mysql+canal+rabbitMq+SpringCloud 实现数据库数据同步监听

  • Post author:
  • Post category:mysql




前言

项目开发中,数据同步一直是一个令人头疼的问题。在业务量小,场景不多,数据量不大的情况下我们可能会选择在项目中直接写一些定时任务手动处理数据,例如从多个表将数据查出来,再汇总处理,再插入到相应的地方。但是随着业务量增大,数据量变多以及各种复杂场景下的分库分表的实现,使数据同步变得越来越困难。

以前用过阿里的Datax,一款离线数据同步工具,通过在项目中定时调用指定的shell脚本,来实现在各种异构数据源之间的数据同步。刚好最近又用到了阿里的canal,于是就抱着学习的态度在本地搭了一套模拟环境。虽然网上也有很多教程,但是总有那么些坑隐藏在其中,因此顺手记录一下也给初学者一点帮助。

二话不说,开干!



mysql

版本: 8.0

MySQL的安装就没啥好说的了,基本上一路下一步就行了。但是这里有一个坑,MySQL8.0好像会在解压的时候自动给你安装一个

MySQL80

的服务。
image-20200907182848440

Windows下的压缩包里面可能没有

my.ini

文件,需要自己配置一个。然后参照canal的QuickStart来配置就好了。无非就是开始MySQL的binlog模式,为canal服务创建一个具有特定权限的账号密码。


canal 快速开始地址

这里贴上我自己的配置文件全图:
image-20200907184614219

配置完了之后就是重启一下Mysql服务了,这时候坑来了。在我用命令行重启MySQL的时候死活启动不了,在ERR文件中看到报错日志一直提示端口被占用,手动Kill掉端口之后

net start mysql

还是一启动立马挂掉。原来系统中MySQL80这个服务是默认安装的时候自动给你注册上去的MySQL服务,我们自定义的MySQL服务根本没有启动。于是在系统中选择将MySQL80服务直接设为禁止启动,将MySQL设置系统服务开机自启,这样才成功将MySQL启动起来。进入MySQL的用户表中,看到我们的账户也创建好了。
image-20200907185550399

同时使用


show variables like '%log_bin%';


查看是否成功启用MySQL的binlog功能。
image-20200907185900468



RabbitMQ

版本:
image-20200907192237285

RabbitMQ安装也没啥好说的,教程一大堆。

安装完成后使用默认guest登录,新建一个Exchange和Queues,并将它们绑定到一起去。

  • 新建Exchange

image-20200907192747481

  • 新建Queues

    同时新建路由键routing key备用。

  • 创建连接MQ的账号密码,同时顺手一起创建一个消费者账号

    canalConsumer

    用于SpringCloud项目中连接MQ所用。

    image-20200907193217904

    小提示:创建好账号密码后一定要给这两个账号密码赋予相应的Vhost权限。



canal

版本:1.1.5

canal是从1.1.1版本之后开始支持将数据直接投递到MQ。虽然github还是只重点写了kafka和RocketMQ,但是看最近的版本迭代和代码,似乎也开始支持RabbitMQ。所以直接下了最新的1.1.5版本。一看,插件包里果然有三种MQ的jar包了
image-20200907191346486

然后就是设置配置文件启动了。大部分配置是默认设置,不需要动,主要就是一些连接配置了,canal与数据库的连接,canal与RabbitMQ的连接。

  • canal.properties

    #模式设置为rabbitMq模式
    canal.serverMode = rabbitMQ
    
    ##################################################
    ######### 		    RabbitMQ	     #############
    ##################################################
    #地址,不需要端口
    rabbitmq.host = 127.0.0.1
    #当前Vhost
    rabbitmq.virtual.host = /
    #刚才配置的交换机
    rabbitmq.exchange = cannal-exchange
    #刚才配置的账号密码
    rabbitmq.username = cannal
    rabbitmq.password = cannal
    
  • instance.properties

    #指定binlog的同步位置(github有个issues问道如果不配置的话怎么同步,回答是:通过show master status,最新的位置开始binlog获取)
    canal.instance.master.journal.name= binlog.000025
    canal.instance.master.position= 556
    # username/password 连接,其实已经配好了
    canal.instance.dbUsername=canal
    canal.instance.dbPassword=canal
    canal.instance.connectionCharset = UTF-8
    # mq config, 指定 rabbitmq 设置绑定的路由, 详见"配置rabbitmq"步骤里的第三步配置的`Routing key`
    canal.mq.topic=cannal-exchange-routing
    

