EBOOK – REDIS IN ACTION

This book covers the use of Redis, an in-memory database/data structure server.

open all | close all

6.4.2 Delayed tasks

With list-based queues, we can handle single-call per queue, multiple callbacks per
queue, and we can handle simple priorities. But sometimes, we need a bit more. Fake
Game Company has decided that they’re going to add a new feature in their game:
delayed selling. Rather than putting an item up for sale now, players can tell the game
to put an item up for sale in the future. It’s our job to change or replace our task
queue with something that can offer this feature.

There are a few different ways that we could potentially add delays to our queue
items. Here are the three most straightforward ones:

  • We could include an execution time as part of queue items, and if a worker process
    sees an item with an execution time later than now, it can wait for a brief
    period and then re-enqueue the item.
  • The worker process could have a local waiting list for any items it has seen
    that need to be executed in the future, and every time it makes a pass through its while loop, it could check that list for any outstanding items that need to
    be executed.
  • Normally when we talk about times, we usually start talking about ZSETs. What if,
    for any item we wanted to execute in the future, we added it to a ZSET instead of
    a LIST, with its score being the time when we want it to execute? We then have a
    process that checks for items that should be executed now, and if there are any,
    the process removes it from the ZSET, adding it to the proper LIST queue.

We can’t wait/re-enqueue items as described in the first, because that’ll waste the
worker process’s time. We also can’t create a local waiting list as described in the second
option, because if the worker process crashes for an unrelated reason, we lose any
pending work items it knew about. We’ll instead use a secondary ZSET as described in
the third option, because it’s simple, straightforward, and we can use a lock from section
6.2 to ensure that the move is safe.

Each delayed item in the ZSET queue will be a JSON-encoded list of four items: a
unique identifier, the queue where the item should be inserted, the name of the callback
to call, and the arguments to pass to the callback. We include the unique identifier
in order to differentiate all calls easily, and to allow us to add possible reporting features
later if we so choose. The score of the item will be the time when the item should be executed.
If the item can be executed immediately, we’ll insert the item into the list queue
instead. For our unique identifier, we’ll again use a 128-bit randomly generated UUID.
The code to create an (optionally) delayed task can be seen next.

Listing 6.22The execute_later() function
def execute_later(conn, queue, name, args, delay=0):
   identifier = str(uuid.uuid4())

Generate a unique identifier.

   item = json.dumps([identifier, queue, name, args])

Prepare the item for the queue.

   if delay > 0:
      conn.zadd('delayed:', item, time.time() + delay)

Delay the item.

   else:
      conn.rpush('queue:' + queue, item)

Execute the item immediately.

   return identifier

Return the identifier.

When the queue item is to be executed without delay, we continue to use the old listbased
queue. But if we need to delay the item, we add the item to the delayed ZSET. An
example of the delayed queue emails to be sent can be seen in figure 6.10.

Figure 6.10A delayed task queue using a ZSET

Unfortunately, there isn’t a convenient
method in Redis to block on
ZSETs until a score is lower than the
current Unix timestamp, so we need
to manually poll. Because delayed
items are only going into a single
queue, we can just fetch the first item
with the score. If there’s no item, or if the item still needs to wait, we’ll wait a brief period and try again. If there is an item,
we’ll acquire a lock based on the identifier in the item (a fine-grained lock), remove the
item from the ZSET, and add the item to the proper queue. By moving items into queues
instead of executing them directly, we only need to have one or two of these running at
any time (instead of as many as we have workers), so our polling overhead is kept low.
The code for polling our delayed queue is in the following listing.

Listing 6.23The poll_queue() function
def poll_queue(conn):
   while not QUIT:
      item = conn.zrange('delayed:', 0, 0, withscores=True)

Get the first item in the queue.

      if not item or item[0][1] > time.time():
         time.sleep(.01)
         continue

No item or the item is still to be executed in the future.

      item = item[0][0]
      identifier, queue, function, args = json.loads(item)

Unpack the item so that we know where it should go.

      locked = acquire_lock(conn, identifier)
      if not locked:

Get the lock for the item.

We couldn’t get the lock, so skip it and try again.

         continue

      if conn.zrem('delayed:', item):
         conn.rpush('queue:' + queue, item)

Move the item to the proper list queue.

      release_lock(conn, identifier, locked)

Release the lock.

As is clear from listing 6.23, because ZSETs don’t have a blocking pop mechanism like
LISTs do, we need to loop and retry fetching items from the queue. This can increase
load on the network and on the processors performing the work, but because we’re
only using one or two of these pollers to move items from the ZSET to the LIST
queues, we won’t waste too many resources. If we further wanted to reduce overhead,
we could add an adaptive method that increases the sleep time when it hasn’t seen any
items in a while, or we could use the time when the next item was scheduled to help
determine how long to sleep, capping it at 100 milliseconds to ensure that tasks scheduled
only slightly in the future are executed in a timely fashion.

Respecting Priorities

In the basic sense, delayed tasks have the same sort of priorities that our first-in, firstout
queue had. Because they’ll go back on their original destination queues, they’ll be
executed with the same sort of priority. But what if we wanted delayed tasks to execute
as soon as possible after their time to execute has come up?

The simplest way to do this is to add some extra queues to make scheduled tasks
jump to the front of the queue. If we have our high-, medium-, and low-priority
queues, we can also create high-delayed, medium-delayed, and low-delayed queues,
which are passed to the worker_watch_queues() function as ["high-delayed",
"high", "medium-delayed", "medium", "low-delayed", "low"]. Each of the delayed
queues comes just before its nondelayed equivalent.

Some of you may be wondering, “If we’re having them jump to the front of the
queue, why not just use LPUSH instead of RPUSH?” Suppose that all of our workers are
working on tasks for the medium queue, and will take a few seconds to finish. Suppose
also that we have three delayed tasks that are found and LPUSHed onto the front of the
medium queue. The first is pushed, then the second, and then the third. But on the
medium queue, the third task to be pushed will be executed first, which violates our
expectations that things that we want to execute earlier should be executed earlier.

If you use Python and you’re interested in a queue like this, I’ve written a package
called RPQueue that offers delayed task execution semantics similar to the preceding
code snippets. It does include more functionality, so if you want a queue and are already
using Redis, give RPQueue a look at http://github.com/josiahcarlson/rpqueue/.

When we use task queues, sometimes we need our tasks to report back to other
parts of our application with some sort of messaging system. In the next section, we’ll
talk about creating message queues that can be used to send to a single recipient, or
to communicate between many senders and receivers.