基于 CDC + MQ 的 DB 与缓存双写一致性终极架构
在分布式的环境下,“数据库与缓存的双写一致性”是一个永恒的难题。尤其是在高并发、高吞吐、且存在大量热点数据的核心业务场景下,稍有不慎就会引发缓存击穿、数据乱序覆盖等灾难。
在我的项目中主要考虑有两种方案:
- 方案一:业务代码更新DB后,发送MQ消息(附带业务版本号),消费者校验版本号后更新缓存。
- 方案二:引入 CDC(如 Canal/Debezium)监听 Binlog,根据主键 Hash 发送到 MQ 同一分区,利用严格顺序性更新缓存(无需业务层维护版本号)。
经过深思熟虑和反复推演,对于追求高性能和高并发的核心系统,我最终拍板了方案二(CDC + MQ),并针对其中的并发陷阱进行了深度改造。本文将为你硬核拆解这套方案的演进过程及底层细节。
一、 为什么抛弃“业务发MQ”,选择“CDC监听”?
在高并发主链路上,任何多余的跨网络交互都是性能杀手。
方案一最大的硬伤在于双写一致性与性能的矛盾。如果先写DB再发MQ,遇到网络抖动MQ发送失败,缓存永远是脏数据;如果引入本地消息表或事务消息,又会极大拖慢主接口的响应时间。
引入 CDC 的最大魅力在于“极致的解耦”:
主链路业务只需要一句 SQL 更新数据库,直接返回响应,性能达到数据库的物理极限。CDC 组件伪装成从库,异步拉取 Binlog 并将其投递到 MQ。由于 Binlog 的存在,系统获得了天生的“最终一致性”保障——只要数据落盘,消息就必定送达。
二、 架构进阶:热点数据的“更新”与“删除”之争
在 CDC + MQ 架构下,消费者收到 Binlog 变更后,应该删除缓存(Cache Aside) 还是 更新缓存(覆盖写)?
业界普遍推荐“删除缓存”,因为它轻量且天生幂等。但我们的业务包含大量热点数据。如果高并发下频繁删除热点缓存,会导致严重的缓存击穿,瞬间压垮数据库。既然 MQ 的分区机制保证了针对同一个主键的变更一定是有序的,那么“更新缓存(覆盖写)”显然是更优解。
但是,无脑覆盖写会遇到一个隐蔽的“幽灵脏数据”陷阱:
- 缓存刚好过期,读请求 A 去 DB 查到了旧值 V1,此时网络卡顿。
- 写请求更新 DB 为新值 V2。
- CDC 极速监听到变更,将缓存更新为 V2。
- 读请求 A 恢复,将其刚才查到的旧值 V1 写入缓存,完美覆盖了正确的 V2!
破局之法:Binlog 时间戳 + Redis Lua 脚本
为了防止读写并发踩踏,我们需要“版本号”。但方案二的初衷就是不让业务层维护版本号!怎么办?
答案是:利用 Binlog 自带的执行时间戳(executeTime)作为天然版本号!
消费者不再使用简单的 SET,而是通过 Lua 脚本原子更新:
1 2 3 4 5 6 7 8 9
| local current_version = redis.call('hget', KEYS[1], 'version')
if (not current_version) or (tonumber(ARGV[1]) > tonumber(current_version)) then redis.call('hset', KEYS[1], 'data', ARGV[2], 'version', ARGV[1]) return 1 else return 0 end
|
前端读请求在未命中重建缓存时,也必须带上查询数据库时的数据版本号去执行这个 Lua 脚本。 这样,无论是读请求迟到,还是 MQ 消息重复投递,都能被完美拦截。
三、 绝对可靠:抛弃 Spring Retry,拥抱 Kafka 底层
在将 Binlog 发送到 MQ 的过程中,如何保证消息 100% 可靠?起初有人提议在 Canal Client 中使用 Spring Retry(@Retryable)处理网络抖动。
这是流式处理中的大忌!
- 阻塞主线程:Spring Retry 是内存级别的线程阻塞,一旦发生网络故障,CDC 消费线程直接卡死,Binlog 严重积压。
- 破坏顺序性:如果结合
@Async 异步重试,后面的消息就会比失败的消息先进入 MQ,精心设计的“主键 Hash 顺序路由”彻底崩塌。
正确姿势:Kafka 底层重试 + 参数调优
将重试的任务完全交给 Kafka Client 底层的 NIO 机制。它不仅不阻塞业务线程,还能在底层保证重试时的顺序不乱。
核心 application.yml 配置如下:
1 2 3 4 5 6 7
| spring: kafka: producer: acks: all properties: enable.idempotence: true delivery.timeout.ms: 5000
|
四、 优雅降级:当 Kafka 彻底拒收,如何引入死信队列(DLQ)?
如果在 delivery.timeout.ms(如 5 秒)内,Kafka 穷尽了各种底层重试依然发送失败(比如单台机器网络物理断开),我们不能让程序无限卡死,必须引入死信队列(DLQ)来进行降级。
这里提供了一段基于 CompletableFuture 和 EntryHandler(以 canal-spring-boot-starter 为例)的异步高吞吐 + 死信降级代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| @Component @CanalTable("t_order") @Slf4j public class OrderEntryHandler implements EntryHandler<OrderEntity> {
@Resource private KafkaTemplate<String, String> kafkaTemplate; private static final String TOPIC = "binlog_cache_sync_topic";
@Override public void update(OrderEntity before, OrderEntity after) { long binlogTimestamp = extractTimestamp(); String primaryKey = String.valueOf(after.getId()); String jsonMsg = buildJson(after, binlogTimestamp);
kafkaTemplate.send(TOPIC, primaryKey, jsonMsg) .exceptionally(ex -> { log.error("Kafka 彻底发送失败!主键: {}, 准备进入死信队列", primaryKey, ex); sendToDeadLetterQueue(primaryKey, jsonMsg, ex); return null; }); } }
|
(注:使用高度封装的 Starter 时务必当心,确保框架支持获取 Binlog Timestamp,否则 Lua 脚本的防乱序机制将失效。如果是极高并发场景,建议直接使用 Canal 原生 API 实现批量异步等待机制。)
附上更复杂的原生代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121
| import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.Message; import lombok.extern.slf4j.Slf4j; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.stereotype.Service;
import javax.annotation.Resource; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit;
@Slf4j @Service public class CanalToKafkaService {
@Resource private CanalConnector canalConnector; @Resource private KafkaTemplate<String, String> kafkaTemplate;
private static final String TOPIC = "binlog_cache_sync_topic"; private volatile boolean running = true;
public void startListen() { canalConnector.connect(); canalConnector.subscribe(".*\\..*"); canalConnector.rollback();
while (running) { Message message = canalConnector.getWithoutAck(1000, 100L, TimeUnit.MILLISECONDS); long batchId = message.getId(); int size = message.getEntries().size();
if (batchId == -1 || size == 0) { sleep(100); continue; }
try { processAndSendToKafka(message.getEntries()); canalConnector.ack(batchId); } catch (Exception e) { log.error("批量发送 Kafka 失败,准备回滚 Canal 位点,batchId: {}", batchId, e); canalConnector.rollback(batchId); sleep(2000); } } }
private void processAndSendToKafka(List<CanalEntry.Entry> entries) throws Exception { List<CompletableFuture<SendResult<String, String>>> futures = new ArrayList<>();
for (CanalEntry.Entry entry : entries) { if (entry.getEntryType() != CanalEntry.EntryType.ROWDATA) { continue; }
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); CanalEntry.EventType eventType = rowChange.getEventType(); String tableName = entry.getHeader().getTableName();
for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) { List<CanalEntry.Column> columns = (eventType == CanalEntry.EventType.DELETE) ? rowData.getBeforeColumnsList() : rowData.getAfterColumnsList();
String primaryKey = getPrimaryKey(columns); if (primaryKey == null) continue;
long executeTime = entry.getHeader().getExecuteTime(); String jsonMsg = buildJsonMsg(tableName, eventType, columns, executeTime);
CompletableFuture<SendResult<String, String>> future = kafkaTemplate.send(TOPIC, primaryKey, jsonMsg); futures.add(future); } }
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join(); }
private String getPrimaryKey(List<CanalEntry.Column> columns) { for (CanalEntry.Column column : columns) { if (column.getIsKey()) { return column.getValue(); } } return null; }
private String buildJsonMsg(String table, CanalEntry.EventType type, List<CanalEntry.Column> columns, long executeTime) { return "{\"table\":\"" + table + "\", \"version\":" + executeTime + ", \"data\":{...}}"; }
private void sleep(long millis) { try { Thread.sleep(millis); } catch (InterruptedException ignored) {} } }
|
五、 终极拷问:死信队列会破坏 Kafka 的顺序与数据一致性吗?
有同学可能会提出一个极其硬核的问题:
如果我开启了 Kafka 的幂等性(防乱序),连续发送消息 1、2、3、4、5。结果 1 发送失败进入了 DLQ。Kafka 底层会拒收 2、3、4、5 吗?如果 2、3、4、5 发送成功了,产生的数据空洞会破坏缓存一致性吗?
1. Kafka 底层的自我疗愈
在现代版本 Kafka(>= 2.4)中,如果 1 因为超时被抛弃导致序列号(SeqNum)断层,Producer 会在底层自动执行 PID Reset(重置生产者 ID),然后带着新身份将 2、3、4、5 重新打标并成功发往 Broker。所以,后续消息不会被永久拒收。
2. 业务层的一致性闭环
此时,消费者实际收到了 2、3、4、5(1 被落在了 MySQL 死信表中)。
看似发生了乱序,但请回忆一下我们第二节讲的 “Binlog 时间戳 + Lua 脚本”!
- 消费者收到 2 时,缓存更新为版本 2。收到 5 时,缓存更新为版本 5。
- 第二天,运维人员处理死信队列,手动将 消息 1(版本 1)重新发往 Kafka。
- 消费者收到迟来的 消息 1,执行 Lua 脚本:发现当前缓存版本是 5,
1 < 5,Lua 脚本直接拦截并拒绝更新!
至此,逻辑形成了完美闭环!
总结
面对高并发的双写一致性挑战,我最终沉淀出了这套方案:
- 主链路:仅更新 DB,保持极致轻量。
- 同步链路:CDC 监听 Binlog,结合主键 Hash 发送到 Kafka,利用底层幂等参数实现高性能异步保序发送。
- 降级保障:利用
delivery.timeout.ms 与异步回调捕获异常,平滑降级到 DLQ,防止阻塞断流。 - 终端消费:摒弃有风险的 Cache Aside(删除),改用基于 Binlog 真实时间戳的 Lua 脚本覆盖写,完美抗击高并发并发写踩踏以及死信重放的乱序问题。
这套架构无需在业务表引入任何侵入性版本字段,既扛住了极高的并发吞吐,又守住了数据最终一致性的底线,是一个较为完美的解决方案。