Try Redis Cloud Essentials for Only $5/Month!

Learn More

Low-latency message queue & broker software

What is a message broker?

Modern software applications have moved from being a single monolithic unit to loosely coupled collections of services. While this new architecture brings many benefits, those services still need to interact with each other, creating the need for robust and efficient messaging solutions.

Redis Streams doubles as a communication channel for building streaming architectures and as a log-like data structure for persisting data, making Streams the perfect solution for event sourcing.

Redis Pub/Sub is an extremely lightweight messaging protocol designed for broadcasting live notifications within a system. It’s ideal for propagating short-lived messages when low latency and huge throughput are critical.

Redis Lists and Redis Sorted Sets are the basis for implementing message queues. They can be used both directly to build bespoke solutions, or via a framework that makes message processing more idiomatic for your programming language of choice.

Challenges in building message broker solutions

1. Communication between services must be reliable

When one service wants to communicate with another service, it can’t always do so immediately. Failures happen and independent deployments might make a service unavailable for periods of time. For applications at scale, it’s not a matter of “if” or “when” a service becomes unavailable, but how often. To mitigate this problem, a best practice is to limit the amount of synchronous communication between services (i.e., directly calling the service’s APIs, for example by sending a HTTP(S) request) and instead prefer persisted channels whenever practical, so that services can then consume messages at their convenience. The two main paradigms for this type of asynchronous communication are event streams and message queues.

Message queues

  1. Message queues are based on mutable lists and are sometimes consumed through tools that help implement common patterns. There are two main differences between message queues and event streams: Message queues use a “push” type of communication—a service pushes a new message to another service’s inbox whenever something new needs attention. Streams operate in the opposite way.
  2. Messages contain mutable state (e.g., number of retries) and, when successfully processed, they are deleted from the system. Stream events are immutable and the history, when trimmed, is often saved in cold storage.

Task Queues

A task queue is a way to distribute work across threads or machines.  It works by adding tasks to a queue and having worker processes pick tasks off the queue and execute them.  This allows you to parallelize the execution of tasks which can make your application more efficient and responsive.

In task queues tasks are added to a queue using the “push” operation.  Workers can then use the “pop” operation to retrieve and execute tasks from the queue.  If a worker is unable to complete a task they can “bury” the task, which moves it to a separate queue for later processing.   

Redis task queues are useful in a variety of situations, such as:

  • Offloading long-running tasks from a web server to improve performance and scalability
  • Executing periodic tasks, such as sending emails or performing backups
  • Processing large amounts of data, such as data import or export

To use task queues you will need to set up a Redis server and install the Redis Python library.  Then you can use the Python client library to add tasks to the queue and create worker processes to execute them.

Redis Lists and Sorted Sets are the two data types that implement this type of behavior and both can be used to build bespoke solutions, as well as backends for ecosystem-specific frameworks such as Celery (Python), Bull (JavaScript), Sidekiq (Ruby), Machinery (Go), and many others.

Event streams

Event streams are based on the log data type, which is extremely efficient at seeking through its history and appending new items to its end. These two properties make the immutable log both a great communication primitive, and an efficient way to store data.

Communicating through a stream is different than using a message queue. As mentioned previously, message queues are “push,” while streams are “pull.” In practice, this means that each service writes to its own stream and other services will optionally observe (i.e. “pull” from) it. This makes one-to-many communication much more efficient than with message queues.

Message queues work best when one service wants another one to perform an operation. In that situation, the second service’s message queue acts as a “request inbox.” When a service needs instead to publish an event (i.e., a message that is of interest to multiple services), the publishing service will need to push a message to the queue of each service interested in the event. In practice most tools (e.g. Enterprise Service Buses) can do that transparently, but generating and storing a separate message copy for each recipient remains inefficient.

Event streams outperform message queues in one-to-many communication patterns by inverting the protocol: only one copy of the original event exists, and whichever service wants to access it can seek through the event stream (i.e. the publishing service’s stream) at its own pace. Event streams have another practical advantage over message queues: you don’t need to specify the event-subscribers in advance. In message queues, the system needs to know to which queues to deliver a copy of the event, so if you add a new service later, it will receive only new events. With event streams, this problem doesn’t exist—a new service can even be made to go through the full event history, which is great for adding new analytics and still being able to compute them retroactively. This means that you don’t have to come up immediately with every metric you might need in the future. You can just track the ones you need now, and add more as you go because you know that you will still be able to see the full history even for the ones added at a later date.

2. Storage must be space efficient

Space efficiency is a welcome property for all communication channels that persist messages. For event streams, though, it’s fundamental, as they are often used for long-term information storage. (We mentioned above that immutable logs are fast at appending new entries and at seeking through history.)

Redis Streams is an implementation of the immutable log that uses radix trees as the underlying data structure. Each stream entry is identified by a timestamp and can contain an arbitrary set of field-value pairs. Entries of the same stream can have different fields, but Redis is able to compress multiple events in a row that share the same schema. This means that if your events have stable set of fields you won’t pay a storage price for each field name, letting you use longer and more descriptive key names without any downside.

As noted, streams can be trimmed to remove older entries and deleted history often gets preserved in an archival format. Another feature of Redis Streams is the ability to mark any mid-stream entry as “deleted” to help with compliance with regulations such as GDPR.

Scaling processing throughput

Event streams and message queues help cope with communication bursts. But another problem of direct API invocation is that services can get overwhelmed when traffic spikes. Asynchronous communication channels can act as a buffer, which helps smooth out spikes, but the processing throughput has to be robust enough to sustain normal traffic or the system will collapse and the buffer will need to grow indefinitely.

