Documentation - Redise Pack

A guide to Redise Pack installation, operation and administration

open all | close all

10.3.3 Scaling a social network

As we built our social network in chapter 8, I pointed out that it wasn’t designed to scale to the size of a social network like Twitter, but that it was primarily meant to help you understand what structures and methods it takes to build a social network. In this section, I’ll describe a few methods that can let us scale a social networking site with sharding, almost without bounds (mostly limited by our budget, which is always the case with large projects).

One of the first steps necessary to helping a social network scale is figuring out what data is read often, what data is written often, and whether it’s possible to separate often-used data from rarely used data. To start, say that we’ve already pulled out our posted message data into a separate Redis server, which has read slaves to handle the moderately high volume of reads that occurs on that data. That really leaves two major types of data left to scale: timelines and follower/following lists.

SCALING POSTED MESSAGE DATABASE SIZEIf you actually built this system out, and you had any sort of success, at some point you’d need to further scale the posted message database beyond just read slaves. Because each message is completely contained within a single HASH, these can be easily sharded onto a cluster of Redis servers based on the key where the HASH is stored. Because this data is easily sharded, and because we’ve already worked through how to fetch data from multiple shards as part of our search scaling in section 10.3.2, you shouldn’t have any difficulty here. Alternatively, you can also use Redis as a cache, storing recently posted messages in Redis, and older (rarely read) messages in a primarily on-disk storage server (like PostgreSQL, MySQL, Riak, MongoDB, and so on). If you’re finding yourself challenged, please feel free to post on the message board or on the Redis mailing list. As you may remember, we had three primary types of timelines: home timelines, profile timelines, and list timelines. Timelines themselves are all similar, though both list timelines and home timelines are limited to 1,000 items. Similarly, followers, following, list followers, and list following are also essentially the same, so we’ll also handle them the same. First, let’s look at how we can scale timelines with sharding.

SHARDING TIMELINES

When we say that we’re sharding timelines, it’s a bit of a bait-and-switch. Because home and list timelines are short (1,000 entries max, which we may want to use to inform how large to set zset-max-ziplist-size),1 there’s really no need to shard the contents of the ZSETs; we merely need to place those timelines on different shards based on their key names.

On the other hand, the size that profile timelines can grow to is currently unlimited. Though the vast majority of users will probably only be posting a few times a day at most, there can be situations where someone is posting significantly more often. As an example of this, the top 1,000 posters on Twitter have all posted more than 150,000 status messages, with the top 15 all posting more than a million messages.

On a practical level, it wouldn’t be unreasonable to cap the number of messages that are kept in the timeline for an individual user to 20,000 or so (the oldest being hidden or deleted), which would handle 99.999% of Twitter users generally. We’ll assume that this is our plan for scaling profile timelines. If not, we can use the technique we cover for scaling follower/following lists later in this section for scaling profile timelines instead.

In order to shard our timelines based on key name, we could write a set of functions that handle sharded versions of ZADDZREM, and ZRANGE, along with others, all of which would be short three-line functions that would quickly get boring. Instead, let’s write a class that uses Python dictionary lookups to automatically create connections to shards.

First, let’s start with what we want our API to look like by updating our follow_user() function from chapter 8. We’ll create a generic sharded connection object that will let us create a connection to a given shard, based on a key that we want
to access in that shard. After we have that connection, we can call all of the standard Redis methods to do whatever we want on that shard. We can see what we want our API to look like, and how we need to update our function, in the next listing.

Listing 10.10 An example of how we want our API for accessing shards to work
sharded_timelines = KeyShardedConnection('timelines', 8)

Create a connection that knows about the sharding information for a given component with a number of shards.

