目录
- Canal安装部署
1.1. 服务器准备
1.2. 设置主机名并配置hosts
1.3. 免密设置
1.4. 设置ntp时间
1.5. 关闭防火墙
1.6. 关闭selinux
1.7. 安装JDK
1.8. 安装zookeeper
1.9. 安装scala - 安装Kafka
2.1. 解压
2.2. 配置环境变量
2.3. 修改配置文件
2.4. 再次修改server.properties
2.5. 创建日志目录
2.6. Kafka集群启动与测试
2.7. topic数据发送与消费
2.8. Kafka集群监控–KafkaOffsetMonitor(老的方式)
2.9. Kafka集群监控–KafkaCenter
2.9.1. 下载
2.9.2. 初始化
2.9.3. 编辑 application.properties属性文件
2.9.4. 编译和运行 - 安装mysql
3.1. 卸载原来的mysql
3.2. 创建canal账号
3.3. 开启Binlog写入功能 - Canal快速安装部署
4.1. 机器准备
4.2. 下载canal
4.3. 解压缩
4.4. 修改配置文件
4.5. 创建example的topic
4.6. 启动canal服务
4.7. 验证功能
4.8. 准备数据库测试数据
4.9. ERROR c.a.otter.canal.server.netty.handler.SessionHandler – something goes wrong with channel:[id: 0x106d73f2, /192.168.106.1:1312 :> /192.168.106.103:11111], exception=java.nio.channels.ClosedChannelException
4.10. 数据监控微服务 - Canal Server+Canal Client HA
5.1. 机器准备
5.2. 下载canal
5.3. 解压缩
5.4. 修改配置文件
5.4.1. 修改 canal.properties
5.4.2. 修改 instance.properties
5.4.3. 另外一台canal server配置
5.4.4. 启动Zookeeper服务
5.4.5. 启动canal服务(两个canal同时启动)
5.4.6. 客户端链接消费数据 - MySQL+Canal+Kafka集成开发
6.1. 机器准备
6.2. 下载canal
6.3. 解压缩
6.4. 修改配置文件
6.4.1. 修改instance.properties
6.4.2. 修改canal.properties
6.5. 启动相关服务
6.5.1. 启动zookeeper服务
6.5.2. 启动Kafka服务
6.5.3. 打开Kafka消费者
6.5.4. 启动Canal服务
6.5.5. 观察Kafka消费者
1.Canal安装部署
1.1.服务器准备
IP | 主机名 | 系统 | 组件 |
---|---|---|---|
192.168.106.103 | node1 | CentOS Linux release 7.4.1708 (Core) | Zookeeper,kafka (master),canal单集 |
192.168.106.104 | node2 | CentOS Linux release 7.4.1708 (Core) | Zookeeper,kafka (slave),canal-ha(master) |
192.168.106.105 | node3 | CentOS Linux release 7.4.1708 (Core) | Zookeeper,kafka (slave),canal-ha(slave) |
1.2.设置主机名并配置hosts
四台机器分别执行:vim /etc/hostname ,分别修改为:node1,node2,node3
然后配置hosts,具体内容如下:
[root@node1 ~]# vim /etc/hosts
192.168.106.103 node1
192.168.106.104 node2
192.168.106.105 node3
1.3.免密设置
四台机器上分别执行:
ssh-keygen -t rsa
ssh-copy-id node1
ssh-copy-id node2
ssh-copy-id node3
1.4.设置ntp时间
参考文档:https://blog.csdn.net/tototuzuoquan/article/details/108900206
1.5.关闭防火墙
systemctl status firewalld.service # 查看防火墙的状态
systemctl stop firewalld.service # 关闭防火墙
systemctl disable firewalld.service # 设置开机不启动
systemctl is-enabled firewalld.service # 查看防火墙服务是否设置开机启动
1.6.关闭selinux
https://www.linuxidc.com/Linux/2016-11/137723.htm
1.7.安装JDK
四台机器中解压jdk,然后配置环境变量,例如:
export JAVA_HOME=/root/jdk1.8.0_161
export JRE_HOME=$JAVA_HOME/jre
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib/rt.jar
export PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin
然后在每台机器上执行:source /etc/profile
1.8.安装zookeeper
参考文章:https://blog.csdn.net/tototuzuoquan/article/details/54003140
其中zoo.cfg的内容如下:
# The number of milliseconds of each tick
tickTime=5000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=/root/apache-zookeeper-3.6.2-bin/data
dataLogDir=/root/apache-zookeeper-3.6.2-bin/log
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1
## Metrics Providers
#
# https://prometheus.io Metrics Exporter
#metricsProvider.className=org.apache.zookeeper.metrics.prometheus.PrometheusMetricsProvider
#metricsProvider.httpPort=7000
#metricsProvider.exportJvmInfo=true
server.1=node1:2888:3888
server.2=node2:2888:3888
server.3=node3:2888:3888
将上面zookeeper远程拷贝到node1、node2、node3上。
进入node1、node2、node3的/root/apache-zookeeper-3.6.2-bin/data,分别执行:
echo 1 > myid # node1上执行
echo 2 > myid # node2上执行
echo 3 > myid # node3上执行
然后分别进入node1、node2、node3上执行:
# 启动zk
$ZOOKEEPER_HOME/bin/zkServer.sh start
# 查看zk的状态
$ZOOKEEPER_HOME/bin/zkServer.sh status
1.9.安装scala
此部分略。
配置环境变量
export SCALA_HOME=/root/scala-2.12.12
export PATH=$PATH:$SCALA_HOME/bin
2.安装Kafka
2.1.解压
使用如下命令,解压kafka安装包:
tar -zxvf kafka_2.12-2.6.0.tgz
删除Kafka安装包:
rm -rf kafka_2.12-2.6.0.tgz
2.2.配置环境变量
环境变量如下:
export SCALA_HOME=/root/scala-2.12.12
export PATH=$PATH:$SCALA_HOME/bin
export KAFKA_HOME=/root/kafka_2.12-2.6.0
export PATH=$PATH:$KAFKA_HOME/bin
然后执行:source /etc/profile
2.3.修改配置文件
cd $KAFKA_HOME/config
1、修改zookeeper.properties文件
[root@node1 config]# vim zookeeper.properties
# ZooKeeper数据存储路径与Zookeeper配置文件保持一致
dataDir=/root/apache-zookeeper-3.6.2-bin/data
2、修改consumer.properties
[root@node1 config]# vim consumer.properties
# 配置 Zookeeper 集群连接地址
zookeeper.connect=node1:2181,node2:2181,node3:2181
3 修改producer.properties
[root@node1 config]# vim producer.properties
# 修改kafka集群配置地址
bootstrap.servers=node1:9092,node2:9092,node3:9092
4 修改server.properties
[root@node1 config]# vim server.properties
# 配置ZooKeeper集群地址
zookeeper.connect=node1:2181,node2:2181,node3:2181
# 存储日志文件目录
log.dirs=/tmp/kafka-logs # 这个路径可以修改
将kafka等同步到各机器节点(在node1节点上执行)
[root@node1 ~]# scp -r kafka_2.12-2.6.0 root@node2:$PWD
[root@node1 ~]# scp -r kafka_2.12-2.6.0 root@node3:$PWD
2.4.再次修改server.properties
在各个节点分别修改server.properties
# 修改node1节点
broker.id=1
#修改node2 节点
broker.id=2
#修改node3节点
broker.id=3
2.5.创建日志目录
三台机器上分别执行:
mkdir -p /tmp/kafka-logs (这里的/tmp/kafka-logs就是上面配置的kafka的日志目录)
2.6.Kafka集群启动与测试
1、启动zookeeper集群(3个节点上执行)
$ZOOKEEPER_HOME/bin/zkServer.sh start
2、启动kafka集群
# 启动kafka
cd $KAFKA_HOME
bin/kafka-server-start.sh -daemon config/server.properties
3、查看topic列表
[root@node1 kafka_2.12-2.6.0]# bin/kafka-topics.sh --zookeeper localhost:2181 --list
4、创建topic
bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test --replication-factor 2 --partitions 2
然后在看topic列表
[root@node1 kafka_2.12-2.6.0]# bin/kafka-topics.sh --zookeeper localhost:2181 --list
test
5、查看topic详情
[root@node1 kafka_2.12-2.6.0]# bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic test
test
2.7.topic数据发送与消费
1.新api使用
node2使用自带脚本消费topic数据
[root@node2 kafka_2.12-2.6.0]# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test
node1使用自带脚本向topic发送数据
[root@node1 kafka_2.12-2.6.0]# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
node3使用自带脚本消费topic数据(此时消费最新数据)
[root@node3 kafka_2.12-2.6.0]# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test
node3使用自带脚本消费topic数据(从头消费数据)
[root@node2 kafka_2.12-2.6.0]# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
输入数据
adfasdasfd
输入测试3
shuru
输入测试2
查看消费数据,必须要指定组。查看kafka组使用以下命令
[root@node2 kafka_2.12-2.6.0]# bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
console-consumer-21382
查看topic每个partition数据消费情况
bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --group xxx --describe
bin/kafka-consumer-groups.sh --describe --bootstrap-server 127.0.0.1:9092 --group xxx
参数说明:
Group 消费者组
TOPIC:曾经消费或正在消费的 topic
PARTITION:分区编号
CURRENT-OFFSET:consumer group 最后一次提交的 offset
LOG-END-OFFSET: 最后提交的生产消息 offset
LAG:消费 offset 与生产 offset 之间的差值
CONSUMER-ID:当前消费 topic-partition 的 group 成员 id
HOST:消费节点的 ip 地址
CLIENT-ID:客户端 id
2.8.Kafka集群监控–KafkaOffsetMonitor(老的方式)
KafkaOffsetMonitor 是一个可以用于监控 Kafka 的 Topic 及 Consumer 消费状况的工具。以程
序一个 jar 包的形式运行,部署较为方便。只有监控功能,使用起来也较为安全。
作用:
1)监控 Kafka 集群状态,Topic、Consumer Group 列表。
2)图形化展示 topic 和 Consumer 之间的关系。
3)图形化展示 Consumer 的 offset、Lag 等信息。
1.下载
下载地址:https://github.com/quantifind/KafkaOffsetMonitor(可以使用已经修改版本)
目前 kafka Monitor 必须使用旧 api 才可以监控到,新 api 目前还没有实现。
2.脚本参数格式
java -cp KafkaOffsetMonitor-assembly-0.2.1.jar \
com.quantifind.kafka.offsetapp.OffsetGetterWeb \
--zk node1:2181,node2:2181,node3:2181 \
--port 8090 \
--refresh 10.seconds \
--retain 2.days
zk: Zookeeper 集群地址
port: 为开启 web 界面的端口号
refresh: 刷新时间
retain: 数据保留时间(单位 seconds, minutes, hours, days) 3.开发 kafkamonitor.sh 执行脚本
vi kafkamonitor.sh
#!/bin/sh
home=$(cd `dirname $0`; cd ..; pwd)
. ${home}/bin/common.sh
java -cp ${lib_home}KafkaOffsetMonitor-assembly-0.2.0.jar \
com.quantifind.kafka.offsetapp.OffsetGetterWeb \
--zk node1:2181,node2:2181,node3:2181 \
--port 8090 \
--refresh 10.seconds \
--retain 2.days > /dev/null 2>&1 &
4.脚本授权
给脚本 kafkamonitor.sh 赋予可执行权限
chmod u+x kafkamonitor.sh
5.启动监控脚本
bin/kafkamonitor.sh
6.web 可视化
node1:8090
2.9.Kafka集群监控–KafkaCenter
github地址: https://github.com/xaecbd/KafkaCenter,下载KafkaCenter的包。
码云的地址: https://gitee.com/newegg/KafkaCenter
2.9.1.下载
git clone https://github.com/xaecbd/KafkaCenter.git
2.9.2.初始化
执行:KafkaCenter-master\KafkaCenter-Core\sql\table_script.sql。
2.9.3.编辑 application.properties属性文件
具体位置是:KafkaCenter/KafkaCenter-Core/src/main/resources/application.properties
主要是修改数据库的密码。
2.9.4.编译和运行
注意的是:确保你安装的JDK是JDK8+
$ git clone https://github.com/xaecbd/KafkaCenter.git (上面已经执行过了)
$ cd KafkaCenter
$ mvn clean package -Dmaven.test.skip=true
$ cd KafkaCenter\KafkaCenter-Core\target
$ java -jar KafkaCenter-Core-2.3.0-SNAPSHOT.jar
3.安装mysql
3.1.卸载原来的mysql
mysql的安装方式可以按照https://blog.csdn.net/tototuzuoquan/article/details/104210148中的方式进行安装。
3.2.创建canal账号
mysql -uroot -p 输入:123456
内容是:
mysql> create user 'canal' identified by 'canal';
Query OK, 0 rows affected (0.00 sec)
mysql> grant all privileges on *.* to 'canal'@'%' identified by 'canal';
Query OK, 0 rows affected (0.00 sec)
mysql> flush privileges;
Query OK, 0 rows affected (0.00 sec)
mysql>
3.3.开启Binlog写入功能
对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf
中配置如下:
[root@node1 etc]# vim /etc/my.cnf
[mysqld]
log-bin=mysql-bin #开启ROW模式
binlog-format=ROW #选择ROW模式
server_id=1 #配置MySQL replaction需要定义,不要canal的slaveID重复
重启mysql
[root@node1 etc]# systemctl restart mysqld
并创建数据库test
create database test default character set utf8;
4.Canal快速安装部署
官网地址:https://github.com/alibaba/canal
4.1.机器准备
Canal服务端:node1
MySQL地址:node1
4.2.下载canal
下载地址:https://github.com/alibaba/canal/releases
主要有:
https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.adapter-1.1.4.tar.gz
https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.admin-1.1.4.tar.gz
https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz (主要是此文件)
https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.example-1.1.4.tar.gz
https://github.com/alibaba/canal/archive/canal-1.1.4.zip
https://github.com/alibaba/canal/archive/canal-1.1.4.tar.gz
4.3.解压缩
mkdir -p /root/canal
tar zxvf canal.deployer-1.1.4.tar.gz -C /root/canal
解压完成后,进入/root/canal,可以看到如下结构:
[root@node1 canal]# pwd
/root/canal
[root@node1 canal]# ls
bin conf lib logs
[root@node1 canal]#
4.4.修改配置文件
[root@node1 canal]# cd conf/example/
[root@node1 example]# ls
instance.properties
[root@node1 example]# vim instance.properties
内容是:
## mysql serverId , v1.0.26+ will autoGen
canal.instance.mysql.slaveId=1234
## position info需要改成自己的数据库信息
canal.instance.master.address=node1:3306
# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8
canal.instance.defaultDatabaseName=test #此处不加的时候,表示的是所有库
4.5.创建example的topic
[root@node1 example]# cd $KAFKA_HOME
[root@node1 kafka_2.12-2.6.0]# bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic example --replication-factor 1 --partitions 1
Created topic example.
4.6.启动canal服务
cd /root/canal
bin/startup.sh
观察canal日志
[root@node1 canal]# cd /root/canal/logs/canal
[root@node1 canal]# tail -f canal.log
2020-12-18 22:50:52.994 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## set default uncaught exception handler
2020-12-18 22:50:53.083 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## load canal configurations
2020-12-18 22:50:53.112 [main] INFO com.alibaba.otter.canal.deployer.CanalStarter - ## start the canal server.
2020-12-18 22:50:53.192 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[192.168.106.103(192.168.106.103):11111]
2020-12-18 22:50:55.369 [main] INFO com.alibaba.otter.canal.deployer.CanalStarter - ## the canal server is running now ......
4.7.验证功能
下载canal源码,在idea中打开:canal-canal-1.1.4.zip。导入之后的效果如下:
打开类:com.alibaba.otter.canal.example.SimpleCanalClientPermanceTest,修改ip地址为:192.168.106.103。
4.8.准备数据库测试数据
向mysql节点的数据库中导入stu.sql表数据,然后可以对stu表进行插入、删除或者修改操作。其中stu的内容如下:
create table `stu` (
`name` varchar (60),
`speciality` varchar (60)
);
insert into `stu` (`name`, `speciality`) values('张三','美术');
insert into `stu` (`name`, `speciality`) values('张三','音乐');
insert into `stu` (`name`, `speciality`) values('李四','篮球');
insert into `stu` (`name`, `speciality`) values('小明','美术');
insert into `stu` (`name`, `speciality`) values('李四','美术');
insert into `stu` (`name`, `speciality`) values('小明','音乐');
在插入数据,修改,删除等操作后,查看数据变化。(也可以通过下面的”数据监控微服务”来查看数据)。
4.9.ERROR c.a.otter.canal.server.netty.handler.SessionHandler – something goes wrong with channel:[id: 0x106d73f2, /192.168.106.1:1312 :> /192.168.106.103:11111], exception=java.nio.channels.ClosedChannelException
在这个过程中可能出现类似上面这个问题,解决办法是,参考:https://blog.csdn.net/woainimax/article/details/105991825 所说
4.10.数据监控微服务
当用户执行数据库的操作的时候,binlog 日志会被canal捕获到,并解析出数据。我们就可以将解析出来的数据进行相应的逻辑处理。
我们在这里使用的一个开源的项目,它实现了springboot与canal的集成。比原生的canal更加优雅。
https://github.com/chenqian56131/spring-boot-starter-canal
使用前需要将starter-canal安装到本地仓库
我们可以参照它提供的canal-test,进行代码实现。
(1)创建工程模块changgou_canal,pom引入依赖(注意:也可以在canal-test工程中直接写,并把下面的依赖添加进去)
<dependency>
<groupId>com.xpand</groupId>
<artifactId>starter-canal</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
(2)创建包com.changgou.canal ,包下创建启动类
@SpringBootApplication
@EnableCanalClient
public class CanalApplication {
public static void main(String[] args) {
SpringApplication.run(CanalApplication.class, args);
}
}
(3)添加配置文件application.properties
# 在在canal-test中,此处开始是注释的
canal.client.instances.example.host=192.168.106.103
# 在canal-test中,此处为2181
canal.client.instances.example.port=11111
canal.client.instances.example.batchSize=1000
# canal.client.instances.example.zookeeperAddress=192.168.0.59:8080,192.168.0.59:8081
# canal.client.instances.example.clusterEnabled=true
(4)创建com.changgou.canal.listener包,包下创建类
@CanalEventListener
public class BusinessListener {
@ListenPoint(schema = "test", table = {"stu"})
public void adUpdate(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
System.err.println("监听test库,stu表数据的变化");
rowData.getBeforeColumnsList().forEach((c) -> System.err.println("更改前数据: " + c.getName() + " :: " + c.getValue()));
rowData.getAfterColumnsList().forEach((c) -> System.err.println("更改后数据: " + c.getName() + " :: " + c.getValue()));
}
}
测试:启动数据监控微服务,修改test的stu表,观察控制台输出。
执行后的效果如下:
5.Canal Server+Canal Client HA
Canal Server和client端的高可用方案依赖zookeer,启动canal server和client的时候,都会向zookeeper读取信息。Canal在zookeeper存储的数据结构如下:
/otter
└── canal
└── destinations
└── example # canal 实例名称
├── 1001 # canal client 信息
│ ├── cursor # 当前消费的 mysql binlog 位点
│ ├── filter # binlog 过滤条件
│ └── running # 当前正在运行的 canal client 服务器
├── cluster # canal server 列表
│ └── ip:11111
└── running # 当前正在运行的 canal server 服务器
Canal server 和 client 启动的时候都会去抢占 zk 对应的 running 节点, 保证只有一个server 和 client 在运行, 而 server 和 client 的高可用切换也是基于监听 running 节点进行的。
5.1.机器准备
3个节点zookeeper集群:node1,node2,node3
2个节点Canal服务端节点:node2,node3
MySQL节点:node1
5.2.下载canal
此处略
5.3.解压缩
[root@node2 ~]# mkdir /root/canal-ha (node2,node3)上一样。
[root@node2 ~]# tar -zxvf canal.deployer-1.1.4.tar.gz -C canal-ha/
在node2上执行:
[root@node2 ~]# scp -r canal-ha root@node3:$PWD
[root@node2 canal-ha]# pwd
/root/canal-ha
[root@node2 canal-ha]# ls
bin conf lib logs
[root@node2 canal-ha]#
5.4.修改配置文件
5.4.1.修改 canal.properties
[root@node2 conf]# pwd
/root/canal-ha/conf
[root@node2 conf]# vim canal.properties
# zk集群地址
canal.zkServers = node1:2181,node2:2181,node3:2181
# 全局的spring配置方式的组件文件
canal.instance.global.spring.xml = classpath:spring/default-instance.xml
备注:
default-instance.xml 介绍:store 选择了内存模式,其余的 parser/sink依赖的位点管理选择了持久化模式,目前持久化的方式主要是写入zookeeper,保证数据集群共享。
特点:支持HA
场景:生产环境,集群化部署。
5.4.2.修改 instance.properties
# Canal伪装的mysql slave的编号,不能与mysql数据库与其他的slave重复。
canal.instance.mysql.slaveId = 1234 (两台canal不能一样)
# 要监听的数据库的地址和端口号
canal.instance.master.address=node1:3306
# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
5.4.3.另外一台canal server配置
配置同上
注意:两台机器上的instance目录的名字需要保证完全一致,HA模式是依赖于instance name进行管理,同时必须都选择default-instance.xml配置。
5.4.4.启动Zookeeper服务
此部分略
5.4.5.启动canal服务(两个canal同时启动)
两个节点分别执行如下命令启动canal服务:
bin/startup.sh
启动后,你可以查看logs/example/example.log,只会看到一台机器上出现了启动成功的日志。
node2上可以看到:
[root@node2 logs]# pwd
/root/canal-ha/logs
[root@node2 logs]# ls
canal
[root@node2 logs]#
node3上可以看到:
[root@node3 logs]# pwd
/root/canal-ha/logs
[root@node3 logs]# ls
canal example
[root@node3 logs]# cd example/
[root@node3 example]# ls
example.log
[root@node3 example]# tail -f example.log -n 500
2020-12-27 03:42:07.860 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
2020-12-27 03:42:07.910 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties]
2020-12-27 03:42:08.708 [main] WARN o.s.beans.GenericTypeAwarePropertyDescriptor - Invalid JavaBean property 'connectionCharset' being accessed! Ambiguous write methods found next to actually used [public void com.alibaba.otter.canal.parse.inbound.mysql.AbstractMysqlEventParser.setConnectionCharset(java.lang.String)]: [public void com.alibaba.otter.canal.parse.inbound.mysql.AbstractMysqlEventParser.setConnectionCharset(java.nio.charset.Charset)]
2020-12-27 03:42:08.983 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
2020-12-27 03:42:08.983 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties]
2020-12-27 03:42:10.851 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example
2020-12-27 03:42:10.864 [main] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^.*\..*$
2020-12-27 03:42:10.864 [main] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table black filter :
2020-12-27 03:42:10.907 [main] INFO c.a.otter.canal.instance.core.AbstractCanalInstance - start successful....
2020-12-27 03:42:11.634 [destination = example , address = node1/192.168.106.103:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> begin to find start position, it will be long time for reset or first position
2020-12-27 03:42:11.636 [destination = example , address = node1/192.168.106.103:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - prepare to find start position just show master status
2020-12-27 03:42:26.160 [destination = example , address = node1/192.168.106.103:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=mysql-bin.000001,position=3802,serverId=1,gtid=,timestamp=1608982052000] cost : 14301ms , the next step is binlog dump
查看一下zookeeper中节点信息,也可以知道当前工作的节点。
[root@node2 bin]# pwd
/root/apache-zookeeper-3.6.2-bin/bin
[root@node2 bin]# ./zkCli.sh
[zk: localhost:2181(CONNECTED) 6] get /otter/canal/destinations/example/running
{"active":true,"address":"192.168.106.105:11111"}
[zk: localhost:2181(CONNECTED) 8] ls /otter/canal/destinations/example/cluster
[192.168.106.104:11111, 192.168.106.105:11111]
5.4.6.客户端链接消费数据
可以直接指定zookeeper地址和instance name,canal client会自动从zookeeper中的running节点,获取当前服务的工作节点,然后与其建立链接(这里还是使用官方提供的例子),要修改的类是com.alibaba.otter.canal.example.ClusterCanalClientTest:
package com.alibaba.otter.canal.example;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
/**
* 集群模式的测试例子
*
* @version 1.0.4
*/
public class ClusterCanalClientTest extends AbstractCanalClientTest {
public ClusterCanalClientTest(String destination){
super(destination);
}
public static void main(String args[]) {
String destination = "example";
// 基于固定canal server的地址,建立链接,其中一台server发生crash,可以支持failover
// CanalConnector connector = CanalConnectors.newClusterConnector(
// Arrays.asList(new InetSocketAddress(
// AddressUtils.getHostIp(),
// 11111)),
// "stability_test", "", "");
// 基于zookeeper动态获取canal server的地址,建立链接,其中一台server发生crash,可以支持failover
CanalConnector connector = CanalConnectors.newClusterConnector("192.168.106.104:2181", destination, "canal", "canal");
final ClusterCanalClientTest clientTest = new ClusterCanalClientTest(destination);
clientTest.setConnector(connector);
clientTest.start();
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
try {
logger.info("## stop the canal client");
clientTest.stop();
} catch (Throwable e) {
logger.warn("##something goes wrong when stopping canal:", e);
} finally {
logger.info("## canal client is down.");
}
}
});
}
}
输出的内容是:
****************************************************
* Batch Id: [1] ,count : [2] , memsize : [81] , Time : 2020-12-27 04:26:06
* Start : [mysql-bin.000001:3853:1608982052000(2020-12-26 19:27:32)]
* End : [mysql-bin.000001:3903:1608982052000(2020-12-26 19:27:32)]
****************************************************
----------------> binlog[mysql-bin.000001:3853] , name[test,stu] , eventType : INSERT , executeTime : 1608982052000(2020-12-26 19:27:32) , gtid : () , delay : 32314134 ms
name : 小明 type=varchar(60) update=true
speciality : 音乐 type=varchar(60) update=true
----------------
END ----> transaction id: 730
================> binlog[mysql-bin.000001:3903] , executeTime : 1608982052000(2020-12-26 19:27:32) , gtid : () , delay : 32314141ms
连接成功后,canal server会记录当前正在工作的canal client信息,比如客户端ip,连接的端口信息等。
[zk: localhost:2181(CONNECTED) 19] get /otter/canal/destinations/example/1001/running
{"active":true,"address":"192.168.106.1:3222","clientId":1001}
[zk: localhost:2181(CONNECTED) 23] get /otter/canal/destinations/example/1001/cursor
{"@type":"com.alibaba.otter.canal.protocol.position.LogPosition","identity":{"slaveId":-1,"sourceAddress":{"address":"node1","port":3306}},"postion":{"gtid":"","included":false,"journalName":"mysql-bin.000001","position":4172,"serverId":1,"timestamp":1609019073000}}
6.MySQL+Canal+Kafka集成开发
官网地址:https://github.com/alibaba/canal/wiki/Canal-Kafka-RocketMQ-QuickStart
6.1.机器准备
Zookeeper集群:node1,node2,node3
Kafka集群:node1,node2,node3
MySQL节点:node1
Canal服务端:node1
6.2.下载canal
下载地址:https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz
6.3.解压缩
[root@node1 ~]# pwd
/root
[root@node1 ~]# mkdir canal-kafka
[root@node1 ~]# tar -zxvf canal.deployer-1.1.4.tar.gz -C canal-kafka
解压完成后,进入/root/canal-kafka
[root@node1 ~]# cd canal-kafka/
[root@node1 canal-kafka]# ls
bin conf lib logs
[root@node1 canal-kafka]#
6.4.修改配置文件
6.4.1.修改instance.properties
/root/canal-kafka/conf/example/instance.properties
# position info
canal.instance.master.address=192.168.106.103:3306
# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
# mq config
canal.mq.topic=test
6.4.2.修改canal.properties
/root/canal-kafka/conf/canal.properties
# tcp, kafka, RocketMQ
canal.serverMode = kafka
# zk集群地址
canal.zkServers = node1:2181,node2:2181,node3:2181
# kafka集群地址
canal.mq.servers = node1:9092,node2:9092,node3:9092
6.5.启动相关服务
6.5.1.启动zookeeper服务
source /etc/profile
$ZOOKEEPER_HOME/bin/zkServer.sh start
6.5.2.启动Kafka服务
# 启动kafka
cd $KAFKA_HOME
bin/kafka-server-start.sh -daemon config/server.properties
6.5.3.打开Kafka消费者
查看kafka-topic列表
cd $KAFKA_HOME
[root@node1 kafka_2.12-2.6.0]# bin/kafka-topics.sh --zookeeper node1:2181 -list
test
[root@node1 kafka_2.12-2.6.0]# bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic test
6.5.4.启动Canal服务
[root@node1 ~]# cd canal-kafka/
[root@node1 canal-kafka]# bin/startup.sh
cd to /root/canal-kafka/bin for workaround relative path
LOG CONFIGURATION : /root/canal-kafka/bin/../conf/logback.xml
canal conf : /root/canal-kafka/bin/../conf/canal.properties
CLASSPATH :/root/canal-kafka/bin/../conf:/root/canal-kafka/bin/../lib/zookeeper-3.4.5.jar:/root/canal-kafka/bin/../lib/zkclient-0.10.jar:/root/canal-kafka/bin/../lib/spring-tx-3.2.18.RELEASE.jar:/root/canal-kafka/bin/../lib/spring-orm-3.2.18.RELEASE.jar:/root/canal-kafka/bin/../lib/spring-jdbc-3.2.18.RELEASE.jar:/root/canal-kafka/bin/../lib/spring-expression-3.2.18.RELEASE.jar:/root/canal-kafka/bin/../lib/spring-core-3.2.18.RELEASE.jar:/root/canal-kafka/bin/../lib/spring-context-3.2.18.RELEASE.jar:/root/canal-kafka/bin/../lib/spring-beans-3.2.18.RELEASE.jar:/root/canal-kafka/bin/../lib/spring-aop-3.2.18.RELEASE.jar:/root/canal-kafka/bin/../lib/snappy-java-1.1.7.1.jar:/root/canal-kafka/bin/../lib/snakeyaml-1.19.jar:/root/canal-kafka/bin/../lib/slf4j-api-1.7.12.jar:/root/canal-kafka/bin/../lib/simpleclient_pushgateway-0.4.0.jar:/root/canal-kafka/bin/../lib/simpleclient_httpserver-0.4.0.jar:/root/canal-kafka/bin/../lib/simpleclient_hotspot-0.4.0.jar:/root/canal-kafka/bin/../lib/simpleclient_common-0.4.0.jar:/root/canal-kafka/bin/../lib/simpleclient-0.4.0.jar:/root/canal-kafka/bin/../lib/scala-reflect-2.11.12.jar:/root/canal-kafka/bin/../lib/scala-logging_2.11-3.8.0.jar:/root/canal-kafka/bin/../lib/scala-library-2.11.12.jar:/root/canal-kafka/bin/../lib/rocketmq-srvutil-4.5.2.jar:/root/canal-kafka/bin/../lib/rocketmq-remoting-4.5.2.jar:/root/canal-kafka/bin/../lib/rocketmq-logging-4.5.2.jar:/root/canal-kafka/bin/../lib/rocketmq-common-4.5.2.jar:/root/canal-kafka/bin/../lib/rocketmq-client-4.5.2.jar:/root/canal-kafka/bin/../lib/rocketmq-acl-4.5.2.jar:/root/canal-kafka/bin/../lib/protobuf-java-3.6.1.jar:/root/canal-kafka/bin/../lib/oro-2.0.8.jar:/root/canal-kafka/bin/../lib/netty-tcnative-boringssl-static-1.1.33.Fork26.jar:/root/canal-kafka/bin/../lib/netty-all-4.1.6.Final.jar:/root/canal-kafka/bin/../lib/netty-3.2.2.Final.jar:/root/canal-kafka/bin/../lib/mysql-connector-java-5.1.47.jar:/root/canal-kafka/bin/../lib/metrics-core-2.2.0.jar:/root/canal-kafka/bin/../lib/lz4-java-1.4.1.jar:/root/canal-kafka/bin/../lib/logback-core-1.1.3.jar:/root/canal-kafka/bin/../lib/logback-classic-1.1.3.jar:/root/canal-kafka/bin/../lib/kafka-clients-1.1.1.jar:/root/canal-kafka/bin/../lib/kafka_2.11-1.1.1.jar:/root/canal-kafka/bin/../lib/jsr305-3.0.2.jar:/root/canal-kafka/bin/../lib/jopt-simple-5.0.4.jar:/root/canal-kafka/bin/../lib/jctools-core-2.1.2.jar:/root/canal-kafka/bin/../lib/jcl-over-slf4j-1.7.12.jar:/root/canal-kafka/bin/../lib/javax.annotation-api-1.3.2.jar:/root/canal-kafka/bin/../lib/jackson-databind-2.9.6.jar:/root/canal-kafka/bin/../lib/jackson-core-2.9.6.jar:/root/canal-kafka/bin/../lib/jackson-annotations-2.9.0.jar:/root/canal-kafka/bin/../lib/ibatis-sqlmap-2.3.4.726.jar:/root/canal-kafka/bin/../lib/httpcore-4.4.3.jar:/root/canal-kafka/bin/../lib/httpclient-4.5.1.jar:/root/canal-kafka/bin/../lib/h2-1.4.196.jar:/root/canal-kafka/bin/../lib/guava-18.0.jar:/root/canal-kafka/bin/../lib/fastsql-2.0.0_preview_973.jar:/root/canal-kafka/bin/../lib/fastjson-1.2.58.jar:/root/canal-kafka/bin/../lib/druid-1.1.9.jar:/root/canal-kafka/bin/../lib/disruptor-3.4.2.jar:/root/canal-kafka/bin/../lib/commons-logging-1.1.3.jar:/root/canal-kafka/bin/../lib/commons-lang3-3.4.jar:/root/canal-kafka/bin/../lib/commons-lang-2.6.jar:/root/canal-kafka/bin/../lib/commons-io-2.4.jar:/root/canal-kafka/bin/../lib/commons-compress-1.9.jar:/root/canal-kafka/bin/../lib/commons-codec-1.9.jar:/root/canal-kafka/bin/../lib/commons-cli-1.2.jar:/root/canal-kafka/bin/../lib/commons-beanutils-1.8.2.jar:/root/canal-kafka/bin/../lib/canal.store-1.1.4.jar:/root/canal-kafka/bin/../lib/canal.sink-1.1.4.jar:/root/canal-kafka/bin/../lib/canal.server-1.1.4.jar:/root/canal-kafka/bin/../lib/canal.protocol-1.1.4.jar:/root/canal-kafka/bin/../lib/canal.prometheus-1.1.4.jar:/root/canal-kafka/bin/../lib/canal.parse.driver-1.1.4.jar:/root/canal-kafka/bin/../lib/canal.parse.dbsync-1.1.4.jar:/root/canal-kafka/bin/../lib/canal.parse-1.1.4.jar:/root/canal-kafka/bin/../lib/canal.meta-1.1.4.jar:/root/canal-kafka/bin/../lib/canal.instance.spring-1.1.4.jar:/root/canal-kafka/bin/../lib/canal.instance.manager-1.1.4.jar:/root/canal-kafka/bin/../lib/canal.instance.core-1.1.4.jar:/root/canal-kafka/bin/../lib/canal.filter-1.1.4.jar:/root/canal-kafka/bin/../lib/canal.deployer-1.1.4.jar:/root/canal-kafka/bin/../lib/canal.common-1.1.4.jar:/root/canal-kafka/bin/../lib/aviator-2.2.1.jar:/root/canal-kafka/bin/../lib/aopalliance-1.0.jar:.:/root/jdk1.8.0_161/lib/dt.jar:/root/jdk1.8.0_161/lib/tools.jar:/root/jdk1.8.0_161/jre/lib/rt.jar
cd to /root/canal-kafka for continue
[root@node1 canal-kafka]#
6.5.5.观察Kafka消费者
第一次启动canal,如果mysql binlog有数据的话,可以直接采集到Kafka集群,打印到Kafka消费者控制台。
[root@node1 kafka_2.12-2.6.0]# bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic test
{"data":null,"database":"`kafka_center`","es":1609021630000,"id":1,"isDdl":false,"mysqlType":null,"old":null,"pkNames":null,"sql":"/* ApplicationName=DBeaver 7.2.5 - SQLEditor <Script-14.sql> */ -- Dumping database structure for kafka_center\r\nCREATE DATABASE IF NOT EXISTS `kafka_center` /*!40100 DEFAULT CHARACTER SET utf8 COLLATE utf8_bin */","sqlType":null,"table":"","ts":1609079707068,"type":"QUERY"}
{"data":null,"database":"kafka_center","es":1609021630000,"id":1,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":"/* ApplicationName=DBeaver 7.2.5 - SQLEditor <Script-14.sql> */ -- Dumping structure for table kafka_center.alert_group\r\nCREATE TABLE IF NOT EXISTS `alert_group` (\r\n `id` int(11) NOT NULL AUTO_INCREMENT,\r\n `cluster_id` int(11) NOT NULL,\r\n `topic_name` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',\r\n `consummer_group` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',\r\n `consummer_api` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',\r\n `threshold` int(11) DEFAULT NULL,\r\n `dispause` int(11) DEFAULT NULL,\r\n `mail_to` varchar(1000) COLLATE utf8_bin NOT NULL DEFAULT '',\r\n `webhook` varchar(1000) COLLATE utf8_bin NOT NULL DEFAULT '',\r\n `create_date` datetime DEFAULT NULL,\r\n `owner_id` int(11) DEFAULT NULL,\r\n `team_id` int(11) DEFAULT NULL,\r\n `disable_alerta` tinyint(1) DEFAULT 0,\r\n `enable` tinyint(1) NOT NULL DEFAULT 1,\r\n PRIMARY KEY (`id`)\r\n) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin","sqlType":null,"table":"alert_group","ts":1609079707071,"type":"CREATE"}
{"data":null,"database":"kafka_center","es":1609021630000,"id":1,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":"/* ApplicationName=DBeaver 7.2.5 - SQLEditor <Script-14.sql> */ -- Data exporting was unselected.\r\n\r\n\r\n-- Dumping structure for table kafka_center.cluster_info\r\nCREATE TABLE IF NOT EXISTS `cluster_info` (\r\n `id` int(11) NOT NULL AUTO_INCREMENT,\r\n `name` varchar(255) COLLATE utf8_bin NOT NULL,\r\n `zk_address` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',\r\n `broker` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',\r\n `create_time` datetime DEFAULT NULL,\r\n `comments` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',\r\n `enable` int(11) DEFAULT NULL,\r\n `broker_size` int(4) DEFAULT 0,\r\n `kafka_version` varchar(10) COLLATE utf8_bin DEFAULT '',\r\n `location` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',\r\n `graf_addr` varchar(255) COLLATE utf8_bin DEFAULT '',\r\n PRIMARY KEY (`id`)\r\n) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin","sqlType":null,"table":"cluster_info","ts":1609079707071,"type":"CREATE"}
{"data":null,"database":"kafka_center","es":1609021630000,"id":1,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":"/* ApplicationName=DBeaver 7.2.5 - SQLEditor <Script-14.sql> */ -- Data exporting was unselected.\r\n\r\n\r\n-- Dumping structure for table kafka_center.ksql_info\r\nCREATE TABLE IF NOT EXISTS `ksql_info` (\r\n `id` int(11) NOT NULL AUTO_INCREMENT,\r\n `cluster_id` int(11) DEFAULT NULL,\r\n `cluster_name` varchar(255) DEFAULT NULL,\r\n `ksql_url` varchar(255) DEFAULT NULL,\r\n `ksql_serverId` varchar(255) DEFAULT NULL,\r\n `version` varchar(255) DEFAULT NULL,\r\n PRIMARY KEY (`id`)\r\n) ENGINE=InnoDB DEFAULT CHARSET=utf8","sqlType":null,"table":"ksql_info","ts":1609079707071,"type":"CREATE"}
{"data":null,"database":"kafka_center","es":1609021630000,"id":1,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":"/* ApplicationName=DBeaver 7.2.5 - SQLEditor <Script-14.sql> */ -- Data exporting was unselected.\r\n\r\n\r\n-- Dumping structure for table kafka_center.task_info\r\nCREATE TABLE IF NOT EXISTS `task_info` (\r\n `id` int(11) NOT NULL AUTO_INCREMENT,\r\n `cluster_ids` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',\r\n `location` varchar(20) COLLATE utf8_bin NOT NULL DEFAULT '',\r\n `topic_name` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',\r\n `partition` int(11) DEFAULT NULL,\r\n `replication` int(11) DEFAULT NULL,\r\n `message_rate` int(50) DEFAULT NULL,\r\n `ttl` int(11) DEFAULT NULL,\r\n `owner_id` int(11) DEFAULT NULL,\r\n `team_id` int(11) DEFAULT NULL,\r\n `comments` varchar(1000) COLLATE utf8_bin NOT NULL DEFAULT '',\r\n `create_time` datetime DEFAULT NULL,\r\n `approved` int(11) DEFAULT NULL,\r\n `approved_id` int(11) DEFAULT NULL,\r\n `approved_time` datetime DEFAULT NULL,\r\n `approval_opinions` varchar(1000) COLLATE utf8_bin DEFAULT '',\r\n PRIMARY KEY (`id`)\r\n) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin","sqlType":null,"table":"task_info","ts":1609079707071,"type":"CREATE"}
{"data":null,"database":"kafka_center","es":1609021630000,"id":1,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":"/* ApplicationName=DBeaver 7.2.5 - SQLEditor <Script-14.sql> */ -- Data exporting was unselected.\r\n\r\n\r\n-- Dumping structure for table kafka_center.team_info\r\nCREATE TABLE IF NOT EXISTS `team_info` (\r\n `id` int(11) NOT NULL AUTO_INCREMENT,\r\n `name` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',\r\n `own` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',\r\n `alarm_group` varchar(255) COLLATE utf8_bin DEFAULT NULL,\r\n PRIMARY KEY (`id`)\r\n) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin","sqlType":null,"table":"team_info","ts":1609079707071,"type":"CREATE"}
{"data":null,"database":"kafka_center","es":1609021630000,"id":1,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":"/* ApplicationName=DBeaver 7.2.5 - SQLEditor <Script-14.sql> */ -- Data exporting was unselected.\r\n\r\n\r\n-- Dumping structure for table kafka_center.topic_collection\r\nCREATE TABLE IF NOT EXISTS `topic_collection` (\r\n `id` int(11) unsigned NOT NULL AUTO_INCREMENT,\r\n `cluster_id` int(11) NOT NULL,\r\n `user_id` int(11) NOT NULL,\r\n `name` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',\r\n `type` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',\r\n PRIMARY KEY (`id`)\r\n) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin","sqlType":null,"table":"topic_collection","ts":1609079707072,"type":"CREATE"}
{"data":null,"database":"kafka_center","es":1609021630000,"id":1,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":"/* ApplicationName=DBeaver 7.2.5 - SQLEditor <Script-14.sql> */ -- Data exporting was unselected.\r\n\r\n\r\n-- Dumping structure for table kafka_center.topic_info\r\nCREATE TABLE IF NOT EXISTS `topic_info` (\r\n `id` int(11) NOT NULL AUTO_INCREMENT,\r\n `cluster_id` int(11) NOT NULL,\r\n `topic_name` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',\r\n `partition` int(11) DEFAULT NULL,\r\n `replication` int(11) DEFAULT NULL,\r\n `ttl` bigint(11) DEFAULT NULL,\r\n `config` varchar(512) COLLATE utf8_bin DEFAULT NULL,\r\n `owner_id` int(11) DEFAULT NULL,\r\n `team_id` int(11) DEFAULT NULL,\r\n `comments` varchar(1000) COLLATE utf8_bin NOT NULL DEFAULT '',\r\n `create_time` datetime DEFAULT NULL,\r\n `file_size` bigint(20) NOT NULL DEFAULT -1,\r\n PRIMARY KEY (`id`)\r\n) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin","sqlType":null,"table":"topic_info","ts":1609079707072,"type":"CREATE"}
{"data":null,"database":"kafka_center","es":1609021630000,"id":1,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":"/* ApplicationName=DBeaver 7.2.5 - SQLEditor <Script-14.sql> */ -- Data exporting was unselected.\r\n\r\n\r\n-- Dumping structure for table kafka_center.user_info\r\nCREATE TABLE IF NOT EXISTS `user_info` (\r\n `id` int(11) NOT NULL AUTO_INCREMENT,\r\n `name` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',\r\n `real_name` varchar(255) COLLATE utf8_bin DEFAULT '',\r\n `email` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',\r\n `role` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '100',\r\n `create_time` datetime DEFAULT NULL,\r\n `password` varchar(255) COLLATE utf8_bin DEFAULT '',\r\n PRIMARY KEY (`id`)\r\n) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin","sqlType":null,"table":"user_info","ts":1609079707072,"type":"CREATE"}
{"data":null,"database":"kafka_center","es":1609021630000,"id":1,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":"/* ApplicationName=DBeaver 7.2.5 - SQLEditor <Script-14.sql> */ -- Data exporting was unselected.\r\n\r\n\r\n-- Dumping structure for table kafka_center.user_team\r\nCREATE TABLE IF NOT EXISTS `user_team` (\r\n `id` int(11) NOT NULL AUTO_INCREMENT,\r\n `user_id` int(11) DEFAULT NULL,\r\n `team_id` int(11) DEFAULT NULL,\r\n PRIMARY KEY (`id`)\r\n) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin","sqlType":null,"table":"user_team","ts":1609079707072,"type":"CREATE"}
可以往mysql删除、更新、插入数据,kafka消费者控制台可以实时消费到binlog日志数据。
往stu表中插入数据:
insert into `stu` (`name`, `speciality`) values('田七','语文');
观察日志,新增的内容如下:
{"data":[{"name":"田七","speciality":"语文"}],"database":"test","es":1609080224000,"id":2,"isDdl":false,"mysqlType":{"name":"varchar(60)","speciality":"varchar(60)"},"old":null,"pkNames":null,"sql":"","sqlType":{"name":12,"speciality":12},"table":"stu","ts":1609080224938,"type":"INSERT"}
打个赏呗,您的支持是我坚持写好博文的动力