This book covers the use of Redis, an in-memory database/data structure server.

open all | close all

8.5.2 Serving the data

In preceding sections and chapters, when we showed functions that made calls to
Redis, we built on the assumption that we had an existing web server that would be
calling these functions at just the right time. In the case of a streaming API, the details
of streaming data to a client can be more complicated than just plugging these functions
into an existing web service stack. In particular, most web servers operate under
the assumption that we’ll be returning the entire response to a request at once, but
this is definitely not the case with a streaming API.

Responses from a streaming API are received status message by status message as
they’re produced and matched. Though modern technologies like WebSockets and
SPDY can offer incremental data production, or even server-side push messages, the
protocols involved are still in the process of being finalized, and client-side support in
many programming languages is incomplete. But there is a method of producing
incremental content with an HTTP server—sending data using the chunked transfer

In this section, we’ll build a simple web server that supports streaming to clients
that can handle chunked HTTP responses. This is to support our later sections which
will actually implement filtering options for streamed message data.

To build this streaming HTTP web server, we have to delve deeper into the Python
programming language. In the past, we’ve attempted to keep everything to standard
functions, and in chapter 6, we even started using generators (that was the code that
included yield). But here, we’ll have to use Python classes. This is primarily because
we don’t want to have to build an entire web server from scratch, and Python already
includes servers that we can mix together to handle all of the difficult parts of web
serving. If you’ve used classes in other languages, you’ll be comfortable with Python,
because classes in Python are similar. They’re meant to encapsulate data, with methods
to manipulate the data. In our case, most of the functionality that we want to use is
already available in existing libraries; we just need to plug them together.


Within Python we have a series of socket server libraries that can be mixed together to
offer varying types of functionality. To start, we’ll create a server that uses threads in order to process each incoming request separately. When the server receives a
request, the server will create a thread to execute a request handler. This request handler
is where we’ll perform some initial basic routing for GET and POST HTTP requests.
Both the threaded server and the request handler are shown in the next listing.

