跳到主要内容

Redis 流数据类型

Redis流(Stream)是Redis 5.0引入的一种新的数据类型,专门用于处理消息流。它类似于日志数据结构,允许你将消息追加到流的末尾,并支持按时间顺序读取消息。流数据类型非常适合用于实现消息队列、事件溯源等场景。

什么是Redis流?

Redis流是一个有序的、持久化的消息日志。每条消息都有一个唯一的ID,通常由时间戳和序列号组成。流中的消息是不可变的,一旦被追加到流中,就无法修改或删除(除非使用特殊命令)。

流的主要特点包括:

  • 有序性:消息按照追加的顺序存储,并且可以通过ID进行范围查询。
  • 持久化:流中的数据会持久化到磁盘,确保数据不会丢失。
  • 消费者组:支持多个消费者组,每个消费者组可以独立地读取和处理消息。

基本操作

追加消息

使用 XADD 命令可以向流中追加一条消息。每条消息由一个或多个键值对组成。

bash
XADD mystream * sensor-id 1234 temperature 25.3
  • mystream 是流的名称。
  • * 表示由Redis自动生成消息ID。
  • sensor-idtemperature 是消息的字段名,123425.3 是对应的值。

输出示例:

"1640995200000-0"

读取消息

使用 XREAD 命令可以从流中读取消息。你可以指定从某个ID开始读取,或者从最新的消息开始读取。

bash
XREAD COUNT 2 STREAMS mystream 0
  • COUNT 2 表示最多读取2条消息。
  • STREAMS mystream 0 表示从流的开头开始读取。

输出示例:

1) 1) "mystream"
2) 1) 1) "1640995200000-0"
2) 1) "sensor-id"
2) "1234"
3) "temperature"
4) "25.3"
2) 1) "1640995200001-0"
2) 1) "sensor-id"
2) "5678"
3) "temperature"
4) "26.1"

消费者组

Redis流支持消费者组(Consumer Group),允许多个消费者并行处理消息。每个消费者组可以独立地读取消息,并且每条消息只会被组内的一个消费者处理。

创建消费者组

使用 XGROUP CREATE 命令可以创建一个消费者组。

bash
XGROUP CREATE mystream mygroup 0
  • mystream 是流的名称。
  • mygroup 是消费者组的名称。
  • 0 表示从流的开头开始读取消息。

消费者读取消息

使用 XREADGROUP 命令可以让消费者从消费者组中读取消息。

bash
XREADGROUP GROUP mygroup consumer1 COUNT 1 STREAMS mystream >
  • GROUP mygroup consumer1 表示消费者 consumer1 属于消费者组 mygroup
  • COUNT 1 表示读取1条消息。
  • STREAMS mystream > 表示读取最新的消息。

输出示例:

1) 1) "mystream"
2) 1) 1) "1640995200000-0"
2) 1) "sensor-id"
2) "1234"
3) "temperature"
4) "25.3"

实际应用场景

消息队列

Redis流非常适合用于实现消息队列。生产者可以将消息追加到流中,而消费者可以从流中读取并处理消息。由于流支持消费者组,多个消费者可以并行处理消息,从而提高系统的吞吐量。

事件溯源

在事件溯源(Event Sourcing)系统中,所有的事件都会被记录下来,并且按照发生的顺序存储。Redis流的有序性和持久化特性使其成为实现事件溯源的理想选择。

总结

Redis流是一种强大的数据结构,适用于处理消息流、实现消息队列和事件溯源等场景。通过 XADDXREADXREADGROUP 等命令,你可以轻松地追加、读取和处理消息。消费者组功能进一步增强了流的并行处理能力。

附加资源

练习

  1. 创建一个名为 events 的流,并向其中追加几条消息。
  2. 使用 XREAD 命令从 events 流中读取消息。
  3. 创建一个消费者组,并让多个消费者从流中读取消息。