数据采集工具之maxwell

  • Post author:
  • Post category:其他

1、安装maxwell

网站

https://maxwells-daemon.io/changelog/

选择v1.29.2

(v130.0以后的全是jdk11 ,就不是jdk8了)

选择maxwell-1.29.2.tar.gz点一下就下载了

安装就是把安装包扔到虚拟机的指定位置

tar -zxvf maxwell-1.29.2.tar.gz -C /usr/local/

我没配置环境变量,其实也可去配置环境变量

2、mysql的环境配置

1)修改mysql的配置

 vim /etc/my.cnf
 
[mysqld]
server_id=1
log-bin=mysql-bin
binlog_format=row
binlog-do-db=test_maxwell

binlog-do-db就是监听的数据库

systemctl restart mysqld
mysql -uroot -p'@Mmforu45'
 show variables like '%binlog%';
 #查看
 binlog_format | ROW

查看真实的binlog

 cd /var/lib/mysql
 ll
  mysqlbin.000001
  mysqlbin.index

2)初始化元数据库

mysql -uroot -p'@Mmforu45'
mysql> CREATE DATABASE maxwell;
mysql> set global validate_password_length=4;
mysql> set global validate_password_policy=0;
mysql> GRANT ALL ON maxwell.* TO 'maxwell'@'%' IDENTIFIED BY '@Mmforu45';
mysql> GRANT SELECT ,REPLICATION SLAVE , REPLICATION CLIENT ON *.* TO maxwell@'%';
mysql> flush privileges;

3、Maxwell的启动方式

3.1第一种启动方式

启动一下maxwell,但是maxwell和mysql一定是在同一个电脑里面!

cd /usr/local/maxwell-1.29.2
bin/maxwell --user='maxwell' --password='@Mmforu45' --host='qianfeng03' --producer=stdout

因为maxwell是前台打印,所以就关闭就直接ctrl+c

3.2 第二种启动方式

就是修改配置,然后启动配置

cd /usr/local/maxwell-1.29.2
cp  config.properties.example config.properties

修改

# tl;dr config
log_level=info

producer=stdout
kafka.bootstrap.servers=localhost:9092

# mysql login info
host=qianfeng03
user=maxwell
password=@Mmforu45

启动

bin/maxwell --config ./config.properties

打印在控制台的操作

bin/maxwell --config ./config.properties

在mysql中

4、案例实操

4.1 案例1

mysql -uroot -p'@Mmforu45'
use test_maxwell;
create table test01(
id tinyint,
name varchar(24)
);
insert into test01 value(1,'aaaa'),(2,'bbbbb'),(3,'cccc');

回看一下前台打印日志

[root@qianfeng03 maxwell-1.29.2]# bin/maxwell --config ./config.properties
Using kafka version: 1.0.0
23:25:22,490 INFO  Maxwell - Starting Maxwell. maxMemory: 652738560 bufferMemory                                                                               Usage: 0.25
23:25:22,538 INFO  Maxwell - Maxwell v1.29.2 is booting (StdoutProducer), starti                                                                               ng at Position[BinlogPosition[mysql-bin.000001:77135], lastHeartbeat=16573794609                                                                               62]
23:25:22,632 INFO  MysqlSavedSchema - Restoring schema id 1 (last modified at Po                                                                               sition[BinlogPosition[mysql-bin.000001:5598], lastHeartbeat=0])
23:25:22,730 INFO  BinlogConnectorReplicator - Setting initial binlog pos to: my                                                                               sql-bin.000001:77135
23:25:22,740 INFO  BinaryLogClient - Connected to qianfeng03:3306 at mysql-bin.0                                                                               00001/77135 (sid:6379, cid:68)
23:25:22,740 INFO  BinlogConnectorReplicator - Binlog connected.
23:32:59,024 INFO  AbstractSchemaStore - storing schema @Position[BinlogPosition                                                                               [mysql-bin.000001:108271], lastHeartbeat=1657380774724] after applying "create t                                                                               able test01( id tinyint, name varchar(24) )" to test_maxwell, new schema id is 2
{"database":"test_maxwell","table":"test01","type":"insert","ts":1657380895,"xid                                                                               ":2227,"xoffset":0,"data":{"id":1,"name":"aaaa"}}
{"database":"test_maxwell","table":"test01","type":"insert","ts":1657380895,"xid                                                                               ":2227,"xoffset":1,"data":{"id":2,"name":"bbbbb"}}
{"database":"test_maxwell","table":"test01","type":"insert","ts":1657380895,"xid                                                                               ":2227,"commit":true,"data":{"id":3,"name":"cccc"}}

