🏗️ Spring Boot 2.7.3 + MySQL 8.1 + Canal 实现缓存双写一致性 · 全量落地指南

包含:理论剖析 → MySQL 配置 → Canal 部署 → 直连 Canal 版(完整代码)→ Kafka 版(完整代码)→ 生产级加固方案,可直接照抄落地。


目录

  1. 1. 双写一致性:为什么传统方案必翻车
  2. 2. Canal 方案的核心理念
  3. 3. 环境版本一览
  4. Step 0 · MySQL 8.1 侧准备
  5. Step 1 · Canal Server 部署 & 基础配置
  6. ✅ 方案一:直连 Canal 模式 · 完整落地
  7. ✅ 方案二:Canal → Kafka → Spring Boot 消费 · 完整落地
  8. 8. 生产级加固 & 踩坑清单
  9. 9. 两种方案怎么选

1. 双写一致性:为什么传统方案必翻车

缓存和数据库是两个独立存储,无法包进同一个本地事务

经典翻车矩阵

策略 翻车场景 后果
先更新 DB → 再删缓存 DB 更新成功,但删缓存网络超时 / Redis 宕机 缓存永远存旧值,直到 TTL 过期
先删缓存 → 再更新 DB 删完瞬间,并发读未命中 → 查 DB(旧值)→ 回填缓存 缓存被"旧数据污染",可能长期不一致
先改缓存 → 再改 DB 缓存改了,DB 写入失败但缓存无法回滚 缓存永久脏数据

业界唯一靠谱的底层共识

数据库是唯一 Source of Truth。 缓存只是派生数据。正确姿势是 Cache Aside

  • :缓存 hit 直接返;miss 则从 DB 加载 → 回填缓存
  • :更新 DB 后,令缓存失效(DELETE),而不是 UPDATE 缓存

但问题是:业务代码里的「删缓存」仍然可能失败(进程崩、网络闪断)。

Canal 解决的是什么?

把"缓存失效"这件事从业务代码中剥离,挂靠到 MySQL 自己的 binlog 这条最可靠的事件源上

App ──只写──▶ MySQL ──(binlog, ROW, 事务提交后才写)──▶ Canal 感知 ──▶ 删 Redis 缓存
  • binlog 在事务 commit 之后才写 → 绝不会把"没提交的数据"同步出去
  • binlog 是 全局有序 的 → 同一行不会乱序
  • DB 写成功 → binlog 必有 → Canal 必能感知(只要 Canal 活着)

本质:让缓存失效变成最终一致的、事件驱动的后置动作,而不是业务链路上的同步步骤


2. Canal 方案的核心理念

┌──────────────┐     binlog(ROW)      ┌──────────────┐
│  MySQL 8.1    │═════════════════════▶│  Canal Server │
│  (主库/从库)   │  dump 协议,伪装Slave  │  (deployer)   │
└──────────────┘                       └──────┬───────┘
                                              │
                    ┌─────────────────────────┼─────────────────────────┐
                    ▼                         ▼                          ▼
             直连 Canal Client          Canal → Kafka →         Canal Adapter
             (Simple/HA模式)             Consumer Group          (ES/JDBC等)
                    │                         │
                    ▼                         ▼
              删 Redis 缓存             删 Redis 缓存

策略选择:Canal 侧一律做 DEL key(失效),而不是 SET 新值。理由——

如果 Canal 去 SET 缓存值,万一因为网络乱序导致旧事件覆盖新事件,反而造脏数据。DEL 是幂等的,下次读请求走 Cache Aside 自然从 DB 加载最新值,安全得多。


3. 环境版本一览

组件 版本 备注
Spring Boot 2.7.3 Java 8+
MySQL 8.1 8.0+ 通用,注意认证插件
Canal 1.1.7 / 1.1.8 推荐 1.1.7 稳定;1.1.8 也可
Redis 客户端 Lettuce(Boot 自带) 无需额外依赖
Kafka(方案二) 2.8+ Canal 原生支持发到 Kafka
canal.client com.alibaba.otter:canal.client:1.1.8 直连模式用

Step 0 · MySQL 8.1 侧准备

① my.cnf / my.ini

[mysqld]
server-id        = 1
log-bin           = mysql-bin
binlog-format     = ROW
binlog_row_image  = FULL
expire_logs_days  = 7
# 如果你的 mysql-bin.index 路径有权限问题,可显式指定:
# log_bin_index = /var/lib/mysql/mysql-bin.index

