Redis Stream多消费者组模式:原理、ACK机制与Java实践 在分布式系统中,消息队列是解耦服务、削峰填谷的核心组件。Redis作为高性能的内存数据库,从5.0版本开始引入Stream数据结构,原生支持消息队列功能。相较于Redis以往的Pub/Sub(无持久化)、List(简单队列),Stream提供了持久化、多消费者组、ACK确认、消息回溯 等企业级特性,尤其在多消费者组场景下,能灵活实现消息的分发与处理。本文将深入剖析Stream多消费者组的实现原理,重点解读ACK确认机制与Pending List的作用,并结合Java代码演示实际运用场景。
一、Redis Stream核心概念与多消费者组模型 在理解多消费者组之前,我们先明确Stream的几个基础概念:
Stream :消息的载体,类似日志文件的结构,每条消息有唯一的ID(格式为“时间戳-序列号”,如1690000000000-0),消息内容以键值对存储。
消费者组(Consumer Group) :一组消费者的集合,同一个Stream可以创建多个消费者组。不同消费者组之间是独立的 ,即同一条消息会被每个消费者组消费一次;而同一个消费者组内的消费者则竞争消费消息,确保一条消息只被组内一个消费者处理。
消费者(Consumer) :隶属于某个消费者组,实际处理消息的个体。
Pending List(PENDING列表) :消费者从组内获取消息后,若未发送ACK确认,消息会被放入该消费者的Pending List中,标记为“已读取但未确认”。
多消费者组的核心模型如下图所示(概念示意):
Stream(消息队列) → 消费者组A(Consumer1、Consumer2) → 各自Pending List
└→ 消费者组B(Consumer3、Consumer4) → 各自Pending List
例如,一个订单Stream可以创建“支付组”和“物流组”,支付组处理订单支付状态,物流组处理订单发货,两者独立消费同一条订单消息,互不干扰。
二、关键机制:ACK确认与Pending List深度解析 Stream的可靠性依赖于ACK确认机制 ,而Pending List则是ACK机制的核心载体。这两者共同确保消息“至少被消费一次”(At-Least-Once),避免消息丢失。
2.1 ACK确认机制流程 当消费者从组内获取消息后,消息并不会立即从Stream中删除,而是需要消费者处理完成后主动发送ACK命令(XACK),Redis才会将消息从该消费者的Pending List中移除,标记为“已处理”。具体流程如下:
消费者通过XREADGROUP命令从组内读取消息(如XREADGROUP GROUP group1 consumer1 COUNT 1 STREAMS stream1 >,其中“>”表示读取组内未被消费过的消息)。
Redis将消息标记为“已分配给consumer1”,并加入consumer1的Pending List。
消费者处理消息(如业务逻辑计算、数据库写入等)。
处理成功后,消费者发送XACK stream1 group1 msgId命令确认消息。
Redis收到ACK后,从consumer1的Pending List中删除该消息。
2.2 Pending List的作用与核心操作 Pending List(简称PENDING)是每个消费者组内消费者独有的“未确认消息列表”,它解决了“消息已读取但处理失败”的问题。当消费者处理消息异常(如服务宕机、业务报错)时,消息会一直留在Pending List中,直到被重新处理并ACK。
核心特性 :Pending List中的消息会记录三个关键信息——消息ID、消费者ID、未确认时长(idle time),方便后续进行消息重试或死信处理。
操作Pending List的常用命令:
XPENDING:查看消费者组或消费者的Pending消息统计,如XPENDING stream1 group1可查看group1的Pending总数、最小/最大idle time等。
XPENDING stream1 group1 - + 10 consumer1:查看consumer1的前10条Pending消息(“-”表示最小ID,“+”表示最大ID)。
XCLAIM:将Pending List中的消息“认领”给其他消费者处理,适用于消费者宕机后的消息转移,如XCLAIM stream1 group1 consumer2 60000 1690000000000-0(60000表示idle time超过60秒的消息)。
三、Java中的实践:从队列消费与Pending List重试 下面我们通过Java代码演示Stream的核心用法:先消费Stream中的新消息,若处理异常则将消息留在Pending List,下次消费前先处理Pending List中的消息 。我们使用Spring Data Redis作为客户端(需引入依赖:spring-boot-starter-data-redis)。
3.1 初始化Stream与消费者组 首先需要创建Stream并初始化消费者组(可在项目启动时执行):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 import org.springframework.data.redis.connection.RedisConnection;import org.springframework.data.redis.connection.stream.Consumer;import org.springframework.data.redis.connection.stream.ObjectRecord;import org.springframework.data.redis.connection.stream.ReadOffset;import org.springframework.data.redis.connection.stream.StreamInfo;import org.springframework.data.redis.core.RedisTemplate;import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;import java.util.Map;@Component public class StreamInitializer { private final RedisTemplate<String, Object> redisTemplate; private static final String STREAM_KEY = "order_stream" ; private static final String GROUP_NAME = "order_group" ; public StreamInitializer (RedisTemplate<String, Object> redisTemplate) { this .redisTemplate = redisTemplate; } @PostConstruct public void initStreamAndGroup () { redisTemplate.execute((RedisConnection connection) -> { StreamInfo.XInfoStream info = connection.streamCommands().xInfoStream(STREAM_KEY.getBytes()); if (info == null ) { System.out.println("Stream不存在,创建Stream:" + STREAM_KEY); } try { connection.streamCommands().xInfoGroup(STREAM_KEY.getBytes(), GROUP_NAME.getBytes()); System.out.println("消费者组已存在:" + GROUP_NAME); } catch (Exception e) { connection.streamCommands().xGroupCreate( STREAM_KEY.getBytes(), GROUP_NAME.getBytes(), ReadOffset.latest(), true ); System.out.println("创建消费者组:" + GROUP_NAME); } return null ; }); } }
3.2 消息生产者 模拟发送订单消息到Stream:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 import org.springframework.data.redis.connection.stream.ObjectRecord;import org.springframework.data.redis.core.RedisTemplate;import org.springframework.stereotype.Component;@Component public class StreamProducer { private final RedisTemplate<String, Object> redisTemplate; private static final String STREAM_KEY = "order_stream" ; public StreamProducer (RedisTemplate<String, Object> redisTemplate) { this .redisTemplate = redisTemplate; } public void sendOrderMessage (OrderMessage message) { ObjectRecord<String, OrderMessage> record = ObjectRecord.create(STREAM_KEY, message); redisTemplate.opsForStream().add(record); System.out.println("发送消息:" + message); } public static class OrderMessage { private String orderId; private String userId; private Double amount; } }
3.3 消息消费者:优先处理Pending List 核心逻辑:消费者启动后,先处理Pending List中的消息 ,处理完后再持续消费Stream中的未被消费的消息;若处理消息异常,不发送ACK,消息留在Pending List下次重试。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 private static final ExecutorService SAVE_ORDER_EXECUTOR = Executors.newSingleThreadExecutor(); @PostConstruct public void init () { SAVE_ORDER_EXECUTOR.submit(new VoucherOrderTask ()); } private class VoucherOrderTask implements Runnable { @Override public void run () { while (true ){ try { List<MapRecord<String, Object, Object>> read = stringRedisTemplate.opsForStream().read( Consumer.from("g1" , "c1" ), StreamReadOptions.empty().count(1 ).block(Duration.ofSeconds(2 )), StreamOffset.create("stream.order" , ReadOffset.lastConsumed()) ); if (read == null || read.isEmpty()) { continue ; } MapRecord<String, Object, Object> record = read.getFirst(); Map<Object, Object> orderMap = record.getValue(); VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(orderMap, new VoucherOrder (), true ); voucherOrderSaveHandler(voucherOrder); stringRedisTemplate.opsForStream().acknowledge("stream.order" ,"g1" ,record.getId()); } catch (Exception e) { log.error("处理订单异常" ); handPendingMessagesList(); } } } } private void handPendingMessagesList () { while (true ){ try { List<MapRecord<String, Object, Object>> read = stringRedisTemplate.opsForStream().read( Consumer.from("g1" , "c1" ), StreamReadOptions.empty().count(1 ), StreamOffset.create("stream.order" , ReadOffset.from("0" )) ); if (read == null || read.isEmpty()) { break ; } MapRecord<String, Object, Object> record = read.getFirst(); Map<Object, Object> orderMap = record.getValue(); VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(orderMap, new VoucherOrder (), true ); voucherOrderSaveHandler(voucherOrder); stringRedisTemplate.opsForStream().acknowledge("stream.order" ,"g1" ,record.getId()); } catch (Exception e) { log.error("处理pending-list订单异常" ); try { Thread.sleep(50 ); } catch (InterruptedException ex) { throw new RuntimeException (ex); } } } }
四、总结与注意事项 Redis Stream的多消费者组模式通过ACK机制和Pending List,实现了消息的可靠消费,尤其适合中小型分布式系统的轻量级消息队列场景。但在使用时需注意以下几点:
消息持久化 :Stream消息默认持久化到Redis的RDB/AOF中,需确保Redis的持久化配置合理,避免宕机丢失消息。
Pending List积压 :需定期监控Pending List的消息量,若积压过多,需排查消费者是否异常,并通过XCLAIM转移消息或设置死信机制(如超过重试次数后转移到其他Stream)。
消费者弹性 :同一个消费者组内可动态增减消费者,实现负载均衡,但需避免消费者ID重复。
性能考量 :Redis单线程处理命令,高并发场景下需评估Stream的消息生产速率,避免Redis成为瓶颈(可结合分片或集群扩展)。
总之,Redis Stream以其轻量、高效、可靠的特性,为分布式系统提供了一种灵活的消息队列解决方案,尤其在多消费者组和消息重试场景下,展现了独特的优势。