Canal
目录
介绍 Canal 基础使用, TCP模式与Kafka模式的
版本说明
-
canal-1.1.2
-
MySQL:5.7 -
Zookeeper:3.4.6 -
Kafka:2.13-2.8.1 -
kafka Manager:latest
前置条件
1、MySQL 开启 Binlog
my.cnf 指定 开启 Binglog 的库, 配置好
需要重启
[client]
port = 3306
socket = /var/lib/mysql/data/mysql.sock
[mysqld]
# 针对5.7版本执行group by字句出错问题解决
sql_mode='STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION'
# 一般配置选项
basedir = /var/lib/mysql
datadir = /var/lib/mysql/data
port = 3306
socket = /var/lib/mysql/data/mysql.sock
lc-messages-dir = /usr/share/mysql # 务必配置此项,否则执行sql出错时,只能显示错误代码而不显示具体错误消息
character-set-server=utf8mb4
back_log = 300
max_connections = 3000
max_connect_errors = 50
table_open_cache = 4096
max_allowed_packet = 32M
#binlog_cache_size = 4M
max_heap_table_size = 128M
read_rnd_buffer_size = 16M
sort_buffer_size = 16M
join_buffer_size = 16M
thread_cache_size = 16
query_cache_size = 64M
query_cache_limit = 4M
ft_min_word_len = 8
thread_stack = 512K
#tx_isolation = READ-COMMITTED
tmp_table_size = 64M
#log-bin=mysql-bin
long_query_time = 6
server_id=1
innodb_buffer_pool_size = 1024M
innodb_thread_concurrency = 16
innodb_log_buffer_size = 16M
wait_timeout= 31536000
interactive_timeout= 31536000
lower_case_table_names = 1
log-bin=mysql-bin # 开启 binlog
binlog_format=row # 选择 ROW 模式
# 指定需要设置binlog的库名称,比如 test 库
binlog-do-db=test
2、 查看是否开启 binlog
mysql> show variables like 'log_%';
+----------------------------------------+-------------------------------------+
| Variable_name | Value |
+----------------------------------------+-------------------------------------+
| log_bin | ON |
| log_bin_basename | /var/lib/mysql/data/mysql-bin |
| log_bin_index | /var/lib/mysql/data/mysql-bin.index |
| log_bin_trust_function_creators | OFF |
| log_bin_use_v1_row_events | OFF |
| log_builtin_as_identified_by_password | OFF |
| log_error | stderr |
| log_error_verbosity | 3 |
| log_output | FILE |
| log_queries_not_using_indexes | OFF |
| log_slave_updates | OFF |
| log_slow_admin_statements | OFF |
| log_slow_slave_statements | OFF |
| log_statements_unsafe_for_binlog | ON |
| log_syslog | OFF |
| log_syslog_facility | daemon |
| log_syslog_include_pid | ON |
| log_syslog_tag | |
| log_throttle_queries_not_using_indexes | 0 |
| log_timestamps | UTC |
| log_warnings | 2 |
+----------------------------------------+-------------------------------------+
21 rows in set (0.12 sec)
3、创建 Mysql 账户与授权
canal 用户只需要查的权限,其余增删改不需要
CREATE USER `canal`@`%` IDENTIFIED BY 'canal'
GRANT Replication Client, Replication Slave, Select ON *.* TO `canal`@`%`
Canal 下载与启动
1、Github 网址下载
2、 创建存放目录并解压
mkdir -p /opt/module/canal
tar -zxvf canal.deployer-1.1.2.tar.gz -C /opt/module/canal
3、修改 example 下的配置文件 (默认:TCP)
cd /opt/module/canal/
vim conf/example/instance.properties
# 不能与mysql service-id 一样
canal.instance.mysql.slaveId=20
# 对应的 mysql 地址
canal.instance.master.address=120.79.196.173:3306
# 刚刚创建的mysql用户名,可以直接修改为root
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
TCP 连接方式 (Java编码方式)
pom.xml
<dependencies>
<!-- https://mvnrepository.com/artifact/com.alibaba.otter/canal.client -->
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.1</version>
</dependency>
</dependencies>
Main方法
package org.example;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import java.net.InetSocketAddress;
import java.util.List;
/**
* @author eddie.lee
* @description
*/
public class CanalClient {
public static void main(String[] args)
throws InterruptedException, InvalidProtocolBufferException {
// 获取连接
CanalConnector connector = CanalConnectors.newSingleConnector(
new InetSocketAddress("127.0.0.1", 11111), "example", "", "");
while (true) {
// 连接
connector.connect();
// 订阅数据库的表
connector.subscribe("test\\..*");
// 获取数据 (单次拉取100条,并不会阻塞)
Message message = connector.get(100);
// 获取Entry集合
List<Entry> entries = message.getEntries();
// 判断集合是否为空, 如果为空,则等待一会继续拉取数据
if (entries.size() <= 0) {
System.out.println("当前没有数据,休息一会。。。");
Thread.sleep(1000);
} else {
// 遍历 entries 单条解析
for (CanalEntry.Entry entry : entries) {
// 1. 获取表名
String tableName = entry.getHeader().getTableName();
// 2. 获取类型
EntryType entryType = entry.getEntryType();
// 3. 获取序列化后的数据
ByteString storeValue = entry.getStoreValue();
// 4. 判断当前entryType类型是否为ROWDATA
if (EntryType.ROWDATA.equals(entryType)) {
// 5. 反序列化数据
RowChange rowChange = RowChange.parseFrom(storeValue);
// 6. 获取当前时间的操作类型
EventType eventType = rowChange.getEventType();
// 7. 获取数据及
List<RowData> rowDatasList = rowChange.getRowDatasList();
// 8. 遍历 rowDatasList 并且打印
for (RowData rowData : rowDatasList) {
// 9. 打印更新前数据
JSONObject beforeData = new JSONObject();
List<Column> beforeColumnsList = rowData.getBeforeColumnsList();
for (Column column : beforeColumnsList) {
beforeData.put(column.getName(), column.getValue());
}
// 10. 打印更新后数据
JSONObject afterData = new JSONObject();
List<Column> afterColumnsList = rowData.getAfterColumnsList();
for (Column column : afterColumnsList) {
afterData.put(column.getName(), column.getValue());
}
System.out.println("Table:" + tableName +
", EventType: " + entryType +
", Before: " + beforeData +
", After: " + afterData);
}
} else {
System.out.println("当前操作类型为:" + entryType);
}
}
}
}
}
}
运行Main方法,一直挂着,不要关!!!
启动 Canal 服务端
[root@VM-0-3-centos canal]# pwd
/opt/module/canal
[root@VM-0-3-centos canal]# bin/startup.sh
[root@VM-0-3-centos canal]# jps
20745 CanalLauncher
20761 Jps
DB.Tbale 新增修改数据
新增一条SQL
INSERT INTO user_info VALUES ('1002','lisi','female')
用工具也行,不一定要语句的
IDEA控制台打印信息:
Table:user_info, EventType: ROWDATA, Before: {"sex":"female","id":"1002","NAME":"abc"}, After: {"sex":"female","id":"1002","NAME":"lisi"}
当前操作类型为:TRANSACTIONEND
当前没有数据,休息一会。。。
当前没有数据,休息一会。。。
更新一条SQL
UPDATE `test`.`user_info` SET `NAME` = 'wangwu' WHERE `id` = '1002' AND `NAME` = 'lisi' AND `sex` = 'female' LIMIT 1
IDEA控制台打印信息:
当前没有数据,休息一会。。。
当前操作类型为:TRANSACTIONBEGIN
Table:user_info, EventType: ROWDATA, Before: {"sex":"female","id":"1002","NAME":"lisi"}, After: {"sex":"female","id":"1002","NAME":"wangwu"}
当前操作类型为:TRANSACTIONEND
当前没有数据,休息一会。。。
当前没有数据,休息一会。。。
Kafka 连接方式 (非编码方式)
修改配置文件
修改:canal.properties 文件
[root@VM-0-3-centos canal]# vim conf/canal.properties
# TCP 修改为 kafak
# tcp, kafka, RocketMQ
#canal.serverMode = tcp
canal.serverMode = kafka
##################################################
######### MQ #############
##################################################
# 修改为kafka地址
#canal.mq.servers = 127.0.0.1:6667
canal.mq.servers = 127.0.0.1:9092
修改:instance.properties 文件
[root@VM-0-3-centos conf]# vim conf/example/instance.properties
# 修改 mq 的主题
# mq config
canal.mq.topic=canal_kafka
canal.mq.partition=0
修改后,就不能使用IDEA请求,因为连接不到,现在连接模式是Kafka
Kafka 启动消费者
## 进入容器
[root@VM-0-3-centos ~]# docker exec -it kafka bash
## 启动消费者
root@a9ea8b546c2d:/# kafka-console-consumer.sh --bootstrap-server kafka:9092 --from-beginning --topic example_canal_kafka
## 在数据库随意的增删改,查看能不能消费
{"data":[{"id":"1003","NAME":"lisi","sex":"male"}],"database":"test","es":1656645220000,"id":6,"isDdl":false,"mysqlType":{"id":"varchar(255)","NAME":"varchar(255)","sex":"varchar(255)"},"old":null,"sql":"","sqlType":{"id":12,"NAME":12,"sex":12},"table":"user_info","ts":1656645220451,"type":"INSERT"}
{"data":[{"id":"1003","NAME":"lisi","sex":"female"}],"database":"test","es":1656645272000,"id":7,"isDdl":false,"mysqlType":{"id":"varchar(255)","NAME":"varchar(255)","sex":"varchar(255)"},"old":[{"sex":"male"}],"sql":"","sqlType":{"id":12,"NAME":12,"sex":12},"table":"user_info","ts":1656645272803,"type":"UPDATE"}
{"data":[{"id":"1003","NAME":"lisi","sex":"female"}],"database":"test","es":1656645388000,"id":8,"isDdl":false,"mysqlType":{"id":"varchar(255)","NAME":"varchar(255)","sex":"varchar(255)"},"old":null,"sql":"","sqlType":{"id":12,"NAME":12,"sex":12},"table":"user_info","ts":1656645388216,"type":"DELETE"}
Q&A
1、提示
found canal.pid , Please run stop.sh first ,then startup.sh
该文件是记录了canal运行的pid号,用户stop的时候kill对应的pid。应该是没有执行stop.sh,需要执行一下stop.sh。如果执行了stop.sh,该文件还没删除,提示:存在canal.pid,但是进程又没有。可以直接删除canal.pid,在启动就可以了
参考网址:https://www.jianshu.com/p/83929b8e6712
版权声明:本文为eddielee9217原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。