修改后 重启 MySQL

-- 验证
SHOW VARIABLES LIKE 'log_bin';
SHOW VARIABLES LIKE 'binlog_format';
SHOW VARIABLES LIKE 'binlog_row_image';
-- log_bin=ON / binlog_format=ROW / binlog_row_image=FULL

② 创建 Canal 专用账号(⚠️ MySQL 8.x 关键坑)

MySQL 8.x 默认认证插件是 caching_sha2_passwordCanal 的老版本 JDBC 握手会报 Auth failed。必须为 canal 用户强制指定 mysql_native_password

CREATE USER 'canal'@'%'
  IDENTIFIED WITH mysql_native_password BY 'Canal#2024';

GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';

FLUSH PRIVILEGES;

验证:

SELECT host, user, plugin
FROM mysql.user
WHERE user = 'canal';
-- plugin 必须是 mysql_native_password

如果不想改认证插件,Canal 1.1.6+ 理论上可通过 canal.instance.rsaPublicKeyFile 走 RSA 公钥方式兼容 caching_sha2_password,但实测坑多。生产上强烈建议用上面 mysql_native_password 方式,简单稳妥。

③ 确认 binlog 当前位点(后面 Canal 配置要用)

SHOW MASTER STATUS;
-- 记下 File(如 mysql-bin.000003) 和 Position(如 157)

Step 1 · Canal Server 部署 & 基础配置

下载解压

wget https://github.com/alibaba/canal/releases/download/canal-1.1.7/canal.deployer-1.1.7.tar.gz
tar -zxf canal.deployer-1.1.7.tar.gz -C /opt/canal
cd /opt/canal

目录结构:

canal/
  ├── bin/
  ├── conf/
  │   ├── canal.properties          ← 全局配置
  │   └── example/
  │       └── instance.properties   ← 实例级配置(连哪个MySQL、监听哪些表)
  ├── lib/
  └── logs/

全局配置 conf/canal.properties

两种模式只需要改一处开关:

模式 canal.serverMode 说明
直连 tcp (默认) Canal 开 11111 TCP,Client 直连拉取
Kafka kafka binlog 事件序列化后推 Kafka

先按 直连模式 把基础配好(Kafka 模式在后文单独标出差异):

# ---- conf/canal.properties ----
canal.serverMode = tcp

canal.destinations = example

# 监听端口(直连客户端连这个)
canal.port = 11111

# HA / 单机都行,单机忽略 zk
# canal.zkServers =

# binlog解析工作线程
canal.instance.parser.parallel = true

实例配置 conf/example/instance.properties

# ---- conf/example/instance.properties ----

# 标记(Canal 1.1.7+ 可自动生成,不设也行)
# canal.instance.mysql.slaveId = 1234

# MySQL 地址
canal.instance.master.address = 127.0.0.1:3306

# 【可选】指定从某个 binlog 位点开始(首次可不设,让 Canal 自动从最新位点接)
# canal.instance.master.journal.name = mysql-bin.000003
# canal.instance.master.position     = 157

# 认证(MySQL 8.1 的 canal 账号)
canal.instance.dbUsername = canal
canal.instance.dbPassword = Canal#2024

canal.instance.connectionCharset = UTF-8

# 监听表过滤(按需改,这里是 your_db 下所有表)
canal.instance.filter.regex = your_db\\..*

# 黑名单(一般不需要)
# canal.instance.filter.black.regex =

# 心跳频率(秒),让 binlog 流不断
canal.instance.detecting.enable = true
canal.instance.detecting.sql = SELECT 1
canal.instance.detecting.interval.time = 3

启动 & 验活

sh bin/startup.sh
tail -f logs/canal/canal.log
# 看到 "start successful..." 且无 Auth failed 即可

如果日志出现 caching_sha2_password Auth failed → 回到 Step 0 确认 canal 用户的 plugin=mysql_native_password


✅ 方案一:直连 Canal 模式 · 完整落地

Canal Server 运行在 tcp 模式(11111),Spring Boot 作为 Canal Client 长连接拉取 binlog 事件,解析后删 Redis 缓存。

方案一方针

Spring Boot 启动时 → 建一个守护线程
  → CanalConnector.connect()
    → 循环 getWithoutAck(batchSize)
      → 解析 Entry → 提取 库名/表名/主键值
        → redisTemplate.delete(cacheKey)
      → connector.ack(batchId)
    → 异常 → 退避重连

① pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
                             http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.3</version>
        <relativePath/>
    </parent>

    <groupId>com.demo</groupId>
    <artifactId>canal-direct-demo</artifactId>
    <version>1.0.0</version>
    <description>Canal Direct Client Demo</description>

    <properties>
        <java.version>1.8</java.version>
        <canal.version>1.1.8</canal.version>
    </properties>

    <dependencies>
        <!-- Web(可去掉,留着方便健康检查) -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!-- Redis -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>

        <!-- Canal 官方 Client -->
        <dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>canal.client</artifactId>
            <version>${canal.version}</version>
            <!-- 排除可能的旧日志冲突 -->
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <!-- protobuf(canal.protocol 传递进来的,一般跟随 canal.client 的传递依赖即可)
             如遇 NoClassDefFound: com/google/protobuf/xxx,
             显式加:
        <dependency>
            <groupId>com.google.protobuf</groupId>
            <artifactId>protobuf-java</artifactId>
            <version>3.21.12</version>
        </dependency>
        -->

        <!-- JSON(用于打日志/调试) -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.83</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

② application.yml

server:
  port: 8080

spring:
  redis:
    host: 127.0.0.1
    port: 6379
    password:
    database: 0
    timeout: 3s
    lettuce:
      pool:
        max-active: 8
        max-idle: 8
        min-idle: 1

canal:
  # Canal Server 地址
  server-host: 127.0.0.1
  server-port: 11111
  destination: example
  username: ""
  password: ""
  batch-size: 1000
  # 每次空轮询休眠 ms
  idle-interval-ms: 500
  # 连接断开后退避初始 ms
  retry-initial-delay-ms: 2000
  # 最大退避 ms
  retry-max-delay-ms: 30000
  # 关心的 schema(用于过滤)
  schema: your_db

logging:
  level:
    com.demo.canal: debug

③ 配置属性绑定

package com.demo.canal.config;

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

@Data
@Component
@ConfigurationProperties(prefix = "canal")
public class CanalProperties {
    private String serverHost;
    private int serverPort = 11111;
    private String destination = "example";
    private String username = "";
    private String password = "";
    private int batchSize = 1000;
    private long idleIntervalMs = 500;
    private long retryInitialDelayMs = 2000;
    private long retryMaxDelayMs = 30000;
    private String schema;
}

④ Cache Key 构建器(统一缓存 key 规范)

package com.demo.canal.cache;

/**
 * 你们项目的缓存 key 约定
 * 例:cache:product:123 、 cache:user:456
 */
public final class CacheKeyBuilder {

    private static final String PREFIX = "cache";

    public static String of(String tableName, Object id) {
        return PREFIX + ":" + tableName + ":" + id;
    }

    public static String ofProduct(Long productId) {
        return PREFIX + ":product:" + productId;
    }

    public static String ofUser(Long userId) {
        return PREFIX + ":user:" + userId;
    }

    private CacheKeyBuilder() {}
}

⑤ ⭐ 核心:Canal 直连客户端(带重连 / ACK / 优雅关闭)

package com.demo.canal.client;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.demo.canal.cache.CacheKeyBuilder;
import com.demo.canal.config.CanalProperties;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;

import java.net.InetSocketAddress;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

@Slf4j
@Component
public class CanalDirectClient implements ApplicationRunner, DisposableBean {

    @Autowired
    private CanalProperties props;

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    /** 单线程串行消费 binlog(保证同表内有序) */
    private final ExecutorService executor = Executors.newSingleThreadExecutor(
            r -> {
                Thread t = new Thread(r, "canal-direct-client");
                t.setDaemon(true);
                return t;
            });

    private final AtomicBoolean running = new AtomicBoolean(false);
    private volatile CanalConnector connector;

    // ---------------------- 启停 ----------------------

    @Override
    public void run(ApplicationArguments args) {
        running.set(true);
        executor.submit(this::loopWithRetry);
    }

    @Override
    public void destroy() {
        running.set(false);
        executor.shutdownNow();
        safeDisconnect();
        log.info("[Canal] client destroyed");
    }

    // ---------------------- 核心循环 ----------------------

