深入理解 Kafka:Offset 提交、重试机制与重复消费的“罗生门”

在分布式系统的面试和生产实践中,Kafka 的“重复消费”是一个绕不开的话题。很多开发者会有这样的疑惑:“我明明配置了自动提交,代码里也写了 try-catch,为什么数据库里还是会有重复数据?”

要彻底解决这个问题,我们不能只看表象,必须深入 Kafka 的 Offset 提交机制重试策略,理解为什么 Kafka 只能保证 “At Least Once”(至少一次),而不能天然保证 “Exactly Once”(恰好一次)。

一、 Offset 的双重人格:内存 vs 磁盘

在 Kafka 消费者(Consumer)端,Offset(位移) 其实有两个完全不同的概念,理解它们的区别是破解重复消费的第一步。

1. Current Offset(当前读取位置)

  • 存在哪里? 消费者的内存中。
  • 代表什么? “我现在读到哪一行了”。
  • 怎么变? 只要消费者调用 poll() 拉取到了新消息,这个指针就会自动往后移。哪怕你没有任何提交操作,只要进程不挂,消费者就能一直顺畅地往下读。

2. Committed Offset(已提交位置)

  • 存在哪里? Kafka 服务端的 __consumer_offsets 主题中(持久化)。
  • 代表什么? “我已经处理完并确认到哪一行了”。
  • 怎么变? 只有触发了 commitSync()commitAsync()(无论是自动还是手动),这个指针才会更新。它是消费者崩溃重启后的唯一恢复依据

比喻:

  • Current Offset 是你的眼睛,你看书读到了第 50 页。
  • Committed Offset 是书签,你上次把书签夹在了第 40 页。
  • 如果你没移动书签(提交 Offset)就睡着了(宕机),下次醒来(重启),你只能从第 40 页(书签位置)开始重读,尽管你梦里已经读到了 50 页。

二、 Spring Boot 中的重试机制:一把双刃剑

在使用 Spring for Apache Kafka 时,框架为我们提供了强大的重试机制,但这也埋下了重复消费的伏笔。

1. 阻塞式重试(Default)

Spring Kafka 默认在监听器抛出异常时,会执行原地重试(Blocking Retry)。

  • 机制: 消费者线程暂停拉取新消息,反复调用你的 @KafkaListener 方法处理同一条失败的消息。
  • 后果: 如果一直失败,Offset 永远不会提交。

2. 非阻塞式重试(@RetryableTopic)

为了不阻塞主流程,通常我们会配置死信队列策略:

1
2
3
4
5
6
7
8
9
@RetryableTopic(
attempts = "4",
backoff = @Backoff(delay = 1000),
dltStrategy = DltStrategy.FAIL_ON_ERROR
)
@KafkaListener(topics = "orders")
public void listen(String msg) {
// 业务逻辑
}

  • 机制: 失败后,消息被转发到 orders-retry 主题,主主题的 Offset 立刻提交
  • 风险: 在转发成功但提交 Offset 失败的极低概率下,可能导致主主题重发 + 重试主题消费,造成双倍重复。

三、 谁动了我的数据?重复消费的三大“元凶”

除了代码逻辑错误,Kafka 架构层面导致重复消费的场景主要有以下三种:

场景一:消费者宕机(The Crash)—— 最经典

这是最容易理解的情况。

  1. 消费者拉取了 Offset 100~110 的消息。
  2. 业务逻辑全部处理完毕,数据已入库。
  3. 就在准备提交 Offset 的前一毫秒,消费者进程被 kill -9 或断电了。
  4. 结果: 消费者重启后,问 Kafka:“我上次去哪了?” Kafka 说:“你还在 100。” 于是 100~110 被再次消费。

场景二:消费者“假死”引发 Rebalance(The Timeout)—— 最隐蔽

