如何使用cannal来做双写一致
🏗️ Spring Boot 2.7.3 + MySQL 8.1 + Canal 实现缓存双写一致性 · 全量落地指南
包含:理论剖析 → MySQL 配置 → Canal 部署 → 直连 Canal 版(完整代码)→ Kafka 版(完整代码)→ 生产级加固方案,可直接照抄落地。
目录
- 1. 双写一致性:为什么传统方案必翻车
- 2. Canal 方案的核心理念
- 3. 环境版本一览
- Step 0 · MySQL 8.1 侧准备
- Step 1 · Canal Server 部署 & 基础配置
- ✅ 方案一:直连 Canal 模式 · 完整落地
- ✅ 方案二:Canal → Kafka → Spring Boot 消费 · 完整落地
- 8. 生产级加固 & 踩坑清单
- 9. 两种方案怎么选
1. 双写一致性:为什么传统方案必翻车
缓存和数据库是两个独立存储,无法包进同一个本地事务。
经典翻车矩阵
| 策略 | 翻车场景 | 后果 |
|---|---|---|
| 先更新 DB → 再删缓存 | DB 更新成功,但删缓存网络超时 / Redis 宕机 | 缓存永远存旧值,直到 TTL 过期 |
| 先删缓存 → 再更新 DB | 删完瞬间,并发读未命中 → 查 DB(旧值)→ 回填缓存 | 缓存被"旧数据污染",可能长期不一致 |
| 先改缓存 → 再改 DB | 缓存改了,DB 写入失败但缓存无法回滚 | 缓存永久脏数据 |
业界唯一靠谱的底层共识
数据库是唯一 Source of Truth。 缓存只是派生数据。正确姿势是 Cache Aside:
- 读:缓存 hit 直接返;miss 则从 DB 加载 → 回填缓存
- 写:更新 DB 后,令缓存失效(DELETE),而不是 UPDATE 缓存
但问题是:业务代码里的「删缓存」仍然可能失败(进程崩、网络闪断)。
Canal 解决的是什么?
把"缓存失效"这件事从业务代码中剥离,挂靠到 MySQL 自己的 binlog 这条最可靠的事件源上:
App ──只写──▶ MySQL ──(binlog, ROW, 事务提交后才写)──▶ Canal 感知 ──▶ 删 Redis 缓存
- binlog 在事务 commit 之后才写 → 绝不会把"没提交的数据"同步出去
- binlog 是 全局有序 的 → 同一行不会乱序
- DB 写成功 → binlog 必有 → Canal 必能感知(只要 Canal 活着)
本质:让缓存失效变成最终一致的、事件驱动的后置动作,而不是业务链路上的同步步骤。
2. Canal 方案的核心理念
┌──────────────┐ binlog(ROW) ┌──────────────┐
│ MySQL 8.1 │═════════════════════▶│ Canal Server │
│ (主库/从库) │ dump 协议,伪装Slave │ (deployer) │
└──────────────┘ └──────┬───────┘
│
┌─────────────────────────┼─────────────────────────┐
▼ ▼ ▼
直连 Canal Client Canal → Kafka → Canal Adapter
(Simple/HA模式) Consumer Group (ES/JDBC等)
│ │
▼ ▼
删 Redis 缓存 删 Redis 缓存
策略选择:Canal 侧一律做 DEL key(失效),而不是 SET 新值。理由——
如果 Canal 去
SET缓存值,万一因为网络乱序导致旧事件覆盖新事件,反而造脏数据。DEL是幂等的,下次读请求走 Cache Aside 自然从 DB 加载最新值,安全得多。
3. 环境版本一览
| 组件 | 版本 | 备注 |
|---|---|---|
| Spring Boot | 2.7.3 | Java 8+ |
| MySQL | 8.1 | 8.0+ 通用,注意认证插件 |
| Canal | 1.1.7 / 1.1.8 | 推荐 1.1.7 稳定;1.1.8 也可 |
| Redis 客户端 | Lettuce(Boot 自带) | 无需额外依赖 |
| Kafka(方案二) | 2.8+ | Canal 原生支持发到 Kafka |
| canal.client | com.alibaba.otter:canal.client:1.1.8 |
直连模式用 |
Step 0 · MySQL 8.1 侧准备
① my.cnf / my.ini
[mysqld]
server-id = 1
log-bin = mysql-bin
binlog-format = ROW
binlog_row_image = FULL
expire_logs_days = 7
# 如果你的 mysql-bin.index 路径有权限问题,可显式指定:
# log_bin_index = /var/lib/mysql/mysql-bin.index
修改后 重启 MySQL。
-- 验证
SHOW VARIABLES LIKE 'log_bin';
SHOW VARIABLES LIKE 'binlog_format';
SHOW VARIABLES LIKE 'binlog_row_image';
-- log_bin=ON / binlog_format=ROW / binlog_row_image=FULL
② 创建 Canal 专用账号(⚠️ MySQL 8.x 关键坑)
MySQL 8.x 默认认证插件是 caching_sha2_password,Canal 的老版本 JDBC 握手会报 Auth failed。必须为 canal 用户强制指定 mysql_native_password:
CREATE USER 'canal'@'%'
IDENTIFIED WITH mysql_native_password BY 'Canal#2024';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
验证:
SELECT host, user, plugin
FROM mysql.user
WHERE user = 'canal';
-- plugin 必须是 mysql_native_password
如果不想改认证插件,Canal 1.1.6+ 理论上可通过
canal.instance.rsaPublicKeyFile走 RSA 公钥方式兼容caching_sha2_password,但实测坑多。生产上强烈建议用上面mysql_native_password方式,简单稳妥。
③ 确认 binlog 当前位点(后面 Canal 配置要用)
SHOW MASTER STATUS;
-- 记下 File(如 mysql-bin.000003) 和 Position(如 157)
Step 1 · Canal Server 部署 & 基础配置
下载解压
wget https://github.com/alibaba/canal/releases/download/canal-1.1.7/canal.deployer-1.1.7.tar.gz
tar -zxf canal.deployer-1.1.7.tar.gz -C /opt/canal
cd /opt/canal
目录结构:
canal/
├── bin/
├── conf/
│ ├── canal.properties ← 全局配置
│ └── example/
│ └── instance.properties ← 实例级配置(连哪个MySQL、监听哪些表)
├── lib/
└── logs/
全局配置 conf/canal.properties
两种模式只需要改一处开关:
| 模式 | canal.serverMode |
说明 |
|---|---|---|
| 直连 | tcp (默认) |
Canal 开 11111 TCP,Client 直连拉取 |
| Kafka | kafka |
binlog 事件序列化后推 Kafka |
先按 直连模式 把基础配好(Kafka 模式在后文单独标出差异):
# ---- conf/canal.properties ----
canal.serverMode = tcp
canal.destinations = example
# 监听端口(直连客户端连这个)
canal.port = 11111
# HA / 单机都行,单机忽略 zk
# canal.zkServers =
# binlog解析工作线程
canal.instance.parser.parallel = true
实例配置 conf/example/instance.properties
# ---- conf/example/instance.properties ----
# 标记(Canal 1.1.7+ 可自动生成,不设也行)
# canal.instance.mysql.slaveId = 1234
# MySQL 地址
canal.instance.master.address = 127.0.0.1:3306
# 【可选】指定从某个 binlog 位点开始(首次可不设,让 Canal 自动从最新位点接)
# canal.instance.master.journal.name = mysql-bin.000003
# canal.instance.master.position = 157
# 认证(MySQL 8.1 的 canal 账号)
canal.instance.dbUsername = canal
canal.instance.dbPassword = Canal#2024
canal.instance.connectionCharset = UTF-8
# 监听表过滤(按需改,这里是 your_db 下所有表)
canal.instance.filter.regex = your_db\\..*
# 黑名单(一般不需要)
# canal.instance.filter.black.regex =
# 心跳频率(秒),让 binlog 流不断
canal.instance.detecting.enable = true
canal.instance.detecting.sql = SELECT 1
canal.instance.detecting.interval.time = 3
启动 & 验活
sh bin/startup.sh
tail -f logs/canal/canal.log
# 看到 "start successful..." 且无 Auth failed 即可
如果日志出现
caching_sha2_password Auth failed→ 回到 Step 0 确认 canal 用户的plugin=mysql_native_password。
✅ 方案一:直连 Canal 模式 · 完整落地
Canal Server 运行在
tcp模式(11111),Spring Boot 作为 Canal Client 长连接拉取 binlog 事件,解析后删 Redis 缓存。
方案一方针
Spring Boot 启动时 → 建一个守护线程
→ CanalConnector.connect()
→ 循环 getWithoutAck(batchSize)
→ 解析 Entry → 提取 库名/表名/主键值
→ redisTemplate.delete(cacheKey)
→ connector.ack(batchId)
→ 异常 → 退避重连
① pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.3</version>
<relativePath/>
</parent>
<groupId>com.demo</groupId>
<artifactId>canal-direct-demo</artifactId>
<version>1.0.0</version>
<description>Canal Direct Client Demo</description>
<properties>
<java.version>1.8</java.version>
<canal.version>1.1.8</canal.version>
</properties>
<dependencies>
<!-- Web(可去掉,留着方便健康检查) -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- Redis -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- Canal 官方 Client -->
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>${canal.version}</version>
<!-- 排除可能的旧日志冲突 -->
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- protobuf(canal.protocol 传递进来的,一般跟随 canal.client 的传递依赖即可)
如遇 NoClassDefFound: com/google/protobuf/xxx,
显式加:
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.21.12</version>
</dependency>
-->
<!-- JSON(用于打日志/调试) -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.83</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
② application.yml
server:
port: 8080
spring:
redis:
host: 127.0.0.1
port: 6379
password:
database: 0
timeout: 3s
lettuce:
pool:
max-active: 8
max-idle: 8
min-idle: 1
canal:
# Canal Server 地址
server-host: 127.0.0.1
server-port: 11111
destination: example
username: ""
password: ""
batch-size: 1000
# 每次空轮询休眠 ms
idle-interval-ms: 500
# 连接断开后退避初始 ms
retry-initial-delay-ms: 2000
# 最大退避 ms
retry-max-delay-ms: 30000
# 关心的 schema(用于过滤)
schema: your_db
logging:
level:
com.demo.canal: debug
③ 配置属性绑定
package com.demo.canal.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
@Data
@Component
@ConfigurationProperties(prefix = "canal")
public class CanalProperties {
private String serverHost;
private int serverPort = 11111;
private String destination = "example";
private String username = "";
private String password = "";
private int batchSize = 1000;
private long idleIntervalMs = 500;
private long retryInitialDelayMs = 2000;
private long retryMaxDelayMs = 30000;
private String schema;
}
④ Cache Key 构建器(统一缓存 key 规范)
package com.demo.canal.cache;
/**
* 你们项目的缓存 key 约定
* 例:cache:product:123 、 cache:user:456
*/
public final class CacheKeyBuilder {
private static final String PREFIX = "cache";
public static String of(String tableName, Object id) {
return PREFIX + ":" + tableName + ":" + id;
}
public static String ofProduct(Long productId) {
return PREFIX + ":product:" + productId;
}
public static String ofUser(Long userId) {
return PREFIX + ":user:" + userId;
}
private CacheKeyBuilder() {}
}
⑤ ⭐ 核心:Canal 直连客户端(带重连 / ACK / 优雅关闭)
package com.demo.canal.client;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.demo.canal.cache.CacheKeyBuilder;
import com.demo.canal.config.CanalProperties;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@Slf4j
@Component
public class CanalDirectClient implements ApplicationRunner, DisposableBean {
@Autowired
private CanalProperties props;
@Autowired
private RedisTemplate<String, Object> redisTemplate;
/** 单线程串行消费 binlog(保证同表内有序) */
private final ExecutorService executor = Executors.newSingleThreadExecutor(
r -> {
Thread t = new Thread(r, "canal-direct-client");
t.setDaemon(true);
return t;
});
private final AtomicBoolean running = new AtomicBoolean(false);
private volatile CanalConnector connector;
// ---------------------- 启停 ----------------------
@Override
public void run(ApplicationArguments args) {
running.set(true);
executor.submit(this::loopWithRetry);
}
@Override
public void destroy() {
running.set(false);
executor.shutdownNow();
safeDisconnect();
log.info("[Canal] client destroyed");
}
// ---------------------- 核心循环 ----------------------
private void loopWithRetry() {
long retryDelay = props.getRetryInitialDelayMs();
while (running.get()) {
try {
ensureConnected();
retryDelay = props.getRetryInitialDelayMs(); // 成功连上就重置退避
innerLoop();
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
break;
} catch (Exception e) {
log.error("[Canal] connector error, will retry after {}ms", retryDelay, e);
safeDisconnect();
sleepQuietly(retryDelay);
// 指数退避
retryDelay = Math.min(retryDelay * 2, props.getRetryMaxDelayMs());
}
}
}
/** 单次连接后的正常消费循环 */
private void innerLoop() throws InterruptedException {
while (running.get() && connector != null) {
Message message = connector.getWithoutAck(props.getBatchSize());
long batchId = message.getId();
if (batchId == -1 || message.getEntries().isEmpty()) {
connector.ack(-1); // ack 空批次
sleepQuietly(props.getIdleIntervalMs());
continue;
}
try {
for (CanalEntry.Entry entry : message.getEntries()) {
if (entry.getEntryType() != CanalEntry.EntryType.ROWDATA) continue;
handleEntry(entry);
}
// 全部处理完才 ack —— 保证 at-least-once 语义
connector.ack(batchId);
} catch (Exception handleEx) {
// 处理某条 binlog 事件失败:rollback 让 Canal 下次重发同一批
log.error("[Canal] handleEntry failed, rollback batchId={}", batchId, handleEx);
connector.rollback(batchId);
sleepQuietly(1000);
}
}
}
// ---------------------- 解析 & 清缓存 ----------------------
private void handleEntry(CanalEntry.Entry entry) {
CanalEntry.RowChange rowChange;
try {
rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
log.warn("[Canal] parse RowChange fail, skip", e);
return;
}
String schema = entry.getHeader().getSchemaName();
String table = entry.getHeader().getTableName();
CanalEntry.EventType eventType = rowChange.getEventType();
// 只处理关心的数据变更事件
if (!shouldProcess(schema, table, eventType)) {
return;
}
for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
Long id = resolveId(rowData);
if (id == null) {
// 极端情况:找不到主键列,需要兜底策略
log.warn("[Canal] cannot resolve PK for {}.{}, event={} — skip row-level evict",
schema, table, eventType);
continue;
}
String key = CacheKeyBuilder.of(table, id);
try {
Boolean deleted = redisTemplate.delete(key);
log.debug("[Canal] evict cache key={} (event={}, existsBefore={})",
key, eventType, deleted);
} catch (Exception redisEx) {
// Redis 挂了:记日志 + 可写补偿表
log.error("[Canal] REDIS FAIL evict key={}, YOU MAY NEED MANUAL EVICT OR WAIT TTL", key, redisEx);
}
}
}
/**
* 是否需要处理这张表
*/
private boolean shouldProcess(String schema, String table, CanalEntry.EventType et) {
if (props.getSchema() != null && !props.getSchema().equalsIgnoreCase(schema)) {
return false;
}
// 只关心增/改/删(QUERY / GTID_LOG_EVENT 之类跳过)
return et == CanalEntry.EventType.INSERT
|| et == CanalEntry.EventType.UPDATE
|| et == CanalEntry.EventType.DELETE;
}
/**
* 从 before/after columns 里提取主键(假设 PK 列名叫 "id")
*/
private Long resolveId(CanalEntry.RowData rowData) {
// UPDATE: afterColumns 有最新值;INSERT: afterColumns;DELETE: beforeColumns
List<CanalEntry.Column> cols = !rowData.getAfterColumnsList().isEmpty()
? rowData.getAfterColumnsList()
: rowData.getBeforeColumnsList();
for (CanalEntry.Column c : cols) {
if ("id".equalsIgnoreCase(c.getName()) || "ID".equals(c.getName())) {
try {
return Long.valueOf(c.getValue());
} catch (NumberFormatException ignore) {}
}
}
return null;
}
// ---------------------- 连接管理 ----------------------
private void ensureConnected() {
if (connector != null) {
try {
// 轻量检测:发一次空 get 看连接是否还活着
connector.getWithoutAck(1);
return;
} catch (Exception e) {
log.warn("[Canal] connection lost, reconnecting...", e);
safeDisconnect();
}
}
InetSocketAddress addr = new InetSocketAddress(props.getServerHost(), props.getServerPort());
connector = CanalConnectors.newSingleConnector(
addr,
props.getDestination(),
props.getUsername(),
props.getPassword()
);
connector.connect();
connector.subscribe(); // 不传 filter 则使用 instance.properties 里的 regex
log.info("[Canal] connected to {}:{} destination={}",
props.getServerHost(), props.getServerPort(), props.getDestination());
}
private void safeDisconnect() {
try {
if (connector != null) connector.disconnect();
} catch (Exception ignore) {}
connector = null;
}
private void sleepQuietly(long ms) {
try { TimeUnit.MILLISECONDS.sleep(ms); } catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
}
}
这段直连代码的关键设计点
| 点 | 为什么这么做 |
|---|---|
getWithoutAck + ack(batchId) 成功才确认 |
至少一次(at-least-once) 语义:处理失败走 rollback 重放,不会丢事件 |
innerLoop 里 catch → rollback |
避免"处理到一半炸了但已经 ack"导致漏删缓存 |
ensureConnected() 带重连 |
Canal Server 重启后自动恢复,不需要人工介入 |
AtomicBoolean running + DisposableBean.destroy() |
Spring 关闭时优雅退出,不丢最后一批 |
resolveId 兼容 INSERT/UPDATE/DELETE |
无论哪种事件都能拿到 PK |
| Redis delete 失败只打日志 | 这是"最终一致"的容忍窗口——但你要有 TTL兜底 和可选的补偿表 |
⑥ Redis 配置(Boot 2.7 默认 Lettuce 即可)
package com.demo.canal.config;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.impl.LaissezFaireSubTypeValidator;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
@Configuration
public class RedisConfig {
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
RedisTemplate<String, Object> template = new RedisTemplate<>();
template.setConnectionFactory(factory);
StringRedisSerializer keySerializer = new StringRedisSerializer();
Jackson2JsonRedisSerializer<Object> valSerializer =
new Jackson2JsonRedisSerializer<>(Object.class);
ObjectMapper om = new ObjectMapper();
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
om.activateDefaultTyping(
LaissezFaireSubTypeValidator.instance,
ObjectMapper.DefaultTyping.NON_FINAL);
valSerializer.configure(om);
template.setKeySerializer(keySerializer);
template.setValueSerializer(valSerializer);
template.setHashKeySerializer(keySerializer);
template.setHashValueSerializer(valSerializer);
template.afterPropertiesSet();
return template;
}
}
⑦ 启动类
@SpringBootApplication
public class CanalDirectApplication {
public static void main(String[] args) {
SpringApplication.run(CanalDirectApplication.class, args);
}
}
启动后观察日志
[Canal] connected to ...,然后对your_db任意表做一次UPDATE/INSERT/DELETE,对应cache:表名:id就会被删掉。
✅ 方案二:Canal → Kafka → Spring Boot 消费 · 完整落地
适合:多实例部署、需要横向扩展、需要持久化事件流、需要死信队列。生产环境首推此方案。
架构
MySQL 8.1
│ binlog
▼
Canal Server(serverMode=kafka)
│ 序列化 binlog → JSON / FlatMessage
▼
Kafka Topic: canal-binlog
│
├── consumer-group: cache-evict(你的 Spring Boot)
│ └── 删 Redis 缓存
└── consumer-group: audit / es-sync / 其他(随意扩展)
Step ① Canal Server 切到 Kafka 模式
编辑 conf/canal.properties:
# ---- conf/canal.properties ----
canal.serverMode = kafka
# Kafka 集群
canal.mq.servers = 127.0.0.1:9092
# topic(可按 instance 区分,也可以统一)
canal.mq.topic = canal-binlog
# 分区数:如果要保证同表/同行有序 → 必须用 1 分区 或 自定义 partitionBy PK
canal.mq.partitionsNum = 1
canal.mq.partitionHash = .*\\..*:$pk$
# flatMessage=true → 输出为 JSON,而不是 protobuf 二进制
canal.mq.flatMessage = true
flatMessage = true很关键——这样 Kafka 里存的是 JSON,你的 Spring Boot 用普通StringDeserializer就能消费,调试也方便。
然后重启 Canal:
sh bin/stop.sh
sh bin/startup.sh
确认 topic 已创建(Canal 会自动建;如你手动建也行):
kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --list
Step ② Spring Boot Kafka 消费者
pom.xml 新增
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
application.yml
spring:
redis:
host: 127.0.0.1
port: 6379
database: 0
timeout: 3s
lettuce:
pool:
max-active: 8
kafka:
bootstrap-servers: 127.0.0.1:9092
consumer:
group-id: cache-evict-group
auto-offset-reset: latest # 首次上线从最新开始;如需全量回溯改成 earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 手动 ack —— 处理成功才 commit
enable-auto-commit: false
listener:
ack-mode: manual_immediate
canal:
schema: your_db
kafka:
topic: canal-binlog
logging:
level:
com.demo.canal.kafka: debug
KafkaConsumer 代码
package com.demo.canal.kafka;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.demo.canal.cache.CacheKeyBuilder;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
import java.util.HashSet;
import java.util.Set;
/**
* Canal flatMessage=true 时,每条 Kafka 消息是一个 JSON,结构大致:
* {
* "database": "your_db",
* "table": "product",
* "type": "UPDATE",
* "ts": 1710000000,
* "data": [ {"id":"1","name":"xx"} ],
* "old": [ {"name":"yy"} ]
* }
*
* 注意:canal.mq.flatMessage=true 输出有两种形态(取决于版本/配置)
* —— 标准 FlatMessage 就是上面这种 JSON;
* —— 如果遇到 data 里不是数组而是对象,代码中也做了兼容。
*/
@Slf4j
@Component
public class CanalKafkaConsumer {
@Value("${canal.schema:}")
private String concernSchema;
private final RedisTemplate<String, Object> redisTemplate;
public CanalKafkaConsumer(RedisTemplate<String, Object> redisTemplate) {
this.redisTemplate = redisTemplate;
}
@KafkaListener(topics = "${canal.kafka.topic}", groupId = "cache-evict-group")
public void consume(ConsumerRecord<String, String> record, Acknowledgment ack) {
String json = record.value();
if (json == null || json.isBlank()) {
ack.acknowledge();
return;
}
try {
JSONObject msg = JSON.parseObject(json);
String database = msg.getString("database");
String table = msg.getString("table");
String type = msg.getString("type"); // INSERT / UPDATE / DELETE
if (concernSchema != null && !concernSchema.equalsIgnoreCase(database)) {
ack.acknowledge();
return;
}
// 只有数据变更事件有意义
if (!("INSERT".equals(type) || "UPDATE".equals(type) || "DELETE".equals(type))) {
ack.acknowledge();
return;
}
// ---- 提取 PK 集合 ----
Set<String> keysToEvict = extractKeys(msg, table);
if (!keysToEvict.isEmpty()) {
try {
Long delCount = redisTemplate.delete(keysToEvict);
log.debug("[Canal-Kafka] evict {} keys ({}), redis actualDel={}",
keysToEvict.size(), keysToEvict, delCount);
} catch (Exception redisEx) {
// ★ 关键:Redis 挂了不能 silently skip,否则缓存永远不清理
log.error("[Canal-Kafka] REDIS FAIL evict keys={}, NEED COMPENSATION", keysToEvict, redisEx);
// TODO: 写入补偿表 / DLQ,让定时任务兜底清理
// 如果严重到必须停消费:throw 让 Kafka 重试
}
}
// 成功处理才 commit offset
ack.acknowledge();
} catch (Exception parseEx) {
log.error("[Canal-Kafka] parse fail, msg={}, WILL SKIP & ACK to avoid stuck", json, parseEx);
// 无法解析的消息直接 ACK 跳过(否则死循环);但建议写入 dead-letter
ack.acknowledge();
}
}
private Set<String> extractKeys(JSONObject msg, String table) {
Set<String> keys = new HashSet<>();
Object dataObj = msg.get("data");
if (dataObj == null) return keys;
// data 可能是数组 [ {...}, {...} ] 也可能是单个对象 {...}
if (dataObj instanceof JSONArray) {
for (Object row : (JSONArray) dataObj) {
if (row instanceof JSONObject) {
addKeyIfHasId(keys, table, (JSONObject) row);
}
}
} else if (dataObj instanceof JSONObject) {
addKeyIfHasId(keys, table, (JSONObject) dataObj);
}
return keys;
}
private void addKeyIfHasId(Set<String> keys, String table, JSONObject row) {
Object idVal = row.get("id");
if (idVal == null) idVal = row.get("ID");
if (idVal != null) {
keys.add(CacheKeyBuilder.of(table, idVal));
} else {
log.warn("[Canal-Kafka] row missing 'id' column in table={}, row={}", table, row);
}
}
}
enable-auto-commit: false+ack-mode: manual_immediate是关键——保证:Redis 真正删成功了才提交 offset;Redis 宕机时报异常 → offset 不提交 → Kafka 会重投(你可以结合死信队列控制重试上限)。
关于 Canal flatMessage JSON 格式的一点补充说明
不同 Canal 版本在 flatMessage=true 时字段名略有差异,常见两种:
| 字段 | 含义 |
|---|---|
database / schema |
库名 |
table |
表名 |
type |
INSERT / UPDATE / DELETE |
data |
变更后的行数据([{id:"1",name:"xx"}]) |
old |
UPDATE 时旧值快照 |
建议你 先发一条测试消息到 Kafka,用 kafka-console-consumer 看一下真实 JSON 结构,然后微调上面的 extractKeys 即可。
8. 生产级加固 & 踩坑清单
① 必须给缓存设 TTL 兜底
即使 Canal / Kafka / Consumer 全挂了,TTL 能保证缓存不会永远脏:
// 写缓存时永远带 TTL
redisTemplate.opsForValue().set(key, value, Duration.ofMinutes(30));
② 补偿机制
| 故障场景 | 对策 |
|---|---|
| Redis 全挂,DEL 连续失败 | consumer 捕获异常后把 (table, id, timestamp) 写本地补偿表;定时任务扫描 > 5min 仍未清掉的 key,重试或告警 |
| Canal Server 宕机期间发生的变更 | Canal 重启后会从 show master status 或 meta 位点继续 dump,不丢(前提是 binlog 没过期) |
| binlog 过期被 MySQL 清掉 | 监控 binlog 保留时长;Canal 报 BinlogMissing 时必须有告警,人工重建位点 |
③ binlog 保留时间
-- 别让 expire_logs_days=0(可能很快清掉)
SET GLOBAL expire_logs_days = 7;
-- 或 MySQL 8.0 用 binary log expiration
-- SET BINARY LOGS EXPIRE AFTER 604800;
④ 多表 / 多 PK 列
如果某些表主键不叫 id(如 product_id、order_no),把 resolveId / addKeyIfHasId 扩展成一个小映射:
private static final Map<String, String> TABLE_PK = Map.of(
"product", "product_id",
"orders", "order_id",
"user", "id"
);
⑤ 并发读穿透问题(进阶)
在「缓存被 Canal 删掉 → 下一读请求回查 DB 回填」这个窗口,如果瞬间高并发读全部 miss → 全部打到 DB。
解法:对热点 key 加 分布式锁 / Redisson RLock / Bloom + 单飞(singleflight) 让回查只走一次。这属于 Cache Aside 的经典优化,不归 Canal 管。
9. 两种方案怎么选
| 直连 Canal(方案一) | Canal → Kafka(方案二) | |
|---|---|---|
| 复杂度 | 低,上手快 | 中高(多一个 Kafka) |
| 可靠性 | 客户端挂 = 暂停消费(但有重连) | Kafka 持久化事件,consumer 可独立扩缩 |
| 有序性 | 单线程自然有序 | partition=1 或 partitionBy $pk$ 才保单行有序 |
| 多消费者 | ❌ 只能一个 connector 跑(需 ZK HA 抢主) | ✅ 天然多实例 + 多 consumer-group |
| 推荐场景 | 小项目/内部系统/快速落地 | 生产核心链路、需要审计/ES同步等多下游 |
我的建议:能扛得住运维 Kafka 就选方案二;想最快见效、组件最少就先用方案一,后续 Kafka 迁移也很平滑(把"删缓存"逻辑从直连 handler 搬到 Kafka consumer 即可,核心
extractKeys()不变)。
更多推荐
所有评论(0)