This book covers the use of Redis, an in-memory database/data structure server.

open all | close all

8.5.3 Filtering streamed messages

So far we’ve built a server to serve the streamed messages; now it’s time to filter through
the messages for streaming. We filter the messages so that a client making a request
only sees the messages they’re interested in. Though our social network may not have
a lot of traffic, sites like Twitter, Facebook, or even Google+ will see tens to hundreds of
thousands of events every second. And for both third parties and ourselves, the cost of bandwidth to send all of that information can be quite high, so only sending messages
that match up is important.

In this section, we’ll write functions and classes that will filter posted messages to
be streamed to clients. These filters will plug into the streaming web server we wrote
in section 8.5.2. As I mentioned at the beginning of section 8.5, we’ll support random
sampling of all messages and access to the full firehose, as well as filtering for specific
users, words, and the location of messages.

As mentioned way back in chapter 3, we’ll use Redis PUBLISH and SUBSCRIBE to implement at least part of the streaming functionality. More specifically, when users post messages, we’ll PUBLISH the posted message information to a channel in Redis.
Our filters will SUBSCRIBE to that same channel, receive the message, and yield messages that match the filters back to the web server for sending to the client.


Before we get ahead of ourselves, let’s first update our message posting function from
section 8.1.2 and message deletion function from section 8.4 to start producing messages
to filter. We’ll start with posting in the next listing, which shows that we’ve added
a line to our function that sends messages out to be filtered.

Listing 8.13Updated create_status() from listing 8.2 to support streaming filters
def create_status(conn, uid, message, **data):
   pipeline = conn.pipeline(True)
   pipeline.hget('user:%s'%uid, 'login')
   login, id = pipeline.execute()

   if not login:
      return None

      'message': message,
      'posted': time.time(),
      'id': id,
      'uid': uid,
      'login': login,
   pipeline.hmset('status:%s'%id, data)
   pipeline.hincrby('user:%s'%uid, 'posts')
   pipeline.publish('streaming:status:', json.dumps(data))

The added line to send a message to streaming filters

   return id

All it took was one more line to add streaming support on the posting side. But what about
deletion? The update to status message deletion is shown in the following listing.

Listing 8.14Updated delete_status() from listing 8.8 to support streaming filters
def delete_status(conn, uid, status_id):
   key = 'status:%s'%status_id
   lock = acquire_lock_with_timeout(conn, key, 1)
   if not lock:
      return None

   if conn.hget(key, 'uid') != str(uid):  
      return None

   pipeline = conn.pipeline(True)
   status = conn.hgetall(key)

Fetch the status message so that streaming filters can perform the same filters to determine whether the deletion should be passed to the client.

   status['deleted'] = True

Mark the status message as deleted.

   pipeline.publish('streaming:status:', json.dumps(status))

Publish the deleted status message to the stream.

   pipeline.zrem('profile:%s'%uid, status_id)
   pipeline.zrem('home:%s'%uid, status_id)
   pipeline.hincrby('user:%s'%uid, 'posts', -1)

   release_lock(conn, key, lock)
   return True

At first glance, you’re probably wondering why we’d want to send the entire status
message that’s to be deleted to the channel for filtering. Conceptually, we should only
need to send message-deleted information to clients that received the status message
when it was posted. If we perform the same filtering on deleted messages as we do on
newly posted messages, then we can always send message-deleted notifications to
those clients that would’ve received the original message. This ensures that we don’t
need to keep a record of the status IDs for messages sent to all clients, which simplifies
our server and reduces memory use.


Now that we’re sending information about status messages being posted and deleted to
a channel in Redis, we only need to subscribe to that channel to start receiving messages
to filter. As was the case in chapter 3, we’ll need to construct a special pubsub object in
order to subscribe to a channel. When we’ve subscribed to the channel, we’ll perform
our filtering, and produce one of two different messages depending on whether the
message was posted or deleted. The code for handling these operations is next.

