一、Canal
1、介绍
Canal是阿里巴巴旗下的一款开源项目,纯Java开发,基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持MySQL。
2、工作原理
Canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议,mysql master收到dump请求,开始推送binary log给slave(也就是Canal),Canal解析binary log对象(原始为byte流)。
3、Canal主要实现的业务
(1)数据同步
(2)数据库实时备份
(3)实现缓存与数据库数据一致
…….
二、集成使用
1、mysql
(1)首先mysql开启binlog日志,在my.ini或my.cnf文件中配置
log‐bin=mysql‐bin #添加这一行就ok
binlog‐format=ROW #选择row模式
server‐id=1 #配置mysql replaction需要定义,不能和canal的slaveId重复
(3)使用root账户创建mysql的canal用户并赋予权限
1 create user canal identified by 'canal';
2
3 GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
4
5 FLUSH PRIVILEGES;
2、canal
canal安装(windows服务器为例,linux环境可以使用同样的canal安装包)
(1)下载地址:
Releases · alibaba/canal · GitHub
(2)创建文件夹,解压
(3)修改conf\example文件夹下instance.properties配置文件
(4)启动canal服务(windows下是bat,linux是sh)
(5)查看canal启动日志,如下图表示启动成功
3、项目中集成
(1)添加pom依赖
<!-- canal -->
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.6</version>
</dependency>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.protocol</artifactId>
<version>1.1.6</version>
</dependency>
(2)application配置文件添加canal配置,ip改为canal部署的服务器的ip
# canal
canal:
server:
ip: 127.0.0.1
port: 11111
promotion:
destination: example
(3)创建canal监听类
/**
* @author Aruioooo
* @date 2023-05-15 10:02
* @description canal监听类
*/
@Slf4j
@Component
@Order(1)
public class CanalListener implements CommandLineRunner {
@Value("${canal.server.ip}")
private String canalServerIp;
@Value("${canal.server.port}")
private int canalServerPort;
@Value("${canal.promotion.destination}")
private String destination;
@Override
public void run(String... args) throws Exception {
CanalConnector conn = CanalConnectors.newSingleConnector(
new InetSocketAddress(canalServerIp, canalServerPort), destination, "", "");
int batchSize = 1000;
while (true) {
conn.connect();
//订阅实例中所有的数据库和表
/*
这里注意下:
conn.subscribe(".*\\..*"); 会导致服务端的canal.instance.filter.regex=.*\\..* 失效。
更严重的是canal会一直向你的example.log日志文件写入日志,
测了一下大概12小时会写入20M大小的日志。
*/
//conn.subscribe(".*\\..*");
// 回滚到未进行ack的地方
conn.rollback();
// 获取数据 每次获取一千条改变数据
Message message = conn.getWithoutAck(batchSize);
//获取这条消息的id
long id = message.getId();
int size = message.getEntries().size();
if (id != -1 && size > 0) {
// 数据处理
dataHandle(message.getEntries());
}else {
//暂停1秒防止重复链接数据库
Thread.sleep(1000);
}
// 确认消费完成这条消息
conn.ack(id);
// 关闭连接
conn.disconnect();
}
}
/**
* 数据处理
*
* @param entries
*/
private void dataHandle(List<CanalEntry.Entry> entries) {
for (CanalEntry.Entry entry : entries) {
// 解析binlog
CanalEntry.RowChange rowChange;
try {
rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("数据处理出现异常 data:" + entry.toString(), e);
}
if (rowChange != null) {
// 获取操作类型
CanalEntry.EventType eventType = rowChange.getEventType();
for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
dataDetails(rowData.getBeforeColumnsList(), rowData.getAfterColumnsList(), eventType, entry);
}
}
}
}
/**
* 解析数据
*
* @param beforeColumns 操作前的数据
* @param afterColumns 操作后的数据
* @param eventType 操作类型(INSERT,UPDATE,DELETE)
* @param entry
*/
private void dataDetails(List<CanalEntry.Column> beforeColumns, List<CanalEntry.Column> afterColumns,
CanalEntry.EventType eventType, CanalEntry.Entry entry) {
log.info("数据库:" + entry.getHeader().getSchemaName());
log.info("表名:" + entry.getHeader().getTableName());
log.info("操作时间:" + entry.getHeader().getExecuteTime());
log.info("操作类型:" + eventType);
if (CanalEntry.EventType.INSERT.equals(eventType)) {
// log.info("新增数据:"+afterColumns);
insertHandle(entry);
} else if (CanalEntry.EventType.DELETE.equals(eventType)) {
// log.info("删除数据:"+beforeColumns);
deleteHandle(entry);
} else {
// log.info("更新数据:更新前数据--"+beforeColumns);
// log.info("更新数据:更新后数据--"+afterColumns);
updateHandle(entry);
}
}
/**
* 新增数据操作
*
* @param entry
*/
private void insertHandle(CanalEntry.Entry entry) {
try {
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
for (CanalEntry.RowData rowData : rowDatasList) {
List<CanalEntry.Column> columnList = rowData.getAfterColumnsList();
columnList.forEach(c ->{
log.info("字段名:"+c.getName()+",字段值:"+c.getValue());
});
// StringBuffer sql = new StringBuffer("insert into " + entry.getHeader().getTableName() + " (");
// for (int i = 0; i < columnList.size(); i++) {
// sql.append(columnList.get(i).getName());
// if (i != columnList.size() - 1) {
// sql.append(",");
// }
// }
// sql.append(") VALUES (");
// for (int i = 0; i < columnList.size(); i++) {
// sql.append("'" + columnList.get(i).getValue() + "'");
// if (i != columnList.size() - 1) {
// sql.append(",");
// }
// }
// sql.append(")");
// 插入sql语句
// log.info(sql.toString());
}
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
}
/**
* 删除数据操作
*
* @param entry
*/
private void deleteHandle(CanalEntry.Entry entry) {
try {
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
for (CanalEntry.RowData rowData : rowDatasList) {
List<CanalEntry.Column> columnList = rowData.getBeforeColumnsList();
columnList.forEach(column ->{
if (column.getIsKey()){
log.info("删除主键"+column.getName()+"值为"+column.getValue()+"的记录");
}
log.info("删除前"+column.getName()+"字段值为:"+column.getValue());
});
// StringBuffer sql = new StringBuffer("delete from " + entry.getHeader().getTableName() + " where ");
// for (CanalEntry.Column column : columnList) {
// if (column.getIsKey()) {
// //暂时只支持单一主键
// sql.append(column.getName() + "=" + column.getValue());
// break;
// }
// }
// 删除sql语句
// log.info(sql.toString());
}
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
}
/**
* 更新数据操作
*
* @param entry
*/
private void updateHandle(CanalEntry.Entry entry) {
try {
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
for (CanalEntry.RowData rowData : rowDatasList) {
List<CanalEntry.Column> newColumnList = rowData.getAfterColumnsList();
List<CanalEntry.Column> oldColumnList = rowData.getBeforeColumnsList();
oldColumnList.forEach(oldColumn ->{
log.info("更新前"+oldColumn.getName()+"值为:"+oldColumn.getValue());
});
log.info("----------------------------------------------------------------------------------");
newColumnList.forEach(newColumn ->{
log.info("更新后"+newColumn.getName()+"值为:"+newColumn.getValue());
});
// StringBuffer sql = new StringBuffer("update " + entry.getHeader().getTableName() + " set ");
// for (int i = 0; i < newColumnList.size(); i++) {
// sql.append(" " + newColumnList.get(i).getName()
// + " = '" + newColumnList.get(i).getValue() + "'");
// if (i != newColumnList.size() - 1) {
// sql.append(",");
// }
// }
// sql.append(" where ");
// for (CanalEntry.Column column : oldColumnList) {
// if (column.getIsKey()) {
// //暂时只支持单一主键
// sql.append(column.getName() + "=" + column.getValue());
// break;
// }
// }
// 更新sql语句
// log.info(sql.toString());
}
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
}
}
(4)测试,修改数据库数据,canal监听到数据库数据操作记录