4.2 案例2

有关kafka

start-all.sh
zkServer.sh start #每一个虚拟机都需要
kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties  #每一个虚拟机都需要

在windows可以安装一个kafak Tool 2.0.8 去监控kafka的状态

启动

 bin/maxwell --user='maxwell' --password='@Mmforu45' --host='qianfeng03' --producer=kafka --kafka.bootstrap.servers=qianfeng03:9092 --kafka_topic=maxwell

其实上面这个kafka的主题可以不存在,就是直接创建

数据库的操作

use test_maxwell;
insert into test01 value(3,'zs'),(4,'ls');

查看kafka的主题 (qianfeng03)

kafka-topics.sh \
--zookeeper qianfeng01:2181,qianfeng02:2181,qianfeng03:2181/kafka \
--list
[root@qianfeng03 ~]# kafka-topics.sh \
> --zookeeper qianfeng01:2181,qianfeng02:2181,qianfeng03:2181/kafka \
> --list
MiPhone
__consumer_offsets
flink-kafka
food
maxwell
pet
studen
student

上面是生成了主题

下面演示一下在公司生产环境中一个kafka是如何监控多个库的

maxwell 监控多个 mysql 库的数据,然后将这些数据发往 kafka 的一个主题 Topic,并且这个主题也肯定是多分区的,为了提高并发度。

修改一下配置文件

producer=kafka
kafka.bootstrap.servers=qianfeng03:9092

# mysql login info
host=qianfeng03
user=maxwell
password=@Mmforu45
...

#       *** kafka ***

# list of kafka brokers
#kafka.bootstrap.servers=hosta:9092,hostb:9092

# kafka topic to write to
# this can be static, e.g. 'maxwell', or dynamic, e.g. namespace_%{database}_%{table}
# in the latter case 'database' and 'table' will be replaced with the values for the row being processed
kafka_topic=maxwell3 #解除注释,maxwell加了一个3
...

#           *** partitioning ***

# What part of the data do we partition by?
#producer_partition_by=database # [database, table, primary_key, transaction_id, thread_id, column]
producer_partition_by=database
# specify what fields to partition by when using producer_partition_by=column
# column separated list.
#producer_partition_columns=id,foo,bar

# when using producer_partition_by=column, partition by this when
# the specified column(s) don't exist.
#producer_partition_by_fallback=database

kafka

kafka-topics.sh --zookeeper qianfeng01:2181,qianfeng02:2181,qianfeng03:2181/kafka --create --replication-factor 2 --partitions 3 --topic maxwell3

启动一下maxwell

bin/maxwell --config ./config.properties

4.3 案例3 输入的过滤(就是指定输入)

cd maxwell-1.29.2
bin/maxwell --user='maxwell' --password='@Mmforu45' --host='qianfeng03' --filter 'exclude: *.*,include:test_maxwell.test01' --producer=stdout

在mysql这边

mysql -uroot -p'@Mmforu45'
use test_maxwell;
insert into test01 value(45,'asad'),(34,'2144');

在qianfeng03的前程进程


