EBOOK – REDIS IN ACTION

This book covers the use of Redis, an in-memory database/data structure server.

open all | close all

6.6.2 Sending files

In order to get the log data to our logs processors, we’ll have two different components
operating on the data. The first is a script that will be taking the log files and
putting them in Redis under named keys, publishing the names of the keys to a chat
channel using our group chat method from section 6.5.2, and waiting for notification
when they’re complete (to not use more memory than our Redis machine has). It’ll
be waiting for a notification that a key with a name similar to the file stored in Redis
has a value equal to 10, which is our number of aggregation processes. The function
that copies logs and cleans up after itself is shown in the following listing.

Listing 6.30The copy_logs_to_redis() function
def copy_logs_to_redis(conn, path, channel, count=10,
                     limit=2**30, quit_when_done=True):
   bytes_in_redis = 0
   waiting = deque()
   create_chat(conn, 'source', map(str, range(count)), '', channel)

Create the chat that will be used to send messages to clients.

   count = str(count)
   for logfile in sorted(os.listdir(path)):

Iterate over all of the log files.

      full_path = os.path.join(path, logfile)

      fsize = os.stat(full_path).st_size
      while bytes_in_redis + fsize > limit:
         cleaned = _clean(conn, channel, waiting, count)
         if cleaned:
            bytes_in_redis -= cleaned
      else:
            time.sleep(.25)

Clean out finished files if we need more room.

   with open(full_path, 'rb') as inp:
      block = ' '
      while block:
         block = inp.read(2**17)
         conn.append(channel+logfile, block)

Upload the file to Redis.

   send_message(conn, channel, 'source', logfile)

Notify the listeners that the file is ready.

   bytes_in_redis += fsize
   waiting.append((logfile, fsize))

Update our local information about Redis’ memory use.

if quit_when_done:
   send_message(conn, channel, 'source', ':done')

We are out of files, so signal that it’s done.

while waiting:
   cleaned = _clean(conn, channel, waiting, count)
   if cleaned:
      bytes_in_redis -= cleaned
   else:
      time.sleep(.25)

Clean out finished files if we need more room.

def _clean(conn, channel, waiting, count):
   if not waiting:
      return 0
   w0 = waiting[0][0]
   if conn.get(channel + w0 + ':done') == count:
      conn.delete(channel + w0, channel + w0 + ':done')
      return waiting.popleft()[1]
   return 0

How we actually perform the cleanup from Redis.

Copying logs to Redis requires a lot of detailed steps, mostly involving being careful
to not put too much data into Redis at one time and properly cleaning up after ourselves
when a file has been read by all clients. The actual aspect of notifying logs processors
that there’s a new file ready is easy, but the setup, sending, and cleanup are
pretty detailed.