This book covers the use of Redis, an in-memory database/data structure server.

open all | close all

6.5.2 Multiple-recipient publish/subscribe replacement

Single-recipient messaging is useful, but it doesn’t get us far in replacing the PUBLISH
and SUBSCRIBE commands when we have multiple recipients. To do that, we need to
turn our problem around. In many ways, Redis PUBLISH/SUBSCRIBE is like group chat
where whether someone’s connected determines whether they’re in the group chat.
We want to remove that “need to be connected all the time” requirement, and we’ll
implement it in the context of chatting.

Let’s look at Fake Garage Startup’s next problem. After quickly implementing
their user-to-user messaging system, Fake Garage Startup realized that replacing SMS is good, but they’ve had many requests to add group chat functionality. Like before,
their clients may connect or disconnect at any time, so we can’t use the built-in PUBLISH/SUBSCRIBE method.

Figure 6.12Some example chat and user data. The chat ZSETs show users and the maximum IDs
of messages in that chat that they’ve seen. The seen ZSETs list chat IDs per user, again with the
maximum message ID in the given chat that they’ve seen.

Each new group chat will have a set of original recipients of the group messages,
and users can join or leave the group if they want. Information about what users are in
the chat will be stored as a ZSET with members being the usernames of the recipients,
and values being the highest message ID the user has received in the chat. Which
chats an individual user is a part of will also be stored as a ZSET, with members being
the groups that the user is a part of, and scores being the highest message ID that the
user has received in that chat. Information about some users and chats can be seen in
figure 6.12.

As you can see, user jason22 has seen five of six chat messages sent in chat:827, in
which jason22 and jeff24 are participating.


The content of chat sessions themselves will be stored in ZSETs, with messages as members
and message IDs as scores. To create and start a chat, we’ll increment a global
counter to get a new chat ID. We’ll then create a ZSET with all of the users that we want
to include with seen IDs being 0, and add the group to each user’s group list ZSET.
Finally, we’ll send the initial message to the users by placing the message in the chat
ZSET. The code to create a chat is shown here.

Listing 6.24The create_chat() function
def create_chat(conn, sender, recipients, message, chat_id=None):
   chat_id = chat_id or str(conn.incr('ids:chat:'))

Get a new chat ID.

   recipientsd = dict((r, 0) for r in recipients)

Set up a dictionary of users-toscores to add to the chat ZSET.

      pipeline = conn.pipeline(True)
      pipeline.zadd('chat:' + chat_id, **recipientsd)

Create the set with the list of people participating.

      for rec in recipients:
         pipeline.zadd('seen:' + rec, chat_id, 0)

Initialize the seen ZSETs.


      return send_message(conn, chat_id, sender, message)

Send the message.

About the only thing that may be surprising is our use of what’s called a generator expression
from within a call to the dict() object constructor. This shortcut lets us
quickly construct a dictionary that maps users to an initially 0-valued score, which
ZADD can accept in a single call.

can be easily constructed by passing a sequence of pairs of values. The first item
in the pair becomes the key; the second item becomes the value. Listing 6.24
shows some code that looks odd, where we actually generate the sequence to be
passed to the dictionary in-line. This type of sequence generation is known as
a generator expression, which you can read more about at http://mng.bz/TTKb.


To send a message, we must get a new message ID, and then add the message to the
chat’s messages ZSET. Unfortunately, there’s a race condition in sending messages, but
it’s easily handled with the use of a lock from section 6.2. Our function for sending a
message using a lock is shown next.

