e-Book - Redis in Action

This book covers the use of Redis, an in-memory database/data structure server.
  • Foreword
  • Preface
  • Acknowledgments
  • About this Book
  • About the Cover Illustration
  • Part 1: Getting Started
  • Part 2: Core concepts
  • Part 3: Next steps
  • Appendix A
  • Appendix B
  • Buy the paperback

    8.5.2 Serving the data

    In preceding sections and chapters, when we showed functions that made calls to
    Redis, we built on the assumption that we had an existing web server that would be
    calling these functions at just the right time. In the case of a streaming API, the details
    of streaming data to a client can be more complicated than just plugging these functions
    into an existing web service stack. In particular, most web servers operate under
    the assumption that we’ll be returning the entire response to a request at once, but
    this is definitely not the case with a streaming API.

    Responses from a streaming API are received status message by status message as
    they’re produced and matched. Though modern technologies like WebSockets and
    SPDY can offer incremental data production, or even server-side push messages, the
    protocols involved are still in the process of being finalized, and client-side support in
    many programming languages is incomplete. But there is a method of producing
    incremental content with an HTTP server—sending data using the chunked transfer

    In this section, we’ll build a simple web server that supports streaming to clients
    that can handle chunked HTTP responses. This is to support our later sections which
    will actually implement filtering options for streamed message data.

    To build this streaming HTTP web server, we have to delve deeper into the Python
    programming language. In the past, we’ve attempted to keep everything to standard
    functions, and in chapter 6, we even started using generators (that was the code that
    included yield). But here, we’ll have to use Python classes. This is primarily because
    we don’t want to have to build an entire web server from scratch, and Python already
    includes servers that we can mix together to handle all of the difficult parts of web
    serving. If you’ve used classes in other languages, you’ll be comfortable with Python,
    because classes in Python are similar. They’re meant to encapsulate data, with methods
    to manipulate the data. In our case, most of the functionality that we want to use is
    already available in existing libraries; we just need to plug them together.


    Within Python we have a series of socket server libraries that can be mixed together to
    offer varying types of functionality. To start, we’ll create a server that uses threads in order to process each incoming request separately. When the server receives a
    request, the server will create a thread to execute a request handler. This request handler
    is where we’ll perform some initial basic routing for GET and POST HTTP requests.
    Both the threaded server and the request handler are shown in the next listing.

    Listing 8.9Server and request handler for our streaming HTTP server
    class StreamingAPIServer(

    Create a new class called “StreamingAPIServer”.


    This new class should have the ability to create new threads with each request, and should be an HTTPServer.

       daemon_threads = True

    Tell the internals of the threading server to shut down all client request threads if the main server thread dies.

    class StreamingAPIRequestHandler(

    Create a new class called “StreamingAPIRequestHandler”.


    This new class should be able to handle HTTP requests.

       def do_GET(self):

    Create a method that is called do_GET(), which will be executed on GET requests performed against this server.


    Call a helper function that handles the fetching of an identifier for the client.

          if self.path != '/statuses/sample.json':
             return self.send_error(404)

    If the request is not a “sample” or “firehose” streaming GET request, return a “404 not found” error.


    Otherwise, call a helper function that actually handles the filtering.

       def do_POST(self):

    Create a method that is called do_POST(), which will be executed on POST requests performed against this server.


    Call a helper function that handles the fetching of an identifier for the client.

          if self.path != '/statuses/filter.json':
             return self.send_error(404)

    If the request is not a user, keyword, or location filter, return a ‘404 not found’ error.


    Otherwise, call a helper function that actually handles the filtering.

    What we didn’t write is the code that actually starts up the server, but we’ll get to that
    in a moment. For now, you can see that we defined a server that created threads on
    each request. Those threads execute methods on a request handler object, which
    eventually lead to either do_GET() or do_POST(), which handle the two major types of
    streaming API requests: filtered and sampled.

    To actually run this server, we’ll use a bit of Python magic. This magic allows us to
    later import a module to use these predefined classes, or it allows us to run the module
    directly in order to start up a streaming API server. The code that lets us both
    import the module and run it as a daemon can be seen in the next listing.

    Before you put these two blocks of code into a file and run them, remember that
    we’re still missing two functions that are called as part of the streaming API server,
    parse_identifier() and process_filters(), which we’ll cover next.

    Listing 8.10The code to actually start and run the streaming HTTP server
    if __name__ == '__main__':

    Run the block of code below if this module is being run from the command line.

       server = StreamingAPIServer(
          ('localhost', 8080), StreamingAPIRequestHandler)

    Create an instance of the streaming API server listening on localhost port 8080, and use the StreamingAPIRequestHandler to process requests.

       print 'Starting server, use <Ctrl-C> to stop'

    Print an informational line.


    Run the server until someone kills it.


    The first of these two functions is a way of fetching identifying information about the
    client. This basic method extracts an identifier from the request query arguments. For
    a production scenario, we’d want to perform some amount of client validation of the
    identifier. Our simple method to parse an identifier from the request can be seen in
    the next listing.

    Listing 8.11An example function to parse and store the client identifier
    def parse_identifier(handler):
       handler.identifier = None
       handler.query = {}

    Set the identifier and query arguments to be placeholder values.

       if '?' in handler.path:

    If there were query arguments to be placeholder values. arguments as part of the request, process them.

          handler.path, _, query = handler.path.partition('?')

    Extract the query portion from the path and update the path.

          handler.query = urlparse.parse_qs(query)

    Parse the query.

          identifier = handler.query.get('identifier') or [None]

    Fetch the list of query arguments with the name “identifier.”

          handler.identifier = identifier[0]

    Use the first identifier passed.

    That function shouldn’t do anything surprising; we set some initial values for the
    query arguments (if we want to use them later) and the identifier, parse the query
    arguments, and then store the identifier from the query if it was available.


    There’s one final piece to the HTTP server portion of our request—actually sending
    the filtered responses. To prepare to send these filtered messages one by one, we first
    need to verify the requests are valid. Assuming that everything is okay, we must then
    send to the client the notification that we’ll be entering an HTTP mode called chunked
    transfer encoding, which will allow us to send messages one at a time as they come in.
    The function that performs this validation and the actual transfer of streamed messages
    to the client is shown next.

    Listing 8.12A function that will verify the request and stream data to the client
    FILTERS = ('track', 'filter', 'location')

    Keep a listing of filters that need arguments.

    def process_filters(handler):
       id = handler.identifier
       if not id:
          return handler.send_error(401, "identifier missing")

    Return an error if an identifier was not provided by the client.

       method = handler.path.rsplit('/')[-1].split('.')[0]

    Fetch the method; should be one of “sample” or “filter”.

       name = None
       args = None
       if method == 'filter':

    If this is a filtering method, we need to fetch the arguments.

          data = cgi.FieldStorage(

    Parse the POST request to discover the type and arguments to the filter.

          for name in data:
             if name in FILTERS:
                args = data.getfirst(name).lower().split(',')

    Fetch any of the filters provided by the client request.

          if not args:
             return handler.send_error(401, "no filter provided")

    If there were no filters specified, return an error.

       args = handler.query

    For sample requests, pass the query arguments as the “args”.

    handler.send_header('Transfer-Encoding', 'chunked')

    Finally, return a response to the client, informing them that they will be receiving a streaming response.

    quit = [False]

    Use a Python list as a holder for a pass-byreference variable, which will allow us to tell the content filter to stop receiving messages.

    for item in filter_content(id, method, name, args, quit):

    Iterate over the results of the filter.

          handler.wfile.write('%Xrn%srn'%(len(item), item))

    Send the pre-encoded response to the client using the chunked encoding.

       except socket.error:
          quit[0] = True

    If sending to the client caused an error, then we need to tell the subscriber to unsubscribe and shut down.

    if not quit[0]:

    Send the “end of chunks” message to the client if we haven’t already disconnected.

    A few details in this function are tricky, but the basic idea is that we make sure that we
    have an identifier for the client and fetch the filtering arguments for the specific calls.
    If everything is okay, we then announce to the client that we’ll be streaming responses
    and pass the actual filtering off to a generator, which will produce the sequence of
    messages that match the filter criteria.

    And that’s it for the streaming HTTP server. In the next section, we’ll build the
    methods that will filter messages that pass through the system.