其上,上面两个配置文件一个是canal配置文件,一个是实例的配置文件。看配置文件还支持集群,但是我们单机下配置其实还是挺少的。在配置过程中,难免或多或少会遇到一些问题,一开始总是找不到方向去网上查看别人的配置方法,结果还是各种不行,连接报错,canal启动不了。最后发现在log目录下会有各自的log日志文件:
image-20200907194904356

其实会告诉你各种连接不上启动不了的原因,跟着日志去一个个解决,比你自己在网上找的那些教程要靠谱多了,因为每个人遇到的情况总是不一样的,要根据实际情况去解决对应的错误。下面看看这些错误日志:

2020-08-31 10:43:59.017 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties]
2020-08-31 10:43:59.388 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example 
2020-08-31 10:43:59.394 [main] WARN  c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^.*\..*$
2020-08-31 10:43:59.394 [main] WARN  c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table black filter : ^mysql\.slave_.*$
2020-08-31 10:43:59.407 [destination = example , address = /127.0.0.1:3306 , EventParser] ERROR c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - dump address /127.0.0.1:3306 has an error, retrying. caused by 
com.alibaba.otter.canal.parse.exception.CanalParseException: java.io.IOException: connect /127.0.0.1:3306 failure
Caused by: java.io.IOException: connect /127.0.0.1:3306 failure
	at com.alibaba.otter.canal.parse.driver.mysql.MysqlConnector.connect(MysqlConnector.java:83) ~[canal.parse.driver-1.1.5-SNAPSHOT.jar:na]
	at com.alibaba.otter.canal.parse.inbound.mysql.MysqlConnection.connect(MysqlConnection.java:89) ~[canal.parse-1.1.5-SNAPSHOT.jar:na]
	at com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser.preDump(MysqlEventParser.java:86) ~[canal.parse-1.1.5-SNAPSHOT.jar:na]
	at com.alibaba.otter.canal.parse.inbound.AbstractEventParser$3.run(AbstractEventParser.java:183) ~[canal.parse-1.1.5-SNAPSHOT.jar:na]
	at java.lang.Thread.run(Unknown Source) [na:1.8.0_261]
Caused by: java.io.IOException: caching_sha2_password Auth failed
	at com.alibaba.otter.canal.parse.driver.mysql.MysqlConnector.negotiate(MysqlConnector.java:257) ~[canal.parse.driver-1.1.5-SNAPSHOT.jar:na]
	at com.alibaba.otter.canal.parse.driver.mysql.MysqlConnector.connect(MysqlConnector.java:80) ~[canal.parse.driver-1.1.5-SNAPSHOT.jar:na]
	... 4 common frames omitted
2020-08-31 10:43:59.410 [destination = example , address = /127.0.0.1:3306 , EventParser] ERROR com.alibaba.otter.canal.common.alarm.LogAlarmHandler - destination:example[com.alibaba.otter.canal.parse.exception.CanalParseException: java.io.IOException: connect /127.0.0.1:3306 failure
Caused by: java.io.IOException: connect /127.0.0.1:3306 failure
	at com.alibaba.otter.canal.parse.driver.mysql.MysqlConnector.connect(MysqlConnector.java:83)
	at com.alibaba.otter.canal.parse.inbound.mysql.MysqlConnection.connect(MysqlConnection.java:89)
	at com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser.preDump(MysqlEventParser.java:86)
	at com.alibaba.otter.canal.parse.inbound.AbstractEventParser$3.run(AbstractEventParser.java:183)
	at java.lang.Thread.run(Unknown Source)
Caused by: java.io.IOException: caching_sha2_password Auth failed
	at com.alibaba.otter.canal.parse.driver.mysql.MysqlConnector.negotiate(MysqlConnector.java:257)
	at com.alibaba.otter.canal.parse.driver.mysql.MysqlConnector.connect(MysqlConnector.java:80)
	... 4 more
]

例如MySQL连接失败,权限不足等问题,跟着错误日志去解决问题,是最快的方法了。

两个配置文件配置完成后启动bin目录下startup.bat文件,可以看到canal成功启动起来了。

image-20200908102419048

同时观察log目录下canal的日志文件文件与example的日志文件,发现均已成功启动,没有报错,那差不多canal就已经启动成功并且连接上RabbitMQ了。

canal的Log:

