EBOOK – REDIS IN ACTION

This book covers the use of Redis, an in-memory database/data structure server.

  • Foreword
  • Preface
  • Acknowledgments
  • About this Book
  • About the Cover Illustration
  • Part 1: Getting Started
  • Part 2: Core concepts
  • Part 3: Next steps
  • Appendix A
  • Appendix B
  • Buy the paperback

    9.3.3 Calculating aggregates over sharded STRINGs

    To calculate aggregates, we have two use cases. Either we’ll calculate aggregates over
    all of the information we know about, or we’ll calculate over a subset. We’ll start by calculating
    aggregates over the entire population, and then we’ll write code that calculates
    aggregates over a smaller group.

    To calculate aggregates over everyone we have information for, we’ll recycle some
    code that we wrote back in section 6.6.4, specifically the readblocks() function,
    which reads blocks of data from a given key. Using this function, we can perform a single
    command and round trip with Redis to fetch information about thousands of
    users at one time. Our function to calculate aggregates with this block-reading function
    is shown next.

    Listing 9.16A function to aggregate location information for everyone
    def aggregate_location(conn):
    
       countries = defaultdict(int)
       states = defaultdict(lambda:defaultdict(int))
    

    Initialize two special structures that will allow us to update existing and missing counters quickly.

       max_id = int(conn.zscore('location:max', 'max'))
       max_block = max_id // USERS_PER_SHARD
    

    Fetch the maximum user ID known, and use that to calculate the maximum shard ID that we need to visit.

       for shard_id in xrange(max_block + 1):
    

    Sequentially check every shard...

          for block in readblocks(conn, 'location:%s'%shard_id):
    

    ... reading each block.

             for offset in xrange(0, len(block)-1, 2):
    

    Extract each code from the block and look up the original location information (like US, CA for someone who lives in California).

                code = block[offset:offset+2]
    
                update_aggregates(countries, states, [code])
    

    Update our aggregates.

       return countries, states
    

    This function to calculate aggregates over country- and state-level information for
    everyone uses a structure called a defaultdict, which we also first used in chapter 6
    to calculate aggregates about location information before writing back to Redis.
    Inside this function, we refer to a helper function that actually updates the aggregates
    and decodes location codes back into their original ISO3 country codes and local state
    abbreviations, which can be seen in this next listing.

    Listing 9.17Convert location codes back to country/state information
    def update_aggregates(countries, states, codes):
       for code in codes:
    
          if len(code) != 2:
             continue
    

    Only look up codes that could be valid.

          country = ord(code[0]) - 1
          state = ord(code[1]) - 1
    

    Calculate the actual offset of the country and state in the lookup tables.

          if country < 0 or country >= len(COUNTRIES):
             continue
    

    If the country is out of the range of valid countries, continue to the next code.

          country = COUNTRIES[country]
    

    Fetch the ISO3 country code.

          countries[country] += 1
    

    Count this user in the decoded country.

          if country not in STATES:
             continue
          if state < 0 or state >= STATES[country]:
             continue
    

    If we don’t have state information or if the state is out of the range of valid states for the country, continue to the next code.

          state = STATES[country][state]
    

    Fetch the state name from the code.

          states[country][state] += 1
    

    Increment the count for the state.

    With a function to convert location codes back into useful location information and
    update aggregate information, we have the building blocks to perform aggregates
    over a subset of users. As an example, say that we have location information for many
    Twitter users. And also say that we have follower information for each user. To discover
    information about where the followers of a given user are located, we’d only
    need to fetch location information for those users and compute aggregates similar to
    our global aggregates. The next listing shows a function that will aggregate location
    information over a provided list of user IDs.

    Listing 9.18A function to aggregate location information over provided user IDs
    def aggregate_location_list(conn, user_ids):
    
       pipe = conn.pipeline(False)
    

    Set up the pipeline so that we aren’t making too many round trips to Redis.

       countries = defaultdict(int)
       states = defaultdict(lambda: defaultdict(int))
    

    Set up our base aggregates as we did before.

       for i, user_id in enumerate(user_ids):
    
          shard_id, position = divmod(user_id, USERS_PER_SHARD)
          offset = position * 2
    

    Calculate the shard ID and offset into the shard for this user’s location.

          pipe.substr('location:%s'%shard_id, offset, offset+1)
    

    Send another pipelined command to fetch the location information for the user.

          if (i+1) % 1000 == 0:
             update_aggregates(countries, states, pipe.execute())
    

    Every 1000 requests, we’ll actually update the aggregates using the helper function we defined before.

       update_aggregates(countries, states, pipe.execute())
    

    Handle the last hunk of users that we might have missed before.

       return countries, states
    

    Return the aggregates.

    This technique of storing fixed-length data in sharded STRINGs can be useful. Though
    we stored multiple bytes of data per user, we can use GETBIT and SETBIT identically to
    store individual bits, or even groups of bits.