def follow_user(conn, uid, other_uid):
   fkey1 = 'following:%s'%uid
   fkey2 = 'followers:%s'%other_uid

   if conn.zscore(fkey1, other_uid):
      print "already followed", uid, other_uid
      return None

   now = time.time()

   pipeline = conn.pipeline(True)
   pipeline.zadd(fkey1, other_uid, now)
   pipeline.zadd(fkey2, uid, now)
   pipeline.zcard(fkey1)
   pipeline.zcard(fkey2)
   following, followers = pipeline.execute()[-2:]
   pipeline.hset('user:%s'%uid, 'following', following)
   pipeline.hset('user:%s'%other_uid, 'followers', followers)
   pipeline.execute()

   pkey = 'profile:%s'%other_uid
   status_and_score = sharded_timelines[pkey].zrevrange(
      pkey, 0, HOME_TIMELINE_SIZE-1, withscores=True)

Fetch the recent status messages from the profile timeline of the nowfollowed user.

   if status_and_score:
      hkey = 'home:%s'%uid
      pipe = sharded_timelines[hkey].pipeline(True)

Get a connection based on the shard key provided, and fetch a pipeline from that.

      pipe.zadd(hkey, **dict(status_and_score))
      pipe.zremrangebyrank(hkey, 0, -HOME_TIMELINE_SIZE-1)

Add the statuses to the home timeline ZSET on the shard, and then trim it.

      pipe.execute()

Execute the transaction.

   return True

Now that we have an idea of what we want our API to look like, let’s build it. We first need
an object that takes the component and number of shards. When a key is referenced via
dictionary lookup on the object, we need to return a connection to the shard that the
provided key should be stored on. The class that implements this follows.

Listing 10.11A class that implements sharded connection resolution based on key
class KeyShardedConnection(object):
   def __init__(self, component, shards):
      self.component = component
      self.shards = shards

The object is initialized with the component name and number of shards.

   def __getitem__(self, key):

When an item is fetched from the object, this method is called with the item that was requested.

      return get_sharded_connection(
         self.component, key, self.shards)

Use the passed key along with the previously known component and shards to fetch the sharded connection.

For simple key-based sharding, this is all that’s necessary to support almost every call
that we’d perform in Redis. All that remains is to update the remainder of
unfollow_user()refill_timeline(), and the rest of the functions that access home
timelines and list timelines. If you intend to scale this social network, go ahead and
update those functions yourself. For those of us who aren’t scaling the social network,
we’ll continue on.

Exercise: Syndicating posts to home and list timelines

With the update to where data is stored for both home and list timelines, can you
update your list timeline supporting syndication task from chapter 8 to support
sharded profiles? Can you keep it almost as fast as the original version? Hint: If
you’re stuck, we include a fully updated version that supports sharded follower lists
in listing 10.15.

Up next is scaling follower and following lists.

SCALING FOLLOWER AND FOLLOWING LISTS WITH SHARDING

Though our scaling of timelines is pretty straightforward, scaling followers, following,
and the equivalent “list” ZSETs is more difficult. The vast majority of these ZSETs will
be short (99.99% of users on Twitter have fewer than 1,000 followers), but there may
be a few users who are following a large number of users, or who have a large number
of followers. As a practical matter, it wouldn’t be unreasonable to limit the number of
users that a given user or list can follow to be somewhat small (perhaps up to 1,000, to
match the limits on home and list timelines), forcing them to create lists if they really
want to follow more people. But we still run into issues when the number of followers
of a given user grows substantially.

To handle the situation where follower/following lists can grow to be very large,
we’ll shard these ZSETs across multiple shards. To be more specific, a user’s followers
will be broken up into as many pieces as we have shards. For reasons we’ll get
into in a moment, we only need to implement specific sharded versions of ZADD,
ZREM, and ZRANGEBYSCORE.

I know what you’re thinking: since we just built a method to handle sharding automatically,
we could use that. We will (to some extent), but because we’re sharding data
and not just keys, we can’t just use our earlier class directly. Also, in order to reduce
the number of connections we need to create and call, it makes a lot of sense to have
data for both sides of the follower/following link on the same shard, so we can’t just
shard by data like we did in chapter 9 and in section 10.2.

In order to shard our follower/following data such that both sides of the follower/
following relationship are on the same shard, we’ll use both IDs as part of the key to look
up a shard. Like we did for sharding timelines, let’s update follow_user() to show the
API that we’d like to use, and then we’ll create the class that’s necessary to implement
the functionality. The updated follow_user() with our desired API is next.

