EBOOK – REDIS IN ACTION

This book covers the use of Redis, an in-memory database/data structure server.

  • Foreword
  • Preface
  • Acknowledgments
  • About this Book
  • About the Cover Illustration
  • Part 1: Getting Started
  • Part 2: Core concepts
  • Part 3: Next steps
  • Appendix A
  • Appendix B
  • Buy the paperback

    6.6.2 Sending files

    In order to get the log data to our logs processors, we’ll have two different components
    operating on the data. The first is a script that will be taking the log files and
    putting them in Redis under named keys, publishing the names of the keys to a chat
    channel using our group chat method from section 6.5.2, and waiting for notification
    when they’re complete (to not use more memory than our Redis machine has). It’ll
    be waiting for a notification that a key with a name similar to the file stored in Redis
    has a value equal to 10, which is our number of aggregation processes. The function
    that copies logs and cleans up after itself is shown in the following listing.

    Listing 6.30The copy_logs_to_redis() function
    def copy_logs_to_redis(conn, path, channel, count=10,
                         limit=2**30, quit_when_done=True):
       bytes_in_redis = 0
       waiting = deque()
    
       create_chat(conn, 'source', map(str, range(count)), '', channel)
    

    Create the chat that will be used to send messages to clients.

       count = str(count)
    
       for logfile in sorted(os.listdir(path)):
    

    Iterate over all of the log files.

          full_path = os.path.join(path, logfile)
    
    
          fsize = os.stat(full_path).st_size
    
          while bytes_in_redis + fsize > limit:
             cleaned = _clean(conn, channel, waiting, count)
             if cleaned:
                bytes_in_redis -= cleaned
          else:
                time.sleep(.25)
    
    

    Clean out finished files if we need more room.

       with open(full_path, 'rb') as inp:
          block = ' '
          while block:
             block = inp.read(2**17)
             conn.append(channel+logfile, block)
    
    

    Upload the file to Redis.

       send_message(conn, channel, 'source', logfile)
    
    

    Notify the listeners that the file is ready.

       bytes_in_redis += fsize
       waiting.append((logfile, fsize))
    
    

    Update our local information about Redis’ memory use.

    if quit_when_done:
       send_message(conn, channel, 'source', ':done')
    
    

    We are out of files, so signal that it’s done.

    while waiting:
       cleaned = _clean(conn, channel, waiting, count)
       if cleaned:
          bytes_in_redis -= cleaned
       else:
          time.sleep(.25)
    
    

    Clean out finished files if we need more room.

    def _clean(conn, channel, waiting, count):
       if not waiting:
          return 0
       w0 = waiting[0][0]
       if conn.get(channel + w0 + ':done') == count:
          conn.delete(channel + w0, channel + w0 + ':done')
          return waiting.popleft()[1]
       return 0
    

    How we actually perform the cleanup from Redis.

    Copying logs to Redis requires a lot of detailed steps, mostly involving being careful
    to not put too much data into Redis at one time and properly cleaning up after ourselves
    when a file has been read by all clients. The actual aspect of notifying logs processors
    that there’s a new file ready is easy, but the setup, sending, and cleanup are
    pretty detailed.