基于 CDC + MQ 的 DB 与缓存双写一致性终极架构

在分布式的环境下,“数据库与缓存的双写一致性”是一个永恒的难题。尤其是在高并发、高吞吐、且存在大量热点数据的核心业务场景下,稍有不慎就会引发缓存击穿、数据乱序覆盖等灾难。

在我的项目中主要考虑有两种方案:

  • 方案一:业务代码更新DB后,发送MQ消息(附带业务版本号),消费者校验版本号后更新缓存。
  • 方案二:引入 CDC(如 Canal/Debezium)监听 Binlog,根据主键 Hash 发送到 MQ 同一分区,利用严格顺序性更新缓存(无需业务层维护版本号)。

经过深思熟虑和反复推演,对于追求高性能和高并发的核心系统,我最终拍板了方案二(CDC + MQ),并针对其中的并发陷阱进行了深度改造。本文将为你硬核拆解这套方案的演进过程及底层细节。


一、 为什么抛弃“业务发MQ”,选择“CDC监听”?

在高并发主链路上,任何多余的跨网络交互都是性能杀手。

方案一最大的硬伤在于双写一致性与性能的矛盾。如果先写DB再发MQ,遇到网络抖动MQ发送失败,缓存永远是脏数据;如果引入本地消息表或事务消息,又会极大拖慢主接口的响应时间。

引入 CDC 的最大魅力在于“极致的解耦”:
主链路业务只需要一句 SQL 更新数据库,直接返回响应,性能达到数据库的物理极限。CDC 组件伪装成从库,异步拉取 Binlog 并将其投递到 MQ。由于 Binlog 的存在,系统获得了天生的“最终一致性”保障——只要数据落盘,消息就必定送达。


二、 架构进阶:热点数据的“更新”与“删除”之争

在 CDC + MQ 架构下,消费者收到 Binlog 变更后,应该删除缓存(Cache Aside) 还是 更新缓存(覆盖写)

业界普遍推荐“删除缓存”,因为它轻量且天生幂等。但我们的业务包含大量热点数据。如果高并发下频繁删除热点缓存,会导致严重的缓存击穿,瞬间压垮数据库。既然 MQ 的分区机制保证了针对同一个主键的变更一定是有序的,那么“更新缓存(覆盖写)”显然是更优解。

但是,无脑覆盖写会遇到一个隐蔽的“幽灵脏数据”陷阱

  1. 缓存刚好过期,读请求 A 去 DB 查到了旧值 V1,此时网络卡顿。
  2. 写请求更新 DB 为新值 V2
  3. CDC 极速监听到变更,将缓存更新为 V2
  4. 读请求 A 恢复,将其刚才查到的旧值 V1 写入缓存,完美覆盖了正确的 V2!

破局之法:Binlog 时间戳 + Redis Lua 脚本

为了防止读写并发踩踏,我们需要“版本号”。但方案二的初衷就是不让业务层维护版本号!怎么办?
答案是:利用 Binlog 自带的执行时间戳(executeTime)作为天然版本号!

消费者不再使用简单的 SET,而是通过 Lua 脚本原子更新:

1
2
3
4
5
6
7
8
9
-- KEYS[1]: 缓存Key, ARGV[1]: Binlog时间戳(版本号), ARGV[2]: 数据内容
local current_version = redis.call('hget', KEYS[1], 'version')
-- 只有当传入的版本号 > 缓存现有版本号时,才允许覆盖
if (not current_version) or (tonumber(ARGV[1]) > tonumber(current_version)) then
redis.call('hset', KEYS[1], 'data', ARGV[2], 'version', ARGV[1])
return 1
else
return 0
end

前端读请求在未命中重建缓存时,也必须带上查询数据库时的数据版本号去执行这个 Lua 脚本。 这样,无论是读请求迟到,还是 MQ 消息重复投递,都能被完美拦截。


三、 绝对可靠:抛弃 Spring Retry,拥抱 Kafka 底层

在将 Binlog 发送到 MQ 的过程中,如何保证消息 100% 可靠?起初有人提议在 Canal Client 中使用 Spring Retry@Retryable)处理网络抖动。

这是流式处理中的大忌!

  1. 阻塞主线程:Spring Retry 是内存级别的线程阻塞,一旦发生网络故障,CDC 消费线程直接卡死,Binlog 严重积压。
  2. 破坏顺序性:如果结合 @Async 异步重试,后面的消息就会比失败的消息先进入 MQ,精心设计的“主键 Hash 顺序路由”彻底崩塌。

正确姿势:Kafka 底层重试 + 参数调优