这是生产环境的大坑。消费者没挂,网络也没断,但消息重复了。

  • 原因: 某批消息处理耗时过长,超过了 max.poll.interval.ms(默认 5 分钟)。
  • 过程:
    1. Kafka 协调者认为该消费者已死,触发 Rebalance(重平衡),将该 Partition 分配给消费者 B。
    2. 消费者 A 其实还在处理,最终处理完成并入库,但提交 Offset 时被拒绝(因为已经不在群组里了)。
    3. 消费者 B 从旧的 Committed Offset 开始,又把这批消息处理了一遍
  • 结果: 两个消费者同时处理同一批数据。

场景三:生产者网络抖动(The Producer)—— 物理层面重复

有时候,Kafka 里的消息本来就是双份的

  • 原因: 生产者发送消息 A 成功,写入了 Broker 磁盘。但在 Broker 返回 ACK 确认包时,网络断了。
  • 过程: 生产者根据重试配置(默认按时间无限重试),再次发送消息 A。Broker 以为是新消息,再次写入。
  • 结果: Log 里存了两条内容一模一样的消息(Offset 不同),消费者自然会消费两次。

四、 终极解决方案:幂等性(Idempotency)

既然 Kafka 无法保证不重复投递,我们就必须在消费端构建防线。接口幂等性是系统稳定性的最后一道墙。

方案 1:利用数据库唯一索引(兜底方案)

利用关系型数据库的 UNIQUE KEY 约束。

1
ALTER TABLE t_order ADD UNIQUE INDEX idx_order_id (order_id);

  • 逻辑: 无论 Kafka 推送多少次,只有第一次 INSERT 会成功。后续的重复请求会报 DuplicateKeyException,我们在代码中捕获并忽略即可。
  • 优点: 数据绝对安全,实现简单。
  • 缺点: 依赖数据库,吞吐量受限。

方案 2:利用 Redis 原子操作(高性能方案)

利用 Redis 的 SETNX(Set if Not Exists)命令。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public void listen(String orderId, String message) {
// 1. 尝试获取锁 / 标记状态
// key = "processed:order:" + orderId
Boolean isNew = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 1, TimeUnit.HOURS);

if (Boolean.FALSE.equals(isNew)) {
// 2. 也是重复消费,直接 ACK,不处理业务
ack.acknowledge();
return;
}

try {
// 3. 执行业务逻辑
processOrder(message);
} catch (Exception e) {
// 4. 如果业务失败,需要删除 Redis key,允许下次重试
stringRedisTemplate.delete(key);
throw e;
}
}
  • 优点: 性能极高,不给数据库造成压力。
  • 注意: 需要合理设置 Key 的过期时间,防止内存无限膨胀。

方案 3:利用redis的set结构

想利用集合结构的内存优势,必须解决“过期”和“大 Key”问题。通常采用 “分桶 + 定期轮转” 的策略。

优化方案:按时间分桶

不要把所有 ID 放在一个 processed_ids 里,而是按时间粒度生成 Key。

  • Key 的设计: idempotent:20231027:10 (表示 2023-10-27 10点的数据)
  • 操作:
    1. 请求来了,计算当前时间所在的桶(Key)。
    2. SADD idempotent:20231027:10 <order_id>
    3. 设置过期: 对这个 Key 设置过期时间。例如,如果你只需要防 1 小时内的重放,那么这个 Key 可以设置 2 小时后过期。
  • 判断: 可能需要查当前桶和上一个时间桶(防止跨小时边界的重复)。

优点:

  1. 利用了 Set/Hash 的紧凑内存结构。
  2. 解决了无限增长问题(整个桶一起过期)。
  3. 缓解了大 Key 问题(将数据分散到了 24 个或更多的小 Key 中)。

五、 总结

Kafka 的设计哲学是 “高吞吐优于精确性”,它把保证数据不丢失(At Least Once)放在首位,而把去重的工作交给了下游业务。

  1. Offset 提交 只是一个“保存进度”的动作,不影响当前的拉取。
  2. 重试机制 是为了系统的容错,但副作用是打乱顺序和重复消费。
  3. 重复消费 无法避免(宕机、Rebalance、网络抖动)。
  4. 幂等性 是必须实现的业务逻辑,Redis + 数据库唯一索引 是黄金搭档。