Listing 10.12Access follower/following ZSET shards
sharded_timelines = KeyShardedConnection('timelines', 8)
sharded_followers = KeyDataShardedConnection('followers', 16)

Create a connection that knows about the sharding information for a given component with a number of shards.

def follow_user(conn, uid, other_uid):
   fkey1 = 'following:%s'%uid
   fkey2 = 'followers:%s'%other_uid
   sconn = sharded_followers[uid, other_uid]

Fetch the connection object for the uid, other_uid pair.

   if sconn.zscore(fkey1, other_uid):

Check to see if other_uid pair. the other_uid is already followed

      return None
   now = time.time()
   spipe = sconn.pipeline(True)
   spipe.zadd(fkey1, other_uid, now)
   spipe.zadd(fkey2, uid, now)

Add the follower/ following information to the ZSETs.

   following, followers = spipe.execute()

   pipeline = conn.pipeline(True)
   pipeline.hincrby('user:%s'%uid, 'following', int(following))
   pipeline.hincrby('user:%s'%other_uid, 'followers', int(followers))

Update the follower and following information for both users.

   pipeline.execute()

   pkey = 'profile:%s'%other_uid
   status_and_score = sharded_timelines[pkey].zrevrange(
      pkey, 0, HOME_TIMELINE_SIZE-1, withscores=True)

   if status_and_score:
      hkey = 'home:%s'%uid
      pipe = sharded_timelines[hkey].pipeline(True)
      pipe.zadd(hkey, **dict(status_and_score))
      pipe.zremrangebyrank(hkey, 0, -HOME_TIMELINE_SIZE-1)
      pipe.execute()

   return True

Aside from a bit of rearrangement and code updating, the only difference between
this change and the change we made earlier for sharding timelines is that instead of
passing a specific key to look up the shard, we pass a pair of IDs. From these two IDs,
we’ll calculate the proper shard that data involving both IDs should be stored on. The
class that implements this API appears next.

Listing 10.13Sharded connection resolution based on ID pairs
class KeyDataShardedConnection(object):
   def __init__(self, component, shards):
      self.component = component
      self.shards = shards

The object is initialized with the component name and number of shards.

   def __getitem__(self, ids):

When the pair of IDs is passed as part of the dictionary lookup, this method is called.

      id1, id2 = map(int, ids)

Unpack the pair of IDs, and ensure that they are integers.

      if id2 < id1:
         id1, id2 = id2, id1

If the second is less than the first, swap them so that the first ID is less than or equal to the second.

      key = "%s:%s"%(id1, id2)

Construct a key based on the two IDs.

      return get_sharded_connection(
         self.component, key, self.shards)

Use the computed key along with the previously known component and shards to fetch the sharded connection.

The only thing different for this sharded connection generator, compared to listing
10.11, is that this sharded connection generator takes a pair of IDs instead of a
key. From those two IDs, we generate a key where the lower of the two IDs is first,
and the higher is second. By constructing the key in this way, we ensure that whenever we reference the same two IDs, regardless of initial order, we always end
up on the same shard.

With this sharded connection generator, we can update almost all of the remaining follower/following ZSET operations. The one remaining operation that’s left is to properly handle ZRANGEBYSCORE, which we use in a few places to fetch a “page” of followers. Usually this is done to syndicate messages out to home and list timelines when an update is posted. When syndicating to timelines, we could scan through all of one shard’s ZSET, and then move to the next. But with a little extra work, we could instead pass through all ZSETs simultaneously, which would give us a useful sharded ZRANGEBYSCORE operation that can be used in other situations.

As we saw in section 10.3.2, in order to fetch items 100–109 from sharded ZSETs, we needed to fetch items 0–109 from all ZSETs and merge them together. This is because we only knew the index that we wanted to start at. Because we have the opportunity to scan based on score instead, when we want to fetch the next 10 items with scores greater than X, we only need to fetch the next 10 items with scores greater than X from all shards, followed by a merge. A function that implements ZRANGEBYSCORE across multiple shards is shown in the following listing.

