真是好东西,Maxwell流式同步MySQL到kafka
Maxwell 介绍
1,Maxwell 是由美国Zendesk开源,用Java编写的MySQL实时抓取软件。 实时读取MySQL二进制日志Binlog,并生成 JSON 格式的消息,作为生产者发送给 Kafka,Kinesis、RabbitMQ、Redis、Google Cloud Pub/Sub、文件或其它平台的应用程序。
2,官网地址:http://maxwells-daemon.io/
maxwell源代码:https://github.com/zendesk/maxwell
3, Maxwell的工作原理
很简单,就是把自己伪装成slave,假装从master复制数据
4, MySQL的binlog
(1)什么是binlog
MySQL的二进制日志可以说MySQL最重要的日志了,它记录了所有的DDL和DML(除了数据查询语句)语句,以事件形式记录,还包含语句所执行的消耗的时间,MySQL的二进制日志是事务安全型的。
一般来说开启二进制日志大概会有1%的性能损耗。二进制有两个最重要的使用场景:
其一:MySQL Replication在Master端开启binlog,Master把它的二进制日志传递给slaves来达到master-slave数据一致的目的。
其二:自然就是数据恢复了,通过使用mysqlbinlog工具来使恢复数据。
二进制日志包括两类文件:二进制日志索引文件(文件名后缀为.index)用于记录所有的二进制文件,二进制日志文件(文件名后缀为.00000*)记录数据库所有的DDL和DML(除了数据查询语句)语句事件。
(2)binlog的开启
Linux: /etc/my.cnf
Windows: \my.ini
在mysql的配置文件下,修改配置
在[mysqld] 区块,设置/添加 log-bin=mysql-bin
这个表示binlog日志的前缀是mysql-bin,以后生成的日志文件就是 mysql-bin.123 的文件后面的数字按顺序生成,每次mysql重启或者到达单个文件大小的阈值时,新生一个文件,按顺序编号。
(3)binlog的分类设置
mysql binlog的格式有三种,分别是STATEMENT,MIXED,ROW。
在配置文件中可以选择配置 binlog_format= statement|mixed|row
三种格式的区别:
statement
语句级,binlog会记录每次一执行写操作的语句。
相对row模式节省空间,但是可能产生不一致性,比如
update tt set create_date=now()
如果用binlog日志进行恢复,由于执行时间不同可能产生的数据就不同。
优点: 节省空间
缺点: 有可能造成数据不一致。
row
行级, binlog会记录每次操作后每行记录的变化。
优点:保持数据的绝对一致性。因为不管sql是什么,引用了什么函数,他只记录执行后的效果。
缺点:占用较大空间。
mixed
statement的升级版,一定程度上解决了,因为一些情况而造成的statement模式不一致问题
默认还是statement,在某些情况下譬如:
当函数中包含 UUID() 时;
包含 AUTO_INCREMENT 字段的表被更新时;
执行 INSERT DELAYED 语句时;
用 UDF 时;
会按照 ROW的方式进行处理
优点:节省空间,同时兼顾了一定的一致性。
缺点:还有些极个别情况依旧会造成不一致,另外statement和mixed对于需要对binlog的监控的情况都不方便。
综合上面对比,Maxwell想做监控分析,选择row格式比较合适
安装
一:依赖MySQL
1,修改/etc/my.cnf文件
[axian@bd202 module]$ sudo vim /etc/my.cnf
server-id= 1
log-bin=mysql-bin
binlog_format=row
binlog-do-db=map
注:binlog-do-db根据自己具体要同步的数据库
2,重启MySQL使配置生效
sudo systemctl restart mysqld
二:安装Maxwell
下载两种方式:
1, wget https://github.com/zendesk/maxwell/releases/download/v1.19.5/maxwell-1.19.5.tar.gz
2,直接官网下载
[xian@bd202 module]$ tar -zxvf /bd/software/maxwell-1.25.0.tar.gz -C /bd/module/
3, 初始化Maxwell元数据库
在MySQL中建立一个maxwell库用于存储Maxwell的元数据
[xian@bd202 module]$ mysql -uroot -p123456
mysql> CREATE DATABASE maxwell ;
设置安全级别
mysql> set global validate_password_length=4;
mysql> set global validate_password_policy=0;
分配一个账号可以操作该数据库
mysql> `GRANT ALL ON maxwell.* TO 'maxwell'@'%' IDENTIFIED BY 'maxwell';`
分配这个账号可以监控其他数据库的权限
mysql> GRANT SELECT ,REPLICATION SLAVE , REPLICATION CLIENT ON *.* TO maxwell@ '%';
4,使用Maxwell监控抓取MySQL数据
拷贝配置文件
[xian@bd202 maxwell-1.25.0]$ cp config.properties.example config.properties
修改配置文件
producer=kafka
kafka.bootstrap.servers=bd202:9092,bd203:9092,bd204:9092
#需要添加
kafka_topic=ods_db
#mysql login info
host=bd202
user=maxwell
password=maxwell
#需要,初始化会用
client_id=maxwell_1
注:默认输出到指定Kafka主题的一个kafka分区,因为多分区并行可能会打乱binlog的顺序
如果要提高并行度,首先设置kafka的分区数>1,然后设置producer_partition_by属性
可选值producer_partition_by=database|table|primary_key|random| column
增量导入
1,启动maxwell
/bd/module/maxwell-1.25.0/bin/maxwell –config /opt/module/maxwell-1.25.0/config.properties >/dev/null 2>&1 &
2,可以同时看到kafka的消费,当然首先你的安装了kakfa:
bin/kafka-console-consumer.sh --bootstrap-server bd202:9092 --topic ods_db
3,一旦map库表有新增记录,则会同步消费到kafka
全量数据导入
1,例如:初始化用户表
bin/maxwell-bootstrap –user maxwell –password Maxwell –host bd202 –database map –table user_location –client_id maxwell_1
–user maxwell
数据库分配的操作maxwell数据库的用户名
–password maxwell
数据库分配的操作maxwell数据库的密码
–host
数据库主机名
–database
数据库名
–table
表名
–client_id
maxwell-bootstrap不具备将数据直接导入kafka或者hbase的能力,通过–client_id指定将数据交给哪个maxwell进程处理,在maxwell的conf.properties中配置
2,这样可以导入历史数据,甚至可以加where条件,方便好用
欢迎交流