基于Flink CDC实现实时数据采集(四)-Sink接口实现
目前支持Hdfs、StarRocks、Mysql、Kafka端写入,每个写入分别都实现AbstractSink接口
Hdfs Sink
/**
* {
* "targetType":"hdfs",
* "targetConfig":{
* "targetPath":"xxxxxxxxxx",
* "partition":true
* }
* }
* @author xiaohf7
* @date 2022/4/12 9:37
* @Description
*/
public class HdfsSink extends AbstractSink<RowData> {
@Override
public void addSink(DataStream<RowData> stream, Task task, LogicalType[] logicalTypes, String[] fields) {
JSONObject targetConfig = task.getTargetConfig();
String targetPath = targetConfig.getString("targetPath");
Boolean isPartition = targetConfig.getBoolean("partition");
if (StringUtils.isEmpty(targetPath)) {
throw new RuntimeException("targetConfig.targetPath参数不能为空");
}
ParquetWriterFactory<RowData> factory = ParquetRowDataBuilder.createWriterFactory(getRowType(logicalTypes, fields),
new Configuration(), true);
Path basePath = new Path(HAUtils.switchPath(targetPath));
FileSink<RowData> fileSink = FileSink.forBulkFormat(basePath, factory)
.withBucketAssigner(new DateTimeBucketAssigner("yyyyMMdd"){
@Override
public String getBucketId(Object element, Context context) {
if(isPartition) {
// 如果分区,则数据生产到分区路径
return "ds=" + super.getBucketId(element, context);
}
// 不分区则写入到基础路径
return "";
}
}).build();
stream.sinkTo(fileSink).name(task.getTargetType() + "->" + task.getTableName());
}
@Override
public String getSinkType() {
return "hdfs";
}
/**
* 生成行类型
* @param logicalTypes 字段类型
* @param fields 字段名称
* @return 每行数据的schema
*/
public RowType getRowType(LogicalType[] logicalTypes, String[] fields) {
return RowType.of(
logicalTypes,
fields);
}
- 由于公司有两套集群,需要访问另外一台集群的hdfs,总是出现unknown cluster的异常,所以写了一个方法去捕获当前nameservices的active节点
public class HAUtils {
public static String switchPath(String path) {
try {
URI uri = new URI(path);
if("hdfs".equals(uri.getScheme()) && uri.getPort() == -1) {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://nameservice1");
conf.set("dfs.nameservices", "nameservice1");
conf.set("dfs.ha.namenodes.nameservice1", "nn1,nn2");
conf.set("dfs.namenode.rpc-address.nameservice1.nn1","ip1:8020");
conf.set("dfs.namenode.rpc-address.nameservice1.nn2", "ip2:8020");
conf.set("dfs.client.failover.proxy.provider.nameservice1",
"org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
FileSystem fileSystem = FileSystem.get(conf);
InetSocketAddress addressOfActive = HAUtil.getAddressOfActive(fileSystem);
fileSystem.close();
return path.replace(uri.getAuthority(), addressOfActive.getHostString() + ":" + addressOfActive.getPort());
}
return path;
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
}
Starrock Sink
/**
* {
* "targetType":"starrocks",
* "targetConfig":{
* "jdbcUrl":"jdbc:mysql://ip:port",
* "loadUrl":"ip:port,ip:port",
* "user":"",
* "password":"",
* "tableName":"",
* "dbName":""
* }
* }
* @author xiaohf7
* @date 2022/4/12 9:56
* @Description
*/
public class StarrocksSink extends AbstractSink<RowData> {
@Override
public void addSink(DataStream<RowData> stream, Task task, LogicalType[] logicalTypes, String[] fields) {
SinkFunction<RowData> sink = StarRocksSink.sink(
getTableSchema(logicalTypes, fields),
getSinkOptions(task),
new RowDataStarRocksSinkRowBuilder(logicalTypes
版权声明:本文为IT_xhf原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。