一、camus配置
camus.job.name=Camus Job
#hdfs存放路径
etl.destination.path=/user/hive/warehouse/binlog.db
#offsets, error logs, and count files存放路径
etl.execution.base.path=/camus/exec
#完成的jobs的输出路径
etl.execution.history.path=/camus/exec/history
#kafka日志解析,因为通过canal存放的日志是json格式的
camus.message.decoder.class=com.linkedin.camus.etl.kafka.coders.JsonStringMessageDecoder
# 即core-site.xml中的fs.defaultFS参数
fs.defaultFS=hdfs://nameservice1
# 落地到HDFS时的写入器,默认支持Avro、SequenceFile和字符串
# 这里我们采用一个自定义的WriterProvider,代码在后面
#etl.record.writer.provider.class=com.linkedin.camus.etl.kafka.common.StringRecordWriterProvider
etl.record.writer.provider.class=com.linkedin.camus.etl.kafka.common.CanalBinlogRecordWriterProvider
camus.message.timestamp.field=es
# 时间戳字段的格式
camus.message.timestamp.format=unix_milliseconds
# 时间分区的类型和格式,默认支持小时、天,也可以自定义时间
etl.partitioner.class=com.linkedin.camus.etl.kafka.partitioner.TimeBasedPartitioner
etl.destination.path.topic.sub.dirformat='pt_hour'=YYYYMMddHH
# max hadoop tasks to use, each task can pull multiple topic partitions
mapred.map.tasks=30
# max historical time that will be pulled from each partition based on event timestamp
kafka.max.pull.hrs=-1
# events with a timestamp older than this will be discarded.
kafka.max.historical.days=3
# Max minutes for each mapper to pull messages (-1 means no limit)
kafka.max.pull.minutes.per.task=-1
#黑白名单
kafka.blacklist.topics=Report2,Trade,caojia,ddmgServer,default-flume-topic,ddmg_biz.test_canal1,ddmg_biz.test_canal
kafka.whitelist.topics=ddmg_biz.test_canal2
log4j.configuration=true
#连接名
kafka.client.name=camus
#kafka地址
kafka.brokers=xxxx
etl.run.tracking.post=false
kafka.monitor.tier=
etl.counts.path=
kafka.monitor.time.granularity=10
etl.hourly=hourly
etl.daily=daily
etl.ignore.schema.errors=false
# configure output compression for deflate or snappy. Defaults to deflate
mapred.output.compress=false
#etl.output.codec=deflate
#etl.deflate.level=6
#etl.output.codec=snappy
etl.default.timezone=Asia/Shanghai
#etl.output.file.time.partition.mins=60
#etl.keep.count.files=false
#etl.execution.history.max.of.quota=.8
# Configures a customer reporter which extends BaseReporter to send etl data
#etl.reporter.class
mapred.map.max.attempts=1
kafka.client.buffer.size=20971520
#kafka.client.buffer.size=20480
kafka.client.so.timeout=60000
二、camus自定义解析代码
package com.linkedin.camus.etl.kafka.common;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Map.Entry;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.io.compress.SnappyCodec;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.ReflectionUtils;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.parser.Feature;
import com.linkedin.camus.coders.CamusWrapper;
import com.linkedin.camus.etl.IEtlKey;
import com.linkedin.camus.etl.RecordWriterProvider;
import com.linkedin.camus.etl.kafka.mapred.EtlMultiOutputFormat;
import org.apache.hadoop.conf.Configuration;
public class CanalBinlogRecordWriterProvider implements RecordWriterProvider {
protected String recordDelimiter = null;
public static final String ETL_OUTPUT_RECORD_DELIMITER = "etl.output.record.delimiter";
public static final String DEFAULT_RECORD_DELIMITER = "\n";
private boolean isCompressed = false;
private CompressionCodec codec = null;
private String extension = "";
public CanalBinlogRecordWriterProvider(TaskAttemptContext context) {
Configuration conf = context.getConfiguration();
if (recordDelimiter == null) {
recordDelimiter = conf.get(ETL_OUTPUT_RECORD_DELIMITER, DEFAULT_RECORD_DELIMITER);
}
isCompressed = FileOutputFormat.getCompressOutput(context);
if (isCompressed) {
Class<? extends CompressionCodec> codecClass = null;
if ("snappy".equals(EtlMultiOutputFormat.getEtlOutputCodec(context))) {
codecClass = SnappyCodec.class;
} else if ("gzip".equals((EtlMultiOutputFormat.getEtlOutputCodec(context)))) {
codecClass = GzipCodec.class;
} else {
codecClass = DefaultCodec.class;
}
codec = ReflectionUtils.newInstance(codecClass, conf);
extension = codec.getDefaultExtension();
}
}
static class CanalBinlogRecordWriter extends RecordWriter<IEtlKey, CamusWrapper> {
private DataOutputStream outputStream;
private String fieldDelimiter;
private String rowDelimiter;
public CanalBinlogRecordWriter(DataOutputStream outputStream, String fieldDelimiter, String rowDelimiter) {
this.outputStream = outputStream;
this.fieldDelimiter = fieldDelimiter;
this.rowDelimiter = rowDelimiter;
}
@Override
public void write(IEtlKey key, CamusWrapper value) throws IOException, InterruptedException {
if (value == null) {
return;
}
String recordStr = (String) value.getRecord();
JSONObject record = JSON.parseObject(recordStr, Feature.OrderedField);
if (record.getString("isDdl").equals("true")) {
return;
}
JSONArray data = record.getJSONArray("data");
for (int i = 0; i < data.size(); i++) {
JSONObject obj = data.getJSONObject(i);
if (obj != null) {
StringBuilder fieldsBuilder = new StringBuilder();
fieldsBuilder.append(record.getLong("id"));
fieldsBuilder.append(fieldDelimiter);
fieldsBuilder.append(record.getLong("es"));
fieldsBuilder.append(fieldDelimiter);
fieldsBuilder.append(record.getLong("ts"));
fieldsBuilder.append(fieldDelimiter);
fieldsBuilder.append(record.getString("type"));
for (Entry<String, Object> entry : obj.entrySet()) {
fieldsBuilder.append(fieldDelimiter);
fieldsBuilder.append(entry.getValue());
}
fieldsBuilder.append(rowDelimiter);
outputStream.write(fieldsBuilder.toString().getBytes());
}
}
}
@Override
public void close(TaskAttemptContext context) throws IOException, InterruptedException {
outputStream.close();
}
}
@Override
public String getFilenameExtension() {
return "";
}
@Override
public RecordWriter<IEtlKey, CamusWrapper> getDataRecordWriter(
TaskAttemptContext context,
String fileName,
CamusWrapper data,
FileOutputCommitter committer
) throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
String rowDelimiter = conf.get("etl.output.record.delimiter", "\n");
Path path = new Path(committer.getWorkPath(), EtlMultiOutputFormat.getUniqueFile(context, fileName, getFilenameExtension()));
FileSystem fs = path.getFileSystem(conf);
FSDataOutputStream outputStream = fs.create(path, false);
return new CanalBinlogRecordWriter(outputStream, "\t", rowDelimiter);
}
}
canal 主配置
# tcp bind ip
canal.ip =
# register ip to zookeeper
canal.register.ip =
canal.port = 11111
canal.metrics.pull.port = 11112
# zk地址
canal.zkServers = xxx
# flush data to zk
canal.zookeeper.flush.period = 1000
canal.withoutNetty = false
# tcp, kafka, RocketMQ
canal.serverMode = kafka
# flush meta cursor/parse position to file
canal.file.data.dir = /opt/canal/data
canal.file.flush.period = 1000
## memory store RingBuffer size, should be Math.pow(2,n)
canal.instance.memory.buffer.size = 16384
## memory store RingBuffer used memory unit size , default 1kb
canal.instance.memory.buffer.memunit = 1024
## meory store gets mode used MEMSIZE or ITEMSIZE
canal.instance.memory.batch.mode = MEMSIZE
canal.instance.memory.rawEntry = true
# support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery
canal.instance.transaction.size = 1024
# mysql fallback connected to new master should fallback times
canal.instance.fallbackIntervalInSeconds = 60
# network config
canal.instance.network.receiveBufferSize = 16384
canal.instance.network.sendBufferSize = 16384
canal.instance.network.soTimeout = 30
#监控过滤
# binlog filter config
#canal.instance.filter.druid.ddl = true
canal.instance.filter.query.dcl = true
canal.instance.filter.query.dml = false
canal.instance.filter.query.ddl = false
canal.instance.filter.table.error = false
canal.instance.filter.rows = false
canal.instance.filter.transaction.entry = false
# binlog format/image check
#canal.instance.binlog.format = ROW,STATEMENT,MIXED
canal.instance.binlog.format = ROW
canal.instance.binlog.image = FULL,MINIMAL,NOBLOB
# binlog ddl isolation
canal.instance.get.ddl.isolation = false
# parallel parser config
canal.instance.parser.parallel = true
## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors()
#canal.instance.parser.parallelThreadSize = 16
## disruptor ringbuffer size, must be power of 2
canal.instance.parser.parallelBufferSize = 256
#配置文件夹下的子文件夹名
canal.destinations = example
# conf root dir
canal.conf.dir = ../conf
# auto scan instance dir add/remove and start/stop instance
canal.auto.scan = true
canal.auto.scan.interval = 5
canal.instance.global.mode = spring
canal.instance.global.lazy = false
canal.instance.global.manager.address = ${canal.admin.manager}
#canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
canal.instance.global.spring.xml = classpath:spring/file-instance.xml
#### 消息队列配置
canal.mq.servers = mq地址
canal.mq.retries = 1
canal.mq.batchSize = 4096
canal.mq.maxRequestSize = 1048576
canal.mq.lingerMs = 10
canal.mq.bufferMemory = 33554432
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
canal.mq.flatMessage = true
canal.mq.compressionType = none
canal.mq.acks = all
#canal.mq.properties. =
#canal.mq.producerGroup = test #kafka无意义
# Set this value to "cloud", if you want open message trace feature in aliyun.
#canal.mq.accessChannel = local #kafka无意义
# aliyun mq namespace
#canal.mq.namespace =
canal 实例配置
## mysql serverId , v1.0.26+ will autoGen
canal.instance.mysql.slaveId=1234
# enable gtid use true/false
canal.instance.gtidon=false
# position info
canal.instance.master.address= mysql 地址和端口
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=
# username/password 数据库用户名和密码
canal.instance.dbUsername=xxx
canal.instance.dbPassword=xxx
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==
# table regex
canal.instance.filter.regex=ddmg_biz\\..*
# table black regex
canal.instance.filter.black.regex=
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch
# mq config
canal.mq.topic=ddmg_biz
# dynamic topic route by schema or table regex
canal.mq.dynamicTopic=.*\\..*
canal.mq.partition=0
结果
使用kafka-consumer进行消费
kafka-console-consumer --bootstrap-server cdh6:9092,cdh7:9092,cdh8:9092 --from-beginning --topic ddmg_biz.test_canal2
结果如下
{"data":null,"database":"ddmg_biz","es":1585796772000,"id":144,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":"CREATE TABLE `ddmg_biz`.`test_canal2` (\r\n `id` int(0) NOT NULL,\r\n `osr` varchar(255) NULL,\r\n `tesposr` varchar(255) NULL,\r\n PRIMARY KEY (`id`)\r\n)","sqlType":null,"table":"test_canal2","ts":1585796772708,"type":"CREATE"}
{"data":[{"id":"1","osr":"dfs","tesposr":"bcc"}],"database":"ddmg_biz","es":1585806475000,"id":178,"isDdl":false,"mysqlType":{"id":"int","osr":"varchar(255)","tesposr":"varchar(255)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"osr":12,"tesposr":12},"table":"test_canal2","ts":1585806476174,"type":"INSERT"}
{"data":[{"id":"2","osr":"dfs","tesposr":"dcxv"}],"database":"ddmg_biz","es":1585809008000,"id":234,"isDdl":false,"mysqlType":{"id":"int","osr":"varchar(255)","tesposr":"varchar(255)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"osr":12,"tesposr":12},"table":"test_canal2","ts":1585809008337,"type":"INSERT"}
{"data":[{"id":"3","osr":"dsdsd","tesposr":"dsfs"}],"database":"ddmg_biz","es":1585809856000,"id":238,"isDdl":false,"mysqlType":{"id":"int","osr":"varchar(255)","tesposr":"varchar(255)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"osr":12,"tesposr":12},"table":"test_canal2","ts":1585809856890,"type":"INSERT"}
{"data":[{"id":"3","osr":"aaa","tesposr":"dsfs"}],"database":"ddmg_biz","es":1585809936000,"id":239,"isDdl":false,"mysqlType":{"id":"int","osr":"varchar(255)","tesposr":"varchar(255)"},"old":[{"osr":"dsdsd"}],"pkNames":["id"],"sql":"","sqlType":{"id":4,"osr":12,"tesposr":12},"table":"test_canal2","ts":1585809936902,"type":"UPDATE"}
{"data":[{"id":"4","osr":"dfs","tesposr":"dddd"}],"database":"ddmg_biz","es":1585809945000,"id":240,"isDdl":false,"mysqlType":{"id":"int","osr":"varchar(255)","tesposr":"varchar(255)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"osr":12,"tesposr":12},"table":"test_canal2","ts":1585809945419,"type":"INSERT"}
{"data":[{"id":"2","osr":"dfs","tesposr":"dcxv"}],"database":"ddmg_biz","es":1585809949000,"id":241,"isDdl":false,"mysqlType":{"id":"int","osr":"varchar(255)","tesposr":"varchar(255)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"osr":12,"tesposr":12},"table":"test_canal2","ts":1585809949729,"type":"DELETE"}
{"data":[{"id":"5","osr":"dsf","tesposr":"aaa"}],"database":"ddmg_biz","es":1585814771000,"id":263,"isDdl":false,"mysqlType":{"id":"int","osr":"varchar(255)","tesposr":"varchar(255)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"osr":12,"tesposr":12},"table":"test_canal2","ts":1585814771154,"type":"INSERT"}
运行解析结果如下
hadoop jar camus-example-0.1.0-SNAPSHOT-shaded.jar com.linkedin.camus.etl.kafka.CamusJob -P camus.properties
263 1585814771000 1585814771154 INSERT 5 dsf aaa
版权声明:本文为illbehere原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。