Redis Stream多消费者组模式:原理、ACK机制与Java实践

在分布式系统中,消息队列是解耦服务、削峰填谷的核心组件。Redis作为高性能的内存数据库,从5.0版本开始引入Stream数据结构,原生支持消息队列功能。相较于Redis以往的Pub/Sub(无持久化)、List(简单队列),Stream提供了持久化、多消费者组、ACK确认、消息回溯等企业级特性,尤其在多消费者组场景下,能灵活实现消息的分发与处理。本文将深入剖析Stream多消费者组的实现原理,重点解读ACK确认机制与Pending List的作用,并结合Java代码演示实际运用场景。

一、Redis Stream核心概念与多消费者组模型

在理解多消费者组之前,我们先明确Stream的几个基础概念:

  • Stream:消息的载体,类似日志文件的结构,每条消息有唯一的ID(格式为“时间戳-序列号”,如1690000000000-0),消息内容以键值对存储。

  • 消费者组(Consumer Group):一组消费者的集合,同一个Stream可以创建多个消费者组。不同消费者组之间是独立的,即同一条消息会被每个消费者组消费一次;而同一个消费者组内的消费者则竞争消费消息,确保一条消息只被组内一个消费者处理。

  • 消费者(Consumer):隶属于某个消费者组,实际处理消息的个体。

  • Pending List(PENDING列表):消费者从组内获取消息后,若未发送ACK确认,消息会被放入该消费者的Pending List中,标记为“已读取但未确认”。

多消费者组的核心模型如下图所示(概念示意):

Stream(消息队列) → 消费者组A(Consumer1、Consumer2) → 各自Pending List

  └→ 消费者组B(Consumer3、Consumer4) → 各自Pending List

例如,一个订单Stream可以创建“支付组”和“物流组”,支付组处理订单支付状态,物流组处理订单发货,两者独立消费同一条订单消息,互不干扰。

二、关键机制:ACK确认与Pending List深度解析

Stream的可靠性依赖于ACK确认机制,而Pending List则是ACK机制的核心载体。这两者共同确保消息“至少被消费一次”(At-Least-Once),避免消息丢失。

2.1 ACK确认机制流程

当消费者从组内获取消息后,消息并不会立即从Stream中删除,而是需要消费者处理完成后主动发送ACK命令(XACK),Redis才会将消息从该消费者的Pending List中移除,标记为“已处理”。具体流程如下:

  1. 消费者通过XREADGROUP命令从组内读取消息(如XREADGROUP GROUP group1 consumer1 COUNT 1 STREAMS stream1 >,其中“>”表示读取组内未被消费过的消息)。

  2. Redis将消息标记为“已分配给consumer1”,并加入consumer1的Pending List。

  3. 消费者处理消息(如业务逻辑计算、数据库写入等)。

  4. 处理成功后,消费者发送XACK stream1 group1 msgId命令确认消息。

  5. Redis收到ACK后,从consumer1的Pending List中删除该消息。

2.2 Pending List的作用与核心操作

Pending List(简称PENDING)是每个消费者组内消费者独有的“未确认消息列表”,它解决了“消息已读取但处理失败”的问题。当消费者处理消息异常(如服务宕机、业务报错)时,消息会一直留在Pending List中,直到被重新处理并ACK。

核心特性:Pending List中的消息会记录三个关键信息——消息ID、消费者ID、未确认时长(idle time),方便后续进行消息重试或死信处理。

操作Pending List的常用命令:

  • XPENDING:查看消费者组或消费者的Pending消息统计,如XPENDING stream1 group1可查看group1的Pending总数、最小/最大idle time等。

  • XPENDING stream1 group1 - + 10 consumer1:查看consumer1的前10条Pending消息(“-”表示最小ID,“+”表示最大ID)。

  • XCLAIM:将Pending List中的消息“认领”给其他消费者处理,适用于消费者宕机后的消息转移,如XCLAIM stream1 group1 consumer2 60000 1690000000000-0(60000表示idle time超过60秒的消息)。

三、Java中的实践:从队列消费与Pending List重试

下面我们通过Java代码演示Stream的核心用法:先消费Stream中的新消息,若处理异常则将消息留在Pending List,下次消费前先处理Pending List中的消息。我们使用Spring Data Redis作为客户端(需引入依赖:spring-boot-starter-data-redis)。

3.1 初始化Stream与消费者组

首先需要创建Stream并初始化消费者组(可在项目启动时执行):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48

import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamInfo;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.Map;

