基于Flink CDC实现实时数据采集(四)-Sink接口实现

  • Post author:
  • Post category:其他


基于Flink CDC实现实时数据采集(四)-Sink接口实现

目前支持Hdfs、StarRocks、Mysql、Kafka端写入,每个写入分别都实现AbstractSink接口



Hdfs Sink

/**
 * {
 *     "targetType":"hdfs",
 *     "targetConfig":{
 *         "targetPath":"xxxxxxxxxx",
 *         "partition":true
 *     }
 * }
 * @author xiaohf7
 * @date 2022/4/12 9:37
 * @Description
 */
public class HdfsSink extends AbstractSink<RowData> {
   

    @Override
    public void addSink(DataStream<RowData> stream, Task task, LogicalType[] logicalTypes, String[] fields) {
   

        JSONObject targetConfig = task.getTargetConfig();
        String targetPath = targetConfig.getString("targetPath");
        Boolean isPartition = targetConfig.getBoolean("partition");
        if (StringUtils.isEmpty(targetPath)) {
   
            throw new RuntimeException("targetConfig.targetPath参数不能为空");
        }

        ParquetWriterFactory<RowData> factory = ParquetRowDataBuilder.createWriterFactory(getRowType(logicalTypes, fields),
                new Configuration(), true);
        Path basePath = new Path(HAUtils.switchPath(targetPath));
        FileSink<RowData> fileSink = FileSink.forBulkFormat(basePath, factory)
                .withBucketAssigner(new DateTimeBucketAssigner("yyyyMMdd"){
   
                    @Override
                    public String getBucketId(Object element, Context context) {
   
                        if(isPartition) {
   
                            // 如果分区,则数据生产到分区路径
                            return "ds=" + super.getBucketId(element, context);
                        }
                        // 不分区则写入到基础路径
                        return "";
                    }
                }).build();
        stream.sinkTo(fileSink).name(task.getTargetType() + "->" + task.getTableName());
    }

    @Override
    public String getSinkType() {
   
        return "hdfs";
    }

    /**
     * 生成行类型
     * @param logicalTypes 字段类型
     * @param fields 字段名称
     * @return 每行数据的schema
     */
    public RowType getRowType(LogicalType[] logicalTypes, String[] fields) {
   
        return RowType.of(
                logicalTypes,
                fields);
    }
  • 由于公司有两套集群,需要访问另外一台集群的hdfs,总是出现unknown cluster的异常,所以写了一个方法去捕获当前nameservices的active节点
public class HAUtils {
   

    public static String switchPath(String path) {
   
        try {
   
            URI uri = new URI(path);
            if("hdfs".equals(uri.getScheme()) && uri.getPort() == -1) {
   
                Configuration conf = new Configuration();
                conf.set("fs.defaultFS", "hdfs://nameservice1");
                conf.set("dfs.nameservices", "nameservice1");
                conf.set("dfs.ha.namenodes.nameservice1", "nn1,nn2");
                conf.set("dfs.namenode.rpc-address.nameservice1.nn1","ip1:8020");
                conf.set("dfs.namenode.rpc-address.nameservice1.nn2", "ip2:8020");
                conf.set("dfs.client.failover.proxy.provider.nameservice1",
                        "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
                FileSystem fileSystem = FileSystem.get(conf);
                InetSocketAddress addressOfActive = HAUtil.getAddressOfActive(fileSystem);
                fileSystem.close();
                return path.replace(uri.getAuthority(), addressOfActive.getHostString() + ":" + addressOfActive.getPort());
            }
            return path;
        } catch (Exception e) {
   
            e.printStackTrace();
            return null;
        }
    }

}



Starrock Sink

/**
 * {
 *     "targetType":"starrocks",
 *     "targetConfig":{
 *         "jdbcUrl":"jdbc:mysql://ip:port",
 *         "loadUrl":"ip:port,ip:port",
 *         "user":"",
 *         "password":"",
 *         "tableName":"",
 *         "dbName":""
 *     }
 * }
 * @author xiaohf7
 * @date 2022/4/12 9:56
 * @Description
 */
public class StarrocksSink extends AbstractSink<RowData> {
   
    @Override
    public void addSink(DataStream<RowData> stream, Task task, LogicalType[] logicalTypes, String[] fields) {
   
        SinkFunction<RowData> sink = StarRocksSink.sink(
                getTableSchema(logicalTypes, fields),
                getSinkOptions(task),
                new RowDataStarRocksSinkRowBuilder(logicalTypes



版权声明:本文为IT_xhf原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。