[root@qianfeng03 maxwell-1.29.2]# bin/maxwell --user='maxwell' --password='@Mmforu45' --host='qianfeng03' --filter 'exclude: *.*,include:test_maxwell.test01' --producer=stdout
Using kafka version: 1.0.0
09:44:48,669 INFO  Maxwell - Starting Maxwell. maxMemory: 652738560 bufferMemoryUsage: 0.25
09:44:48,808 INFO  Maxwell - Maxwell v1.29.2 is booting (StdoutProducer), starting at Position[BinlogPosition[mysql-bin.000001:315977], lastHeartbeat=1657383903593]
09:44:49,007 INFO  MysqlSavedSchema - Restoring schema id 2 (last modified at Position[BinlogPosition[mysql-bin.000001:108271], lastHeartbeat=1657380774724])
09:44:49,149 INFO  MysqlSavedSchema - Restoring schema id 1 (last modified at Position[BinlogPosition[mysql-bin.000001:5598], lastHeartbeat=0])
09:44:49,200 INFO  MysqlSavedSchema - beginning to play deltas...
09:44:49,200 INFO  MysqlSavedSchema - played 1 deltas in 0ms
09:44:49,221 INFO  BinlogConnectorReplicator - Setting initial binlog pos to: mysql-bin.000001:315977
09:44:49,248 INFO  BinaryLogClient - Connected to qianfeng03:3306 at mysql-bin.000001/315977 (sid:6379, cid:17)
09:44:49,248 INFO  BinlogConnectorReplicator - Binlog connected.
{"database":"test_maxwell","table":"test01","type":"insert","ts":1657417604,"xid":418,"xoffset":0,"data":{"id":45,"name":"asad"}}
{"database":"test_maxwell","table":"test01","type":"insert","ts":1657417604,"xid":418,"commit":true,"data":{"id":34,"name":"2144"}}

4.4 Maxwell设置mysql的全量导入

就是对表进行初始化,然后再导入,其实maxwell和canal非常大的不同就是,maxwell是可以支持全量导入,而canal只能增量导入。

初始化数据库有两种方式

方式一

bin/maxwell-bootstrap --database fooDB --table barTable

方式二

在mysql的maxwell的元数据库中触发bootstrap表

mysql -uroot -p'@Mmforu45'
INSERT INTO maxwell.bootstrap(database_name,table_name) VALUES('test_maxwell','test01');

重新开启maxwell之后

[root@qianfeng03 maxwell-1.29.2]# bin/maxwell --user='maxwell' --password='@Mmfo                                                                               ru45' --host='qianfeng03' --filter 'exclude: *.*,include:test_maxwell.test01' --                                                                               producer=stdout
Using kafka version: 1.0.0
10:13:58,413 INFO  Maxwell - Starting Maxwell. maxMemory: 652738560 bufferMemory                                                                               Usage: 0.25
10:13:58,463 INFO  Maxwell - Maxwell v1.29.2 is booting (StdoutProducer), starti                                                                               ng at Position[BinlogPosition[mysql-bin.000002:408], lastHeartbeat=1657383903593                                                                               ]
10:13:58,557 INFO  MysqlSavedSchema - Restoring schema id 2 (last modified at Po                                                                               sition[BinlogPosition[mysql-bin.000001:108271], lastHeartbeat=1657380774724])
10:13:58,673 INFO  MysqlSavedSchema - Restoring schema id 1 (last modified at Po                                                                               sition[BinlogPosition[mysql-bin.000001:5598], lastHeartbeat=0])
10:13:58,719 INFO  MysqlSavedSchema - beginning to play deltas...
10:13:58,719 INFO  MysqlSavedSchema - played 1 deltas in 0ms
10:13:58,736 INFO  BinlogConnectorReplicator - Setting initial binlog pos to: my                                                                               sql-bin.000002:408
10:13:58,758 INFO  BinaryLogClient - Connected to qianfeng03:3306 at mysql-bin.0                                                                               00002/408 (sid:6379, cid:35)
10:13:58,758 INFO  BinlogConnectorReplicator - Binlog connected.
{"database":"test_maxwell","table":"test01","type":"bootstrap-start","ts":165741                                                                               9238,"data":{}}
10:13:58,766 INFO  SynchronousBootstrapper - bootstrapping started for test_maxw                                                                               ell.test01
{"database":"test_maxwell","table":"test01","type":"bootstrap-insert","ts":16574                                                                               19238,"data":{"id":1,"name":"aaaa"}}
{"database":"test_maxwell","table":"test01","type":"bootstrap-insert","ts":16574                                                                               19238,"data":{"id":2,"name":"bbbbb"}}
{"database":"test_maxwell","table":"test01","type":"bootstrap-insert","ts":16574                                                                               19238,"data":{"id":3,"name":"cccc"}}
{"database":"test_maxwell","table":"test01","type":"bootstrap-insert","ts":16574                                                                               19238,"data":{"id":3,"name":"zs"}}
{"database":"test_maxwell","table":"test01","type":"bootstrap-insert","ts":16574                                                                               19238,"data":{"id":4,"name":"ls"}}
{"database":"test_maxwell","table":"test01","type":"bootstrap-insert","ts":16574                                                                               19238,"data":{"id":6,"name":"fff"}}
{"database":"test_maxwell","table":"test01","type":"bootstrap-insert","ts":16574                                                                               19238,"data":{"id":45,"name":"asad"}}
{"database":"test_maxwell","table":"test01","type":"bootstrap-insert","ts":16574                                                                               19238,"data":{"id":34,"name":"2144"}}
{"database":"test_maxwell","table":"test01","type":"bootstrap-complete","ts":165                                                                               7419238,"data":{}}
10:13:58,787 INFO  SynchronousBootstrapper - bootstrapping ended for #1 test_max                                                                               well.test01

