canal
简介
阿里巴巴旗下的一款开源项目,纯java开发, 基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了mysql
mysql主从复制原理:
- master将改变记录到二进制日志(binary log)中
- slave将master的binary log events拷贝到它的中继日志(relay log)
- slave重做中继日志中的事件,将改变反映它自己的数据
canal原理:
- canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议
- mysql master收到dump请求,开始推送binary log给slave(也就是canal)
- canal解析binary log对象(原始为byte流)
准备
- mysql主数据库:地址、账密、建表
- mysql从数据库:地址、账密、建表
-
canal-developer下载:服务器,
wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.deployer-1.1.5.tar.gz
-
canal-adapter下载:客户端,
wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.adapter-1.1.5.tar.gz
mysql修改
-
开启binlog功能,
vi /etc/my.cnf
增加以下代码
log_bin=mysql-bin
binlog_format=ROW
# 服务编号,与其它节点不冲突就可以
server-id=1
# 每次执行操作都与磁盘进行同步
sync-binlog=1
-
重启mysql,
systemctl restart mysqld
- 查看是否开启
[root@CompoCloud ~]# mysql -uroot -p
Enter password:
Welcome to the MySQL monitor. Commands end with ; or \g.
Your MySQL connection id is 71
Server version: 8.0.23 MySQL Community Server - GPL
Copyright (c) 2000, 2021, Oracle and/or its affiliates.
Oracle is a registered trademark of Oracle Corporation and/or its
affiliates. Other names may be trademarks of their respective
owners.
Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.
mysql> show variables like 'log_bin';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| log_bin | ON |
+---------------+-------+
1 row in set (0.01 sec)
mysql> show variables like 'binlog_format';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| binlog_format | ROW |
+---------------+-------+
1 row in set (0.01 sec)
- 创建mysql canal用户并授权
mysql> create user canal identified by 'Canal@123';
Query OK, 0 rows affected (1.02 sec)
mysql> grant select, replication slave, replication client on *.* to 'canal'@'%' ;
Query OK, 0 rows affected, 1 warning (0.00 sec)
mysql> flush privileges;
Query OK, 0 rows affected (0.02 sec)
注意测试是否该账密是否能成功连接,保证账密无误
canal-deployer安装配置
-
解压,
tar -zxvf canal.deployer-1.1.5.tar.gz -C /opt/canal/canal-deployer
-
修改
canal-deployer/canal.properties
,去掉
canal.instance.parser.parallelBufferSize
前的注释
采坑:没有修改此处导致日志报错数据库连接不成功
# parallel parser config
canal.instance.parser.parallel = true
## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors()
canal.instance.parser.parallelThreadSize = 16
## disruptor ringbuffer size, must be power of 2
canal.instance.parser.parallelBufferSize = 256
-
修改
canal-deployer/conf/example/instance.properties
,配置主数据库地址和账密
#################################################
## mysql serverId , v1.0.26+ will autoGen
# canal.instance.mysql.slaveId=0
# enable gtid use true/false
canal.instance.gtidon=false
# position info
# 修改为主数据库的地址
canal.instance.master.address=[主数据库ip]:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=
# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=
# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=
# username/password
# 修改为主数据库的canal用户权限的账密
canal.instance.dbUsername=canal
canal.instance.dbPassword=Canal@123
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==
# table regex
canal.instance.filter.regex=.*\\..*
# table black regex
canal.instance.filter.black.regex=mysql\\.slave_.*
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch
# mq config
canal.mq.topic=example
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
#################################################
- 启动服务
cd /opt/canal/canal-deployer/bin
./startup.sh
#停止
./stop.sh
#重启
./stop.sh
./startup.sh
- 可通过查看日志测试是否成功
[root@CompoCloud canal-deployer]# cat logs/canal/canal.log
......
......
2021-04-21 14:45:14.905 [main] INFO com.alibaba.otter.canal.deployer.CanalStarter - ## start the canal server.
2021-04-21 14:45:14.969 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[172.26.223.190(172.26.223.190):11111]
2021-04-21 14:45:16.689 [main] INFO com.alibaba.otter.canal.deployer.CanalStarter - ## the canal server is running now ......
[root@CompoCloud canal-deployer]# cat logs/example/example.log
......
......
2021-04-21 14:45:16.387 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example
2021-04-21 14:45:16.401 [main] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^.*\..*$
2021-04-21 14:45:16.401 [main] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table black filter : ^mysql\.slave_.*$
2021-04-21 14:45:16.621 [main] INFO c.a.otter.canal.instance.core.AbstractCanalInstance - start successful....
canal-adapter安装配置
-
解压,
tar -zxvf canal.adapter-1.1.5.tar.gz -C /opt/canal/canal-adapter
-
修改
canal-adapter/conf/application.yml
,
我需要同步同一个数据库的3张表,所以只需要1个groupId,但是需要配置3个rdb,每个rdb对应不同的key值,这样是为了与之后table的配置相对应
。
采坑:不同数据库可设置多个groupId,同一数据库设置多个容易无法正确对应,会导致虽然查看日志数据更新成功,但是从数据库却没同步的情况
server:
port: 8081
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
default-property-inclusion: non_null
canal.conf:
mode: tcp #tcp kafka rocketMQ rabbitMQ
#flatMessage: true
#zookeeperHosts:
syncBatchSize: 1000
retries: 0
timeout:
accessKey:
secretKey:
consumerProperties:
# canal tcp consumer
canal.tcp.server.host: 127.0.0.1:11111
#canal.tcp.zookeeper.hosts:
#canal.tcp.batch.size: 500
#canal.tcp.username:
#canal.tcp.password:
# kafka consumer
#kafka.bootstrap.servers: 127.0.0.1:9092
#kafka.enable.auto.commit: false
#kafka.auto.commit.interval.ms: 1000
#kafka.auto.offset.reset: latest
#kafka.request.timeout.ms: 40000
#kafka.session.timeout.ms: 30000
#kafka.isolation.level: read_committed
#kafka.max.poll.records: 1000
# rocketMQ consumer
#rocketmq.namespace:
#rocketmq.namesrv.addr: 127.0.0.1:9876
#rocketmq.batch.size: 1000
#rocketmq.enable.message.trace: false
#rocketmq.customized.trace.topic:
#rocketmq.access.channel:
#rocketmq.subscribe.filter:
# rabbitMQ consumer
#rabbitmq.host:
#rabbitmq.virtual.host:
#rabbitmq.username:
#rabbitmq.password:
#rabbitmq.resource.ownerId:
srcDataSources:
defaultDS:
url: jdbc:mysql://127.0.0.1:3306/test1?useUnicode=true # 主数据库地址
username: root # 主数据库用户名
password: root # 主数据库密码
canalAdapters:
- instance: example # canal instance Name or mq topic name
groups:
- groupId: g1
outerAdapters:
- name: logger
- name: rdb
key: mysql1
properties:
jdbc.driverClassName: com.mysql.jdbc.Driver
jdbc.url: jdbc:mysql://192.168.80.100:3306/test2?useUnicode=true # 从数据库地址
jdbc.username: root # 从数据库用户
jdbc.password: root # 从数据库密码
- name: rdb
key: mysql2
properties:
jdbc.driverClassName: com.mysql.jdbc.Driver
jdbc.url: jdbc:mysql://192.168.80.100:3306/test2?useUnicode=true
jdbc.username: root
jdbc.password: root
- name: rdb
key: mysql3
properties:
jdbc.driverClassName: com.mysql.jdbc.Driver
jdbc.url: jdbc:mysql://192.168.80.100:3306/test2?useUnicode=true
jdbc.username: root
jdbc.password: root
# - name: rdb
# key: oracle1
# properties:
# jdbc.driverClassName: oracle.jdbc.OracleDriver
# jdbc.url: jdbc:oracle:thin:@localhost:49161:XE
# jdbc.username: mytest
# jdbc.password: m121212
# - name: rdb
# key: postgres1
# properties:
# jdbc.driverClassName: org.postgresql.Driver
# jdbc.url: jdbc:postgresql://localhost:5432/postgres
# jdbc.username: postgres
# jdbc.password: 121212
# threads: 1
# commitSize: 3000
# - name: hbase
# properties:
# hbase.zookeeper.quorum: 127.0.0.1
# hbase.zookeeper.property.clientPort: 2181
# zookeeper.znode.parent: /hbase
# - name: es
# hosts: 127.0.0.1:9300 # 127.0.0.1:9200 for rest mode
# properties:
# mode: transport # or rest
# # security.auth: test:123456 # only used for rest mode
# cluster.name: elasticsearch
# - name: kudu
# key: kudu
# properties:
# kudu.master.address: 127.0.0.1 # ',' split multi address
-
修改
canal-adapter/conf/rdb
目录下的.yml文件,
一个文件只能对应一张table,所以我们需要建立三个yml配置文件(文件名自定义就行),groupId不变,key需要与application.yml的key值对应
采坑:application配置同一个数据库的三个groupId,此处将所有表写在同一份yml文件中,也即同样配置了三个groupId,虽然查看日志数据更新成功,但是从数据库却没同步
mytest_user.yml
dataSourceKey: defaultDS
destination: example
groupId: g1
outerAdapterKey: mysql1 #注意对应
concurrent: true
dbMapping:
database: test1 # 主数据库database
table: user # 主数据库表名
targetTable: user # 从数据库表名
targetPk:
id: id # 主键,注意大小写等对应
mapAll: true # 主从数据库表结构相同时使用,否则通过targetColumns一一对应列名
#targetColumns:
# id:
# name:
# role_id:
# c_time:
# test1:
etlCondition: 'where c_time>={}'
commitBatch: 3000 # 批量提交的大小
## Mirror schema synchronize config
#dataSourceKey: defaultDS
#destination: example
#groupId: g1
#outerAdapterKey: mysql1
#concurrent: true
#dbMapping:
# mirrorDb: true
# database: mytest
mytest_company.yml
dataSourceKey: defaultDS
destination: example
groupId: g1
outerAdapterKey: mysql2
concurrent: true
dbMapping:
database: test1
table: company
targetTable: company
targetPk:
id: id
mapAll: true
#targetColumns:
# id:
# name:
# role_id:
# c_time:
# test1:
etlCondition: 'where c_time>={}'
commitBatch: 3000 # 批量提交的大小
## Mirror schema synchronize config
#dataSourceKey: defaultDS
#destination: example
#groupId: g1
#outerAdapterKey: mysql1
#concurrent: true
#dbMapping:
# mirrorDb: true
# database: mytest
mytest_project.yml
dataSourceKey: defaultDS
destination: example
groupId: g1
outerAdapterKey: mysql3
concurrent: true
dbMapping:
database: test1
table: project
targetTable: project
targetPk:
id: id
mapAll: true
#targetColumns:
# id:
# name:
# role_id:
# c_time:
# test1:
etlCondition: 'where c_time>={}'
commitBatch: 3000 # 批量提交的大小
## Mirror schema synchronize config
#dataSourceKey: defaultDS
#destination: example
#groupId: g1
#outerAdapterKey: mysql1
#concurrent: true
#dbMapping:
# mirrorDb: true
# database: mytest
- 启动服务
cd /opt/canal/canal-adapter/bin
./startup.sh
#停止
./stop.sh
#重启
./stop.sh
./startup.sh
- 可通过查看日志测试是否成功,以及报错信息
cat canal-adapter/logs/adapter/adapter.log
测试
对主数据库进行增删改操作,查看从数据库变化。若出现错误,多查日志。
遇到过的问题
canal caching_sha2_password Auth failed
cat logs/example/example.log
2021-08-20 15:07:00.541 [destination = example , address = /39.98.131.85:3306 , EventParser] ERROR com.alibaba.otter.canal.common.alarm.LogAlarmHandler - destination:example[com.alibaba.otter.canal.parse.exception.CanalParseException: java.io.IOException: connect /39.98.131.85:3306 failure
Caused by: java.io.IOException: connect /39.98.131.85:3306 failure
at com.alibaba.otter.canal.parse.driver.mysql.MysqlConnector.connect(MysqlConnector.java:85)
at com.alibaba.otter.canal.parse.inbound.mysql.MysqlConnection.connect(MysqlConnection.java:90)
at com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser.preDump(MysqlEventParser.java:86)
at com.alibaba.otter.canal.parse.inbound.AbstractEventParser$1.run(AbstractEventParser.java:176)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: caching_sha2_password Auth failed
at com.alibaba.otter.canal.parse.driver.mysql.MysqlConnector.negotiate(MysqlConnector.java:260)
at com.alibaba.otter.canal.parse.driver.mysql.MysqlConnector.connect(MysqlConnector.java:82)
... 4 more
]
原因:
mysql版本为8.0,创建用户
canal
时默认的密码加密方式为
caching_sha2_password
,修改为
mysql_native_password
即可
解决
-
查看canal-deployer自己所配置的数据库用户名
cat /opt/canal/canal-deployer/conf/example/instance.properties ...... # username/password canal.instance.dbUsername=canal canal.instance.dbPassword=Canal@2021 canal.instance.connectionCharset = UTF-8 ......
2.mysql修改
mysql -u root -p
use mysql;
select host, user, authentication_string, plugin from user;
+-----------+------------------+------------------------------------------------------------------------+-----------------------+
| host | user | authentication_string | plugin |
+-----------+------------------+------------------------------------------------------------------------+-----------------------+
| % | canal | $A$005$ lHEV,d=
sC)~oHtEw9QVDdn5xJUGrbsI85EH30/vaIFAygRyZ38w0Sz5 | caching_sha2_password |
| % | root | *C432B98D4B9892F9998F0B6ECBCEC94936308E2A | mysql_native_password |
| localhost | mysql.infoschema | $A$005$THISISACOMBINATIONOFINVALIDSALTANDPASSWORDTHATMUSTNEVERBRBEUSED | caching_sha2_password |
| localhost | mysql.session | $A$005$THISISACOMBINATIONOFINVALIDSALTANDPASSWORDTHATMUSTNEVERBRBEUSED | caching_sha2_password |
| localhost | mysql.sys | $A$005$THISISACOMBINATIONOFINVALIDSALTANDPASSWORDTHATMUSTNEVERBRBEUSED | caching_sha2_password |
| localhost | root | *C432B98D4B9892F9998F0B6ECBCEC94936308E2A | mysql_native_password |
+-----------+------------------+------------------------------------------------------------------------+-----------------------+
ALTER USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY 'Canal@2021';
FLUSH PRIVILEGES;
版权声明:本文为lorogy原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。