EBOOK – REDIS IN ACTION

This book covers the use of Redis, an in-memory database/data structure server.

  • Foreword
  • Preface
  • Acknowledgments
  • About this Book
  • About the Cover Illustration
  • Part 1: Getting Started
  • Part 2: Core concepts
  • Part 3: Next steps
  • Appendix A
  • Appendix B
  • Buy the paperback

    6.4.1 First-in, first-out queues

    In the world of queues beyond task queues, normally a few different kinds of queues
    are discussed—first-in, first-out (FIFO), last-in first-out (LIFO), and priority queues.
    We’ll look first at a first-in, first-out queue, because it offers the most reasonable
    semantics for our first pass at a queue, can be implemented easily, and is fast. Later,
    we’ll talk about adding a method for coarse-grained priorities, and even later, timebased
    queues.

    Let’s again look back to an example from Fake Game Company. To encourage users
    to play the game when they don’t normally do so, Fake Game Company has decided to
    add the option for users to opt-in to emails about marketplace sales that have completed
    or that have timed out. Because outgoing email is one of those internet services
    that can have very high latencies and can fail, we need to keep the act of sending emails for completed or timed-out sales out of the typical code flow for those operations. To
    do this, we’ll use a task queue to keep a record of people who need to be emailed and
    why, and will implement a worker process that can be run in parallel to send multiple
    emails at a time if outgoing mail servers become slow.

    The queue that we’ll write only needs to send emails out in a first-come, firstserved
    manner, and will log both successes and failures. As we talked about in chapters
    3 and 5, Redis LISTs let us push and pop items from both ends with RPUSH/
    LPUSH and RPOP/LPOP. For our email queue, we’ll push emails to send onto the right
    end of the queue with RPUSH, and pop them off the left end of the queue with LPOP.
    (We do this because it makes sense visually for readers of left-to-right languages.)
    Because our worker processes are only going to be performing this emailing operation,
    we’ll use the blocking version of our list pop, BLPOP, with a timeout of 30 seconds.
    We’ll only handle item-sold messages in this version for the sake of simplicity,
    but adding support for sending timeout emails is also easy.

    Figure 6.9A first-in, first-out queue using a LIST

    Our queue will simply be a list of JSON-encoded blobs of data, which will look like figure 6.9.

    To add an item to the queue, we’ll
    get all of the necessary information
    together, serialize it with JSON, and
    RPUSH the result onto our email queue.
    As in previous chapters, we use JSON because it’s human readable and because there
    are fast libraries for translation to/from JSON in most languages. The function that
    pushes an email onto the item-sold email task queue appears in the next listing.

    Listing 6.18The send_sold_email_via_queue() function
    def send_sold_email_via_queue(conn, seller, item, price, buyer):
       data = {
    
          'seller_id': seller,
          'item_id': item,
          'price': price,
          'buyer_id': buyer,
          'time': time.time()
    

    Prepare the item.

       }
    
       conn.rpush('queue:email', json.dumps(data))
    

    Push the item onto the queue.

    Adding a message to a LIST queue shouldn’t be surprising.

    Sending emails from the queue is easy. We use BLPOP to pull items from the email
    queue, prepare the email, and finally send it. The next listing shows our function for
    doing so.

    Listing 6.19The process_sold_email_queue() function
    def process_sold_email_queue(conn):
       while not QUIT:
    
          packed = conn.blpop(['queue:email'], 30)
    

    Try to get a message to send.

          if not packed:
             continue
    
    

    No message to send; try again.

          to_send = json.loads(packed[1])
    

    Load the packed email information.

          try:
    
             fetch_data_and_send_sold_email(to_send)
    

    Send the email using our prewritten emailing function.

          except EmailSendError as err:
             log_error("Failed to send sold email", err, to_send)
          else:
             log_success("Sent sold email", to_send)
    

    Similarly, actually sending the email after pulling the message from the queue is also
    not surprising. But what about executing more than one type of task?

    Multiple Executable Tasks

    Because Redis only gives a single caller a popped item, we can be sure that none of the
    emails are duplicated and sent twice. Because we only put email messages to send in
    the queue, our worker process was simple. Having a single queue for each type of message
    is not uncommon for some situations, but for others, having a single queue able
    to handle many different types of tasks can be much more convenient. Take the
    worker process in listing 6.20: it watches the provided queue and dispatches the JSONencoded
    function call to one of a set of known registered callbacks. The item to be
    executed will be of the form [‘FUNCTION_NAME’, [ARG1, ARG2, …]].

    Listing 6.20The worker_watch_queue() function
    def worker_watch_queue(conn, queue, callbacks):
       while not QUIT:
    
          packed = conn.blpop([queue], 30)
    

    Try to get an item from the queue.

          if not packed:
             continue
    
    

    There’s nothing to work on; try again.

          name, args = json.loads(packed[1])
    

    Unpack the work item.

          if name not in callbacks:
             log_error("Unknown callback %s"%name)
             continue
    

    The function is unknown; log the error and try again.

          callbacks[name](*args)
    

    Execute the task.

    With this generic worker process, our email sender could be written as a callback and passed with other callbacks.

    Task Priorities

    Sometimes when working with queues, it’s necessary to prioritize certain operations
    before others. In our case, maybe we want to send emails about sales that completed
    before we send emails about sales that expired. Or maybe we want to send password
    reset emails before we send out emails for an upcoming special event. Remember the
    BLPOP/BRPOP commands—we can provide multiple LISTs in which to pop an item
    from; the first LIST to have any items in it will have its first item popped (or last if
    we’re using BRPOP).

    Let’s say that we want to have three priority levels: high, medium, and low. Highpriority
    items should be executed if they’re available. If there are no high-priority items, then items in the medium-priority level should be executed. If there are neither
    high- nor medium-priority items, then items in the low-priority level should be executed.
    Looking at our earlier code, we can change two lines to make that possible in
    the updated listing.

    Listing 6.21The worker_watch_queues() function
    def worker_watch_queues(conn, queues, callbacks):
    

    This is the first changed line to add priority support.

       while not QUIT:
    
          packed = conn.blpop(queues, 30)
    

    This is the second changed line to add priority support.

          if not packed:
             continue
    
    
          name, args = json.loads(packed[1])
    
          if name not in callbacks:
             log_error("Unknown callback %s"%name)
             continue
    
          callbacks[name](*args)
    

    By using multiple queues, priorities can be implemented easily. There are situations
    where multiple queues are used as a way of separating different queue items
    (announcement emails, notification emails, and so forth) without any desire to be
    “fair.” In such situations, it can make sense to reorder the queue list occasionally to be
    more fair to all of the queues, especially in the case where one queue can grow quickly
    relative to the other queues.

    If you’re using Ruby, you can use an open source package called Resque that was
    put out by the programmers at GitHub. It uses Redis for Ruby-based queues using
    lists, which is similar to what we’ve talked about here. Resque offers many additional
    features over the 11-line function that we provided here, so if you’re using Ruby, you
    should check it out. Regardless, there are many more options for queues in Redis, and
    you should keep reading.