DPP's Blog
  • Home
  • Archive

Prances With Horses May 2, 2007

Wednesday, May 2. 2007

Posted by David Pollak in Scala

Prance with the Horses, Skittr with the Mice

How hard is it to build a highly scalable social messaging system? Not hard if you use the right tools. Over the last week and a half, in my spare time, I wrote a Twitter clone in Scala and lift. It's 884 'wc -l' lines of code including comments. It can handle 1M+ users on a two Intel Core 2 Duo boxes (one as the web front end and the other as the message broker.) With a weeks of full-time coding, it could be modified to scale linearly with the number of boxes in the system.

You can check out the code from the lift repository and look at it.

You can play with Skittr.com (it's on a box on my DSL line, so it's going to be bandwidth constrained.)
First, here's the 170 lines (including comments and blank lines) of code that does all the logic:
/**
 * All the "current state" and logic necessary to deal with messages between users
 */
class UserActor extends Actor {
  // the maximum messages to keep in memory
  private val maxMessages = 20

  

  // Information about the user
  private var userName: String = _
  private var userId: long = _
  private var fullName: String = _

  

  // the list of the latest messages for the user
  private var latestMsgs: List[Message] = Nil

  // The timeline for the current user
  private var localTimeline: List[Message] = Nil

  

  // the folks who are following this user 
  private var followers: List[Actor] = Nil

  

  // the listeners (either message listeners and/or listeners to the timeline)
  private var messageViewers: List[Actor] = Nil
  private var timelineViewers: List[Actor] = Nil

  

  // a list of friends
  private var friends: List[String] = Nil

