SpringBoot项目集成Canal技术实现数据库与缓存一致

  • Post author:
  • Post category:其他


一、Canal

1、介绍

Canal是阿里巴巴旗下的一款开源项目,纯Java开发,基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持MySQL。

2、工作原理

Canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议,mysql master收到dump请求,开始推送binary log给slave(也就是Canal),Canal解析binary log对象(原始为byte流)。

3、Canal主要实现的业务

(1)数据同步

(2)数据库实时备份

(3)实现缓存与数据库数据一致

…….

二、集成使用

1、mysql

(1)首先mysql开启binlog日志,在my.ini或my.cnf文件中配置

log‐bin=mysql‐bin     #添加这一行就ok
binlog‐format=ROW     #选择row模式
server‐id=1           #配置mysql replaction需要定义,不能和canal的slaveId重复

(2)配置完成后重启mysql服务,然后查看是否开启成功 SHOW VARIABLES LIKE ‘%log_%’;

(3)使用root账户创建mysql的canal用户并赋予权限

1 create user canal identified by 'canal';
2
3 GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
4
5 FLUSH PRIVILEGES;

2、canal

canal安装(windows服务器为例,linux环境可以使用同样的canal安装包)

(1)下载地址:

Releases · alibaba/canal · GitHub

(2)创建文件夹,解压

(3)修改conf\example文件夹下instance.properties配置文件

(4)启动canal服务(windows下是bat,linux是sh)

(5)查看canal启动日志,如下图表示启动成功

3、项目中集成

(1)添加pom依赖

<!-- canal -->
<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.1.6</version>
</dependency>
<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.protocol</artifactId>
    <version>1.1.6</version>
</dependency>

(2)application配置文件添加canal配置,ip改为canal部署的服务器的ip

# canal
canal:
  server:
    ip: 127.0.0.1
    port: 11111
  promotion:
    destination: example

(3)创建canal监听类

/**
 * @author Aruioooo
 * @date 2023-05-15 10:02
 * @description canal监听类
 */
@Slf4j
@Component
@Order(1)
public class CanalListener implements CommandLineRunner {

    @Value("${canal.server.ip}")
    private String canalServerIp;

    @Value("${canal.server.port}")
    private int canalServerPort;

    @Value("${canal.promotion.destination}")
    private String destination;

    @Override
    public void run(String... args) throws Exception {
        CanalConnector conn = CanalConnectors.newSingleConnector(
                new InetSocketAddress(canalServerIp, canalServerPort), destination, "", "");
        int batchSize = 1000;
        while (true) {
            conn.connect();
            //订阅实例中所有的数据库和表
            /*
             这里注意下:
               conn.subscribe(".*\\..*"); 会导致服务端的canal.instance.filter.regex=.*\\..* 失效。
               更严重的是canal会一直向你的example.log日志文件写入日志,
               测了一下大概12小时会写入20M大小的日志。
            */
            //conn.subscribe(".*\\..*");
            // 回滚到未进行ack的地方
            conn.rollback();
            // 获取数据 每次获取一千条改变数据
            Message message = conn.getWithoutAck(batchSize);
            //获取这条消息的id
            long id = message.getId();
            int size = message.getEntries().size();
            if (id != -1 && size > 0) {
                // 数据处理
                dataHandle(message.getEntries());
            }else {
                //暂停1秒防止重复链接数据库
                Thread.sleep(1000);
            }
            // 确认消费完成这条消息
            conn.ack(id);
            // 关闭连接
            conn.disconnect();
        }
    }

