EBOOK – REDIS IN ACTION

This book covers the use of Redis, an in-memory database/data structure server.

open all | close all

10.3.2 Scaling search index size

If there’s one thing we can expect of a search engine, it’s that the search index will
grow over time. As search indexes grow, the memory used by those search indexes also grows. Depending on the speed of the growth, we may or may not be able to keep buying/
renting larger computers to run our index on. But for many of us, getting bigger
and bigger computers is just not possible.

In this section, we’ll talk about how to structure our data to support sharded
search queries, and will include code to execute sharded search queries against a collection
of sharded Redis masters (or slaves of sharded masters, if you followed the
instructions in section 10.3.1).

In order to shard our search queries, we must first shard our indexes so that for
each document that we index, all of the data about that document is on the same
shard. It turns out that our index_document() function from chapter 7 takes a connection
object, which we can shard by hand with the docid that’s passed. Or, because
index_document() takes a connection followed by the docid, we can use our automatic
sharding decorator from listing 10.3 to handle sharding for us.

When we have our documents indexed across shards, we only need to perform
queries against the shards to get the results. The details of what we need to do will
depend on our type of index—whether it’s SORT-based or ZSET-based. Let’s first update
our SORT-based index for sharded searches.

SHARDING SORT-BASED SEARCH

As is the case with all sharded searches, we need a way to combine the results of the
sharded searches. In our implementation of search_and_sort() from chapter 7, we
received a total count of results and the document IDs that were the result of the
required query. This is a great building block to start from, but overall we’ll need to
write functions to perform the following steps:

  1. Perform the search and fetch the values to sort on for a query against a single shard.
  2. Execute the search on all shards.
  3. Merge the results of the queries, and choose the subset desired.

First, let’s look at what it takes to perform the search and fetch the values from a single shard.

Because we already have search_and_sort() from chapter 7, we can start by using
that to fetch the result of a search. After we have the results, we can then fetch the
data associated with each search result. But we have to be careful about pagination,
because we don’t know which shard each result from a previous search came from. So,
in order to always return the correct search results for items 91–100, we need to fetch
the first 100 search results from every shard. Our code for fetching all of the necessary
results and data can be seen in the next listing.

Listing 10.5SORT-based search that fetches the values that were sorted
def search_get_values(conn, query, id=None, ttl=300, sort="-updated",
                     start=0, num=20):

We need to take all of the same parameters to pass on to search_and_sort().

count, docids, id = search_and_sort(
   conn, query, id, ttl, sort, 0, start+num)

First get the results of a search and sort.

key = "kb:doc:%s"
sort = sort.lstrip('-')

pipe = conn.pipeline(False)
for docid in docids:
   pipe.hget(key%docid, sort)
sort_column = pipe.execute()

Fetch the data that the results were sorted by.

