8.5.3 Filtering streamed messages
So far we’ve built a server to serve the streamed messages; now it’s time to filter through
the messages for streaming. We filter the messages so that a client making a request
only sees the messages they’re interested in. Though our social network may not have
a lot of traffic, sites like Twitter, Facebook, or even Google+ will see tens to hundreds of
thousands of events every second. And for both third parties and ourselves, the cost of bandwidth to send all of that information can be quite high, so only sending messages
that match up is important.
In this section, we’ll write functions and classes that will filter posted messages to
be streamed to clients. These filters will plug into the streaming web server we wrote
in section 8.5.2. As I mentioned at the beginning of section 8.5, we’ll support random
sampling of all messages and access to the full firehose, as well as filtering for specific
users, words, and the location of messages.
As mentioned way back in chapter 3, we’ll use Redis PUBLISH and SUBSCRIBE to implement at least part of the streaming functionality. More specifically, when users post messages, we’ll PUBLISH the posted message information to a channel in Redis.
Our filters will SUBSCRIBE to that same channel, receive the message, and yield messages that match the filters back to the web server for sending to the client.
UPDATING STATUS MESSAGE POSTING AND DELETION
Before we get ahead of ourselves, let’s first update our message posting function from
section 8.1.2 and message deletion function from section 8.4 to start producing messages
to filter. We’ll start with posting in the next listing, which shows that we’ve added
a line to our function that sends messages out to be filtered.
All it took was one more line to add streaming support on the posting side. But what about
deletion? The update to status message deletion is shown in the following listing.
At first glance, you’re probably wondering why we’d want to send the entire status
message that’s to be deleted to the channel for filtering. Conceptually, we should only
need to send message-deleted information to clients that received the status message
when it was posted. If we perform the same filtering on deleted messages as we do on
newly posted messages, then we can always send message-deleted notifications to
those clients that would’ve received the original message. This ensures that we don’t
need to keep a record of the status IDs for messages sent to all clients, which simplifies
our server and reduces memory use.
RECEIVING STREAMED MESSAGES FOR FILTERING
Now that we’re sending information about status messages being posted and deleted to
a channel in Redis, we only need to subscribe to that channel to start receiving messages
to filter. As was the case in chapter 3, we’ll need to construct a special pubsub object in
order to subscribe to a channel. When we’ve subscribed to the channel, we’ll perform
our filtering, and produce one of two different messages depending on whether the
message was posted or deleted. The code for handling these operations is next.
As I said before, this function needs to subscribe to a channel in Redis in order to
receive posted/deleted notifications for status messages. But it also needs to handle
cases where the streaming client has disconnected, and it needs to properly clean up
the connection if Redis has been trying to send it too much data.
As we covered in chapter 3, there’s a Redis server setting to determine the maximum
outgoing buffer for subscriptions to support. To ensure that our Redis server
stays up even under heavy load, we’ll probably want to set client-output-bufferlimit
pubsub to lower than the default 32 megabytes per connection. Where to set
the limit will depend on how many clients we expect to support and how much other
data is in Redis.
At this point we’ve built every other layer; it now remains to actually write filtering. I
know, there was a lot of build-up, but you may be surprised to find out that actually filtering
messages isn’t difficult for any of our cases. To create filters, we’ll first define
our create_filters() function in listing 8.16, which will delegate off to one of a variety
of filtering classes, depending on the filter that we want to build. We’ll assume that
clients are sending reasonable arguments, but if you’re considering using any of this
in a production scenario, you’ll want to add validation and verification.
Nothing surprising there: we’re distinguishing the different kinds of filters. The first
filter we’ll create will be the sample filter, which will actually implement the functionality
of the Twitter-style firehose, gardenhose, and spritzer access levels, and anything
in between. The implementation of the sampling filter is shown next.
As you can see, we started using classes again, primarily because we need to encapsulate
data and behavior together. This first class that defines sampling does one interesting
thing—it uses a random number generator seeded with the user-provided
identifier to choose the IDs of status messages that it should accept. This allows the
sampling filters to receive a deleted notification for a message, even if the client had
disconnected (as long as the client reconnected before the delete notification came
through). We use Python sets here to quickly determine whether the ID modulo 100 is
in the group that we want to accept, as Python sets offer O(1) lookup time, compared
to O(n) for a Python list.
Continuing on, we’ll now build the track filter, which will allow users to track
words or phrases in status messages. Similar to our sample filter in listing 8.17, we’ll
use a class to encapsulate the data and filtering functionality together. The filter class
definition is shown in the following listing.
About the only interesting thing about the tracking filter is to make sure that if someone
wants to match a group of words, the filter matches all of the words in the message
and not just some of them. We again use Python sets, which, like Redis SETs, offer
the ability to calculate intersections.
Moving on to the follow filter, we’re trying to match status messages that were
posted by one of a group of users, or where one of the users is mentioned in the message.
The class that implements user matching is shown here.
As before, we continue to use Python sets as a fast way to check whether a name is in
the set of names that we’re looking for, or whether any of the names to match are also
contained in a status message.
We finally get to the location filter. This filter is different from the others in that we
didn’t explicitly talk about adding location information to our status messages. But
because of the way we wrote our create_status() and post_status() functions to
take additional optional keyword arguments, we can add additional information without
altering our status creation and posting functions. The location filter for this
optional data is shown next.
About the only thing that may surprise you about this particular filter is how we’re preparing
the boxes for filtering. We expect that requests will provide location boxes as
comma-separated sequences of numbers, where each chunk of four numbers defines
latitude and longitude ranges (minimum longitude, minimum latitude, maximum
longitude, maximum latitude—the same order as Twitter’s API).
With all of our filters built, a working web server, and the back-end API for everything
else, it’s now up to you to get traffic!