    private void loopWithRetry() {
        long retryDelay = props.getRetryInitialDelayMs();

        while (running.get()) {
            try {
                ensureConnected();
                retryDelay = props.getRetryInitialDelayMs(); // 成功连上就重置退避
                innerLoop();

            } catch (InterruptedException ie) {
                Thread.currentThread().interrupt();
                break;

            } catch (Exception e) {
                log.error("[Canal] connector error, will retry after {}ms", retryDelay, e);
                safeDisconnect();
                sleepQuietly(retryDelay);
                // 指数退避
                retryDelay = Math.min(retryDelay * 2, props.getRetryMaxDelayMs());
            }
        }
    }

    /** 单次连接后的正常消费循环 */
    private void innerLoop() throws InterruptedException {
        while (running.get() && connector != null) {
            Message message = connector.getWithoutAck(props.getBatchSize());
            long batchId = message.getId();

            if (batchId == -1 || message.getEntries().isEmpty()) {
                connector.ack(-1); // ack 空批次
                sleepQuietly(props.getIdleIntervalMs());
                continue;
            }

            try {
                for (CanalEntry.Entry entry : message.getEntries()) {
                    if (entry.getEntryType() != CanalEntry.EntryType.ROWDATA) continue;
                    handleEntry(entry);
                }
                // 全部处理完才 ack —— 保证 at-least-once 语义
                connector.ack(batchId);

            } catch (Exception handleEx) {
                // 处理某条 binlog 事件失败:rollback 让 Canal 下次重发同一批
                log.error("[Canal] handleEntry failed, rollback batchId={}", batchId, handleEx);
                connector.rollback(batchId);
                sleepQuietly(1000);
            }
        }
    }

    // ---------------------- 解析 & 清缓存 ----------------------

    private void handleEntry(CanalEntry.Entry entry) {
        CanalEntry.RowChange rowChange;
        try {
            rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
        } catch (Exception e) {
            log.warn("[Canal] parse RowChange fail, skip", e);
            return;
        }

        String schema = entry.getHeader().getSchemaName();
        String table  = entry.getHeader().getTableName();
        CanalEntry.EventType eventType = rowChange.getEventType();

        // 只处理关心的数据变更事件
        if (!shouldProcess(schema, table, eventType)) {
            return;
        }

        for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
            Long id = resolveId(rowData);
            if (id == null) {
                // 极端情况:找不到主键列,需要兜底策略
                log.warn("[Canal] cannot resolve PK for {}.{}, event={} — skip row-level evict",
                        schema, table, eventType);
                continue;
            }
            String key = CacheKeyBuilder.of(table, id);
            try {
                Boolean deleted = redisTemplate.delete(key);
                log.debug("[Canal] evict cache key={} (event={}, existsBefore={})",
                        key, eventType, deleted);
            } catch (Exception redisEx) {
                // Redis 挂了:记日志 + 可写补偿表
                log.error("[Canal] REDIS FAIL evict key={}, YOU MAY NEED MANUAL EVICT OR WAIT TTL", key, redisEx);
            }
        }
    }

    /**
     * 是否需要处理这张表
     */
    private boolean shouldProcess(String schema, String table, CanalEntry.EventType et) {
        if (props.getSchema() != null && !props.getSchema().equalsIgnoreCase(schema)) {
            return false;
        }
        // 只关心增/改/删(QUERY / GTID_LOG_EVENT 之类跳过)
        return et == CanalEntry.EventType.INSERT
            || et == CanalEntry.EventType.UPDATE
            || et == CanalEntry.EventType.DELETE;
    }

    /**
     * 从 before/after columns 里提取主键(假设 PK 列名叫 "id")
     */
    private Long resolveId(CanalEntry.RowData rowData) {
        // UPDATE: afterColumns 有最新值;INSERT: afterColumns;DELETE: beforeColumns
        List<CanalEntry.Column> cols = !rowData.getAfterColumnsList().isEmpty()
                ? rowData.getAfterColumnsList()
                : rowData.getBeforeColumnsList();

        for (CanalEntry.Column c : cols) {
            if ("id".equalsIgnoreCase(c.getName()) || "ID".equals(c.getName())) {
                try {
                    return Long.valueOf(c.getValue());
                } catch (NumberFormatException ignore) {}
            }
        }
        return null;
    }

    // ---------------------- 连接管理 ----------------------

    private void ensureConnected() {
        if (connector != null) {
            try {
                // 轻量检测:发一次空 get 看连接是否还活着
                connector.getWithoutAck(1);
                return;
            } catch (Exception e) {
                log.warn("[Canal] connection lost, reconnecting...", e);
                safeDisconnect();
            }
        }
        InetSocketAddress addr = new InetSocketAddress(props.getServerHost(), props.getServerPort());
        connector = CanalConnectors.newSingleConnector(
                addr,
                props.getDestination(),
                props.getUsername(),
                props.getPassword()
        );
        connector.connect();
        connector.subscribe(); // 不传 filter 则使用 instance.properties 里的 regex
        log.info("[Canal] connected to {}:{} destination={}",
                props.getServerHost(), props.getServerPort(), props.getDestination());
    }

    private void safeDisconnect() {
        try {
            if (connector != null) connector.disconnect();
        } catch (Exception ignore) {}
        connector = null;
    }

    private void sleepQuietly(long ms) {
        try { TimeUnit.MILLISECONDS.sleep(ms); } catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
        }
    }
}