2020-09-07 15:23:49.862 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## set default uncaught exception handler
2020-09-07 15:23:49.898 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## load canal configurations
2020-09-07 15:23:50.044 [main] INFO  com.alibaba.otter.canal.deployer.CanalStarter - ## start the canal server.
2020-09-07 15:23:50.238 [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[172.19.46.76(172.19.46.76):11111]
2020-09-07 15:23:50.944 [main] INFO  com.alibaba.otter.canal.deployer.CanalStarter - ## the canal server is running now ......
2020-09-07 16:43:33.695 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## set default uncaught exception handler
2020-09-07 16:43:33.729 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## load canal configurations
2020-09-07 16:43:33.881 [main] INFO  com.alibaba.otter.canal.deployer.CanalStarter - ## start the canal server.
2020-09-07 16:43:34.080 [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[172.19.46.76(172.19.46.76):11111]
2020-09-07 16:43:34.811 [main] INFO  com.alibaba.otter.canal.deployer.CanalStarter - ## the canal server is running now ......

example的Log:

2020-09-07 16:43:34.570 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
2020-09-07 16:43:34.573 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties]
2020-09-07 16:43:34.716 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example 
2020-09-07 16:43:34.721 [main] WARN  c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^.*\..*$
2020-09-07 16:43:34.721 [main] WARN  c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table black filter : ^mysql\.slave_.*$
2020-09-07 16:43:34.753 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> begin to find start position, it will be long time for reset or first position
2020-09-07 16:43:34.807 [main] INFO  c.a.otter.canal.instance.core.AbstractCanalInstance - start successful....
2020-09-07 16:43:34.823 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - prepare to find start position just last position
 {"identity":{"slaveId":-1,"sourceAddress":{"address":"activate.navicat.com","port":3306}},"postion":{"gtid":"","included":false,"journalName":"binlog.000025","position":2909,"serverId":1,"timestamp":1599463353000}}
2020-09-07 16:43:34.861 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=binlog.000025,position=2909,serverId=1,gtid=,timestamp=1599463353000] cost : 70ms , the next step is binlog dump

至此环境就已经搭建的差不多了,然后尝试着修改数据库表数据,观察RabbitMQ的Overview,发现果然有一条数据被推送了进来,一阵莫名的激动。
Canal



SpringCloud接受MQ消息

经过上面的环境搭建,canal已经将解析的binlog日志推送到了MQ中。然后接下来就是我们从MQ中取出相应的数据,来经行相应的操作了。

首先,添加RabbitMQ依赖

		<!--添加rabbitMq依赖-->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
        <!--添加rabbitMq依赖-->

然后,yml文件配置RabbitMQ连接

  rabbitmq:
    addresses: amqp://localhost:5672
    username: canalConsumer
    password: canalConsumer
  cloud:
    stream:
      #设置默认的binders
      default-binder: rabbit
      binders:
        rabbit:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                # 连接到指定Vhost
                virtual-host: /
      bindings:
        canal_data_input:
          binder: rabbit
          destination: cannal-exchange
          group: canal_data_input_group
      rabbit:
        bindings:
          canal_data_input:
            consumer:
              exchangeType: direct
              binding-routing-key: cannal-exchange-routing
              autoBindDlq: true
              republishToDlq: true
              deadLetterExchange: CANAL_DATA_DLX

最后使用注解方式监听通道获取信息:

@Slf4j
@EnableBinding(CanalMessageSink.class)
@MessageEndpoint(value = "canalMessageConsumer")
public class MqMesgConsumer {

    @StreamListener(CanalMessageSink.CANAL_DATA_INPUT)
    public void acceptUpdateGoodsMQ(Message<String> message) {
        System.out.println("CANAL接收MQ消息 ->"+JSONObject.toJSONString(message));
    }
}
public interface CanalMessageSink {

    /**
     * 接收canal的mq消息
     */
    String CANAL_DATA_INPUT = "canal_data_input";

    /**
     * 接收canal的mq消息
     *
     * @return
     */
    @Input(CanalMessageSink.CANAL_DATA_INPUT)
    SubscribableChannel canalDataInput();
}

启动消费者,然后在数据库里面修改一条数据,直接在项目中拿到了MQ投递过来的消息。

image-20200908112907150

紧接着,我们来看看投递过来的数据到底是什么样子,将JSON数据格式化之后,庐山真面目终于被揭开。

image-20200908113120737

数据中有数据库名database,表名table,操作类型type,时间等等关键信息,拥有了这些关键信息,那么对于数据我们自己想怎么玩就怎么玩了。



结束

那么这次整个环境的配置搭建到此就告一段落,大致也了解了一下canal同步数据是如何使用的。学习一个新的工具的使用给我的感觉就跟学开车一样,二话不说先开起来,先学会怎么开车,车开的溜了发现车跑得慢要升级,再去研究车的构造。没有谁学开车的时候,教练二话不说,啪的打开车头盖给你说讲发动机原理吧,那TM的也太硬核了。。。。

所以,兄弟们,赶紧搭一个玩起来吧~



版权声明:本文为u011232863原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。