Listing 8.9Server and request handler for our streaming HTTP server
class StreamingAPIServer(

Create a new class called “StreamingAPIServer”.


This new class should have the ability to create new threads with each request, and should be an HTTPServer.

   daemon_threads = True

Tell the internals of the threading server to shut down all client request threads if the main server thread dies.

class StreamingAPIRequestHandler(

Create a new class called “StreamingAPIRequestHandler”.


This new class should be able to handle HTTP requests.

   def do_GET(self):

Create a method that is called do_GET(), which will be executed on GET requests performed against this server.


Call a helper function that handles the fetching of an identifier for the client.

      if self.path != '/statuses/sample.json':
         return self.send_error(404)

If the request is not a “sample” or “firehose” streaming GET request, return a “404 not found” error.


Otherwise, call a helper function that actually handles the filtering.

   def do_POST(self):

Create a method that is called do_POST(), which will be executed on POST requests performed against this server.


Call a helper function that handles the fetching of an identifier for the client.

      if self.path != '/statuses/filter.json':
         return self.send_error(404)

If the request is not a user, keyword, or location filter, return a ‘404 not found’ error.


Otherwise, call a helper function that actually handles the filtering.

What we didn’t write is the code that actually starts up the server, but we’ll get to that
in a moment. For now, you can see that we defined a server that created threads on
each request. Those threads execute methods on a request handler object, which
eventually lead to either do_GET() or do_POST(), which handle the two major types of
streaming API requests: filtered and sampled.

To actually run this server, we’ll use a bit of Python magic. This magic allows us to
later import a module to use these predefined classes, or it allows us to run the module
directly in order to start up a streaming API server. The code that lets us both
import the module and run it as a daemon can be seen in the next listing.

Before you put these two blocks of code into a file and run them, remember that
we’re still missing two functions that are called as part of the streaming API server,
parse_identifier() and process_filters(), which we’ll cover next.

Listing 8.10The code to actually start and run the streaming HTTP server
if __name__ == '__main__':

Run the block of code below if this module is being run from the command line.

   server = StreamingAPIServer(
      ('localhost', 8080), StreamingAPIRequestHandler)

Create an instance of the streaming API server listening on localhost port 8080, and use the StreamingAPIRequestHandler to process requests.

   print 'Starting server, use <Ctrl-C> to stop'

Print an informational line.


Run the server until someone kills it.


The first of these two functions is a way of fetching identifying information about the
client. This basic method extracts an identifier from the request query arguments. For
a production scenario, we’d want to perform some amount of client validation of the
identifier. Our simple method to parse an identifier from the request can be seen in
the next listing.

Listing 8.11An example function to parse and store the client identifier
def parse_identifier(handler):
   handler.identifier = None
   handler.query = {}

Set the identifier and query arguments to be placeholder values.

   if '?' in handler.path:

If there were query arguments to be placeholder values. arguments as part of the request, process them.

      handler.path, _, query = handler.path.partition('?')

Extract the query portion from the path and update the path.

      handler.query = urlparse.parse_qs(query)

Parse the query.

      identifier = handler.query.get('identifier') or [None]

Fetch the list of query arguments with the name “identifier.”

      handler.identifier = identifier[0]

Use the first identifier passed.

That function shouldn’t do anything surprising; we set some initial values for the
query arguments (if we want to use them later) and the identifier, parse the query
arguments, and then store the identifier from the query if it was available.


There’s one final piece to the HTTP server portion of our request—actually sending
the filtered responses. To prepare to send these filtered messages one by one, we first
need to verify the requests are valid. Assuming that everything is okay, we must then
send to the client the notification that we’ll be entering an HTTP mode called chunked
transfer encoding, which will allow us to send messages one at a time as they come in.
The function that performs this validation and the actual transfer of streamed messages
to the client is shown next.

Listing 8.12A function that will verify the request and stream data to the client
FILTERS = ('track', 'filter', 'location')

Keep a listing of filters that need arguments.

def process_filters(handler):
   id = handler.identifier
   if not id:
      return handler.send_error(401, "identifier missing")

Return an error if an identifier was not provided by the client.

   method = handler.path.rsplit('/')[-1].split('.')[0]

Fetch the method; should be one of “sample” or “filter”.

   name = None
   args = None
   if method == 'filter':

If this is a filtering method, we need to fetch the arguments.

      data = cgi.FieldStorage(

Parse the POST request to discover the type and arguments to the filter.

      for name in data:
         if name in FILTERS:
            args = data.getfirst(name).lower().split(',')

Fetch any of the filters provided by the client request.

      if not args:
         return handler.send_error(401, "no filter provided")

If there were no filters specified, return an error.

   args = handler.query

For sample requests, pass the query arguments as the “args”.

handler.send_header('Transfer-Encoding', 'chunked')

Finally, return a response to the client, informing them that they will be receiving a streaming response.

quit = [False]

Use a Python list as a holder for a pass-byreference variable, which will allow us to tell the content filter to stop receiving messages.

for item in filter_content(id, method, name, args, quit):

Iterate over the results of the filter.

      handler.wfile.write('%Xrn%srn'%(len(item), item))

Send the pre-encoded response to the client using the chunked encoding.

   except socket.error:
      quit[0] = True

If sending to the client caused an error, then we need to tell the subscriber to unsubscribe and shut down.

if not quit[0]:

Send the “end of chunks” message to the client if we haven’t already disconnected.

A few details in this function are tricky, but the basic idea is that we make sure that we
have an identifier for the client and fetch the filtering arguments for the specific calls.
If everything is okay, we then announce to the client that we’ll be streaming responses
and pass the actual filtering off to a generator, which will produce the sequence of
messages that match the filter criteria.

And that’s it for the streaming HTTP server. In the next section, we’ll build the
methods that will filter messages that pass through the system.