git地址:
阿里巴巴Canal的Git地址
Canal基于日志增量订阅和消费的业务包括:
- 数据库镜像、数据库实时备份
- 索引构建和实时维护(拆分异构索引、倒排索引)
- 业务cache刷新、带业务逻辑的增量数据处理
Mysql 的数据同步的架构图:
Canal是把自己伪装成一个从库:
Canal的优点
: 实时性好、分布式、ACK机制
Canal的缺点
:
- 只支持增量同步,不支持全量同步
- MYSQL -> ES、RDB
- 一个instance只能有一个消费端消费
- 单点压力大
Canal的组件
:
Cancal 和 ES 的数据同步的架构: Canal可以和Kafka无缝连接
Cancal配置:
- 开启mysql得bin-log日志
## mysql配置修改文件:
vim /etc/my.cnf
[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
## 重启服务
## systemctl stop mariadb
## systemctl start mariadb
mysql -uroot -proot
show variables like '%log_bin%';
## 授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant
CREATE USER root IDENTIFIED BY 'root';
GRANT ALL PRIVILEGES ON *.* TO 'root'@'%' IDENTIFIED BY 'root' WITH GRANT OPTION;
-- CREATE USER canal IDENTIFIED BY 'canal';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' IDENTIFIED BY 'canal' WITH GRANT OPTION;
-- GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
- Canal 配置
## 创建文件夹并 解压 canal
mkdir /usr/local/canal
tar -zxvf canal.deployer-1.1.4.tar.gz -C /usr/local/canal/
## 配置文件
vim /usr/local/canal/conf/canal.properties
## java程序连接端口
canal.port = 11111
vim /usr/local/canal/conf/example/instance.properties
## 不能与已有的mysql节点server-id重复
canal.instance.mysql.slaveId=1001
## mysql master的地址
canal.instance.master.address=192.168.11.31:3306
## 修改内容如下:
canal.instance.dbUsername=root #指定连接mysql的用户密码
canal.instance.dbPassword=root
canal.instance.connectionCharset = UTF-8 #字符集
## 启动canal
cd /usr/local/canal/bin
./startup.sh
## 验证服务
cat /usr/local/canal/logs/canal/canal.log
## 查看实例日志
tail -f -n 100 /usr/local/canal/logs/example/example.log
Canal 与 MQ 整合
:
官方文档:
Canal Kafka RocketMQ QuickStart
Java 操作 Canal例子
:
官方java例子: ClientSample.java
package com.alibaba.otter.canal.sample;
import java.net.InetSocketAddress;
import java.util.List;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
public class SimpleCanalClientExample {
public static void main(String args[]) {
// 创建链接
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),
11111), "example", "", "");
int batchSize = 1000;
int emptyCount = 0;
try {
connector.connect();
connector.subscribe(".*\\..*");
connector.rollback();
int totalEmptyCount = 120;
while (emptyCount < totalEmptyCount) {
Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
emptyCount++;
System.out.println("empty count : " + emptyCount);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
} else {
emptyCount = 0;
// System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
printEntry(message.getEntries());
}
connector.ack(batchId); // 提交确认
// connector.rollback(batchId); // 处理失败, 回滚数据
}
System.out.println("empty too many times, exit");
} finally {
connector.disconnect();
}
}
private static void printEntry(List<Entry> entrys) {
for (Entry entry : entrys) {
if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
continue;
}
RowChange rowChage = null;
try {
rowChage = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
e);
}
EventType eventType = rowChage.getEventType();
System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
eventType));
for (RowData rowData : rowChage.getRowDatasList()) {
if (eventType == EventType.DELETE) {
printColumn(rowData.getBeforeColumnsList());
} else if (eventType == EventType.INSERT) {
printColumn(rowData.getAfterColumnsList());
} else {
System.out.println("-------> before");
printColumn(rowData.getBeforeColumnsList());
System.out.println("-------> after");
printColumn(rowData.getAfterColumnsList());
}
}
}
}
private static void printColumn(List<Column> columns) {
for (Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
}
}
版权声明:本文为weixin_43538215原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。