Maxwell 的使用

  • Post author:
  • Post category:其他


Maxwell 的使用

1.Maxwell介绍

Maxwell介绍Maxwell是由美国zendesk开源,用java编写的Mysql实时抓取软件,其抓取的原理也是基于binlog。

2.Maxwell 和canal工具对比

➢Maxwell没有canal那种server+client模式,只有一个server把数据发送到消息队列或redis。如果需要多个实例,通过指定不同配置文件启动多个进程。

➢Maxwell有一个亮点功能,就是canal只能抓取最新数据,对已存在的历史数据没有办法处理。而Maxwell有一个bootstrap功能,可以直接引导出完整的历史数据用于初始化,非常好用。

➢Maxwell不能直接支持HA,但是它支持断点还原,即错误解决后重启继续上次点儿读取数据。

➢Maxwell只支持json格式,而Canal如果用Server+client模式的话,可以自定义格式。

➢Maxwell比Canal更加轻量级。

3.安装

安装Maxwell

[root@hadoop01 module]$ tar -zxvf /opt/software/maxwell-1.25.0.tar.gz -C /opt/module/3.4使用前准备工作

➢在数据库中建立一个maxwell库用于存储Maxwell的元数据[root@hadoop01 module]$ mysql -uroot -p123456mysql> CREATE DATABASE maxwell ;

➢分配一个账号可以操作该数据库mysql> GRANT ALL ON maxwell.* TO ‘maxwell’@’%’ IDENTIFIED BY ‘123456’;

➢分配这个账号可以监控其他数据库的权限mysql> GRANT SELECT,REPLICATION SLAVE , REPLICATION CLIENT ON

.

TO maxwell@’%’;

4.使用Maxwell监控抓取MySQL数据