Listing 10.14A function that implements a sharded ZRANGEBYSCORE
def sharded_zrangebyscore(component, shards, key, min, max, num):

We need to take arguments for the component and number of shards, and we’ll limit the arguments to be passed on to only those that’ll ensure correct behavior in sharded situations.

   data = []
   for shard in xrange(shards):
      conn = get_redis_connection("%s:%s"%(component, shard))

Fetch the sharded connection for the current shard.

      data.extend(conn.zrangebyscore(
         key, min, max, start=0, num=num, withscores=True))

Get the data from Redis for this shard.

   def key(pair):
      return pair[1], pair[0]
   data.sort(key=key)

Sort the data based on score, and then by member.

   return data[:num]

Return only the number of items requested.

This function works a lot like the query/merge that we did in section 10.3.2, only we can start in the middle of the ZSET because we have scores (and not indexes).

USING THIS METHOD FOR SHARDING PROFILE TIMELINESYou’ll notice that we use timestamps for follower/following lists, which avoided some of the drawbacks to paginate over sharded ZSETs that we covered in section 10.3.2. If you’d planned on using this method for sharding profile timelines, you’ll need to go back and update your code to use timestamps instead of offsets, and you’ll need to implement a ZREVRANGEBYSCORE version of listing 10.14, which should be straightforward. With this new sharded ZRANGEBYSCORE function, let’s update our function that syndicates posts to home and list timelines in the next listing. While we’re at it, we may as well add support for sharded home timelines.

Listing 10.15Updated syndicate status function
def syndicate_status(uid, post, start=0, on_lists=False):
   root = 'followers'
   key = 'followers:%s'%uid
   base = 'home:%s'
   if on_lists:
      root = 'list:out'
      key = 'list:out:%s'%uid
      base = 'list:statuses:%s'

s

   followers = sharded_zrangebyscore(root,
      sharded_followers.shards, key, start, 'inf', POSTS_PER_PASS)

Fetch the next group of followers using the sharded ZRANGEBYSCORE call.

   to_send = defaultdict(list)

Prepare a structure that will group profile information on a per-shard basis.

   for follower, start in followers:
      timeline = base % follower

Calculate the key for the timeline.

      shard = shard_key('timelines',
         timeline, sharded_timelines.shards, 2)

Find the shard where this timeline would go.

      to_send[shard].append(timeline)

Add the timeline key to the rest of the timelines on the same shard.

   for timelines in to_send.itervalues():
      pipe = sharded_timelines[timelines[0]].pipeline(False)

Get a connection to the server for the group of timelines, and create a pipeline.

      for timeline in timelines:
         pipe.zadd(timeline, **post)
         pipe.zremrangebyrank(
            timeline, 0, -HOME_TIMELINE_SIZE-1)

Add the post to the timeline, and remove any posts that are too old.

      pipe.execute()
      
   conn = redis.Redis()
   if len(followers) >= POSTS_PER_PASS:
      execute_later(conn, 'default', 'syndicate_status',
         [uid, post, start, on_lists])

   elif not on_lists:
      execute_later(conn, 'default', 'syndicate_status',
         [uid, post, 0, True])

As you can see from the code, we use the sharded ZRANGEBYSCORE function to fetch those users who are interested in this user’s posts. Also, in order to keep the syndication process fast, we group requests that are to be sent to each home or list timeline shard server together. Later, after we’ve grouped all of the writes together, we add the post to all of the timelines on a given shard server with a pipeline. Though this may be slower than the nonsharded version, this does allow us to scale our social network much larger than before.

All that remains is to finish updating the rest of the functions to support all of the sharding that we’ve done in the rest of section 10.3.3. Again, if you’re going to scale this social network, feel free to do so. But if you have some nonsharded code that you want to shard, you can compare the earlier version of syndicate_status() from section 8.4 with our new version to get an idea of how to update your code.

1 Because of the way we add items to home and list timelines, they can actually grow to roughly 2,000 entries for a short time. And because Redis doesn’t turn structures back into ziplist-encoded versions of themselves when they’ve gotten too large, setting zset-max-ziplist-size to be a little over 2,000 entries can keep these two timelines encoded efficiently.