然后在mysql中插入

mysql> insert into test_maxwell.test01 values(12,'CCC'),(13,'DDDD');

就会看见前台会这样

10:13:58,787 INFO  SynchronousBootstrapper - bootstrapping ended for #1 test_max                                                                               well.test01
{"database":"test_maxwell","table":"test01","type":"insert","ts":1657419325,"xid":4322,"xoffset":0,"data":{"id":12,"name":"CCC"}}
{"database":"test_maxwell","table":"test01","type":"insert","ts":1657419325,"xid":4322,"commit":true,"data":{"id":13,"name":"DDDD"}}

其实数据库内容还是存在的,但是maxwell采集的数据就只有全量导入的一点点

mysql> select * from test_maxwell.test01;
+------+-------+
| id   | name  |
+------+-------+
|    1 | aaaa  |
|    2 | bbbbb |
|    3 | cccc  |
|    3 | zs    |
|    4 | ls    |
|    6 | fff   |
|   45 | asad  |
|   34 | 2144  |
|   12 | CCC   |
|   13 | DDDD  |
+------+-------+
10 rows in set (0.00 sec)

mysql>

把全量导入改成增量导入吧

就是在mysql中设置

DELETE FROM maxwell.bootstrap WHERE database_name = 'test_maxwell';

启动maxwell

bin/maxwell --user='maxwell' --password='@Mmforu45' --host='qianfeng03' --producer=stdout

mysql这边

insert into test_maxwell.test01 values(14,'eeee'),(15,'ffff');

maxwell的前台进程

10:24:44,133 INFO  BinlogConnectorReplicator - Binlog connected.
{"database":"test_maxwell","table":"test01","type":"insert","ts":1657419951,"xid":5789,"xoffset":0,"data":{"id":14,"name":"eeee"}}
{"database":"test_maxwell","table":"test01","type":"insert","ts":1657419951,"xid":5789,"commit":true,"data":{"id":15,"name":"ffff"}}

insert into test_maxwell.test01 values(14,'eeee'),(15,'ffff');

maxwell的前台进程

10:24:44,133 INFO  BinlogConnectorReplicator - Binlog connected.
{"database":"test_maxwell","table":"test01","type":"insert","ts":1657419951,"xid":5789,"xoffset":0,"data":{"id":14,"name":"eeee"}}
{"database":"test_maxwell","table":"test01","type":"insert","ts":1657419951,"xid":5789,"commit":true,"data":{"id":15,"name":"ffff"}}


版权声明:本文为weixin_45682261原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。