In Redis Streams it’s possible to increase the processing throughput by reading a stream through a consumer group. Readers that are part of the same consumer group see messages in a mutually exclusive fashion. Of course, a single stream can have multiple consumer groups. In practice, you’ll want to create a separate consumer group for every service, so that each service can then spin up multiple reader instances to increase parallelism as needed.

3. Messaging semantics must be clear

When communicating asynchronously, it’s fundamental to consider possible failure scenarios. A service instance might crash or lose connectivity while processing a message, for example. Because communication failures are inevitable, messaging systems divide themselves into two categories: at-most-once and at-least-once delivery. (Some messaging systems claim to offer exactly once delivery, but that is not the complete picture. In any reliable messaging system, messages will occasionally need to be delivered more than once in order to overcome failures. This is an unavoidable characteristic of communication over unreliable networks.)

To handle failures correctly, all services participating in the system must be able to perform idempotent message processing. ‘Idempotent’ means that the system’s state doesn’t change in the event of duplicate message delivery. Idempotence is typically achieved by applying any necessary state change and saving the last message processed atomically (e.g., in a transaction). This way, failure will never be left in an inconsistent state in the case of failure, and the reader will be able to tell if a given message was already processed or not by checking if the new message identifier precedes the last processed message.

Redis Streams, being a reliable streaming communication channel, is an at-least-once system. When reading a stream through a consumer group, Redis remembers which event was dispatched to which consumer. It’s the consumer’s duty then to properly acknowledge that the message was successfully processed. When a consumer dies, an event might remain stuck. To solve this problem consumer groups offer a way of inspecting the state of pending messages and, when necessary, to reassign an event to another consumer.

We noted above that transactions (and atomic operations) are the main way to achieve idempotency. To help with that, Redis Transactions and Lua scripting allow composition of multiple commands with all-or-nothing transactional semantics.

Redis Pub/Sub is an at-most-once messaging system that allows publishers to broadcast messages to one or more channels. More precisely, Redis Pub/Sub is designed for real-time communication between instances where low latency is of the utmost importance, and as such doesn’t feature any form of persistence or acknowledgment. The result is the leanest possible real-time messaging system, perfect for financial and gaming applications, where every millisecond matters.

Why Redis Enterprise for messaging?

Redis Enterprise is based on a shared-nothing, symmetric architecture that allows dataset sizes to grow linearly and seamlessly without requiring changes to the application code.

Redis Enterprise offers multiple models of high availability and geographic distribution enabling local latencies for your users when needed.

Multiple persistence options (AOF per write or per second and snapshots) with no impact to performance ensures that you don’t have to rebuild your database servers after failures.

Support for extremely large datasets with the use of intelligent tiered access to memory (RAM, persistent memory or Flash) ensures that you can scale your datasets to meet the demands of your users without significantly impacting performance.

How to use a Pub/Sub with Redis Enterprise

Redis Streams and Pub/Sub have stable APIs across different programming languages, so the following Python examples can easily be translated to your language of choice.

Connecting to Redis:

import redis# Connect to a local redis instance
r = redis.Redis(host = 'localhost', port = 6379, db = 0)

Writing to a stream:

event = {"eventType": "purchase", "amount": 5, "item_id": "XXX"}
r.xadd("stream_key", '*', event)
# the `*` means that redis generates and event id automatically

Reading a stream directly:

last_id = '$' # `$` means only new messages
while True:
    events = r.xread({
        "stream_key": last_id
    }, block = 0, count = 10)
for _, e in events:
    print(f "new event, amount: {e['amount']}")
last_id = e['id']

Reading a stream through a consumer group:

# Start by reading any potential pending events# that were not previously aknowledged(e.g., #because of a crash). "0" indicates pending events.
pending = r.xreadgroup("service-1", "consumer-A", {
    "stream_key": "0"
})
pending_ids = []
 
for _, e in pending:
    print(f "old event found, amount: {e['amount']}")
pending_ids.append(e['id'])# mark pending events as processed
r.xack("stream_key", "service-1", * pending_ids)# Now that we handled all previous events, #start asking
for new ones.“ & gt;”
indicates“ only new events”.
while True:
    events = r.xreadgroup(“service - 1”, “consumer - A”, {“
        stream_key”: “ & gt;”
    }, count = 10)
event_ids = []
 
for _, e in events:
    print(f” new event, amount: {
        e[‘amount’]
    }”)
event_ids.append(e[‘id’])
r.xack(“stream_key”, “service - 1”, * event_ids) 
# If we crash before `r.xack`, on reload, 
# we will retry this message batch.

Processing some events, acknowledging and applying changes atomically:

while True:
    events = r.xreadgroup("service-1", "consumer-A", {
            "stream_key": ">"
        }, count = 10 event_ids = []# initiate a redis transaction transaction = r.multi() for _, e in events:
        transaction.incrby(f” item: {
                e[‘item_id’
                }: total”,
                e[‘amount’]) event_ids.append(e[‘id’]) transaction.xack(“stream_key”, “service - 1”, * event_ids) transaction.exec()# If we crash before committing the transaction, none# of the other operations will happen, ensuring consistency.

Publishing on Pub/Sub:

#publish a message to the `redis`
channel
r.publish("redis", "hello world")

Subscribing to a channel on Pub/Sub:

sub = r.pubsub()
sub.subscribe("redis")
while True:
    msg = sub.get_message()
print(f "new message: {msg['data']}")

Subscribing to a pattern on Pub/Sub:

sub = r.pubsub()# this subscription will
return messages# from all channels that start with `red`.
sub.psubscribe("red*") 
    while True:
    msg = sub.get_message()
print(f "new message in channel {msg['channel']}: {msg['data']}")

Next steps