使用canal同步mysql增量数据至rocketmq
组件 |
版本 |
canal |
1.1.5 |
rocketmq |
4.9.2 |
mysql |
5.7 |
zk |
3.5.6 |
canal使用参考地址:
https://github.com/alibaba/canal
主要配置如下:
canal.properties:
# tcp, kafka, rocketMQ, rabbitMQ
canal.serverMode = rocketMQ
#数据库名称
canal.destinations = rocketmq
#rocketmq连接地址:
canal.aliyun.accessKey = rocketmq2
canal.aliyun.secretKey = 12345678
canal.mq.servers= 127.0.0.1:9876
#zk连接地址
canal.zkServers: zookeeper1:2181,zookeeper2:2181,zookeeper3:2181
instance.properties:
#数据库连接地址
canal.instance.master.address=127.0.0.1:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
#数据里匹配,数据库.表名,可使用正则匹配,详细配置参考github-wiki文档
canal.instance.filter.regex=数据库名.表名
#rocketmq的目标topic
canal.mq.topic=test01
#指定topic的队列数.也可针对不同topic配置不同队列数,例:canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
canal.mq.partitionsNum=4
#分区路由策略
canal.mq.partitionHash=数据库名.表名:pk1
#动态获取MQ服务端的分区数,如果设置为true之后会自动根据topic获取分区数替换canal.mq.partitionsNum的定义
canal.mq.enableDynamicQueuePartition=true
canal.instance.connectionCharset = UTF-8
详细配置参考地址:
Canal Kafka RocketMQ QuickStart · alibaba/canal Wiki · GitHub
配置完安装官方提示通过docker或者本地脚本启动服务即可
问题:
1,消费速度过慢问题:
如果数据库的update操作比较频繁,按照pk级做hash时的取模会导致update的数据落入到某些队列数据远远大于其他队列,这时的消费端消费会不均衡,导致积压量过大,严重的会
导致消费速度跟不上时间长了以后最终导致数据过期无法消费到数据,但一般不会出现3天还未消费完的情况
建议主要是从业务端限制频繁更新,限制不了的可更换路由策略,使用主键+频繁更新的字段作为主键亦可
2,cannal消费顺序性问题:
单topic单分区,可以严格保证和binlog一样的顺序性,缺点就是性能比较慢,单分区的性能写入大概在2~3k的TPS
多topic单分区,可以保证表级别的顺序性,一张表或者一个库的所有数据都写入到一个topic的单分区中,可以保证有序性,针对热点表也存在写入分区的性能问题
单topic、多topic的多分区,如果用户选择的是指定table的方式,那和第二部分一样,保障的是表级别的顺序性(存在热点表写入分区的性能问题),如果用户选择的是指定pk hash的方式,那只能保障的是一个pk的多次binlog顺序性** pk hash的方式需要业务权衡,这里性能会最好,但如果业务上有pk变更或者对多pk数据有顺序性依赖,就会产生业务处理错乱的情况. 如果有pk变更,pk变更前和变更后的值会落在不同的分区里,业务消费就会有先后顺序的问题,需要注意