这段直连代码的关键设计点

为什么这么做
getWithoutAck + ack(batchId) 成功才确认 至少一次(at-least-once) 语义:处理失败走 rollback 重放,不会丢事件
innerLoop 里 catch → rollback 避免"处理到一半炸了但已经 ack"导致漏删缓存
ensureConnected() 带重连 Canal Server 重启后自动恢复,不需要人工介入
AtomicBoolean running + DisposableBean.destroy() Spring 关闭时优雅退出,不丢最后一批
resolveId 兼容 INSERT/UPDATE/DELETE 无论哪种事件都能拿到 PK
Redis delete 失败只打日志 这是"最终一致"的容忍窗口——但你要有 TTL兜底 和可选的补偿表

⑥ Redis 配置(Boot 2.7 默认 Lettuce 即可)

package com.demo.canal.config;

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.impl.LaissezFaireSubTypeValidator;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;

@Configuration
public class RedisConfig {

    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
        RedisTemplate<String, Object> template = new RedisTemplate<>();
        template.setConnectionFactory(factory);

        StringRedisSerializer keySerializer = new StringRedisSerializer();

        Jackson2JsonRedisSerializer<Object> valSerializer =
                new Jackson2JsonRedisSerializer<>(Object.class);
        ObjectMapper om = new ObjectMapper();
        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        om.activateDefaultTyping(
                LaissezFaireSubTypeValidator.instance,
                ObjectMapper.DefaultTyping.NON_FINAL);
        valSerializer.configure(om);

        template.setKeySerializer(keySerializer);
        template.setValueSerializer(valSerializer);
        template.setHashKeySerializer(keySerializer);
        template.setHashValueSerializer(valSerializer);
        template.afterPropertiesSet();
        return template;
    }
}

⑦ 启动类

@SpringBootApplication
public class CanalDirectApplication {
    public static void main(String[] args) {
        SpringApplication.run(CanalDirectApplication.class, args);
    }
}

启动后观察日志 [Canal] connected to ...,然后对 your_db 任意表做一次 UPDATE/INSERT/DELETE,对应 cache:表名:id 就会被删掉。


✅ 方案二:Canal → Kafka → Spring Boot 消费 · 完整落地

适合:多实例部署、需要横向扩展、需要持久化事件流、需要死信队列。生产环境首推此方案。

架构

MySQL 8.1
    │  binlog
    ▼
Canal Server(serverMode=kafka)
    │  序列化 binlog → JSON / FlatMessage
    ▼
Kafka Topic:  canal-binlog
    │
    ├── consumer-group: cache-evict(你的 Spring Boot)
    │       └── 删 Redis 缓存
    └── consumer-group: audit / es-sync / 其他(随意扩展)

Step ① Canal Server 切到 Kafka 模式

编辑 conf/canal.properties

# ---- conf/canal.properties ----
canal.serverMode = kafka

# Kafka 集群
canal.mq.servers = 127.0.0.1:9092

# topic(可按 instance 区分,也可以统一)
canal.mq.topic = canal-binlog

# 分区数:如果要保证同表/同行有序 → 必须用 1 分区 或 自定义 partitionBy PK
canal.mq.partitionsNum = 1
canal.mq.partitionHash = .*\\..*:$pk$

# flatMessage=true  → 输出为 JSON,而不是 protobuf 二进制
canal.mq.flatMessage = true

flatMessage = true 很关键——这样 Kafka 里存的是 JSON,你的 Spring Boot 用普通 StringDeserializer 就能消费,调试也方便。

