canal kafka camus整合

  • Post author:
  • Post category:其他




一、camus配置

camus.job.name=Camus Job

#hdfs存放路径
etl.destination.path=/user/hive/warehouse/binlog.db
#offsets, error logs, and count files存放路径
etl.execution.base.path=/camus/exec
#完成的jobs的输出路径
etl.execution.history.path=/camus/exec/history

#kafka日志解析,因为通过canal存放的日志是json格式的
camus.message.decoder.class=com.linkedin.camus.etl.kafka.coders.JsonStringMessageDecoder
# 即core-site.xml中的fs.defaultFS参数
fs.defaultFS=hdfs://nameservice1


# 落地到HDFS时的写入器,默认支持Avro、SequenceFile和字符串
# 这里我们采用一个自定义的WriterProvider,代码在后面
#etl.record.writer.provider.class=com.linkedin.camus.etl.kafka.common.StringRecordWriterProvider
etl.record.writer.provider.class=com.linkedin.camus.etl.kafka.common.CanalBinlogRecordWriterProvider

camus.message.timestamp.field=es
# 时间戳字段的格式
camus.message.timestamp.format=unix_milliseconds

# 时间分区的类型和格式,默认支持小时、天,也可以自定义时间
etl.partitioner.class=com.linkedin.camus.etl.kafka.partitioner.TimeBasedPartitioner
etl.destination.path.topic.sub.dirformat='pt_hour'=YYYYMMddHH



# max hadoop tasks to use, each task can pull multiple topic partitions
mapred.map.tasks=30
# max historical time that will be pulled from each partition based on event timestamp
kafka.max.pull.hrs=-1
# events with a timestamp older than this will be discarded.
kafka.max.historical.days=3
# Max minutes for each mapper to pull messages (-1 means no limit)
kafka.max.pull.minutes.per.task=-1

#黑白名单
kafka.blacklist.topics=Report2,Trade,caojia,ddmgServer,default-flume-topic,ddmg_biz.test_canal1,ddmg_biz.test_canal
kafka.whitelist.topics=ddmg_biz.test_canal2
log4j.configuration=true

#连接名
kafka.client.name=camus

#kafka地址
kafka.brokers=xxxx


etl.run.tracking.post=false
kafka.monitor.tier=
etl.counts.path=
kafka.monitor.time.granularity=10

etl.hourly=hourly
etl.daily=daily

etl.ignore.schema.errors=false

# configure output compression for deflate or snappy. Defaults to deflate
mapred.output.compress=false
#etl.output.codec=deflate
#etl.deflate.level=6
#etl.output.codec=snappy

etl.default.timezone=Asia/Shanghai
#etl.output.file.time.partition.mins=60
#etl.keep.count.files=false
#etl.execution.history.max.of.quota=.8

# Configures a customer reporter which extends BaseReporter to send etl data
#etl.reporter.class

mapred.map.max.attempts=1

kafka.client.buffer.size=20971520
#kafka.client.buffer.size=20480
kafka.client.so.timeout=60000



二、camus自定义解析代码

package com.linkedin.camus.etl.kafka.common;

import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Map.Entry;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.io.compress.SnappyCodec;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.ReflectionUtils;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.parser.Feature;
import com.linkedin.camus.coders.CamusWrapper;
import com.linkedin.camus.etl.IEtlKey;
import com.linkedin.camus.etl.RecordWriterProvider;
import com.linkedin.camus.etl.kafka.mapred.EtlMultiOutputFormat;
import org.apache.hadoop.conf.Configuration;


public class CanalBinlogRecordWriterProvider implements RecordWriterProvider {
	protected String recordDelimiter = null;
	public static final String ETL_OUTPUT_RECORD_DELIMITER = "etl.output.record.delimiter";
	public static final String DEFAULT_RECORD_DELIMITER = "\n";
	private boolean isCompressed = false;
	private CompressionCodec codec = null;
	private String extension = "";
	
	public CanalBinlogRecordWriterProvider(TaskAttemptContext context) {
		Configuration conf = context.getConfiguration();
		if (recordDelimiter == null) {
		      recordDelimiter = conf.get(ETL_OUTPUT_RECORD_DELIMITER, DEFAULT_RECORD_DELIMITER);
		    }

		    isCompressed = FileOutputFormat.getCompressOutput(context);

		    if (isCompressed) {
		      Class<? extends CompressionCodec> codecClass = null;
		      if ("snappy".equals(EtlMultiOutputFormat.getEtlOutputCodec(context))) {
		        codecClass = SnappyCodec.class;
		      } else if ("gzip".equals((EtlMultiOutputFormat.getEtlOutputCodec(context)))) {
		        codecClass = GzipCodec.class;
		      } else {
		        codecClass = DefaultCodec.class;
		      }
		      codec = ReflectionUtils.newInstance(codecClass, conf);
		      extension = codec.getDefaultExtension();
		    }
	}
	
    static class CanalBinlogRecordWriter extends RecordWriter<IEtlKey, CamusWrapper> {
        private DataOutputStream outputStream;
        private String fieldDelimiter;
        private String rowDelimiter;

