10.3.3 Scaling a social network
As we built our social network in chapter 8, I pointed out that it wasn’t designed to scale to the size of a social network like Twitter, but that it was primarily meant to help you understand what structures and methods it takes to build a social network. In this section, I’ll describe a few methods that can let us scale a social networking site with sharding, almost without bounds (mostly limited by our budget, which is always the case with large projects).
One of the first steps necessary to helping a social network scale is figuring out what data is read often, what data is written often, and whether it’s possible to separate often-used data from rarely used data. To start, say that we’ve already pulled out our posted message data into a separate Redis server, which has read slaves to handle the moderately high volume of reads that occurs on that data. That really leaves two major types of data left to scale: timelines and follower/following lists.
SCALING POSTED MESSAGE DATABASE SIZEIf you actually built this system out, and you had any sort of success, at some point you’d need to further scale the posted message database beyond just read slaves. Because each message is completely contained within a single HASH, these can be easily sharded onto a cluster of Redis servers based on the key where the HASH is stored. Because this data is easily sharded, and because we’ve already worked through how to fetch data from multiple shards as part of our search scaling in section 10.3.2, you shouldn’t have any difficulty here. Alternatively, you can also use Redis as a cache, storing recently posted messages in Redis, and older (rarely read) messages in a primarily on-disk storage server (like PostgreSQL, MySQL, Riak, MongoDB, and so on). If you’re finding yourself challenged, please feel free to post on the message board or on the Redis mailing list. As you may remember, we had three primary types of timelines: home timelines, profile timelines, and list timelines. Timelines themselves are all similar, though both list timelines and home timelines are limited to 1,000 items. Similarly, followers, following, list followers, and list following are also essentially the same, so we’ll also handle them the same. First, let’s look at how we can scale timelines with sharding.
When we say that we’re sharding timelines, it’s a bit of a bait-and-switch. Because home and list timelines are short (1,000 entries max, which we may want to use to inform how large to set zset-max-ziplist-size),1 there’s really no need to shard the contents of the ZSETs; we merely need to place those timelines on different shards based on their key names.
On the other hand, the size that profile timelines can grow to is currently unlimited. Though the vast majority of users will probably only be posting a few times a day at most, there can be situations where someone is posting significantly more often. As an example of this, the top 1,000 posters on Twitter have all posted more than 150,000 status messages, with the top 15 all posting more than a million messages.
On a practical level, it wouldn’t be unreasonable to cap the number of messages that are kept in the timeline for an individual user to 20,000 or so (the oldest being hidden or deleted), which would handle 99.999% of Twitter users generally. We’ll assume that this is our plan for scaling profile timelines. If not, we can use the technique we cover for scaling follower/following lists later in this section for scaling profile timelines instead.
In order to shard our timelines based on key name, we could write a set of functions that handle sharded versions of ZADD, ZREM, and ZRANGE, along with others, all of which would be short three-line functions that would quickly get boring. Instead, let’s write a class that uses Python dictionary lookups to automatically create connections to shards.
First, let’s start with what we want our API to look like by updating our follow_user() function from chapter 8. We’ll create a generic sharded connection object that will let us create a connection to a given shard, based on a key that we want
to access in that shard. After we have that connection, we can call all of the standard Redis methods to do whatever we want on that shard. We can see what we want our API to look like, and how we need to update our function, in the next listing.
Now that we have an idea of what we want our API to look like, let’s build it. We first need
an object that takes the component and number of shards. When a key is referenced via
dictionary lookup on the object, we need to return a connection to the shard that the
provided key should be stored on. The class that implements this follows.
For simple key-based sharding, this is all that’s necessary to support almost every call
that we’d perform in Redis. All that remains is to update the remainder of
unfollow_user(), refill_timeline(), and the rest of the functions that access home
timelines and list timelines. If you intend to scale this social network, go ahead and
update those functions yourself. For those of us who aren’t scaling the social network,
we’ll continue on.
Exercise: Syndicating posts to home and list timelines
With the update to where data is stored for both home and list timelines, can you
update your list timeline supporting syndication task from chapter 8 to support
sharded profiles? Can you keep it almost as fast as the original version? Hint: If
you’re stuck, we include a fully updated version that supports sharded follower lists
in listing 10.15.
Up next is scaling follower and following lists.
SCALING FOLLOWER AND FOLLOWING LISTS WITH SHARDING
Though our scaling of timelines is pretty straightforward, scaling followers, following,
and the equivalent “list” ZSETs is more difficult. The vast majority of these ZSETs will
be short (99.99% of users on Twitter have fewer than 1,000 followers), but there may
be a few users who are following a large number of users, or who have a large number
of followers. As a practical matter, it wouldn’t be unreasonable to limit the number of
users that a given user or list can follow to be somewhat small (perhaps up to 1,000, to
match the limits on home and list timelines), forcing them to create lists if they really
want to follow more people. But we still run into issues when the number of followers
of a given user grows substantially.
To handle the situation where follower/following lists can grow to be very large,
we’ll shard these ZSETs across multiple shards. To be more specific, a user’s followers
will be broken up into as many pieces as we have shards. For reasons we’ll get
into in a moment, we only need to implement specific sharded versions of ZADD,
ZREM, and ZRANGEBYSCORE.
I know what you’re thinking: since we just built a method to handle sharding automatically,
we could use that. We will (to some extent), but because we’re sharding data
and not just keys, we can’t just use our earlier class directly. Also, in order to reduce
the number of connections we need to create and call, it makes a lot of sense to have
data for both sides of the follower/following link on the same shard, so we can’t just
shard by data like we did in chapter 9 and in section 10.2.
In order to shard our follower/following data such that both sides of the follower/
following relationship are on the same shard, we’ll use both IDs as part of the key to look
up a shard. Like we did for sharding timelines, let’s update follow_user() to show the
API that we’d like to use, and then we’ll create the class that’s necessary to implement
the functionality. The updated follow_user() with our desired API is next.
Aside from a bit of rearrangement and code updating, the only difference between
this change and the change we made earlier for sharding timelines is that instead of
passing a specific key to look up the shard, we pass a pair of IDs. From these two IDs,
we’ll calculate the proper shard that data involving both IDs should be stored on. The
class that implements this API appears next.
The only thing different for this sharded connection generator, compared to listing
10.11, is that this sharded connection generator takes a pair of IDs instead of a
key. From those two IDs, we generate a key where the lower of the two IDs is first,
and the higher is second. By constructing the key in this way, we ensure that whenever we reference the same two IDs, regardless of initial order, we always end
up on the same shard.
With this sharded connection generator, we can update almost all of the remaining follower/following ZSET operations. The one remaining operation that’s left is to properly handle ZRANGEBYSCORE, which we use in a few places to fetch a “page” of followers. Usually this is done to syndicate messages out to home and list timelines when an update is posted. When syndicating to timelines, we could scan through all of one shard’s ZSET, and then move to the next. But with a little extra work, we could instead pass through all ZSETs simultaneously, which would give us a useful sharded ZRANGEBYSCORE operation that can be used in other situations.
As we saw in section 10.3.2, in order to fetch items 100–109 from sharded ZSETs, we needed to fetch items 0–109 from all ZSETs and merge them together. This is because we only knew the index that we wanted to start at. Because we have the opportunity to scan based on score instead, when we want to fetch the next 10 items with scores greater than X, we only need to fetch the next 10 items with scores greater than X from all shards, followed by a merge. A function that implements ZRANGEBYSCORE across multiple shards is shown in the following listing.
This function works a lot like the query/merge that we did in section 10.3.2, only we can start in the middle of the ZSET because we have scores (and not indexes).
USING THIS METHOD FOR SHARDING PROFILE TIMELINESYou’ll notice that we use timestamps for follower/following lists, which avoided some of the drawbacks to paginate over sharded ZSETs that we covered in section 10.3.2. If you’d planned on using this method for sharding profile timelines, you’ll need to go back and update your code to use timestamps instead of offsets, and you’ll need to implement a ZREVRANGEBYSCORE version of listing 10.14, which should be straightforward. With this new sharded ZRANGEBYSCORE function, let’s update our function that syndicates posts to home and list timelines in the next listing. While we’re at it, we may as well add support for sharded home timelines.
As you can see from the code, we use the sharded ZRANGEBYSCORE function to fetch those users who are interested in this user’s posts. Also, in order to keep the syndication process fast, we group requests that are to be sent to each home or list timeline shard server together. Later, after we’ve grouped all of the writes together, we add the post to all of the timelines on a given shard server with a pipeline. Though this may be slower than the nonsharded version, this does allow us to scale our social network much larger than before.
All that remains is to finish updating the rest of the functions to support all of the sharding that we’ve done in the rest of section 10.3.3. Again, if you’re going to scale this social network, feel free to do so. But if you have some nonsharded code that you want to shard, you can compare the earlier version of syndicate_status() from section 8.4 with our new version to get an idea of how to update your code.
1 Because of the way we add items to home and list timelines, they can actually grow to roughly 2,000 entries for a short time. And because Redis doesn’t turn structures back into ziplist-encoded versions of themselves when they’ve gotten too large, setting zset-max-ziplist-size to be a little over 2,000 entries can keep these two timelines encoded efficiently.