流式同步工具Maxwell 全量增量导入最强示例

  • Post author:
  • Post category:其他


真是好东西,Maxwell流式同步MySQL到kafka



Maxwell 介绍

1,Maxwell 是由美国Zendesk开源,用Java编写的MySQL实时抓取软件。 实时读取MySQL二进制日志Binlog,并生成 JSON 格式的消息,作为生产者发送给 Kafka,Kinesis、RabbitMQ、Redis、Google Cloud Pub/Sub、文件或其它平台的应用程序。

2,官网地址:http://maxwells-daemon.io/

maxwell源代码:https://github.com/zendesk/maxwell

3, Maxwell的工作原理

很简单,就是把自己伪装成slave,假装从master复制数据

4, MySQL的binlog

(1)什么是binlog

MySQL的二进制日志可以说MySQL最重要的日志了,它记录了所有的DDL和DML(除了数据查询语句)语句,以事件形式记录,还包含语句所执行的消耗的时间,MySQL的二进制日志是事务安全型的。

一般来说开启二进制日志大概会有1%的性能损耗。二进制有两个最重要的使用场景:

其一:MySQL Replication在Master端开启binlog,Master把它的二进制日志传递给slaves来达到master-slave数据一致的目的。

其二:自然就是数据恢复了,通过使用mysqlbinlog工具来使恢复数据。

二进制日志包括两类文件:二进制日志索引文件(文件名后缀为.index)用于记录所有的二进制文件,二进制日志文件(文件名后缀为.00000*)记录数据库所有的DDL和DML(除了数据查询语句)语句事件。

(2)binlog的开启

Linux: /etc/my.cnf

Windows: \my.ini

在mysql的配置文件下,修改配置

在[mysqld] 区块,设置/添加 log-bin=mysql-bin

这个表示binlog日志的前缀是mysql-bin,以后生成的日志文件就是 mysql-bin.123 的文件后面的数字按顺序生成,每次mysql重启或者到达单个文件大小的阈值时,新生一个文件,按顺序编号。

(3)binlog的分类设置

mysql binlog的格式有三种,分别是STATEMENT,MIXED,ROW。

在配置文件中可以选择配置 binlog_format= statement|mixed|row

三种格式的区别:

statement

语句级,binlog会记录每次一执行写操作的语句。

相对row模式节省空间,但是可能产生不一致性,比如

update tt set create_date=now()

如果用binlog日志进行恢复,由于执行时间不同可能产生的数据就不同。

优点: 节省空间

缺点: 有可能造成数据不一致。

row

行级, binlog会记录每次操作后每行记录的变化。

优点:保持数据的绝对一致性。因为不管sql是什么,引用了什么函数,他只记录执行后的效果。

缺点:占用较大空间。

mixed

statement的升级版,一定程度上解决了,因为一些情况而造成的statement模式不一致问题

默认还是statement,在某些情况下譬如:

当函数中包含 UUID() 时;

包含 AUTO_INCREMENT 字段的表被更新时;

执行 INSERT DELAYED 语句时;

用 UDF 时;

会按照 ROW的方式进行处理

优点:节省空间,同时兼顾了一定的一致性。

缺点:还有些极个别情况依旧会造成不一致,另外statement和mixed对于需要对binlog的监控的情况都不方便。

综合上面对比,Maxwell想做监控分析,选择row格式比较合适



安装

一:依赖MySQL

1,修改/etc/my.cnf文件

[axian@bd202 module]$ sudo vim /etc/my.cnf

server-id= 1

log-bin=mysql-bin

binlog_format=row

binlog-do-db=map

注:binlog-do-db根据自己具体要同步的数据库

2,重启MySQL使配置生效

sudo systemctl restart mysqld

二:安装Maxwell

下载两种方式:

1, wget https://github.com/zendesk/maxwell/releases/download/v1.19.5/maxwell-1.19.5.tar.gz

2,直接官网下载

[xian@bd202 module]$ tar -zxvf /bd/software/maxwell-1.25.0.tar.gz -C /bd/module/

3, 初始化Maxwell元数据库

在MySQL中建立一个maxwell库用于存储Maxwell的元数据

[xian@bd202 module]$ mysql -uroot -p123456

mysql> CREATE DATABASE maxwell ;

设置安全级别

mysql> set global validate_password_length=4;

mysql> set global validate_password_policy=0;

分配一个账号可以操作该数据库

mysql> `GRANT ALL   ON maxwell.* TO 'maxwell'@'%' IDENTIFIED BY 'maxwell';`

分配这个账号可以监控其他数据库的权限

mysql>   GRANT  SELECT ,REPLICATION SLAVE , REPLICATION CLIENT  ON  *.*  TO   maxwell@  '%';

4,使用Maxwell监控抓取MySQL数据

拷贝配置文件

[xian@bd202 maxwell-1.25.0]$ cp config.properties.example config.properties

修改配置文件

producer=kafka

kafka.bootstrap.servers=bd202:9092,bd203:9092,bd204:9092

#需要添加

kafka_topic=ods_db

#mysql login info

host=bd202

user=maxwell

password=maxwell

#需要,初始化会用

client_id=maxwell_1

注:默认输出到指定Kafka主题的一个kafka分区,因为多分区并行可能会打乱binlog的顺序

如果要提高并行度,首先设置kafka的分区数>1,然后设置producer_partition_by属性

可选值producer_partition_by=database|table|primary_key|random| column



增量导入

1,启动maxwell

/bd/module/maxwell-1.25.0/bin/maxwell –config /opt/module/maxwell-1.25.0/config.properties >/dev/null 2>&1 &

2,可以同时看到kafka的消费,当然首先你的安装了kakfa:

bin/kafka-console-consumer.sh --bootstrap-server bd202:9092 --topic ods_db

3,一旦map库表有新增记录,则会同步消费到kafka



全量数据导入

1,例如:初始化用户表

bin/maxwell-bootstrap –user maxwell –password Maxwell –host bd202 –database map –table user_location –client_id maxwell_1

–user maxwell

数据库分配的操作maxwell数据库的用户名

–password maxwell

数据库分配的操作maxwell数据库的密码

–host

数据库主机名

–database

数据库名

–table

表名

–client_id

maxwell-bootstrap不具备将数据直接导入kafka或者hbase的能力,通过–client_id指定将数据交给哪个maxwell进程处理,在maxwell的conf.properties中配置

2,这样可以导入历史数据,甚至可以加where条件,方便好用


欢迎交流



版权声明:本文为lexoning原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。