        public CanalBinlogRecordWriter(DataOutputStream outputStream, String fieldDelimiter, String rowDelimiter) {
            this.outputStream = outputStream;
            this.fieldDelimiter = fieldDelimiter;
            this.rowDelimiter = rowDelimiter;
        }

        @Override
        public void write(IEtlKey key, CamusWrapper value) throws IOException, InterruptedException {
            if (value == null) {
                return;
            }

            String recordStr = (String) value.getRecord();
            JSONObject record = JSON.parseObject(recordStr, Feature.OrderedField);
            if (record.getString("isDdl").equals("true")) {
                return;
            }

            JSONArray data = record.getJSONArray("data");
            for (int i = 0; i < data.size(); i++) {
                JSONObject obj = data.getJSONObject(i);
                if (obj != null) {
                    StringBuilder fieldsBuilder = new StringBuilder();
                    fieldsBuilder.append(record.getLong("id"));
                    fieldsBuilder.append(fieldDelimiter);
                    fieldsBuilder.append(record.getLong("es"));
                    fieldsBuilder.append(fieldDelimiter);
                    fieldsBuilder.append(record.getLong("ts"));
                    fieldsBuilder.append(fieldDelimiter);
                    fieldsBuilder.append(record.getString("type"));

                    for (Entry<String, Object> entry : obj.entrySet()) {
                        fieldsBuilder.append(fieldDelimiter);
                        fieldsBuilder.append(entry.getValue());
                    }
                    fieldsBuilder.append(rowDelimiter);

                    outputStream.write(fieldsBuilder.toString().getBytes());
                }
            }
        }

        @Override
        public void close(TaskAttemptContext context) throws IOException, InterruptedException {
            outputStream.close();
        }
    }

    @Override
    public String getFilenameExtension() {
        return "";
    }

    @Override
    public RecordWriter<IEtlKey, CamusWrapper> getDataRecordWriter(
        TaskAttemptContext context,
        String fileName,
        CamusWrapper data,
        FileOutputCommitter committer
    ) throws IOException, InterruptedException {
        Configuration conf = context.getConfiguration();
        String rowDelimiter = conf.get("etl.output.record.delimiter", "\n");

        Path path = new Path(committer.getWorkPath(), EtlMultiOutputFormat.getUniqueFile(context, fileName, getFilenameExtension()));
        FileSystem fs = path.getFileSystem(conf);
        FSDataOutputStream outputStream = fs.create(path, false);

        return new CanalBinlogRecordWriter(outputStream, "\t", rowDelimiter);
    }
}



canal 主配置

# tcp bind ip
canal.ip = 
# register ip to zookeeper
canal.register.ip = 
canal.port = 11111
canal.metrics.pull.port = 11112
# zk地址
canal.zkServers = xxx
# flush data to zk
canal.zookeeper.flush.period = 1000
canal.withoutNetty = false
# tcp, kafka, RocketMQ
canal.serverMode = kafka
# flush meta cursor/parse position to file
canal.file.data.dir = /opt/canal/data
canal.file.flush.period = 1000
## memory store RingBuffer size, should be Math.pow(2,n)
canal.instance.memory.buffer.size = 16384
## memory store RingBuffer used memory unit size , default 1kb
canal.instance.memory.buffer.memunit = 1024 
## meory store gets mode used MEMSIZE or ITEMSIZE
canal.instance.memory.batch.mode = MEMSIZE
canal.instance.memory.rawEntry = true

# support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery
canal.instance.transaction.size =  1024
# mysql fallback connected to new master should fallback times
canal.instance.fallbackIntervalInSeconds = 60

# network config
canal.instance.network.receiveBufferSize = 16384
canal.instance.network.sendBufferSize = 16384
canal.instance.network.soTimeout = 30

#监控过滤
# binlog filter config
#canal.instance.filter.druid.ddl = true
canal.instance.filter.query.dcl = true
canal.instance.filter.query.dml = false
canal.instance.filter.query.ddl = false
canal.instance.filter.table.error = false
canal.instance.filter.rows = false
canal.instance.filter.transaction.entry = false

# binlog format/image check
#canal.instance.binlog.format = ROW,STATEMENT,MIXED 
canal.instance.binlog.format = ROW
canal.instance.binlog.image = FULL,MINIMAL,NOBLOB

# binlog ddl isolation
canal.instance.get.ddl.isolation = false

# parallel parser config
canal.instance.parser.parallel = true
## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors()
#canal.instance.parser.parallelThreadSize = 16
## disruptor ringbuffer size, must be power of 2
canal.instance.parser.parallelBufferSize = 256

#配置文件夹下的子文件夹名
canal.destinations = example
# conf root dir
canal.conf.dir = ../conf
# auto scan instance dir add/remove and start/stop instance
canal.auto.scan = true
canal.auto.scan.interval = 5


canal.instance.global.mode = spring
canal.instance.global.lazy = false
canal.instance.global.manager.address = ${canal.admin.manager}
#canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
canal.instance.global.spring.xml = classpath:spring/file-instance.xml


