This book covers the use of Redis, an in-memory database/data structure server.

open all | close all

6.6.4 Processing files

We’re deferring some of the work of decoding our files to functions that return generators
over data. The readlines() function takes the connection, key, and a block-iterating
callback. It’ll iterate over blocks of data yielded by the block-iterating callback,
discover line breaks, and yield lines. When provided with blocks as in listing 6.32, it finds
the last line ending in the block, and then splits the lines up to that last line ending, yielding
the lines one by one. When it’s done, it keeps any partial lines to prepend onto the
next block. If there’s no more data, it yields the last line by itself. There are other ways
of finding line breaks and extracting lines in Python, but the rfind()/split() combination
is faster than other methods.

Listing 6.32The readlines() function
def readlines(conn, key, rblocks):
   out = ''
   for block in rblocks(conn, key):
      out += block
      posn = out.rfind('n')

Find the rightmost line break if any; rfind() returns -1 on failure.

      if posn >= 0:

We found a line break.

         for line in out[:posn].split('n'):

Split on all of the line breaks.

            yield line + 'n'

Yield each line.

         out = out[posn+1:]

Keep track of the trailing data.

      if not block:

We’re out of data.

         yield out

For our higher-level line-generating function, we’re iterating over blocks produced by
one of two readers, which allows us to focus on finding line breaks.

GENERATORS WITH YIELDListing 6.32 offers our first real use of Python generators
with the yield statement. Generally, this allows Python to suspend and
resume execution of code primarily to allow for easy iteration over sequences
or pseudo-sequences of data. For more details on how generators work, you can
visit the Python language tutorial with this short URL: http://mng.bz/Z2b1.

Each of the two block-yielding callbacks, readblocks and readblocks_gz(), will
read blocks of data from Redis. The first yields the blocks directly, whereas the other
automatically decompresses gzip files. We’ll use this particular layer separation in
order to offer the most useful and reusable data reading method possible. The following
listing shows the readblocks generator.

Listing 6.33The readblocks() generator
def readblocks(conn, key, blocksize=2**17):
   lb = blocksize
   pos = 0
   while lb == blocksize:

Keep fetching more data as long as we don’t have a partial read.

      block = conn.substr(key, pos, pos + blocksize - 1)

Fetch the block.

      yield block
      lb = len(block)
      pos += lb

Prepare for the next pass.

   yield ''

The readblocks() generator is primarily meant to offer an abstraction over our block
reading, which allows us to replace it later with other types of readers, like maybe a filesystem
reader, a memcached reader, a ZSET reader, or in our case, a block reader that
handles gzip files in Redis. The next listing shows the readblocks_gz() generator.

Listing 6.34The readblocks_gz() generator
def readblocks_gz(conn, key):
   inp = ''
   decoder = None
   for block in readblocks(conn, key, 2**17):

Read the raw data from Redis.

      if not decoder:
         inp += block
            if inp[:3] != "x1fx8bx08":
               raise IOError("invalid gzip data")
            i = 10
            flag = ord(inp[3])
            if flag & 4:
               i += 2 + ord(inp[i]) + 256*ord(inp[i+1])
            if flag & 8:
               i = inp.index('', i) + 1
            if flag & 16:
               i = inp.index('', i) + 1
            if flag & 2:
               i += 2

Read the raw data from Redis.

            if i > len(inp):
               raise IndexError("not enough data")
         except (IndexError, ValueError):

We haven’t read the full header yet.

            block = inp[i:]
            inp = None
            decoder = zlib.decompressobj(-zlib.MAX_WBITS)

We found the header; prepare the decompresser.

            if not block:

      if not block:
         yield decoder.flush()

We’re out of data; yield the last chunk.


      yield decoder.decompress(block)

Yield a decompressed block.

Much of the body of readblocks_gz() is gzip header parsing code, which is unfortunately
necessary. For log files (like we’re parsing), gzip can offer a reduction of 2–5
times in storage space requirements, while offering fairly high-speed decompression.
Though more modern compression methods are able to compress better (bzip2,
lzma/xz, and many others) or faster (lz4, lzop, snappy, QuickLZ, and many others),
no other method is as widely available (bzip2 comes close) or has such a useful range
of compression ratio and CPU utilization trade-off options.