data_pairs = zip(docids, sort_column

Pair up the document IDs with the data that it was sorted by.

return count, data_pairs, id

Return the count, data, and cache ID of the results.

This function fetches all of the information necessary from a single shard in preparation
for the final merge. Our next step is to execute the query on all of the shards.

To execute a query on all of our shards, we have two options. We can either run
each query on each shard one by one, or we can execute our queries across all of our
shards simultaneously. To keep it simple, we’ll execute our queries one by one on
each shard, collecting the results together in the next listing.

Listing 10.6A function to perform queries against all shards
def get_shard_results(component, shards, query, ids=None, ttl=300,
               sort="-updated", start=0, num=20, wait=1):

In order to know what servers to connect to, we’ll assume that all of our shard information is kept in the standard configuration location.

   count = 0
   data = []

Prepare structures to hold all of our fetched data.

   ids = ids or shards * [None]

Use cached results if we have any; otherwise, start over.

   for shard in xrange(shards):
      conn = get_redis_connection('%s:%s'%(component, shard), wait)

Get or create a connection to the desired shard.

      c, d, i = search_get_values(
         conn, query, ids[shard], ttl, sort, start, num)

Fetch the search results and their sort values.

      count += c
      data.extend(d)
      ids[shard] = i

Combine this shard’s results with all of the other results.

   return count, data, ids

Return the raw results from all of the shards.

This function works as explained: we execute queries against each shard one at a time
until we have results from all shards. Remember that in order to perform queries against
all shards, we must pass the proper shard count to the get_shard_results() function.

Exercise: Run queries in parallel

Python includes a variety of methods to run calls against Redis servers in parallel.
Because most of the work with performing a query is actually just waiting for Redis
to respond, we can easily use Python’s built-in threading and queue libraries to send
requests to the sharded Redis servers and wait for a response. Can you write a version
of get_shard_results() that uses threads to fetch results from all shards in parallel?

Now that we have all of the results from all of the queries, we only need to re-sort our
results so that we can get an ordering on all of the results that we fetched. This isn’t
terribly complicated, but we have to be careful about numeric and non-numeric sorts,
handling missing values, and handling non-numeric values during numeric sorts. Our
function for merging results and returning only the requested results is shown in the
next listing.

Listing 10.7A function to merge sharded search results
def to_numeric_key(data):
   try:
      return Decimal(data[1] or '0')
   except:
      return Decimal('0')

We’ll use the Decimal numeric type here because it transparently handles both integers and floats reasonably, defaulting to 0 if the value wasn’t numeric or was missing.

def to_string_key(data):
   return data[1] or ''

Always return a string, even if there was no value stored.

def search_shards(component, shards, query, ids=None, ttl=300,
                  sort="-updated", start=0, num=20, wait=1):

We need to take all of the sharding and searching arguments, mostly to pass on to lower-level functions, but we use the sort and search offsets.

   count, data, ids = get_shard_results(
      component, shards, query, ids, ttl, sort, start, num, wait)

Fetch the results of the unsorted sharded search.

   reversed = sort.startswith('-')
   sort = sort.strip('-')
   key = to_numeric_key
   if sort not in ('updated', 'id', 'created'):
      key = to_string_key

Prepare all of our sorting options.

   data.sort(key=key, reverse=reversed)

Actually sort our results based on the sort parameter.

   results = []
   for docid, score in data[start:start+num]:
      results.append(docid)

Fetch just the page of results that we want.

   return count, results, ids

Return the results, including the sequence of cache IDs for each shard.

In order to handle sorting properly, we needed to write two function to convert data
returned by Redis into values that could be consistently sorted against each other.
You’ll notice that we chose to use Python Decimal values for sorting numerically. This
is because we get the same sorted results with less code, and transparent support for
handling infinity correctly. From there, all of our code does exactly what’s expected:
we fetch the results, prepare to sort the results, sort the results, and then return only
those document IDs from the search that are in the requested range.

Now that we have a version of our SORT-based search that works across sharded
Redis servers, it only remains to shard searching on ZSET-based sharded indexes.

SHARDING ZSET-BASED SEARCH

Like a SORT-based search, handling searching for ZSET-based search requires running
our queries against all shards, fetching the scores to sort by, and merging the results properly. We’ll go through the same steps that we did for SORT-based search in this section:
search on one shard, search on all shards, and then merge the results.

To search on one shard, we’ll wrap the chapter 7 search_and_zsort() function on
ZSETs, fetching the results and scores from the cached ZSET in the next listing.

Listing 10.8ZSET-based search that returns scores for each result
def search_get_zset_values(conn, query, id=None, ttl=300, update=1,
                     vote=0, start=0, num=20, desc=True):

We need to accept all of the standard arguments for search_and_zsort().

   count, r, id = search_and_zsort(
      conn, query, id, ttl, update, vote, 0, 1, desc)

Call the underlying search_and_zsort() function to get the cached result ID and total number of results.

   if desc:
      data = conn.zrevrange(id, 0, start + num - 1, withscores=True)
   else:
      data = conn.zrange(id, 0, start + num - 1, withscores=True)

Fetch all of the results we need, including their scores.

   return count, data, id

Return the count, results with scores, and the cache ID.

Compared to the SORT-based search that does similar work, this function tries to keep
things simple by ignoring the returned results without scores, and just fetches the
results with scores directly from the cached ZSET. Because we have our scores already
as floating-point numbers for easy sorting, we’ll combine the function to search on all
shards with the function that merges and sorts the results.

As before, we’ll perform searches for each shard one at a time, combining the
results. When we have the results, we’ll sort them based on the scores that were
returned. After the sort, we’ll return the results to the caller. The function that implements
this is shown next.

Listing 10.9Sharded search query over ZSETs that returns paginated results
def search_shards_zset(component, shards, query, ids=None, ttl=300,
               update=1, vote=0, start=0, num=20, desc=True, wait=1):

We need to take all of the sharding arguments along with all of the search arguments.

   count = 0
   data = []

Prepare structures for data to be returned.

   ids = ids or shards * [None]
   for shard in xrange(shards):

Use cached results, if any; otherwise, start from scratch.

      conn = get_redis_connection('%s:%s'%(component, shard), wait)

Fetch or create a connection to each shard.

      c, d, i = search_get_zset_values(conn, query, ids[shard],
         ttl, update, vote, start, num, desc)

Perform the search on a shard and fetch the scores.

      count += c
      data.extend(d)
      ids[shard] = i

Merge the results together.

   def key(result):

Prepare the simple sort helper to return only information about the score.

      return result[1]
   data.sort(key=key, reversed=desc)

Sort all of the results together.

   results = []
   for docid, score in data[start:start+num]:
      results.append(docid)

Extract the document IDs from the results, removing the scores.

   return count, results, ids

Return the search results to the caller.

With this code, you should have a good idea of the kinds of things necessary for handling
sharded search queries. Generally, when confronted with a situation like this, I
find myself questioning whether it’s worth attempting to scale these queries in this
way. Given that we now have at least working sharded search code, the question is easier
to answer. Note that as our number of shards increase, we’ll need to fetch more
and more data in order to satisfy our queries. At some point, we may even need to delegate
fetching and merging to other processes, or even merging in a tree-like structure.
At that point, we may want to consider other solutions that were purpose-built
for search (like Lucene, Solr, Elastic Search, or even Amazon’s Cloud Search).

Now that you know how to scale a second type of search, we really have only covered
one other problem in other sections that might reach the point of needing to be scaled.
Let’s take a look at what it would take to scale our social network from chapter 8.