#### 消息队列配置
canal.mq.servers = mq地址
canal.mq.retries = 1
canal.mq.batchSize = 4096 
canal.mq.maxRequestSize = 1048576
canal.mq.lingerMs = 10
canal.mq.bufferMemory = 33554432
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
canal.mq.flatMessage = true
canal.mq.compressionType = none
canal.mq.acks = all


#canal.mq.properties. =
#canal.mq.producerGroup = test   #kafka无意义
# Set this value to "cloud", if you want open message trace feature in aliyun.
#canal.mq.accessChannel = local  #kafka无意义
# aliyun mq namespace
#canal.mq.namespace =



canal 实例配置

## mysql serverId , v1.0.26+ will autoGen
canal.instance.mysql.slaveId=1234
# enable gtid use true/false
canal.instance.gtidon=false

# position info
canal.instance.master.address= mysql 地址和端口
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=


# username/password 数据库用户名和密码
canal.instance.dbUsername=xxx
canal.instance.dbPassword=xxx
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==

# table regex
canal.instance.filter.regex=ddmg_biz\\..*
# table black regex
canal.instance.filter.black.regex=
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch

# mq config
canal.mq.topic=ddmg_biz
# dynamic topic route by schema or table regex
canal.mq.dynamicTopic=.*\\..* 
canal.mq.partition=0



结果

使用kafka-consumer进行消费

kafka-console-consumer --bootstrap-server  cdh6:9092,cdh7:9092,cdh8:9092  --from-beginning --topic ddmg_biz.test_canal2

结果如下

{"data":null,"database":"ddmg_biz","es":1585796772000,"id":144,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":"CREATE TABLE `ddmg_biz`.`test_canal2`  (\r\n  `id` int(0) NOT NULL,\r\n  `osr` varchar(255) NULL,\r\n  `tesposr` varchar(255) NULL,\r\n  PRIMARY KEY (`id`)\r\n)","sqlType":null,"table":"test_canal2","ts":1585796772708,"type":"CREATE"}
{"data":[{"id":"1","osr":"dfs","tesposr":"bcc"}],"database":"ddmg_biz","es":1585806475000,"id":178,"isDdl":false,"mysqlType":{"id":"int","osr":"varchar(255)","tesposr":"varchar(255)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"osr":12,"tesposr":12},"table":"test_canal2","ts":1585806476174,"type":"INSERT"}
{"data":[{"id":"2","osr":"dfs","tesposr":"dcxv"}],"database":"ddmg_biz","es":1585809008000,"id":234,"isDdl":false,"mysqlType":{"id":"int","osr":"varchar(255)","tesposr":"varchar(255)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"osr":12,"tesposr":12},"table":"test_canal2","ts":1585809008337,"type":"INSERT"}
{"data":[{"id":"3","osr":"dsdsd","tesposr":"dsfs"}],"database":"ddmg_biz","es":1585809856000,"id":238,"isDdl":false,"mysqlType":{"id":"int","osr":"varchar(255)","tesposr":"varchar(255)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"osr":12,"tesposr":12},"table":"test_canal2","ts":1585809856890,"type":"INSERT"}
{"data":[{"id":"3","osr":"aaa","tesposr":"dsfs"}],"database":"ddmg_biz","es":1585809936000,"id":239,"isDdl":false,"mysqlType":{"id":"int","osr":"varchar(255)","tesposr":"varchar(255)"},"old":[{"osr":"dsdsd"}],"pkNames":["id"],"sql":"","sqlType":{"id":4,"osr":12,"tesposr":12},"table":"test_canal2","ts":1585809936902,"type":"UPDATE"}
{"data":[{"id":"4","osr":"dfs","tesposr":"dddd"}],"database":"ddmg_biz","es":1585809945000,"id":240,"isDdl":false,"mysqlType":{"id":"int","osr":"varchar(255)","tesposr":"varchar(255)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"osr":12,"tesposr":12},"table":"test_canal2","ts":1585809945419,"type":"INSERT"}
{"data":[{"id":"2","osr":"dfs","tesposr":"dcxv"}],"database":"ddmg_biz","es":1585809949000,"id":241,"isDdl":false,"mysqlType":{"id":"int","osr":"varchar(255)","tesposr":"varchar(255)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"osr":12,"tesposr":12},"table":"test_canal2","ts":1585809949729,"type":"DELETE"}
{"data":[{"id":"5","osr":"dsf","tesposr":"aaa"}],"database":"ddmg_biz","es":1585814771000,"id":263,"isDdl":false,"mysqlType":{"id":"int","osr":"varchar(255)","tesposr":"varchar(255)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"osr":12,"tesposr":12},"table":"test_canal2","ts":1585814771154,"type":"INSERT"}



运行解析结果如下

 hadoop jar camus-example-0.1.0-SNAPSHOT-shaded.jar  com.linkedin.camus.etl.kafka.CamusJob -P camus.properties
263	1585814771000	1585814771154	INSERT	5	dsf	aaa



版权声明:本文为illbehere原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。