Listing 8.15A function to receive and process streamed messages

Use our automatic connection decorator from chapter 5.

def filter_content(conn, id, method, name, args, quit):
   match = create_filters(id, method, name, args)

Create the filter that will determine whether a message should be sent to the client.

   pubsub = conn.pubsub()

Prepare the subscription.

   for item in pubsub.listen():

Receive messages from the subscription.

      message = item['data']
      decoded = json.loads(message)

Get the status message information from the subscription structure.

      if match(decoded):

Check if the status message matched the filter.

         if decoded.get('deleted'):
            yield json.dumps({
               'id': decoded['id'], 'deleted': True})

For deleted messages, send a special “deleted” placeholder for the message.

            yield message

For matched status messages that are not deleted, send the message itself.

      if quit[0]:

If the web server no longer has a connection to the client, stop filtering messages.


Reset the Redis connection to ensure that the Redis server clears its outgoing buffers if this wasn’t fast enough.

As I said before, this function needs to subscribe to a channel in Redis in order to
receive posted/deleted notifications for status messages. But it also needs to handle
cases where the streaming client has disconnected, and it needs to properly clean up
the connection if Redis has been trying to send it too much data.

As we covered in chapter 3, there’s a Redis server setting to determine the maximum
outgoing buffer for subscriptions to support. To ensure that our Redis server
stays up even under heavy load, we’ll probably want to set client-output-bufferlimit
pubsub to lower than the default 32 megabytes per connection. Where to set
the limit will depend on how many clients we expect to support and how much other
data is in Redis.


At this point we’ve built every other layer; it now remains to actually write filtering. I
know, there was a lot of build-up, but you may be surprised to find out that actually filtering
messages isn’t difficult for any of our cases. To create filters, we’ll first define
our create_filters() function in listing 8.16, which will delegate off to one of a variety
of filtering classes, depending on the filter that we want to build. We’ll assume that
clients are sending reasonable arguments, but if you’re considering using any of this
in a production scenario, you’ll want to add validation and verification.

Listing 8.16A factory function to dispatch to the actual filter creation
def create_filters(id, method, name, args):
   if method == 'sample':
      return SampleFilter(id, args)

For the “sample” method, we don’t need to worry about names, just the arguments.

   elif name == 'track':
      return TrackFilter(args)
   elif name == 'follow':
      return FollowFilter(args)
   elif name == 'location':
      return LocationFilter(args)

For the “filter” method, we actually worry about which of the filters we want to apply, so return the specific filters for them.

   raise Exception("Unknown filter")

If no filter matches, then raise an exception.

Nothing surprising there: we’re distinguishing the different kinds of filters. The first
filter we’ll create will be the sample filter, which will actually implement the functionality
of the Twitter-style firehose, gardenhose, and spritzer access levels, and anything
in between. The implementation of the sampling filter is shown next.

Listing 8.17The function to handle firehose, gardenhose, and spritzer
def SampleFilter(id, args):

We’re defining a filter class called “SampleFilter”, which is created by passing “id” and “args” parameters.

   percent = int(args.get('percent', ['10'])[0], 10)

The “args” parameter is actually a dictionary based on the parameters passed as part of the GET request.

   ids = range(100)
   shuffler = random.Random(id)

We use the “id” parameter to randomly choose a subset of IDs, the count of which is determined by the “percent” argument passed.

   keep = set(ids[:max(percent, 1)])

We’ll use a Python set to allow us to quickly determine whether a status message matches our criteria.

   def check(status):

If we create a specially named method called “__call__” on an instance, it will be called if the instance is used like a function.

      return (status['id'] % 100) in keep

To filter status messages, we fetch the status ID, find its value modulo 100, and return whether it’s in the status IDs that we want to accept.

   return check

