First, some background on the problem:
Far and away the most common way of persisting and calculating state
is in a relational database (RDBMS). RDBMSs are awesome
creatures. They sit on top of some excellent and well understood
mathematics: set theory. They have well known and well understood
concurrency mechanisms: transactions. They have been designed,
built, tested, and optimized over the last generation. RDBMSs offer a
simple set of commands (SELECT, DELETE, INSERT, UPDATE) as well as a
generally human understandable set of semantics: people understand that
RDBMSs are a sets of things and there are simple ways to ask about
these sets. RDBMSs have evolved along with ERP systems and have
evolved to meet the needs of these systems.
However, there are well known things that RDBMSs don't do well that
include tree structures (yeah, Oracle and others have extensions for
tree walks, but nothing is part of the SQL spec and the performance of
these extensions is not always the same as other models: a tree-walk in
an RDBMS costs O(log n) for each node where a tree walk in an OO system
costs O(1)). Social networks/social graphs are another place
where RDBMSs do not excel.
Let's dive down into this.
A naive implementation of a social messaging site runs something
like these tables:
- Users(id, name, password)
- Friends(owner, friend)
- Messages(id, poster,
content, date)
So, if we wanted to calculate the timeline for a given user at a
given instant, the query would look like:
SELECT messages.* FROM messages,
friends WHERE friends.owner = current_user AND messages.poster = friends.friend
ORDER BY messages.date DESC LIMIT 20
Assuming we've got indexes on friends.owner, messages.poster and
messages.date, the query still results in O(n log n) where n is the
aggregate number of messages posted. This is non-trivial and if
you follow someone who has posted 20,000 messages, then log n cost
becomes non-trivial.
Basically, each time a client asks for the latest timeline, you've
got an O(n log n) operation to determine state. This doesn't
scale.
The first obvious response to the issue is caching (capturing the
state beyond the duration of a short-lived session). I'm going to
skip caching for a moment and do a more sophisticated implementation of
timelines so wecan get better performance.
Let's create a mailbox table. Each time someone publishes a
message, a reference to that message will be put in a Mailbox(owner,
message, date) table and we'll create an index on the table: (owner,
date DESC)
This changes the query to:
SELECT messages.* FROM messages,
mailbox WHERE mailbox.owner = current_user AND messages.id = mailbox.message
ORDER BY mailbox.date DESC LIMIT 20
Depending on your RDBMS, you will wind up with an O(log n)
operation. You find the newest mailbox entry by user (O(log n))
and do an index walk until you've found 20 entries (I'm putting aside
the fact that looking up the 20 messages is an O(n log n) operation
because 20 is a small number and the messages will likely be in the
database's cache... this operation is going to be fast.)
I'm going to sidetrack for a moment. I had the pleasure of
talking over a few beers at a baseball game with one of the senior
engineers at Facebook. We were talking about Facebook's scaling
success. His comment was that it was successful but very
expensive. If there were more than 3% cache misses from MySQL
queries, the system would back up. If they got more than 2% cache
misses from the memcached stuff in front of their MySQL servers the
system would back up. So, basically Facebook has 195% of their
data in RAM.
The net is that O(log n) is only going to work if you've got your
entire index in the cache of your RDBMS. Even a dozen disk reads
is going to turn a 10ms query into a 250ms query and if you've got
1,000 users asking for a status update, you'll wind up with disk
thrashing and ultimately you will not be able to satisfy all of those
requests.
Let's make our discussion more concrete. I'm assuming that an
social networking instance will support 25,000 users. On average,
a user will follow 100 people (100x fan-out of messages). Users
will post one message every 30 minutes (48 messages a day). The
day lasts 10 hours (this is a reasonable approximation for peakiness...
basically, you're compressing 48 message sends in to a 10 hour
period). There are 300 days in a year. These numbers are
averages and there will be some folks who are above average in terms of
fan out (the CEO will have a 25,000x fan out) and some folks are above
average in number of messages per day.
So, that means that each year, there will be 36,000M (36B) mailbox
entries. If each entry costs us 16 bytes of RAM for index purposes,
that means we're at 576B bytes of index. There's no way that
amount of index will fit in RAM. So, what happens if the average
messages/day drops to 1, you're still looking at 10GB of index.
Alternatively, you could purge messages after 3 weeks or limit
timelines to a certain number of messages. That's not
unreasonable, but it's also adding a constraint to the system to deal
with limitations of the RDBMS. There are other alternatives.
Let me talk memcached for a minute. In my opinion, memcached
means that you have a failed design. Memcached means abandoning
all the awesome things that you get with an RDBMS: a mathematical
model, a concurrency/transactional model, durability guarantees, etc.
But, we could move our state from the calculate-on-demand model of
the RDBMS to the a calculate once and cache model using
memcached. This means that you only take the nasty hits if the
cache is not valid. Putting aside the cost of cache invalidation
(I haven't covered the costs of updates in this discussion because
there's no need to go there... the implementation failures can be
demonstrated with just reads), if you have a simple cache invalidation
scheme, most of the cache entries will not survive for 15 minutes (I
can go through the math, but I'm going to leave this one to the
reader). You risk cache stampedes (more than 1 process rebuilding
the cache entry). Basically, the naive memcached implementation
buys you a little bit of head room over the naive (non-mailbox)
approach. In order to get more than 5x or so improvement
(something that will serve a few thousand rather than a few hundred
users), you need to manipulate the cache entries inserting/deleting
individual messages.
The above paragraph in fact leads us in the direction of a better
answer.
But first, let me state that I have proven that an RDBMS cannot be
the sole locus of state for a social messaging site that services more
than a few hundred users. Period. We must move state
somewhere else and manage the cached state manually rather than with
queries and indexes. Let's walk through a design that gives us a
concurrency model as well as the performance we want.
Imagine a model where you interact with a User with a limited set of
(asynchronous) messages:
- add/remove friend
- add message to timeline
- post message (the user has
created a message and it needs to be processed)
- get current timeline (with
offsets and number of entries)
These are the basic messages needed to implement a social messaging
site. If we guaranty that a User will only process 1 message at a time,
we have a concurrency model. It's simple and simple is
good. We have not defined how/where Users store there state (it
could be on a filesystem, in an RDBMS, in a NoSQL store, who
knows). But we can say that adding a message is an O(1) operation
(prepending to the head of a singly linked list). Each User can
have a caching policy (and that caching policy could be dynamic based
on the access characteristics for the User). The sender of the
message doesn't block on the processing of the message (although the
get current timeline message will have an asynchronous response that
the sender will likely block on).
We have changed our abstraction from one where all data (tables and
indexes) are created equal to one where certain data structures are
more prominent (User and Message) than others (mailbox, friends).
We have lost something: transactions. In this model, if I add
Dick as a friend, I am not guaranteed that I will receive Dick's next
update... it may take time for the messages to propagate to Dick's User
and his Message may be sent before the "add friend" message gets to
him. In the case of a financial transaction, this would be
fatal. In the case of social networking, this is a perfectly
reasonable trade-off.
One of the ways that RDBMSs get performance (and the way products
like Oracle distinguish themselves from the likes of MySQL) is the
ability to cache optimized query plans, cache the right data, and
invalidate the right caches at the right time. The same
requirements are going to come up in a social networked application.
So far, I've described an Actor based system... and there are a
plethora of Actor-based systems around. Erlang is the the best
system for distributed Actors. Akka
is the best Scala Actor implementation around. But Actors provide
a single service: ordered messaging. The additional services that
are needed to create scalable social systems are:
- A unified transaction model such that different components in the
system have a single mechanism to engage in transactions;
- A simple interface to extract data from the system without being
and Actor in the system;
- A scalable storage model that provides transactions (integrated
with the system's transaction mechanism) and a richer data model than
key-value pairs.
Additionally, the system should behave the same way when everything
is running in a single JVM or distributed across the network. The
application-level code should not know or be different if the resources
are local or remote. Code should go from the developer's laptop
to the production system unchanged. Additionally, the business
logic in the code should be testable without having to set up a large
number of different software systems.
Goat Rodeo acheives all the above goals... plus one additional goal:
the single machine implementation is open source under an Apache 2.0
license. The scalable multi-machine version is closed
source. What this means if if you've got a site that's going to
run on a single system, you can build it with Goat Rodeo. If you
become the next Twitter of FourSquare, you can rest assured that your
system will scale just by adding money. The benefit to the open
source community is that development is based on money rather than the
goodness of someone's heart. Anyway, back to the bits and bytes.
Goat Rodeo insures, at the compiler level, that all messages can be
JSON serialized and distributed across the network. Anything in
Goat Rodeo that could be serialized must be a subclass of the QBase
trait. Goat Rodeo includes a compiler plugin that insures that
all classes that extend QBase can be serialized (they contain only
immutable List, Map, String, primative types or things that subclass
from QBase.) This means that all messages are immutable.
The container of business logic in Goat Rodeo is a Worker. A
Worker is like an Actor, but there are enough syntactic and semantic
differences, that I choice a new name. Actors are typed based on
the WorkerId:
trait WorkerId extends QBase {
type MyType <: WorkerId
type MsgType <: QBase
type IdType
def myType:
Manifest[MyType]
def id: IdType
def uniqueNameForFile:
String
}
Here's an example of a SimpleId:
final case class SimpleId(id:
Long) extends WorkerId {
type MyType = SimpleId
type MsgType = SimpleMsg
type IdType = Long
def myType =
manifest[SimpleId]
def uniqueNameForFile:
String = "simple_"+id
}
A Worker is defined:
/**
* The interface to a
Worker. A Worker receives messages, performs computations and
communicates with
* other workers
*/
trait Worker[IdType <:
WorkerId] extends SimpleActor[IdType#MsgType] {
/**
* The unique ID of
the Work
*/
def id: IdType
/**
* Send a message to
the worker
*/
def !(msg: MsgType): Unit
}
You can only send an asynchronous message to a Worker. Here's
an example of an asynchronous message:
case class Moo(num: Int)
extends SimpleMsg
You can obtain a Worker[WorkerId] from the WorkerMgr:
WorkerMgr.find(SimpleId(4)):
Box[Worker[SimpleId]]
And here's the code for a Worker[SimpleId]:
class SimpleWorker(id:
SimpleId, calcFunc: SimpleId => ConnectionManager) extends WorkerImpl[SimpleId, SimpleMsg](id,
calcFunc) {
var mooRes = 0
private var balance: Long
= id.id
def doMoo(in: Moo): Unit =
{
mooRes = in.num
}
}
The code does not look like the typical Actor's react/receive
loop. While it's possible to write that kind of dispatch for
Workers, I opted to use reflection to build the dispatch table.
Public methods that start with "do", "perform" or "handle" and take a
subclass of IdType#MsgType are placed in the Worker's dispatch table.
So, you can send a message to a Worker (either from within another
worker or from the outside world):
for {
worker <- WorkerMgr.find(SimpleId(17))
} worker !
StartTrans(SimpleId(18))
So far, nothing revolutionary. But, you may be asking, how
does the outside world ask a question of a Worker? All messages
that lead to a response must mix in:
/**
* A message that generates
a response with a particular type
*/
trait MsgWithResponse[T <:
QBase] extends QBase
So, all messages that lead to a response are strongly typed.
So, we might have the following message:
case class GetBalance()
extends SimpleMsg with MsgWithResponse[QLong]
And from the outside world, we can send it to a worker:
OutsideOfWorker on
myWorker complete {resp: Box[QLong] => resp.foreach(q =>
println("The balance is "+q))} ask GetBalance()
The method on the Worker looks like:
def doGetBalance(in:
GetBalance): QLong = {
balance
}
Alternatively:
def doGetBalance(in:
GetBalance, answer: Box[QLong] => Unit): Unit = {
answer(Full(QLong(balance)))
}
So, we can send messages to Workers and get answers to questions
from Workers from outside the Worker infrastructure. We can also
ask questions from within the Worker context:
on myWorker complete {resp:
Box[QLong] => resp.foreach(q => println("The balance is "+q))}
ask GetBalance()
The only difference between the two is that from within the context
of a Worker, you ask questions via the Worker. The answer
function is executed within the context of the Worker's mailbox and
within the context of the Worker.
Workers can also engage in transactions:
for {
t
<- msg.xacts
fromWorker <- WorkerMgr.find(t.from)
} transaction
avec fromWorker send SetupTransfer(t)
To send a message transactionally to another Worker (of the same
type or of a different type), you designate that the message is
transactional, the transaction is with (avec) another worker and the
message to send. There are a number of transaction modifiers:
postCommit {
f(Full(QLong(balance)))
}
This says that after the commit operation, apply the function f...
send the balance to the asker of the question. We can also define
the action to take in case of a rollback:
rollback {(a,b) =>
TestRecord.findAll().foreach{r => balance = r.balance}
f(ParamFailure("rollback", Empty, Empty, (a, b)))}
Read the balance from the database and reply to the balance inquiry
with a Failure.
Particular messages can be marked as non-re-entrant... they can only
be serviced once per transaction:
transaction notReentrant
"doSetup called a second time with "+msg
Transactions can be rolled back explicitly:
if (balance <
msg.transfer.amount) transaction rollbackTransaction("Insufficient
balance: "+balance+" need "+msg.transfer.amount)
But if all messages that are part of a transaction are processed
successfully and do not themselves send transactional messages, then
transaction commits... basically, the leaf nodes of the transaction are
the message handlers that do not send messages. If all the leaf
message handlers succeed, they notify their callers that they are
available to commit. This process rolls up to the original
handler and once all the methods have completed, the worker that
initiated the transaction sends out a Commit message which ripples
through the Workers that participated in the transaction.
During a transaction, a worker can only process messages related to
that transaction. I am working on deadlock detection (right now,
it's via timeout, but it could be more sophisticated) as well as
supporting two-phase commits.
Goat Rodeo includes a powerful backing store: SQL. Each Worker
has its own private relational data store. Each Worker can store
complex data structures and perform powerful queries against a
relational database. Each Worker's RDBMS commits (or rolls back
if the handler exits via an exception or if transaction is explicit
rolled back) at the end of each message process for messages that are
outside of a Goat Rodeo transaction. For messages within a Goat
Rodeo transaction, the RDBMS connection is committed or rolled back
based on the success or failure of the transaction.
Goat Rodeo's semantics for message passing, ask/answer and
transactions are identical for local systems or remote systems.
The Worker implementation will behave the same way in either
case. How is this acheived? All messages are guaranteed to
be serializable and not contain any mutable references. There's
no way to know where a message came from. The Worker interface is
simple (there are two additional private methods used for out-of-band
communications but those methods take limitted and well-defined system
control messages). All communication between Workers is handled
via messages.
If you are interested in a distributed version of Goat Rodeo, please
contact me and we can discuss your project, your needs, etc.
So, the Goat Rodeo code is up at Assembla (http://liftweb.assembla.com/spaces/goat_rodeo/stream)
[I chose Assembla over GitHub because Assembla has a much better
ticketing system and it has more flexible administrative controls, but
I might move it back to GitHub if there's a good reason]. There's
a Google Group for discussing Goat Rodeo at http://groups.google.com/group/goat-rodeo.
I look forward to seeing what people think and seeing how Goat Rodeo
can evolve.
Oh... and a few side notes. Goat Rodeo runs on Scala 2.8 Beta
1, so it's not ready for production. Goat Rodeo was inspired by
Philipp Haller's awesome work with Scala Actors and concurrency.
I think that the work Jonas and his team have done with Akka is great
and I am interested in seeing how we can share ideas, code,
inspiration, and exploration to make Goat Rodeo and Akka part of one
larger, better thing. I'll get the Maven bits and pieces up on
scala-tools.org sometime the week of 2/15/2010. Right now, Goat
Rodeo uses Maven as its build system, that will change to SBT very soon.
Thanks!