  /**
   * When the Actor is started, this method is invoked
   */
  def act = {
    this.trapExit = true // send messages on exit of linked Actors

    // The main loop that receives messages and processes them
    loop {
      react {
        // The user sends a message containing text and a source.  This can
        // be from the web, from IM, from SMS, etc.
        case SendMessage(text, src) =>
          // create a new Message object to be added to the user's local message and sent
          // to followers
          val msg = Message(text, System.currentTimeMillis, userName, src)
          // add to our local messages (keeping only the maxMessages most recent messages)
          latestMsgs = (msg :: latestMsgs).take(maxMessages)
          // update all our followers (and ourselves) with the message 
          (this :: followers).foreach(_ ! msg)
          // and put it in the database
          MsgStore.create.message(text).source(src).who(userId).save
          // if the message was autogenerated, then autogenerate another message in 5 or so minutes
          if (src == "autogen") autoGen

        

        // someone is asking us for our messages
        case GetMessages => reply(Messages(latestMsgs)) // send them back

        

        // someone is asking us for our timeline
        case GetTimeline => reply(Timeline(localTimeline))  // send it back

        // someone wants to know our name... tell them
        case GetUserIdAndName => reply(UserIdInfo(userId, userName, fullName, friends))

        

        // shut the user down
        case Bye =>
          UserList.remove(userName)
          this.exit(Bye)

        

        // set up the user
        case Setup(id, name, full) =>
          userId = id
          userName = name
          fullName = full
          UserList.add(userName, this) // add the user to the list
          // get the latest messages from the database
          latestMsgs = MsgStore.findAll(By(MsgStore.who, userId), 
                                      OrderBy(MsgStore.when, false),
                                      MaxRows(maxMessages)).map(s => Message(s.message, s.when, userName, s.source))
          localTimeline = latestMsgs  // set the local timeline to our messages (the folks we follow will update us)
          // get friends
          friends = User.findAllByInsecureSql("SELECT users.* FROM users, friends WHERE users.id = friends.friend AND friends.owner = "+userId,
    true).map(_.name.get).sort(_ < _)

        

        // tell all our friends that we follow them
        case ConfigFollowers =>
          friends.flatMap(f => UserList.find(f).toList).foreach(_ ! AddFollower)
          // if the "autogen" property is set, then have each of the actors
          // randomly generate a message 
          if (User.shouldAutogen_? || System.getProperty("autogen") != null) autoGen     

        

        // if we add a friend, 
        case AddFriend(name) =>
          friends = (name :: friends).sort(_ < _)
          // find the user
          UserList.find(name).foreach{
            ua =>
            ua ! AddFollower // tell him we're a follower
            (ua !? GetUserIdAndName) match { // get the user info
              case UserIdInfo(id, _,_, _) => Friend.create.owner(userId).friend(id).save // and persist a friend connection in the DB
              case _ =>
            }
          }

        

        // We are removing a friend
        case RemoveFriend(name) =>
          friends = friends.remove(_ == name)
          // find the user
          UserList.find(name).foreach{
            ua =>
              ua ! RemoveFollower // tell them we're no longer following
              (ua !? GetUserIdAndName) match { // delete from database
                    case UserIdInfo(id, _,_,_) => Friend.findAll(By(Friend.owner, userId), By(Friend.friend, id)).foreach(_.delete_!)
                    case _ => 
              }
          }
          // remove from local timeline
          localTimeline = localTimeline.filter(_.who != name)
          // update timeline vieweres with the former-friend-free timeline
          timelineViewers.foreach(_ ! Timeline(localTimeline))

        

        // merge the messages (from a friend) into our local timeline
        case MergeIntoTimeline(msg) => localTimeline = merge(localTimeline ::: msg)

        

        // add someone who is watching the timeline.  This Actor will get updates each time
        // the local timeline updates.  We link to them so we can remove them if they exit
        case AddTimelineViewer =>
          timelineViewers = sender :: timelineViewers
          this.link(sender)

        

        // remove the timeline viewer
        case RemoveTimelineViewer =>
          timelineViewers = timelineViewers.remove(_ == sender)
          this.unlink(sender)

        // Add an Actor to the list of folks who want to see when we get a message
        // this might be an IM or SMS output
        case AddMessageViewer =>
          messageViewers = sender :: messageViewers
          this.link(sender)

        

        // removes the message viewer
        case RemoveMessageViewer =>
          messageViewers = messageViewers.remove(_ == sender)
          this.unlink(sender)

        

        // add someone who is following us
        case AddFollower =>
          followers = sender :: followers // merge it in
          sender ! MergeIntoTimeline(latestMsgs) // give the follower our messages to merge into his timeline

        // remove the follower
        case RemoveFollower =>  followers = followers.remove(_ == sender) // filter out the sender of the message

        // We get a message
        case msg : Message =>
          messageViewers.foreach(_ ! Messages(latestMsgs)) // send it to the message viewers
          localTimeline = (msg :: localTimeline).take(maxMessages) // update our timeline
          timelineViewers.foreach(_ ! Timeline(localTimeline)) // send the updated timeline to the timeline viewers

        // If someone is exiting, remove them from our lists
        case ('EXIT, who : Actor, why) =>
          messageViewers = messageViewers.remove(_ == who)
          timelineViewers = timelineViewers.remove(_ == who)
          Console.println(why)

        

        case s => Console.println("User "+userName+" Got msg "+s)
      }
    }
  }

  

  /**
  * Sort the list in reverse chronological order and take the first maxMessages elements
  */
  private def merge(bigList: List[Message]) = bigList.sort((a,b) => b.when < a.when).take(maxMessages) 

  /**
  * Autogenerate and schedule a message
  */
  def autoGen = ActorPing.schedule(this, SendMessage("This is a random message @ "+timeNow+" for "+userName, "autogen"), User.randomPeriod)
}

The system contains a UserActor for each user in the system.  The Actors process messages.  A device (the web, IM, SMS) sends a "SendMessage" to the Actor.  The Actor creates a Message instance, puts that in the local message store (we keep 20 messages around) and then forwards the message to all the followers.  Each user also keeps a local "timeline" which is an ordered set of messages from that user and from all the users that the current user is following.

Any follower of the timeline will get  a "Timeline(msg)" message when the timeline is updated. The controller (that causes the web page to get redrawn when an update happens) performs the following on a timeline update:
      case Timeline(msg) =>
      messages = msg
      reRender

And the code that actually renders the timeline in the browser is:
    messages.flatMap(msg => 
      Helpers.bind("sk", defaultXml,
  "username" -> (msg.who+" @ "+toInternetDate(msg.when)),
  "content" -> msg.text))

The view passes in the XHTML template and the messages are bound to the template.

Any thing (another user, a device, etc.) can query the user for its messages or its current timeline.  When the web app displays messages for the user, the web app gets the Actor representing the user (it need not be in the local address space) without making a database query.

What this means in practice is that you'll only have database queries when a new message is added (an insert), when a friend is added (an insert), or when a friend is removed.  There's currently no "history" feature in Skittr, but if there was, that would require a query.  Put another way, Skittr demonstrates how to build a distributed messaging infrastructure that models the actual messages, but uses the database for persistence and history.

lift fully supports Comet style applications.  Rather than the browser "polling" the server, the browser opens a connection to the server and waits for an instruction.  If there's no new instruction (e.g., redraw part of the screen) at the end of timeout (lift uses a 110 second timeout), the server returns a "default" message.  The average cost for composing a complete page (this is more than is required to compose the sub-part of the page containing the messages) is 15ms.  The cost for returning a "no-op" message is < 1ms.  Twitter's hourly new message velocity is 50,000.  That's 14 messages per second which (if each user has 10 friends, half of whom are online) results in 70 "updates" per second.  On a dual core machine, there's plenty of head-room for sending out these messages.  

On a broader note, "He that is good with a hammer tends to think everything is a nail."  The LAMP folks default to solving problems with a database. Sometime, a database is not the best tool to solve problems. Erlang is all about message passing.  There's a ton of problems that are really hard to solve with an RDBMS that get solved really easily with message passing.  I spent a while with Seaside and Magma (an object database) and it totally freed me to think about things in a new way because I was persisting objects and walking object graphs rather than worrying about how big a VARCHAR was.

In many ways, Scala is blend of a lot of these good worlds.  I like that I could choose to blend Actors, servlets, and all the other stuff that's part of the Scala world.  It means that I have a choice about how I solve a problem.  I hope you've enjoyed my solution to the "scaling Twitter problem."  I hope you'll enjoy how the Erlang/Erlyweb folks solve it with their chitter project.

If you're wondering about the title of the post, it's from a children's book:

Barnyard Dance

Stomp your feet! Clamp your hands! Everybody ready for a barnyard dance! Bow to the horse. Bow to the cow. Twirl with the pig if you know how. Bounce with the bunny. Strut with the duck. Spin with the chickens now- Cluck Cluck Cluck!  With a BAA and a MOO and a COCKADOODLEDOO. Everybody promenade two by two.  Prance with the horses, Skitter with the mice. Swing with your partner once or twice. Stand with the donkey.  Slide with the sheep.  Scramble with the little chicks- CHEEP CHEEP CHEEP!  With a NEIGH and a MOO and a COCKADOODLEDOO another little promenade two by two.  Trot with the turkey.  Leap with the frog. Take another spin with the barnyard dog. Turn with the cow in a patch of clover.  All Take a bow, and the dance is over.  With a OINK and a MOO and a QUACK QUACK QUACK, the dance is doe but we'll all be back 





(c) 2010-2025 David Pollak Mastodon Macaw
Put a file in /templates-hidden/include.html to update the page