In this story, we will dive into Redis Stream, understand what they are, how they work, and how to easily get started with them.
As the need to process real-time streaming data continues to grow, it’s no surprise that Redis has introduced support for stream-like data structures.
A stream allows us to process and analyze data as it arrives instead of waiting for all the data to be collected before processing it. This real-time approach lets us respond to events as they happen, making it easier to identify and react to patterns, trends, or anomalies as they occur.
Understanding Streams
Imagine you’re boarding a plane today. Your flight is scheduled to depart at 15:30. You arrive at the airport three hours earlier and patiently wait for your flight.
The time to board is getting closer, most passengers have already arrived, and the gate has already been announced. You walk towards the gate and realize there’s already a long line of people waiting to board the plane.
You join the line, and you’re finally allowed in. You board the plane and watch the chaos. People trying to fit their baggage in the overhead compartments, while others can’t find their seats. Definitely not efficient.
Most of these people arrived at the airport hours in advance, but still, they had to wait for all the other passengers to arrive before they could board their plane.
And this is how many applications work today. They wait for big files of data that have been collected throughout the previous day, week, or month… to arrive. And only then process them.
Data that has been sitting there just waiting to be processed.
With airplanes, there’s not much we can do. We won’t dispatch an airplane per passenger. With software applications, it’s a different story.
Imagine all the benefits the users of an application could have if they didn’t have to wait until these big files arrived. If they had access to data as soon as it was available. Realtime data.
A stream allows us to analyze data as soon as it arrives. And this real-time approach allows us to respond to events as they happen, making it easier to identify and react to patterns, trends, or anomalies as they occur.
Moreover, you can have multiple applications or many replicas of the same application processing data in parallel,making it much more efficient to deal with high volumes of data.
Streams offer great flexibility. You can read all the records, read from a specific point in the stream, or read a certain number of records at a time.
Additionally, multiple applications can read from the same stream simultaneously, much like how several people can look at the same ticket to validate passengers’ information.
From there, we can do all sorts of things with our data, like filtering it, grouping it, and analyzing it. For example, you could use it to see how many people on the plane are wearing red shirts or how many people are from a certain city.
Redis’ take on Streams, while inspired by Apache Kafka, keeps the simplicity of Redis and its extremely fun and effective usage.
Understanding Redis Streams
To understand Redis Streams, we need to go through a few of its core concepts. Redis Streams is a sophisticated feature, but you can make sense of it by understanding the following ideas.
It’s a new data structure
Redis Streams is a new data structure. And like all Redis data structures, each Redis Stream is addressed with a key.And that key points to a value of type Stream
.
This means that all basic operations that you can perform on any Redis key-value pair can also be used on a stream.
For example, you can use the DEL
command to delete a stream, and you can also use the powerful and often essential expire
command to set a time to live (TTL) to our keys.
Besides that, just like any other data structure, stream elements are also stored in memory and can be persisted in the disk with RDB. Which can be used to ensure data durability in the event of a crash or restart.
Wanna know more about persistence in Redis? Check this story:
ADD LINK TO OTHER POST
It is immutable
Streams are immutable. The ordering of elements cannot be changed. This is because a stream is a continuous flow of data or information. It represents a sequence of elements, where elements are added to the end of the sequence over time. Therefore, you can only add new data to the end of a stream.
A stream element is a set of key-value pairs
Each stream element is structured as a set of key-value pairs, similar to a Redis Hash (but not the same). It is a collection of related pieces of information, where each piece of information has a name (key) and its corresponding value.
Think of it like a small container holding various labels and their associated details. For instance, you could create a ticket stream element containing fields for ID, title, date & time, seat, and ticket holder’s name.
Each element has a unique time-prefixed ID
Each element of the stream is identified by a unique ID, and since a stream is commonly used to represent a continuous flow of data or information, this ID consists of a timestamp by default.
Streams can be processed by multiple consumers.
Redis provides two ways for streams to be consumed.
The first one is allowing multiple consumers to read the same data from any point in the stream. This is similar to showing your boarding pass to multiple people at different sections of the airport.
First, you’re going to present your ticket to a security guard to be allowed in the boarding zone. Then, you’re going to show your ticket to an airline employee to be allowed on the plane. Multiple people validate the same information.
When you think about applications, we could have multiple applications validating the same tickets and each one doing its own processing on it.
Which is convenient. But what if you have millions of tickets and you want to scale your applications horizontally? You need replicas of your application, all of them performing the same validations.
At the same time, you don’t want the replicas to validate the same tickets. You want each replica to validate a different ticket so that you can process more tickets at a time.
To enable efficient and distributed processing of data, Redis Streams adopted the concept of Consumer Groups.
Consumer groups are a way to organize multiple consumers that work together to process data from a stream. They help distribute the work among different consumers, allowing them to share the load and process the data more efficiently.
Each replica of the same application will be responsible for processing a different ticket. This allows all tickets to be processed more efficiently.
Putting Our Hands in the Fire
Now that we have a fundamental understanding of Redis Streams let’s see how we can manipulate them.
Publishing to a Stream
A publisher in Redis Streams is the agent responsible for adding new elements to the stream. In this role, the publisher is central to real-time data processing. By publishing data to a stream, other parts of your application (the consumers) can react to that data almost instantly.
To add new elements to the stream, we can use the XADD command. It appends an element consisting of key-value pairs to the specified stream.
If the stream doesn’t exist, Redis will create it automatically.
The command also generates a unique identifier for the element, which is based on the current timestamp and a sequence number by default.
Breaking down this command, we have:
- Key of the stream: The name of the stream you want to add the element to.
- ID: The identifier for the element. You can use the special character * to let Redis automatically generate an ID based on the current timestamp and a sequence number.
- Element in the Stream: The key-value pairs that make up the element.
As a publisher, you can add elements to the stream continuously. For instance, in the context of a concert, the ticket scanner could be the publisher. As it scans each ticket, it could publish an element to the stream, including information like the ticket ID, seat number, and element time. Other parts of the system could consume these messages in real-time, for instance, to keep track of the occupied and available seats.
Here is an example of adding a new element to the ‘tickets’ stream:
XADD tickets * name "Raphael De Lio" seat "B12" movieId 53 sessionId 832
By entering this command, Redis will respond with the generated ID for the new element, which can be used later to read or reference the element in the stream.
"1678900012656-0"
The above element can then be consumed by a consumer or consumer group for further processing.
Publishers play a crucial role in Redis Streams, facilitating real-time data processing by pushing new elements to the stream. They are not constrained by the number of consumers; they can continue publishing irrespective of the number of consumers or the speed at which consumers are processing elements.
Consuming from a stream
In a real-time processing scenario, it’s not just about adding elements to the stream but also reading them, analyzing the data, and possibly taking some action based on that data. That’s where stream consumers come into play.
A consumer in Redis Streams reads the data from the stream and processes it. There can be multiple consumers reading from the same stream simultaneously, which allows for parallel processing of the data.
To read elements from a stream, we use the XREAD command.
Breaking down this command, we have:
- COUNT count: This is optional and is used to limit the number of messages returned by the command.
- BLOCK milliseconds: The
BLOCK
option in theXREAD
command is optional and sets a timeout in milliseconds for how long the command should wait if no data is available. If the timeout expires and no data is received, the command returns an empty response. - STREAMS stream_name: The name of the streams from which you want to read elements.
- ID: The ID from which to start reading elements. If you want to read all elements, you can use “0–0” as the ID.
This command will return all elements in the “tickets” stream. And the reason why we use 0–0 to read from the beginning of the stream is that the IDs are incremental by default. They are a combination of timestamps and sequence numbers (1678900012656-0)
. 0–0 would be the lowest one.
But if you want to listen only to new messages, you can use the special $ ID with the BLOCK option. The $ ID will only return messages that have been added to the stream after the XREAD command was sent.
So, it is important to understand that you should use the $ ID only for the first call to XREAD. Later the ID should be the one from the last reported element in the stream. Otherwise, you could miss all the entries that are added in between.
Stream Consumer Groups
As mentioned earlier, Redis Streams allows us to create Consumer Groups. They are particularly useful when the rate of incoming data is high or the processing time per message is significant.
Consumer groups allow you to distribute the data processing among multiple consumers while making sure they won’t read the same elements.
Creating a Consumer Group
First, you need to create a consumer group associated with the stream using the XGROUP command.
The parameters are as follows:
- stream_name: The name of the streams you want to associate with the group.
- group_name: The name you want to give to the consumer group.
- ID: The ID from which the group will start reading. If you want the group to read all elements since the beginning, use “0” as the ID. You can also use
$
ID for reading only elements that were added after the group was created.
Reading from a Consumer Group
To read data from a consumer group, you use the XREADGROUP command. This command is similar to the XREAD command but includes the consumer group and consumer name as additional parameters.
The command reads as follows:
- Consumer Group Name: the name of the group you want to read from.
- Name of the consumer: the name you want to give to this consumer.
- Key of the Stream: name of the streams you want this consumer to read from.
- ID: The ID from which the group will start reading. If you want the group to read all elements since the beginning, use “0” as the ID. You can also use
$
ID for reading only elements that were added after the group was created. In this case,>
means that the consumer should read new elements that haven’t been read by any other consumer in the group.
Acknowledging Messages with XACK
Once a message is processed by a consumer, it needs to be acknowledged. This is done with the XACK command.
This command removes the message from the Pending Entries List (PEL) of the group, indicating that the message has been processed successfully and does not need to be delivered again.
XACK stream_name group_name message_id
An example of acknowledging a message with the id ‘1526569495631–0’ from the “tickets” stream in the “officer_group” consumer group would be:
XACK tickets officer_group 1526569495631–0
Claiming Stalled Messages with XPENDING, XCLAIM and XAUTOCLAIM
If a consumer fails to acknowledge a message within a specified time, it is assumed that the processing of the message has failed.
A list of pending to-be-acknowledged messages can be returned with the XPENDING command:
XPENDING key group [[IDLE min-idle-time] start end count [consumer]]
An example would be of checking which messages are pending to be acknowledged for more than 30 seconds on the tickets stream within the officer_group group:
XPENDING tickets boarding_counter IDLE 30000 - + 10 consumer2
Another consumer can claim ownership of these messages using the XCLAIM command. This command changes the ownership of the message from the original consumer to a new consumer.
XCLAIM stream_name group_name new_consumer min_idle_time message_id
A simple example of a consumer named “consumer3” claiming a message with the id ‘1526569495631–0’ from the “tickets” stream in the “officer_group” group would be:
XCLAIM tickets officer_group consumer3 30000 1526569495631–0
This command claims messages that have been idle for at least 30 seconds (30000 milliseconds).
Remember, Redis doesn’t automatically execute XCLAIM or XACK when using XREADGROUP. These are manual operations that need to be performed by the consumer.
Other commands
XLEN
The XLEN command returns the number of entries in a stream.
XLEN stream_name
This command will return the number of elements in the given stream. For instance, if you want to get the number of elements in the “tickets” stream, you would use:
XLEN tickets
XRANGE and XREVRANGE
XRANGE and XREVRANGE are used to fetch a range of messages from a stream. XRANGE fetches messages in ascending order of IDs, while XREVRANGE fetches them in descending order.
XRANGE stream_name start end [COUNT count]
XREVRANGE stream_name end start [COUNT count]
For instance, to fetch the first 10 messages from the “tickets” stream, you would use:
XRANGE tickets — + COUNT 10
Similarly, to fetch the last 10 messages, you would use:
XREVRANGE tickets + — COUNT 10
XDEL
The XDEL command is used to delete one or more messages from a stream using their IDs.
XDEL stream_name id [id …]
For example, to delete a message with the ID ‘1526569495631–0’ from the “tickets” stream, you would use:
XDEL tickets 1526569495631–0
XPENDING
The XPENDING command is used to fetch data about messages that have been delivered to a group but have not yet been acknowledged.
XPENDING stream_name group_name [start end count] [consumer_name]
For instance, to fetch data about all unacknowledged messages in the “officer_group” group in the “tickets” stream, you would use:
XPENDING tickets officer_group
Note that you can optionally specify a range of IDs and a count to limit the results, as well as a specific consumer name to fetch messages delivered to that consumer.
Remember that Redis provides a rich set of commands for working with streams, and this is just a selection of the most commonly used ones. You should check their official documentation for a list of all commands and a more in-depth explanation of them.
Conclusion
Real-time data processing has become an essential part of many modern applications, providing immediate insights and facilitating rapid decision-making, and Redis Streams serves this purpose byproviding a robust, scalable, simple and fun solution.
At its core, Redis Streams is an ordered collection of elements — each marked with a unique, timestamped ID — that enables multiple publishers to append information and multiple consumers to process it. Key features such as immutability and consumer groups make Redis Streams an ideal tool for managing data in a distributed and efficient manner.
With its support for Streams, Redis offers powerful solutions for managing and processing real-time data. By understanding these features, you can make more informed decisions about how to handle real-time data in your applications.
Leave a Reply