This book covers the use of Redis, an in-memory database/data structure server.

open all | close all

11.4.4 Performing blocking pops from the sharded LIST

We’ve stepped through pushing items onto both ends of a long LIST, popping items
off both ends, and even written a function to get the total length of a sharded LIST. In
this section, we’ll build a method to perform blocking pops from both ends of the
sharded LIST. In previous chapters, we’ve used blocking pops to implement messaging
and task queues, though other uses are also possible.

Whenever possible, if we don’t need to actually block and wait on a request, we
should use the nonblocking versions of the sharded LIST pop methods. This is
because, with the current semantics and commands available to Lua scripting and
WATCH/MULTI/EXEC transactions, there are still some situations where we may receive
incorrect data. These situations are rare, and we’ll go through a few steps to try to prevent
them from happening, but every system has limitations.

In order to perform a blocking pop, we’ll cheat somewhat. First, we’ll try to perform
a nonblocking pop in a loop until we run out of time, or until we get an item. If
that works, then we’re done. If that doesn’t get an item, then we’ll loop over a few
steps until we get an item, or until our timeout is up.

The specific sequence of operations we’ll perform is to start by trying the nonblocking
pop. If that fails, then we fetch information about the first and last shard IDs.
If the IDs are the same, we then perform a blocking pop on that shard ID. Well, sort of.

Because the shard ID of the end we want to pop from could’ve changed since we
fetched the endpoints (due to round-trip latencies), we insert a pipelined Lua script
EVAL call just before the blocking pop. This script verifies whether we’re trying to pop
from the correct LIST. If we are, then it does nothing, and our blocking pop operation
occurs without issue. But if it’s the wrong LIST, then the script will push an extra
“dummy” item onto the LIST, which will then be popped with the blocking pop operation
immediately following.

There’s a potential race between when the Lua script is executed and when the
blocking pop operation is executed. If someone attempts to pop or push an item from that same shard between when the Lua script is executed and when the blocking pop
operation is executed, then we could get bad data (the other popping client getting
our dummy item), or we could end up blocking on the wrong shard.

EXEC transactions as a way of preventing race conditions through the other
chapters. So why don’t we use WATCH/MULTI/EXEC to prepare information,
and then use a BLPOP/BRPOP operation as the last command before EXEC?
This is because if a BLPOP/BRPOP operation occurs on an empty LIST as part of
a MULTI/EXEC transaction, it’d block forever because no other commands can
be run in that time. To prevent such an error, BLPOP/BRPOP operations within
a MULTI/EXEC block will execute as their nonblocking LPOP/RPOP versions
(except allowing the client to pass multiple lists to attempt to pop from).

To address the issue with blocking on the wrong shard, we’ll only ever block for one
second at a time (even if we’re supposed to block forever). And to address the issue
with our blocking pop operations getting data that wasn’t actually on the end shard,
we’ll operate under the assumption that if data came in between two non-transactional
pipelined calls, it’s close enough to being correct. Our functions for handling
blocking pops can be seen in the next listing.

Listing 11.16Our code to perform a blocking pop from a sharded LIST
DUMMY = str(uuid.uuid4())

Our defined dummy value, which we can change to be something that we shouldn’t expect to see in our sharded LISTs.

def sharded_bpop_helper(conn, key, timeout, pop, bpop, endp, push):

We’ll define a helper function that will actually perform the pop operations for both types of blocking pop operations.

    pipe = conn.pipeline(False)
    timeout = max(timeout, 0) or 2**64
    end = time.time() + timeout

Prepare the pipeline and timeout information.

    while time.time() < end:
        result = pop(conn, key)
        if result not in (None, DUMMY):
            return result

Try to perform a nonblocking pop, returning the value if it isn’t missing or the dummy value.

        shard = conn.get(key + endp) or '0'

Get the shard that we think we need to pop from.

        sharded_bpop_helper_lua(pipe, [key + ':', key + endp],

Run the Lua helper, which will handle pushing a dummy value if we’re popping from the wrong shard.

            [shard, push, DUMMY], force_eval=True)

We use force_eval here to ensure an EVAL call instead of an EVALSHA, because we can’t afford to perform a potentially failing EVALSHA inside a pipeline.

        getattr(pipe, bpop)(key + ':' + shard, 1)

Try to block on popping the item from the LIST, using the proper BLPOP or BRPOP command passed in.

        result = (pipe.execute()[-1] or [None])[-1]
        if result not in (None, DUMMY):
            return result

If we got an item, passed in. then we’re done; otherwise, retry.

    def sharded_blpop(conn, key, timeout=0):
        return sharded_bpop_helper(
            conn, key, timeout, sharded_lpop, 'blpop', ':first', 'lpush')

    def sharded_brpop(conn, key, timeout=0):
        return sharded_bpop_helper(
            conn, key, timeout, sharded_rpop, 'brpop', ':last', 'rpush')

These functions prepare the actual call to the underlying blocking pop operations.

    sharded_bpop_helper_lua = script_load('''
    local shard = redis.call('get', KEYS[2]) or '0'

Get the actual shard for the end we want to pop from.

    if shard ~= ARGV[1] then
        redis.call(ARGV[2], KEYS[1]..ARGV[1], ARGV[3])

If we were going to try to pop from the wrong shard, push an extra value.


There are a lot of pieces that come together to make this actually work, but remember
that there are three basic pieces. The first piece is a helper that handles the loop to
actually fetch the item. Inside this loop, we call the second piece, which is the helper/
blocking pop pair of functions, which handles the blocking portion of the calls. The
third piece is the API that users will actually call, which handles passing all of the
proper arguments to the helper.

For each of the commands operating on sharded LISTs, we could implement them
with WATCH/MULTI/EXEC transactions. But a practical issue comes up when there’s a
modest amount of contention, because each of these operations manipulates multiple
structures simultaneously, and will manipulate structures that are calculated as part of
the transaction itself. Using a lock over the entire structure can help somewhat, but
using Lua improves performance significantly.