    /**
     * 数据处理
     *
     * @param entries
     */
    private void dataHandle(List<CanalEntry.Entry> entries) {
        for (CanalEntry.Entry entry : entries) {
            // 解析binlog
            CanalEntry.RowChange rowChange;
            try {
                rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("数据处理出现异常 data:" + entry.toString(), e);
            }
            if (rowChange != null) {
                // 获取操作类型
                CanalEntry.EventType eventType = rowChange.getEventType();
                for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
                    dataDetails(rowData.getBeforeColumnsList(), rowData.getAfterColumnsList(), eventType, entry);
                }
            }
        }
    }

    /**
     * 解析数据
     *
     * @param beforeColumns 操作前的数据
     * @param afterColumns  操作后的数据
     * @param eventType  操作类型(INSERT,UPDATE,DELETE)
     * @param entry
     */
    private void dataDetails(List<CanalEntry.Column> beforeColumns, List<CanalEntry.Column> afterColumns,
                                    CanalEntry.EventType eventType, CanalEntry.Entry entry) {
        log.info("数据库:" + entry.getHeader().getSchemaName());
        log.info("表名:" + entry.getHeader().getTableName());
        log.info("操作时间:" + entry.getHeader().getExecuteTime());
        log.info("操作类型:" + eventType);
        if (CanalEntry.EventType.INSERT.equals(eventType)) {
            // log.info("新增数据:"+afterColumns);
            insertHandle(entry);
        } else if (CanalEntry.EventType.DELETE.equals(eventType)) {
            // log.info("删除数据:"+beforeColumns);
            deleteHandle(entry);
        } else {
            // log.info("更新数据:更新前数据--"+beforeColumns);
            // log.info("更新数据:更新后数据--"+afterColumns);
            updateHandle(entry);
        }
    }

    /**
     * 新增数据操作
     *
     * @param entry
     */
    private void insertHandle(CanalEntry.Entry entry) {
        try {
            CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
            for (CanalEntry.RowData rowData : rowDatasList) {
                List<CanalEntry.Column> columnList = rowData.getAfterColumnsList();
                columnList.forEach(c ->{
                    log.info("字段名:"+c.getName()+",字段值:"+c.getValue());
                });
                // StringBuffer sql = new StringBuffer("insert into " + entry.getHeader().getTableName() + " (");
                // for (int i = 0; i < columnList.size(); i++) {
                //     sql.append(columnList.get(i).getName());
                //     if (i != columnList.size() - 1) {
                //         sql.append(",");
                //     }
                // }
                // sql.append(") VALUES (");
                // for (int i = 0; i < columnList.size(); i++) {
                //     sql.append("'" + columnList.get(i).getValue() + "'");
                //     if (i != columnList.size() - 1) {
                //         sql.append(",");
                //     }
                // }
                // sql.append(")");
                // 插入sql语句
                // log.info(sql.toString());
            }
        } catch (InvalidProtocolBufferException e) {
            e.printStackTrace();
        }
    }

    /**
     * 删除数据操作
     *
     * @param entry
     */
    private void deleteHandle(CanalEntry.Entry entry) {
        try {
            CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
            for (CanalEntry.RowData rowData : rowDatasList) {
                List<CanalEntry.Column> columnList = rowData.getBeforeColumnsList();
                columnList.forEach(column ->{
                    if (column.getIsKey()){
                        log.info("删除主键"+column.getName()+"值为"+column.getValue()+"的记录");
                    }
                    log.info("删除前"+column.getName()+"字段值为:"+column.getValue());
                });
                // StringBuffer sql = new StringBuffer("delete from " + entry.getHeader().getTableName() + " where ");
                // for (CanalEntry.Column column : columnList) {
                //     if (column.getIsKey()) {
                //         //暂时只支持单一主键
                //         sql.append(column.getName() + "=" + column.getValue());
                //         break;
                //     }
                // }
                // 删除sql语句
                // log.info(sql.toString());
            }
        } catch (InvalidProtocolBufferException e) {
            e.printStackTrace();
        }
    }

    /**
     * 更新数据操作
     *
     * @param entry
     */
    private void updateHandle(CanalEntry.Entry entry) {
        try {
            CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
            for (CanalEntry.RowData rowData : rowDatasList) {
                List<CanalEntry.Column> newColumnList = rowData.getAfterColumnsList();
                List<CanalEntry.Column> oldColumnList = rowData.getBeforeColumnsList();
                oldColumnList.forEach(oldColumn ->{
                    log.info("更新前"+oldColumn.getName()+"值为:"+oldColumn.getValue());
                });
                log.info("----------------------------------------------------------------------------------");
                newColumnList.forEach(newColumn ->{
                    log.info("更新后"+newColumn.getName()+"值为:"+newColumn.getValue());
                });
                // StringBuffer sql = new StringBuffer("update " + entry.getHeader().getTableName() + " set ");
                // for (int i = 0; i < newColumnList.size(); i++) {
                //     sql.append(" " + newColumnList.get(i).getName()
                //             + " = '" + newColumnList.get(i).getValue() + "'");
                //     if (i != newColumnList.size() - 1) {
                //         sql.append(",");
                //     }
                // }
                // sql.append(" where ");
                // for (CanalEntry.Column column : oldColumnList) {
                //     if (column.getIsKey()) {
                //         //暂时只支持单一主键
                //         sql.append(column.getName() + "=" + column.getValue());
                //         break;
                //     }
                // }
                // 更新sql语句
                // log.info(sql.toString());
            }
        } catch (InvalidProtocolBufferException e) {
            e.printStackTrace();
        }
    }

}

(4)测试,修改数据库数据,canal监听到数据库数据操作记录



版权声明:本文为Aruioooo原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。