This book covers the use of Redis, an in-memory database/data structure server.

open all | close all

6.4.1 First-in, first-out queues

In the world of queues beyond task queues, normally a few different kinds of queues
are discussed—first-in, first-out (FIFO), last-in first-out (LIFO), and priority queues.
We’ll look first at a first-in, first-out queue, because it offers the most reasonable
semantics for our first pass at a queue, can be implemented easily, and is fast. Later,
we’ll talk about adding a method for coarse-grained priorities, and even later, timebased

Let’s again look back to an example from Fake Game Company. To encourage users
to play the game when they don’t normally do so, Fake Game Company has decided to
add the option for users to opt-in to emails about marketplace sales that have completed
or that have timed out. Because outgoing email is one of those internet services
that can have very high latencies and can fail, we need to keep the act of sending emails for completed or timed-out sales out of the typical code flow for those operations. To
do this, we’ll use a task queue to keep a record of people who need to be emailed and
why, and will implement a worker process that can be run in parallel to send multiple
emails at a time if outgoing mail servers become slow.

The queue that we’ll write only needs to send emails out in a first-come, firstserved
manner, and will log both successes and failures. As we talked about in chapters
3 and 5, Redis LISTs let us push and pop items from both ends with RPUSH/
LPUSH and RPOP/LPOP. For our email queue, we’ll push emails to send onto the right
end of the queue with RPUSH, and pop them off the left end of the queue with LPOP.
(We do this because it makes sense visually for readers of left-to-right languages.)
Because our worker processes are only going to be performing this emailing operation,
we’ll use the blocking version of our list pop, BLPOP, with a timeout of 30 seconds.
We’ll only handle item-sold messages in this version for the sake of simplicity,
but adding support for sending timeout emails is also easy.

Figure 6.9A first-in, first-out queue using a LIST

Our queue will simply be a list of JSON-encoded blobs of data, which will look like figure 6.9.

To add an item to the queue, we’ll
get all of the necessary information
together, serialize it with JSON, and
RPUSH the result onto our email queue.
As in previous chapters, we use JSON because it’s human readable and because there
are fast libraries for translation to/from JSON in most languages. The function that
pushes an email onto the item-sold email task queue appears in the next listing.

Listing 6.18The send_sold_email_via_queue() function
def send_sold_email_via_queue(conn, seller, item, price, buyer):
   data = {
      'seller_id': seller,
      'item_id': item,
      'price': price,
      'buyer_id': buyer,
      'time': time.time()

Prepare the item.

   conn.rpush('queue:email', json.dumps(data))

Push the item onto the queue.

Adding a message to a LIST queue shouldn’t be surprising.

Sending emails from the queue is easy. We use BLPOP to pull items from the email
queue, prepare the email, and finally send it. The next listing shows our function for
doing so.

Listing 6.19The process_sold_email_queue() function
def process_sold_email_queue(conn):
   while not QUIT:
      packed = conn.blpop(['queue:email'], 30)

Try to get a message to send.

      if not packed:

No message to send; try again.

      to_send = json.loads(packed[1])

Load the packed email information.


Send the email using our prewritten emailing function.

      except EmailSendError as err:
         log_error("Failed to send sold email", err, to_send)
         log_success("Sent sold email", to_send)

Similarly, actually sending the email after pulling the message from the queue is also
not surprising. But what about executing more than one type of task?

Multiple Executable Tasks

Because Redis only gives a single caller a popped item, we can be sure that none of the
emails are duplicated and sent twice. Because we only put email messages to send in
the queue, our worker process was simple. Having a single queue for each type of message
is not uncommon for some situations, but for others, having a single queue able
to handle many different types of tasks can be much more convenient. Take the
worker process in listing 6.20: it watches the provided queue and dispatches the JSONencoded
function call to one of a set of known registered callbacks. The item to be
executed will be of the form [‘FUNCTION_NAME’, [ARG1, ARG2, …]].

Listing 6.20The worker_watch_queue() function
def worker_watch_queue(conn, queue, callbacks):
   while not QUIT:
      packed = conn.blpop([queue], 30)

Try to get an item from the queue.

      if not packed:

There’s nothing to work on; try again.

      name, args = json.loads(packed[1])

Unpack the work item.

      if name not in callbacks:
         log_error("Unknown callback %s"%name)

The function is unknown; log the error and try again.


Execute the task.

With this generic worker process, our email sender could be written as a callback and passed with other callbacks.

Task Priorities

Sometimes when working with queues, it’s necessary to prioritize certain operations
before others. In our case, maybe we want to send emails about sales that completed
before we send emails about sales that expired. Or maybe we want to send password
reset emails before we send out emails for an upcoming special event. Remember the
BLPOP/BRPOP commands—we can provide multiple LISTs in which to pop an item
from; the first LIST to have any items in it will have its first item popped (or last if
we’re using BRPOP).

Let’s say that we want to have three priority levels: high, medium, and low. Highpriority
items should be executed if they’re available. If there are no high-priority items, then items in the medium-priority level should be executed. If there are neither
high- nor medium-priority items, then items in the low-priority level should be executed.
Looking at our earlier code, we can change two lines to make that possible in
the updated listing.

Listing 6.21The worker_watch_queues() function
def worker_watch_queues(conn, queues, callbacks):

This is the first changed line to add priority support.

   while not QUIT:
      packed = conn.blpop(queues, 30)

This is the second changed line to add priority support.

      if not packed:

      name, args = json.loads(packed[1])
      if name not in callbacks:
         log_error("Unknown callback %s"%name)

By using multiple queues, priorities can be implemented easily. There are situations
where multiple queues are used as a way of separating different queue items
(announcement emails, notification emails, and so forth) without any desire to be
“fair.” In such situations, it can make sense to reorder the queue list occasionally to be
more fair to all of the queues, especially in the case where one queue can grow quickly
relative to the other queues.

If you’re using Ruby, you can use an open source package called Resque that was
put out by the programmers at GitHub. It uses Redis for Ruby-based queues using
lists, which is similar to what we’ve talked about here. Resque offers many additional
features over the 11-line function that we provided here, so if you’re using Ruby, you
should check it out. Regardless, there are many more options for queues in Redis, and
you should keep reading.