然后重启 Canal:

sh bin/stop.sh
sh bin/startup.sh

确认 topic 已创建(Canal 会自动建;如你手动建也行):

kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --list

Step ② Spring Boot Kafka 消费者

pom.xml 新增

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

application.yml

spring:
  redis:
    host: 127.0.0.1
    port: 6379
    database: 0
    timeout: 3s
    lettuce:
      pool:
        max-active: 8

  kafka:
    bootstrap-servers: 127.0.0.1:9092
    consumer:
      group-id: cache-evict-group
      auto-offset-reset: latest          # 首次上线从最新开始;如需全量回溯改成 earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # 手动 ack —— 处理成功才 commit
      enable-auto-commit: false
    listener:
      ack-mode: manual_immediate

canal:
  schema: your_db
  kafka:
    topic: canal-binlog

logging:
  level:
    com.demo.canal.kafka: debug

KafkaConsumer 代码

package com.demo.canal.kafka;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.demo.canal.cache.CacheKeyBuilder;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

import java.util.HashSet;
import java.util.Set;

/**
 * Canal flatMessage=true 时,每条 Kafka 消息是一个 JSON,结构大致:
 * {
 *   "database": "your_db",
 *   "table": "product",
 *   "type": "UPDATE",
 *   "ts": 1710000000,
 *   "data": [ {"id":"1","name":"xx"} ],
 *   "old":  [ {"name":"yy"} ]
 * }
 *
 * 注意:canal.mq.flatMessage=true 输出有两种形态(取决于版本/配置)
 *   —— 标准 FlatMessage 就是上面这种 JSON;
 *   —— 如果遇到 data 里不是数组而是对象,代码中也做了兼容。
 */
@Slf4j
@Component
public class CanalKafkaConsumer {

    @Value("${canal.schema:}")
    private String concernSchema;

    private final RedisTemplate<String, Object> redisTemplate;

    public CanalKafkaConsumer(RedisTemplate<String, Object> redisTemplate) {
        this.redisTemplate = redisTemplate;
    }

    @KafkaListener(topics = "${canal.kafka.topic}", groupId = "cache-evict-group")
    public void consume(ConsumerRecord<String, String> record, Acknowledgment ack) {
        String json = record.value();
        if (json == null || json.isBlank()) {
            ack.acknowledge();
            return;
        }

        try {
            JSONObject msg = JSON.parseObject(json);

            String database = msg.getString("database");
            String table    = msg.getString("table");
            String type     = msg.getString("type"); // INSERT / UPDATE / DELETE

            if (concernSchema != null && !concernSchema.equalsIgnoreCase(database)) {
                ack.acknowledge();
                return;
            }

            // 只有数据变更事件有意义
            if (!("INSERT".equals(type) || "UPDATE".equals(type) || "DELETE".equals(type))) {
                ack.acknowledge();
                return;
            }

            // ---- 提取 PK 集合 ----
            Set<String> keysToEvict = extractKeys(msg, table);

            if (!keysToEvict.isEmpty()) {
                try {
                    Long delCount = redisTemplate.delete(keysToEvict);
                    log.debug("[Canal-Kafka] evict {} keys ({}), redis actualDel={}",
                            keysToEvict.size(), keysToEvict, delCount);
                } catch (Exception redisEx) {
                    // ★ 关键:Redis 挂了不能 silently skip,否则缓存永远不清理
                    log.error("[Canal-Kafka] REDIS FAIL evict keys={}, NEED COMPENSATION", keysToEvict, redisEx);
                    // TODO: 写入补偿表 / DLQ,让定时任务兜底清理
                    // 如果严重到必须停消费:throw 让 Kafka 重试
                }
            }

            // 成功处理才 commit offset
            ack.acknowledge();

        } catch (Exception parseEx) {
            log.error("[Canal-Kafka] parse fail, msg={}, WILL SKIP & ACK to avoid stuck", json, parseEx);
            // 无法解析的消息直接 ACK 跳过(否则死循环);但建议写入 dead-letter
            ack.acknowledge();
        }
    }