[[root@hadoop01 maxwell-1.25.0]$ cp config.properties.example config.properties

➢修改配置文件

producer=kafka

kafka.bootstrap.servers=hadoop01:9092,hadoop02:9092,hadoop03:9092

#需要添加

kafka_topic=user_info_log

#mysql login info

host=hadoop01

user=maxwell

password=123456

#需要添加后续初始化会用

client_id=maxwell_1


注意:默认还是输出到指定Kafka主题的一个kafka分区,因为多个分区并行可能会打乱binlog的顺序如果要提高并行度,首先设置kafka的分区数>1,然后设置producer_partition_by属性可选值producer_partition_by=database|table|primary_key|random|column

➢在/home/atguigu/bin目录下编写maxwell.sh启动脚本

[root@hadoop01 maxwell-1.25.0]$ vim/home/root/bin/maxwell.sh

/opt/module/maxwell-1.25.0/bin/maxwell –config /opt/module/maxwell-1.25.0/config.properties >/dev/null 2>&1 &

➢授予执行权限

[root@hadoop01 maxwell-1.25.0]$ sudo chmod +x /home/root/bin/maxwell.sh

➢运行启动程序

[atguigu@hadoop202 maxwell-1.25.0]$ maxwell.sh

➢启动Kafka消费客户端,观察结果

[root@hadoop01 kafka]$ bin/kafka-console-consumer.sh –bootstrap-server hadoop01:9092 –topic user_info_log

5.Maxwell版本的ODS层处理

5.1执行不同操作,Maxwell和canal数据格式对比

➢执行insert测试语句

INSERT INTO z_user_info VALUES(30,‘zhang3’,‘13810001010’),(31,‘li4’,‘1389999999’);

  canal  

{“data”:[{“id”:“30”,“user_name”:“zhang3”,“tel”:“13810001010”},{“id”:“31”,“user_name”:“li4”,“tel”:“1389999999”}],“database”:“gmall-2020-04”,“es”:1589385314000,“id”:2,“isDdl”:false,“mysqlType”:{“id”:“bigint(20)”,“user_name”:“varchar(20)”,“tel”:“varchar(20)”},“old”:null,“pkNames”:[“id”],“sql”:””,“sqlType”:{“id”:-5,“user_name”:12,“tel”:12},“table”:“z_user_info”,“ts”:1589385314116,“type”:“INSERT”}

  maxwell

{“database”:“gmall-2020-04”,“table”:“z_user_info”,“type”:“insert”,“ts”:1589385314,“xid”:82982,“xoffset”:0,“data”:{“id”:30,“user_name”:“zhang3”,“tel”:“13810001010”}}

{“database”:“gmall202004”,“table”:“z_user_info”,“type”:“insert”,“ts”:1589385314,“xid”:82982,“commit”:true,“data”:{“id”:31,“user_name”:“li4”,“tel”:“1389999999”}}

5.2总结数据特点

➢日志结构

canal 每一条SQL会产生一条日志,如果该条Sql影响了多行数据,则已经会通过集合的方式归集在这条日志中。(即使是一条数据也会是数组结构)maxwell 以影响的数据为单位产生日志,即每影响一条数据就会产生一条日志。如果想知道这些日志是否是通过某一条sql产生的可以通过xid进行判断,相同的xid的日志来自同一sql。

➢数字类型

当原始数据是数字类型时,maxwell会尊重原始数据的类型不增加双引,变为字符串。canal一律转换为字符串。

➢带原始数据字段定义

canal数据中会带入表结构。maxwell更简洁。

5.3SparkStreaming对Topic分流业务代码

// An highlighted block
import com.alibaba.fastjson.{JSON, JSONArray, JSONObject}import com.atguigu.gmall.realtime.utils.{MyKafkaSink, MyKafkaUtil, OffsetManagerUtil}i
import  org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.spark.SparkConfimport org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{HasOffsetRanges, OffsetRange}
import org.apache.spark.streaming.{Seconds, StreamingContext}
/** Desc: 基于Maxwell从Kafka中读取业务数据,进行分流*/
object BaseDBMaxwellApp {
def main(args: Array[String]): Unit = {
	val sparkConf: SparkConf = newSparkConf().setMaster("local[4]").setAppName("BaseDBCanalApp")
	val ssc = new StreamingContext(sparkConf, Seconds(5))
	val topic = "ods_order_info"
	val groupId = "base_db_maxwell_group"//从Redis中读取偏移量
	var recoredDStream: InputDStream[ConsumerRecord[String, String]] = null
			val kafkaOffsetMap:Map[TopicPartition,Long=OffsetManagerUtil.getOffset(topic,groupId)
	if(kafkaOffsetMap!=null &&kafkaOffsetMap.size >0){
	recoredDStream = MyKafkaUtil.getKafkaStream(topic,ssc,kafkaOffsetMap,groupId)
	}
		else{recoredDStream = MyKafkaUtil.getKafkaStream(topic,ssc,groupId)
		}
//获取当前采集周期中处理的数据对应的分区已经偏移量
	var offsetRanges: Array[OffsetRange] = Array.empty[OffsetRange]
	val offsetDStream: DStream[ConsumerRecord[String, String]] = 										recoredDStream.transform {
			rdd => {offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRangesrdd
	}
}
//将从kafka中读取到的recore数据进行封装为json对象
val jsonObjDStream: DStream[JSONObject] = offsetDStream.map {
		record => {//获取value部分的json字符串
			val jsonStr: String = record.value()//将json格式字符串转换为json对象
			val jsonObject: JSONObject = JSON.parseObject(jsonStr)jsonObject
	}
}//从json对象中获取table和data,发送到不同的kafka主题
jsonObjDStream.foreachRDD{
		rdd=>{
			rdd.foreach{
				jsonObj=>{
	val opType: String = jsonObj.getString("type")
	val dataJsonObj: JSONObject = jsonObj.getJSONObject("data")
	if(dataJsonObj!=null && !dataJsonObj.isEmpty && !"delete".equals(opType)){
	//获取更新的表名
	val tableName: String = jsonObj.getString("table")//拼接发送的主题
	var sendTopic = "ods_" + tableName//向kafka发送消息MyKafkaSink.send(sendTopic,dataJsonObj.toString())
					}
			}
	}	//修改Redis中Kafka的偏移量OffsetManagerUtil.saveOffset(topic,groupId,offsetRanges)
		}
	}
	ssc.start()
	ssc.awaitTermination()
		}
	}

5.4测试

➢启动Redis

➢启动Maxwel

l➢运行BaseDBMaxwellApp程序

➢查看kafka下的主题

[root@hadoop01 kafka]$ bin/kafka-console-consumer.sh –bootstrap-server hadoop01:9092 –topic ods_order_info

6.生产数据 查看情况



版权声明:本文为weixin_43847900原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。