EBOOK – REDIS IN ACTION

This book covers the use of Redis, an in-memory database/data structure server.

open all | close all

6.6.1 Aggregating users by location

Let’s take a moment and look back at an earlier problem that we solved for Fake
Game Company. With the ability to discover where users are accessing the game from
thanks to our IP-to-city lookup in chapter 5, Fake Game Company has found itself
needing to reparse many gigabytes of log files. They’re looking to aggregate user visitation
patterns over time in a few different dimensions: per country, per region, per city, and more. Because we need this to be run in real time over new data, we’ve
already implemented callbacks to perform the aggregate operations.

As you may remember from chapter 5, Fake Game Company has been around for
about 2 years. They have roughly 1,000,000 users per day, but they have roughly 10
events per user per day. That gives us right around 7.3 billion log lines to process. If
we were to use one of the earlier methods, we’d copy the log files to various machines
that need to process the data, and then go about processing the log files. This works,
but then we need to copy the data, potentially delaying processing, and using storage
space on every machine that processes the data, which later needs to be cleaned up.

In this particular case, instead of copying files around, we could write a one-time
map-reduce3 process to handle all of this data. But because map-reduces are designed
to not share memory between items to be processed (each item is usually one log
line), we can end up taking more time with map-reduce than if we spent some time
writing it by hand to share memory. More specifically, if we load our IP-to-city lookup
table into memory in Python (which we’d only want to do if we had a lot of processing
to do, and we do), we can perform about 200k IP-to-city ID lookups per second, which
is faster than we could expect a single Redis instance to respond to the same queries.
Also, to scale with map-reduce, we’d have to run at least a few instances of Redis to
keep up with the map-reduces.

With the three standard methods of handling this already discounted (NFS/
Samba, copying files, map-reduce), let’s look at some other practical pieces that we’ll
need to solve to actually perform all of our lookups.

AGGREGATING DATA LOCALLY

In order to process that many log entries efficiently, we’ll need to locally cache aggregates
before updating Redis in order to minimize round trips. Why? If we have
roughly 10 million log lines to process for each day, then that’s roughly 10 million
writes to Redis. If we perform the aggregates locally on a per-country basis for the
entire day (being that there are around 300 countries), we can instead write 300 values
to Redis. This will significantly reduce the number of round trips between Redis,
reducing the number of commands processed, which in turn will reduce the total processing
time.

If we don’t do anything intelligent about local caching, and we have 10 aggregates
that we want to calculate, we’re looking at around 10 days to process all of the data.
But anything on the country or region level can be aggregated completely (for the
day) before being sent to Redis. And generally because the top 10% of cities (there
are roughly 350,000 cities in our sample dataset) amount for more than 90% of our
game’s users, we can also locally cache any city-level aggregates. So by performing
local caching of aggregates, we’re not limited by Redis on aggregate throughput.

Assuming that we’ve already cached a copy of our ZSET and HASH table for IP lookups
from section 5.3, we only need to worry about aggregating the data. Let’s start with the log lines that contain an IP address, date, time, and the operation that was
performed, similar to the following snippet:

173.194.38.137 2011-10-10 13:55:36 achievement-762

Given log lines of that form, let’s aggregate those lines on a daily basis per country. To
do this, we’ll receive the line as part of a call and increment the appropriate counter.
If the log line is empty, then we’ll say that the day’s worth of data is done, and we
should write to Redis. The source code for performing this aggregate is shown next.

Listing 6.29A locally aggregating callback for a daily country-level aggregate
aggregates = defaultdict(lambda: defaultdict(int))

Prepare the local aggregate dictionary.

def daily_country_aggregate(conn, line):
   if line:
      line = line.split()
      ip = line[0]
      day = line[1]

Extract the information from our log lines.

      country = find_city_by_ip_local(ip)[2]

Find the country from the IP address.

      aggregates[day][country] += 1

Increment our local aggregate.

      return

   for day, aggregate in aggregates.items():
      conn.zadd('daily:country:' + day, **aggregate)
      del aggregates[day]

The day file is done; write our aggregate to Redis.

Now that we’ve written and seen one of these aggregate functions, the rest are fairly
similar and just as easy to write. Let’s move on to more interesting topics, like how
we’re going to send files through Redis.

3 MapReduce (or Map/Reduce) is a type of distributed computation popularized by Google, which can offer
high performance and simplicity for some problems.