    private Set<String> extractKeys(JSONObject msg, String table) {
        Set<String> keys = new HashSet<>();

        Object dataObj = msg.get("data");
        if (dataObj == null) return keys;

        // data 可能是数组 [ {...}, {...} ] 也可能是单个对象 {...}
        if (dataObj instanceof JSONArray) {
            for (Object row : (JSONArray) dataObj) {
                if (row instanceof JSONObject) {
                    addKeyIfHasId(keys, table, (JSONObject) row);
                }
            }
        } else if (dataObj instanceof JSONObject) {
            addKeyIfHasId(keys, table, (JSONObject) dataObj);
        }
        return keys;
    }

    private void addKeyIfHasId(Set<String> keys, String table, JSONObject row) {
        Object idVal = row.get("id");
        if (idVal == null) idVal = row.get("ID");
        if (idVal != null) {
            keys.add(CacheKeyBuilder.of(table, idVal));
        } else {
            log.warn("[Canal-Kafka] row missing 'id' column in table={}, row={}", table, row);
        }
    }
}

enable-auto-commit: false + ack-mode: manual_immediate 是关键——保证:Redis 真正删成功了才提交 offset;Redis 宕机时报异常 → offset 不提交 → Kafka 会重投(你可以结合死信队列控制重试上限)。


关于 Canal flatMessage JSON 格式的一点补充说明

不同 Canal 版本在 flatMessage=true 时字段名略有差异,常见两种:

字段 含义
database / schema 库名
table 表名
type INSERT / UPDATE / DELETE
data 变更后的行数据([{id:"1",name:"xx"}]
old UPDATE 时旧值快照

建议你 先发一条测试消息到 Kafka,用 kafka-console-consumer 看一下真实 JSON 结构,然后微调上面的 extractKeys 即可。


8. 生产级加固 & 踩坑清单

① 必须给缓存设 TTL 兜底

即使 Canal / Kafka / Consumer 全挂了,TTL 能保证缓存不会永远脏

// 写缓存时永远带 TTL
redisTemplate.opsForValue().set(key, value, Duration.ofMinutes(30));

② 补偿机制

故障场景 对策
Redis 全挂,DEL 连续失败 consumer 捕获异常后把 (table, id, timestamp) 写本地补偿表;定时任务扫描 > 5min 仍未清掉的 key,重试或告警
Canal Server 宕机期间发生的变更 Canal 重启后会从 show master status 或 meta 位点继续 dump,不丢(前提是 binlog 没过期)
binlog 过期被 MySQL 清掉 监控 binlog 保留时长;Canal 报 BinlogMissing 时必须有告警,人工重建位点

③ binlog 保留时间

-- 别让 expire_logs_days=0(可能很快清掉)
SET GLOBAL expire_logs_days = 7;
-- 或 MySQL 8.0 用 binary log expiration
-- SET BINARY LOGS EXPIRE AFTER 604800;

④ 多表 / 多 PK 列

如果某些表主键不叫 id(如 product_idorder_no),把 resolveId / addKeyIfHasId 扩展成一个小映射:

private static final Map<String, String> TABLE_PK = Map.of(
    "product", "product_id",
    "orders",  "order_id",
    "user",    "id"
);

⑤ 并发读穿透问题(进阶)

在「缓存被 Canal 删掉 → 下一读请求回查 DB 回填」这个窗口,如果瞬间高并发读全部 miss → 全部打到 DB。

解法:对热点 key 加 分布式锁 / Redisson RLock / Bloom + 单飞(singleflight) 让回查只走一次。这属于 Cache Aside 的经典优化,不归 Canal 管。


9. 两种方案怎么选

直连 Canal(方案一) Canal → Kafka(方案二)
复杂度 低,上手快 中高(多一个 Kafka)
可靠性 客户端挂 = 暂停消费(但有重连) Kafka 持久化事件,consumer 可独立扩缩
有序性 单线程自然有序 partition=1partitionBy $pk$ 才保单行有序
多消费者 ❌ 只能一个 connector 跑(需 ZK HA 抢主) ✅ 天然多实例 + 多 consumer-group
推荐场景 小项目/内部系统/快速落地 生产核心链路、需要审计/ES同步等多下游

我的建议:能扛得住运维 Kafka 就选方案二;想最快见效、组件最少就先用方案一,后续 Kafka 迁移也很平滑(把"删缓存"逻辑从直连 handler 搬到 Kafka consumer 即可,核心 extractKeys() 不变)。


Logo

CANN开发者社区旨在汇聚广大开发者,围绕CANN架构重构、算子开发、部署应用优化等核心方向,展开深度交流与思想碰撞,携手共同促进CANN开放生态突破!

更多推荐