This book covers the use of Redis, an in-memory database/data structure server.

open all | close all

6.6.3 Receiving files

The second part of the process is a group of functions and generators that will fetch
log filenames from the group chat. After receiving each name, it’ll process the log
files directly from Redis, and will update the keys that the copy process is waiting on.
This will also call our callback on each incoming line, updating our aggregates. The
next listing shows the code for the first of these functions.

Listing 6.31The process_logs_from_redis() function
def process_logs_from_redis(conn, id, callback):
   while 1:
      fdata = fetch_pending_messages(conn, id)

Fetch the list of files.

      for ch, mdata in fdata:
         for message in mdata:
            logfile = message['message']


            if logfile == ':done':

No more log files.

            elif not logfile:

            block_reader = readblocks
            if logfile.endswith('.gz'):
               block_reader = readblocks_gz

Choose a block reader.

            for line in readlines(conn, ch+logfile, block_reader):

Iterate over the lines.

               callback(conn, line)

Pass each line to the callback.

            callback(conn, None)

Force a flush of our aggregate caches.

            conn.incr(ch + logfile + ':done')

Report that we’re finished with the log.

   if not fdata:

Receiving information about log files is straightforward, though we do defer a lot of
the hard work of actually reading the file from Redis to helper functions that generate
sequences of log lines. We also need to be careful to notify the file sender by incrementing
the counter for the log file; otherwise the file sending process won’t know to
clean up finished log files.