数据同步之初识Canal

  • Post author:
  • Post category:其他


git地址:

阿里巴巴Canal的Git地址

Canal基于日志增量订阅和消费的业务包括:

  • 数据库镜像、数据库实时备份
  • 索引构建和实时维护(拆分异构索引、倒排索引)
  • 业务cache刷新、带业务逻辑的增量数据处理

Mysql 的数据同步的架构图:

在这里插入图片描述

Canal是把自己伪装成一个从库:

在这里插入图片描述


Canal的优点

: 实时性好、分布式、ACK机制


Canal的缺点

  • 只支持增量同步,不支持全量同步
  • MYSQL -> ES、RDB
  • 一个instance只能有一个消费端消费
  • 单点压力大


Canal的组件



在这里插入图片描述

Cancal 和 ES 的数据同步的架构: Canal可以和Kafka无缝连接

在这里插入图片描述

Cancal配置:

  1. 开启mysql得bin-log日志
	## mysql配置修改文件:
	vim /etc/my.cnf
    
    [mysqld]
    log-bin=mysql-bin # 开启 binlog
    binlog-format=ROW # 选择 ROW 模式
    server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
    
    ## 重启服务
    ## systemctl stop mariadb  
    ## systemctl start mariadb
    mysql -uroot -proot
    show variables like '%log_bin%';
    
    ## 授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant
    CREATE USER root IDENTIFIED BY 'root';  
    GRANT ALL PRIVILEGES ON *.* TO 'root'@'%' IDENTIFIED BY 'root' WITH GRANT OPTION;
    
    -- CREATE USER canal IDENTIFIED BY 'canal';  
    -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' IDENTIFIED BY 'canal' WITH GRANT OPTION;
    -- GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
    
    FLUSH PRIVILEGES;
  1. Canal 配置
	## 创建文件夹并 解压 canal
    mkdir /usr/local/canal
    tar -zxvf canal.deployer-1.1.4.tar.gz -C /usr/local/canal/
    
    ## 配置文件
    vim /usr/local/canal/conf/canal.properties
    ## java程序连接端口
    canal.port = 11111
    
    vim /usr/local/canal/conf/example/instance.properties
    ## 不能与已有的mysql节点server-id重复
    canal.instance.mysql.slaveId=1001
    ## mysql master的地址
    canal.instance.master.address=192.168.11.31:3306
    
    ## 修改内容如下:
    canal.instance.dbUsername=root #指定连接mysql的用户密码
    canal.instance.dbPassword=root
    canal.instance.connectionCharset = UTF-8 #字符集
    
    ## 启动canal
    cd /usr/local/canal/bin
    ./startup.sh
    
    ## 验证服务
    cat /usr/local/canal/logs/canal/canal.log
    ## 查看实例日志
    tail -f -n 100 /usr/local/canal/logs/example/example.log


Canal 与 MQ 整合



官方文档:

Canal Kafka RocketMQ QuickStart


Java 操作 Canal例子

官方java例子: ClientSample.java

package com.alibaba.otter.canal.sample;
import java.net.InetSocketAddress;
import java.util.List;


import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;


public class SimpleCanalClientExample {


public static void main(String args[]) {
    // 创建链接
    CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),
                                                                                        11111), "example", "", "");
    int batchSize = 1000;
    int emptyCount = 0;
    try {
        connector.connect();
        connector.subscribe(".*\\..*");
        connector.rollback();
        int totalEmptyCount = 120;
        while (emptyCount < totalEmptyCount) {
            Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
            long batchId = message.getId();
            int size = message.getEntries().size();
            if (batchId == -1 || size == 0) {
                emptyCount++;
                System.out.println("empty count : " + emptyCount);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                }
            } else {
                emptyCount = 0;
                // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
                printEntry(message.getEntries());
            }

            connector.ack(batchId); // 提交确认
            // connector.rollback(batchId); // 处理失败, 回滚数据
        }

        System.out.println("empty too many times, exit");
    } finally {
        connector.disconnect();
    }
}

private static void printEntry(List<Entry> entrys) {
    for (Entry entry : entrys) {
        if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
            continue;
        }

        RowChange rowChage = null;
        try {
            rowChage = RowChange.parseFrom(entry.getStoreValue());
        } catch (Exception e) {
            throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
                                       e);
        }

        EventType eventType = rowChage.getEventType();
        System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",
                                         entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                                         entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                                         eventType));

        for (RowData rowData : rowChage.getRowDatasList()) {
            if (eventType == EventType.DELETE) {
                printColumn(rowData.getBeforeColumnsList());
            } else if (eventType == EventType.INSERT) {
                printColumn(rowData.getAfterColumnsList());
            } else {
                System.out.println("-------&gt; before");
                printColumn(rowData.getBeforeColumnsList());
                System.out.println("-------&gt; after");
                printColumn(rowData.getAfterColumnsList());
            }
        }
    }
}

private static void printColumn(List<Column> columns) {
    for (Column column : columns) {
        System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
    }
}

}



版权声明:本文为weixin_43538215原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。