将重试的任务完全交给 Kafka Client 底层的 NIO 机制。它不仅不阻塞业务线程,还能在底层保证重试时的顺序不乱。

核心 application.yml 配置如下:

1
2
3
4
5
6
7
spring:
kafka:
producer:
acks: all # 必须等所有 ISR 副本确认
properties:
enable.idempotence: true # 核心:开启幂等性,保证重试绝对不乱序!
delivery.timeout.ms: 5000 # 投递总超时时间(比如 5秒),替代无限重试


四、 优雅降级:当 Kafka 彻底拒收,如何引入死信队列(DLQ)?

如果在 delivery.timeout.ms(如 5 秒)内,Kafka 穷尽了各种底层重试依然发送失败(比如单台机器网络物理断开),我们不能让程序无限卡死,必须引入死信队列(DLQ)来进行降级。

这里提供了一段基于 CompletableFutureEntryHandler(以 canal-spring-boot-starter 为例)的异步高吞吐 + 死信降级代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@Component
@CanalTable("t_order")
@Slf4j
public class OrderEntryHandler implements EntryHandler<OrderEntity> {

@Resource
private KafkaTemplate<String, String> kafkaTemplate;
private static final String TOPIC = "binlog_cache_sync_topic";

@Override
public void update(OrderEntity before, OrderEntity after) {
// 注意:生产环境需确保你的 Starter 能透传 Binlog 的 executeTime
long binlogTimestamp = extractTimestamp();
// 指定key用于保证同一个数据的操作hash到同一个分区保证顺序性
String primaryKey = String.valueOf(after.getId());
String jsonMsg = buildJson(after, binlogTimestamp);

// 异步发送,绝对不要加 .get() 阻塞!
kafkaTemplate.send(TOPIC, primaryKey, jsonMsg)
.exceptionally(ex -> {
log.error("Kafka 彻底发送失败!主键: {}, 准备进入死信队列", primaryKey, ex);
// 【降级逻辑】将消息写入 MySQL 死信表或本地日志,等待人工/定时任务重放
sendToDeadLetterQueue(primaryKey, jsonMsg, ex);

// 返回 null 吞掉异常,让 Canal 继续推进位点,防止主链路永远卡死
return null;
});
}
}

(注:使用高度封装的 Starter 时务必当心,确保框架支持获取 Binlog Timestamp,否则 Lua 脚本的防乱序机制将失效。如果是极高并发场景,建议直接使用 Canal 原生 API 实现批量异步等待机制。)

附上更复杂的原生代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

@Slf4j
@Service
public class CanalToKafkaService {

@Resource
private CanalConnector canalConnector;
@Resource
private KafkaTemplate<String, String> kafkaTemplate;

private static final String TOPIC = "binlog_cache_sync_topic";
private volatile boolean running = true;

// 模拟应用启动时调用的监听方法
public void startListen() {
canalConnector.connect();
canalConnector.subscribe(".*\\..*"); // 订阅库表
canalConnector.rollback(); // 启动时回滚一次,防止有未确认的数据

while (running) {
// 1. 获取数据,不自动 ACK。每次最多拉取 1000 条
Message message = canalConnector.getWithoutAck(1000, 100L, TimeUnit.MILLISECONDS);
long batchId = message.getId();
int size = message.getEntries().size();

if (batchId == -1 || size == 0) {
// 没有数据,短暂休眠
sleep(100);
continue;
}

try {
// 2. 解析 Binlog 并发送到 Kafka
processAndSendToKafka(message.getEntries());

// 3. 核心:整批数据都成功发送到 Kafka 后,才向 Canal 提交 ACK!
canalConnector.ack(batchId);

} catch (Exception e) {
log.error("批量发送 Kafka 失败,准备回滚 Canal 位点,batchId: {}", batchId, e);
// 4. 核心:一旦发送失败(比如 Kafka 宕机超过2分钟超时),直接回滚!
// 下一次循环会重新拉取这批数据,绝对不丢数据!
canalConnector.rollback(batchId);

// 失败后休眠一会儿,防止疯狂重试压垮 CPU
sleep(2000);
}
}
}

private void processAndSendToKafka(List<CanalEntry.Entry> entries) throws Exception {
// 用于收集这一批所有的 Kafka 发送 Future,以实现等待全批次成功
List<CompletableFuture<SendResult<String, String>>> futures = new ArrayList<>();

for (CanalEntry.Entry entry : entries) {
// 过滤掉事务头/尾等非数据项
if (entry.getEntryType() != CanalEntry.EntryType.ROWDATA) {
continue;
}

CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
CanalEntry.EventType eventType = rowChange.getEventType();
String tableName = entry.getHeader().getTableName();

for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
// 提取变动后的数据(如果是 DELETE 就是 beforeColumns)
List<CanalEntry.Column> columns = (eventType == CanalEntry.EventType.DELETE) ?
rowData.getBeforeColumnsList() : rowData.getAfterColumnsList();

// 1. 核心提取:找到这行数据的主键 (Primary Key)
String primaryKey = getPrimaryKey(columns);
if (primaryKey == null) continue; // 防御性编程

// 2. 构造你要发送的消息体(这里简单用 JSON 字符串示意,包含版本号即时间戳)
long executeTime = entry.getHeader().getExecuteTime(); // Binlog 产生的时间戳(毫秒),作为版本号
String jsonMsg = buildJsonMsg(tableName, eventType, columns, executeTime);

// 3. 核心发送:按主键作为 Kafka 的 Key 发送!!!
// Kafka 会根据 Key 的 Hash 值发送到特定的 Partition,保证同一个主键绝对有序!
CompletableFuture<SendResult<String, String>> future = kafkaTemplate.send(TOPIC, primaryKey, jsonMsg);
futures.add(future);
}
}

// 4. 核心等待:阻塞等待这批消息 *全部* 被 Kafka 确认接收
// 如果中间发生网络抖动,Kafka Client 会在底层自动重试,直到达到 delivery.timeout.ms (2分钟)
// 如果 2 分钟后还是失败,这里会抛出异常,外层 catch 捕获并触发 canalConnector.rollback()
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
}

