This book covers the use of Redis, an in-memory database/data structure server.

open all | close all

9.3.3 Calculating aggregates over sharded STRINGs

To calculate aggregates, we have two use cases. Either we’ll calculate aggregates over
all of the information we know about, or we’ll calculate over a subset. We’ll start by calculating
aggregates over the entire population, and then we’ll write code that calculates
aggregates over a smaller group.

To calculate aggregates over everyone we have information for, we’ll recycle some
code that we wrote back in section 6.6.4, specifically the readblocks() function,
which reads blocks of data from a given key. Using this function, we can perform a single
command and round trip with Redis to fetch information about thousands of
users at one time. Our function to calculate aggregates with this block-reading function
is shown next.

Listing 9.16A function to aggregate location information for everyone
def aggregate_location(conn):
   countries = defaultdict(int)
   states = defaultdict(lambda:defaultdict(int))

Initialize two special structures that will allow us to update existing and missing counters quickly.

   max_id = int(conn.zscore('location:max', 'max'))
   max_block = max_id // USERS_PER_SHARD

Fetch the maximum user ID known, and use that to calculate the maximum shard ID that we need to visit.

   for shard_id in xrange(max_block + 1):

Sequentially check every shard...

      for block in readblocks(conn, 'location:%s'%shard_id):

... reading each block.

         for offset in xrange(0, len(block)-1, 2):

Extract each code from the block and look up the original location information (like US, CA for someone who lives in California).

            code = block[offset:offset+2]
            update_aggregates(countries, states, [code])

Update our aggregates.

   return countries, states

This function to calculate aggregates over country- and state-level information for
everyone uses a structure called a defaultdict, which we also first used in chapter 6
to calculate aggregates about location information before writing back to Redis.
Inside this function, we refer to a helper function that actually updates the aggregates
and decodes location codes back into their original ISO3 country codes and local state
abbreviations, which can be seen in this next listing.

Listing 9.17Convert location codes back to country/state information
def update_aggregates(countries, states, codes):
   for code in codes:
      if len(code) != 2:

Only look up codes that could be valid.

      country = ord(code[0]) - 1
      state = ord(code[1]) - 1

Calculate the actual offset of the country and state in the lookup tables.

      if country < 0 or country >= len(COUNTRIES):

If the country is out of the range of valid countries, continue to the next code.

      country = COUNTRIES[country]

Fetch the ISO3 country code.

      countries[country] += 1

Count this user in the decoded country.

      if country not in STATES:
      if state < 0 or state >= STATES[country]:

If we don’t have state information or if the state is out of the range of valid states for the country, continue to the next code.

      state = STATES[country][state]

Fetch the state name from the code.

      states[country][state] += 1

Increment the count for the state.

With a function to convert location codes back into useful location information and
update aggregate information, we have the building blocks to perform aggregates
over a subset of users. As an example, say that we have location information for many
Twitter users. And also say that we have follower information for each user. To discover
information about where the followers of a given user are located, we’d only
need to fetch location information for those users and compute aggregates similar to
our global aggregates. The next listing shows a function that will aggregate location
information over a provided list of user IDs.

Listing 9.18A function to aggregate location information over provided user IDs
def aggregate_location_list(conn, user_ids):
   pipe = conn.pipeline(False)

Set up the pipeline so that we aren’t making too many round trips to Redis.

   countries = defaultdict(int)
   states = defaultdict(lambda: defaultdict(int))

Set up our base aggregates as we did before.

   for i, user_id in enumerate(user_ids):
      shard_id, position = divmod(user_id, USERS_PER_SHARD)
      offset = position * 2

Calculate the shard ID and offset into the shard for this user’s location.

      pipe.substr('location:%s'%shard_id, offset, offset+1)

Send another pipelined command to fetch the location information for the user.

      if (i+1) % 1000 == 0:
         update_aggregates(countries, states, pipe.execute())

Every 1000 requests, we’ll actually update the aggregates using the helper function we defined before.

   update_aggregates(countries, states, pipe.execute())

Handle the last hunk of users that we might have missed before.

   return countries, states

Return the aggregates.

This technique of storing fixed-length data in sharded STRINGs can be useful. Though
we stored multiple bytes of data per user, we can use GETBIT and SETBIT identically to
store individual bits, or even groups of bits.