As you can see, we started using classes again, primarily because we need to encapsulate
data and behavior together. This first class that defines sampling does one interesting
thing—it uses a random number generator seeded with the user-provided
identifier to choose the IDs of status messages that it should accept. This allows the
sampling filters to receive a deleted notification for a message, even if the client had
disconnected (as long as the client reconnected before the delete notification came
through). We use Python sets here to quickly determine whether the ID modulo 100 is
in the group that we want to accept, as Python sets offer O(1) lookup time, compared
to O(n) for a Python list.

Continuing on, we’ll now build the track filter, which will allow users to track
words or phrases in status messages. Similar to our sample filter in listing 8.17, we’ll
use a class to encapsulate the data and filtering functionality together. The filter class
definition is shown in the following listing.

Listing 8.18A filter that matches groups of words that are posted in status messages
def TrackFilter(list_of_strings):
   groups = []
   for group in list_of_strings:
      group = set(group.lower().split())

The filter has been provided with a list of word groups, and the filter matches if a message has all of the words in any of the groups.

      if group:

We’ll only keep groups that have at least 1 word.

   def check(status):
      message_words = set(status['message'].lower().split())

We’ll split words in the message on whitespace.

      for group in groups:

Then we’ll iterate over all of the groups.

         if len(group & message_words) == len(group):
            return True

If all of the words in any of the groups match, we’ll accept the message with this filter.

      return False
   return check

About the only interesting thing about the tracking filter is to make sure that if someone
wants to match a group of words, the filter matches all of the words in the message
and not just some of them. We again use Python sets, which, like Redis SETs, offer
the ability to calculate intersections.

Moving on to the follow filter, we’re trying to match status messages that were
posted by one of a group of users, or where one of the users is mentioned in the message.
The class that implements user matching is shown here.

Listing 8.19Messages posted by or mentioning any one of a list of users
def FollowFilter(names):
   names = set()

We’ll match login names against posters and messages.

   for name in names:
      names.add('@' + name.lower().lstrip('@'))

Store all names consistently as ‘@username’.

   def check(status):
      message_words = set(status['message'].lower().split())
      message_words.add('@' + status['login'].lower())

Construct a set of words from the message and the poster’s name.

      return message_words & names

Consider the message a match if any of the usernames provided match any of the whitespace-separated words in the message.

   return check

As before, we continue to use Python sets as a fast way to check whether a name is in
the set of names that we’re looking for, or whether any of the names to match are also
contained in a status message.

We finally get to the location filter. This filter is different from the others in that we
didn’t explicitly talk about adding location information to our status messages. But
because of the way we wrote our create_status() and post_status() functions to
take additional optional keyword arguments, we can add additional information without
altering our status creation and posting functions. The location filter for this
optional data is shown next.

Listing 8.20Messages within boxes defined by ranges of latitudes and longitudes
def LocationFilter(list_of_boxes):
   boxes = []
   for start in xrange(0, len(list_of_boxes)-3, 4):
      boxes.append(map(float, list_of_boxes[start:start+4]))

We’ll create a set of boxes that define the regions that should return messages.

   def check(self, status):
      location = status.get('location')

Try to fetch “location” data from a status message.

      if not location:
         return False

If the message has no location information, then it can’t be inside the boxes.

      lat, lon = map(float, location.split(','))

Otherwise, extract the latitude and longitude of the location.

      for box in self.boxes:

To match one of the boxes, we need to iterate over all boxes.

         if (box[1] <= lat <= box[3] and
            box[0] <= lon <= box[2]):
            return True

If the message status location is within the required latitude and longitude range, then the status message matches the filter.

      return False
   return check

About the only thing that may surprise you about this particular filter is how we’re preparing
the boxes for filtering. We expect that requests will provide location boxes as
comma-separated sequences of numbers, where each chunk of four numbers defines
latitude and longitude ranges (minimum longitude, minimum latitude, maximum
longitude, maximum latitude—the same order as Twitter’s API).

With all of our filters built, a working web server, and the back-end API for everything
else, it’s now up to you to get traffic!