Listing 6.25The send_message() function
def send_message(conn, chat_id, sender, message):
   identifier = acquire_lock(conn, 'chat:' + chat_id)
   if not identifier:
      raise Exception("Couldn't get the lock")
      mid = conn.incr('ids:' + chat_id)
      ts = time.time()
      packed = json.dumps({
         'id': mid,
         'ts': ts,
         'sender': sender,
         'message': message,

Prepare the message.

      conn.zadd('msgs:' + chat_id, packed, mid)

Send the message to the chat.

      release_lock(conn, 'chat:' + chat_id, identifier)
   return chat_id

Most of the work involved in sending a chat message is preparing the information to
be sent itself; actually sending the message involves adding it to a ZSET. We use locking
around the packed message construction and addition to the ZSET for the same reasons
that we needed a lock for our counting semaphore earlier. Generally, when we
use a value from Redis in the construction of another value we need to add to Redis, we’ll either need to use a WATCH/MULTI/EXEC transaction or a lock to remove race conditions.
We use a lock here for the same performance reasons that we developed it in the first place.

Now that we’ve created the chat and sent the initial message, users need to find
out information about the chats they’re a part of and how many messages are pending,
and they need to actually receive the messages.


To fetch all pending messages for a user, we need to fetch group IDs and message IDs
seen from the user’s ZSET with ZRANGE. When we have the group IDs and the messages
that the user has seen, we can perform ZRANGEBYSCORE operations on all of the message
ZSETs. After we’ve fetched the messages for the chat, we update the seen ZSET
with the proper ID and the user entry in the group ZSET, and we go ahead and clean
out any messages from the group chat that have been received by everyone in the
chat, as shown in the following listing.

Listing 6.26The fetch_pending_messages() function
def fetch_pending_messages(conn, recipient):
   seen = conn.zrange('seen:' + recipient, 0, -1, withscores=True)

Get the last message IDs received.

   pipeline = conn.pipeline(True)
   for chat_id, seen_id in seen:
         'msgs:' + chat_id, seen_id+1, 'inf')

Fetch all new messages.

   chat_info = zip(seen, pipeline.execute())

Prepare information about the data to be returned.

   for i, ((chat_id, seen_id), messages) in enumerate(chat_info):
      if not messages:
      messages[:] = map(json.loads, messages)
      seen_id = messages[-1]['id']
      conn.zadd('chat:' + chat_id, recipient, seen_id)

Update the “chat” ZSET with the most recently received message.

      min_id = conn.zrange(
         'chat:' + chat_id, 0, 0, withscores=True)

Discover messages that have been seen by all users.

      pipeline.zadd('seen:' + recipient, chat_id, seen_id)

Update the “seen” ZSET.

      if min_id:
            'msgs:' + chat_id, 0, min_id[0][1])

Clean out messages that have been seen by all users.

      chat_info[i] = (chat_id, messages)

   return chat_info

Fetching pending messages is primarily a matter of iterating through all of the chats
for the user, pulling the messages, and cleaning up messages that have been seen by
all users in a chat.


We’ve sent and fetched messages from group chats; all that remains is joining and
leaving the group chat. To join a group chat, we fetch the most recent message ID for the chat, and we add the chat information to the user’s seen ZSET with the score being
the most recent message ID. We also add the user to the group’s member list, again
with the score being the most recent message ID. See the next listing for the code for
joining a group.

Listing 6.27The join_chat() function
def join_chat(conn, chat_id, user):
   message_id = int(conn.get('ids:' + chat_id))

Get the most recent message ID for the chat.

   pipeline = conn.pipeline(True)
   pipeline.zadd('chat:' + chat_id, user, message_id)

Add the user to the chat member list.

   pipeline.zadd('seen:' + user, chat_id, message_id)

Add the chat to the user’s seen list.


Joining a chat only requires adding the proper references to the user to the chat, and
the chat to the user’s seen ZSET.

To remove a user from the group chat, we remove the user ID from the chat ZSET,
and we remove the chat from the user’s seen ZSET. If there are no more users in the
chat ZSET, we delete the messages ZSET and the message ID counter. If there are users
remaining, we’ll again take a pass and clean out any old messages that have been seen
by all users. The function to leave a chat is shown in the following listing.

Listing 6.28The leave_chat() function
def leave_chat(conn, chat_id, user):
   pipeline = conn.pipeline(True)
   pipeline.zrem('chat:' + chat_id, user)
   pipeline.zrem('seen:' + user, chat_id)

Remove the user from the chat.

   pipeline.zcard('chat:' + chat_id)

Find the number of remaining group members.

   if not pipeline.execute()[-1]:
      pipeline.delete('msgs:' + chat_id)
      pipeline.delete('ids:' + chat_id)

Delete the chat.

      oldest = conn.zrange(
         'chat:' + chat_id, 0, 0, withscores=True)

Find the oldest message seen by all users.

      conn.zremrangebyscore('chat:' + chat_id, 0, oldest)

Delete old messages from the chat.

Cleaning up after a user when they leave a chat isn’t that difficult, but requires taking
care of a lot of little details to ensure that we don’t end up leaking a ZSET or ID somewhere.

We’ve now finished creating a complete multiple-recipient pull messaging system
in Redis. Though we’re looking at it in terms of chat, this same method can be used to
replace the PUBLISH/SUBSCRIBE functions when you want your recipients to be able to
receive messages that were sent while they were disconnected. With a bit of work, we
could replace the ZSET with a LIST, and we could move our lock use from sending a
message to old message cleanup. We used a ZSET instead, because it saves us from having
to fetch the current message ID for every chat. Also, by making the sender do more work (locking around sending a message), the multiple recipients are saved
from having to request more data and to lock during cleanup, which will improve performance

We now have a multiple-recipient messaging system to replace PUBLISH and SUBSCRIBE
for group chat. In the next section, we’ll use it as a way of sending information
about key names available in Redis.