private String getPrimaryKey(List<CanalEntry.Column> columns) {
for (CanalEntry.Column column : columns) {
if (column.getIsKey()) {
return column.getValue(); // 返回主键的值
}
}
return null; // 实际业务中建议抛出异常或报警
}

private String buildJsonMsg(String table, CanalEntry.EventType type, List<CanalEntry.Column> columns, long executeTime) {
// ... 组装你的 JSON 数据,记得把 executeTime 塞进去 ...
return "{\"table\":\"" + table + "\", \"version\":" + executeTime + ", \"data\":{...}}";
}

private void sleep(long millis) {
try { Thread.sleep(millis); } catch (InterruptedException ignored) {}
}
}


五、 终极拷问:死信队列会破坏 Kafka 的顺序与数据一致性吗?

有同学可能会提出一个极其硬核的问题:
如果我开启了 Kafka 的幂等性(防乱序),连续发送消息 1、2、3、4、5。结果 1 发送失败进入了 DLQ。Kafka 底层会拒收 2、3、4、5 吗?如果 2、3、4、5 发送成功了,产生的数据空洞会破坏缓存一致性吗?

1. Kafka 底层的自我疗愈

在现代版本 Kafka(>= 2.4)中,如果 1 因为超时被抛弃导致序列号(SeqNum)断层,Producer 会在底层自动执行 PID Reset(重置生产者 ID),然后带着新身份将 2、3、4、5 重新打标并成功发往 Broker。所以,后续消息不会被永久拒收。

2. 业务层的一致性闭环

此时,消费者实际收到了 2、3、4、5(1 被落在了 MySQL 死信表中)。
看似发生了乱序,但请回忆一下我们第二节讲的 “Binlog 时间戳 + Lua 脚本”

  • 消费者收到 2 时,缓存更新为版本 2。收到 5 时,缓存更新为版本 5。
  • 第二天,运维人员处理死信队列,手动将 消息 1(版本 1)重新发往 Kafka。
  • 消费者收到迟来的 消息 1,执行 Lua 脚本:发现当前缓存版本是 5,1 < 5,Lua 脚本直接拦截并拒绝更新!

至此,逻辑形成了完美闭环!


总结

面对高并发的双写一致性挑战,我最终沉淀出了这套方案:

  1. 主链路:仅更新 DB,保持极致轻量。
  2. 同步链路:CDC 监听 Binlog,结合主键 Hash 发送到 Kafka,利用底层幂等参数实现高性能异步保序发送。
  3. 降级保障:利用 delivery.timeout.ms 与异步回调捕获异常,平滑降级到 DLQ,防止阻塞断流。
  4. 终端消费:摒弃有风险的 Cache Aside(删除),改用基于 Binlog 真实时间戳的 Lua 脚本覆盖写,完美抗击高并发并发写踩踏以及死信重放的乱序问题。

这套架构无需在业务表引入任何侵入性版本字段,既扛住了极高的并发吞吐,又守住了数据最终一致性的底线,是一个较为完美的解决方案。