@Component
public class StreamInitializer {
private final RedisTemplate<String, Object> redisTemplate;
private static final String STREAM_KEY = "order_stream";
private static final String GROUP_NAME = "order_group";

public StreamInitializer(RedisTemplate<String, Object> redisTemplate) {
this.redisTemplate = redisTemplate;
}

@PostConstruct
public void initStreamAndGroup() {
redisTemplate.execute((RedisConnection connection) -> {
// 检查Stream是否存在,不存在则创建(可选,XADD会自动创建)
StreamInfo.XInfoStream info = connection.streamCommands().xInfoStream(STREAM_KEY.getBytes());
if (info == null) {
System.out.println("Stream不存在,创建Stream:" + STREAM_KEY);
}
// 检查消费者组是否存在,不存在则创建
try {
connection.streamCommands().xInfoGroup(STREAM_KEY.getBytes(), GROUP_NAME.getBytes());
System.out.println("消费者组已存在:" + GROUP_NAME);
} catch (Exception e) {
// MKSTREAM选项:若Stream不存在则创建
connection.streamCommands().xGroupCreate(
STREAM_KEY.getBytes(),
GROUP_NAME.getBytes(),
ReadOffset.latest(),
true
);
System.out.println("创建消费者组:" + GROUP_NAME);
}
return null;
});
}
}

3.2 消息生产者

模拟发送订单消息到Stream:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32

import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;

@Component
public class StreamProducer {
private final RedisTemplate<String, Object> redisTemplate;
private static final String STREAM_KEY = "order_stream";

public StreamProducer(RedisTemplate<String, Object> redisTemplate) {
this.redisTemplate = redisTemplate;
}

// 发送订单消息
public void sendOrderMessage(OrderMessage message) {
// ObjectRecord会自动将对象序列化为Hash(需配置RedisTemplate的序列化方式)
ObjectRecord<String, OrderMessage> record = ObjectRecord.create(STREAM_KEY, message);
// XADD命令:添加消息到Stream
redisTemplate.opsForStream().add(record);
System.out.println("发送消息:" + message);
}

// 订单消息实体
public static class OrderMessage {
private String orderId;
private String userId;
private Double amount;

// getter、setter、toString
}
}

3.3 消息消费者:优先处理Pending List

核心逻辑:消费者启动后,先处理Pending List中的消息,处理完后再持续消费Stream中的未被消费的消息;若处理消息异常,不发送ACK,消息留在Pending List下次重试。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72

//定义线程池
private static final ExecutorService SAVE_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();

//启动时执行 持续监听消息队列
@PostConstruct
public void init() {
SAVE_ORDER_EXECUTOR.submit(new VoucherOrderTask());
}

private class VoucherOrderTask implements Runnable{
@Override
public void run() {
while(true){
try {
//从消息队列里面读取消息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS streams.order >
List<MapRecord<String, Object, Object>> read = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
StreamOffset.create("stream.order", ReadOffset.lastConsumed())
);
if(read == null || read.isEmpty())
{
continue;
}
//获取消息 并且将消息转化为订单
MapRecord<String, Object, Object> record = read.getFirst();
Map<Object, Object> orderMap = record.getValue();
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(orderMap, new VoucherOrder(), true);
voucherOrderSaveHandler(voucherOrder);
//确认消息 发送ack
stringRedisTemplate.opsForStream().acknowledge("stream.order","g1",record.getId());

} catch (Exception e) {
log.error("处理订单异常");
//遇到异常应先处理pending list中的消息 确保每个消息都被处理一次
handPendingMessagesList();
}
}
}
}

private void handPendingMessagesList() {
while (true){
try {
//从pending队列里面获取待处理的消息
List<MapRecord<String, Object, Object>> read = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1),
StreamOffset.create("stream.order", ReadOffset.from("0"))
);
if(read == null || read.isEmpty())
{
break;
}
//获取消息 并且将消息转化为订单
MapRecord<String, Object, Object> record = read.getFirst();
Map<Object, Object> orderMap = record.getValue();
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(orderMap, new VoucherOrder(), true);
voucherOrderSaveHandler(voucherOrder);
//确认消息 发送ack
stringRedisTemplate.opsForStream().acknowledge("stream.order","g1",record.getId());
} catch (Exception e) {
log.error("处理pending-list订单异常");
try {
Thread.sleep(50);
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
}
}
}

四、总结与注意事项

Redis Stream的多消费者组模式通过ACK机制和Pending List,实现了消息的可靠消费,尤其适合中小型分布式系统的轻量级消息队列场景。但在使用时需注意以下几点:

  • 消息持久化:Stream消息默认持久化到Redis的RDB/AOF中,需确保Redis的持久化配置合理,避免宕机丢失消息。

  • Pending List积压:需定期监控Pending List的消息量,若积压过多,需排查消费者是否异常,并通过XCLAIM转移消息或设置死信机制(如超过重试次数后转移到其他Stream)。

  • 消费者弹性:同一个消费者组内可动态增减消费者,实现负载均衡,但需避免消费者ID重复。

  • 性能考量:Redis单线程处理命令,高并发场景下需评估Stream的消息生产速率,避免Redis成为瓶颈(可结合分片或集群扩展)。

总之,Redis Stream以其轻量、高效、可靠的特性,为分布式系统提供了一种灵活的消息队列解决方案,尤其在多消费者组和消息重试场景下,展现了独特的优势。