文章目录
flink 继承connector源码二次开发思路
说明:其他连接器jdbc,kafka等等二次开发思路一致
推荐:公司基于flink开发内部平台,一些内部的特殊场景与需求,经常需要修改源码。但是修改源码在版本更新的情况下会导致开发成本大,周期长。本方案通过继承源码的方式,通过加强,打包覆盖源码的类解决上述问题。
1. idea 创建module项目
2. pom文件说明-以elasticsearch6为例
2.1 参考官方提供的连接包设置
elasticsearch6 为案例,部分pom参考官方提供的连接包,版本号对应,
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-connector-elasticsearch6_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
2.2 maven-shade-plugin
- 作用:maven-shade-plugin打包
- 拷贝flink-sql-connector-elasticsearch6_${scala.binary.version}的所有配置,其余的配置根据自身情况做相应的配置
2.3 elasticsearch6 二开pom配置
<properties>
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
<flink.version>1.13.1</flink.version>
<scala.binary.version>2.11</scala.binary.version>
<slf4j.version>1.7.15</slf4j.version>
<sql.driver.version>8.0.21</sql.driver.version>
<fastjson.version>1.2.75</fastjson.version>
<google.guava.version>20.0</google.guava.version>
<apache.commons.version>3.11</apache.commons.version>
<cn.hutool.all.version>5.5.2</cn.hutool.all.version>
<macasaet.version>1.5.0</macasaet.version>
<!-- compile,provided -->
<scope>provided</scope>
</properties>
<dependencies>
<!-- flink -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch6_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- kafka -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>${scope}</scope>
<exclusions>
<exclusion>
<artifactId>slf4j-api</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>${flink.version}</version>
<scope>${scope}</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
<scope>${scope}</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>${scope}</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>${scope}</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
<scope>${scope}</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>${scope}</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
<scope>${scope}</scope>
</dependency>
<!-- log -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
<scope>${scope}</scope>
</dependency>
<!-- other jar -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>${fastjson.version}</version>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>${cn.hutool.all.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>${apache.commons.version}</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${google.guava.version}</version>
</dependency>
<dependency>
<groupId>com.macasaet.fernet</groupId>
<artifactId>fernet-java8</artifactId>
<version>${macasaet.version}</version>
</dependency>
<dependency>
<groupId>com.pupu.bigdata</groupId>
<artifactId>pupu-flink-connector-common</artifactId>
<version>1.0.0.0</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.10</version>
<optional>true</optional>
<scope>${scope}</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.3.0</version>
<executions>
<execution>
<id>shade-flink</id>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<shadeTestJar>false</shadeTestJar>
<artifactSet>
<includes>
<include>*:*</include>
</includes>
<excludes>
<exclude>com.carrotsearch:hppc</exclude>
<exclude>com.tdunning:t-digest</exclude>
<exclude>joda-time:joda-time</exclude>
<exclude>net.sf.jopt-simple:jopt-simple</exclude>
<exclude>org.elasticsearch:jna</exclude>
<exclude>org.hdrhistogram:HdrHistogram</exclude>
<exclude>org.yaml:snakeyaml</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<artifact>cn.hutool:hutool-all</artifact>
<excludes>
<exclude>META-INF/**</exclude>
</excludes>
</filter>
<filter>
<artifact>cn.hutool:hutool-all</artifact>
<excludes>
<exclude>META-INF/**</exclude>
</excludes>
</filter>
<filter>
<artifact>com.alibaba:fastjson</artifact>
<excludes>
<exclude>META-INF/**</exclude>
</excludes>
</filter>
<filter>
<artifact>org.elasticsearch:elasticsearch</artifact>
<excludes>
<exclude>config/**</exclude>
<exclude>modules.txt</exclude>
<exclude>plugins.txt</exclude>
<exclude>org/joda/**</exclude>
</excludes>
</filter>
<filter>
<artifact>org.elasticsearch.client:elasticsearch-rest-high-level-client</artifact>
<excludes>
<exclude>forbidden/**</exclude>
</excludes>
</filter>
<filter>
<artifact>org.apache.httpcomponents:httpclient</artifact>
<excludes>
<exclude>mozilla/**</exclude>
</excludes>
</filter>
<filter>
<artifact>org.apache.lucene:lucene-analyzers-common</artifact>
<excludes>
<exclude>org/tartarus/**</exclude>
</excludes>
</filter>
<filter>
<artifact>*:*</artifact>
<excludes>
<!-- exclude Java 9 specific classes as otherwise the shade-plugin crashes -->
<exclude>META-INF/versions/**</exclude>
<exclude>META-INF/services/com.fasterxml.**</exclude>
<exclude>META-INF/services/org.apache.lucene.**</exclude>
<exclude>META-INF/services/org.elasticsearch.**</exclude>
<exclude>META-INF/LICENSE.txt</exclude>
</excludes>
</filter>
</filters>
<relocations>
<!-- Force relocation of all Elasticsearch dependencies. -->
<relocation>
<pattern>org.apache.commons</pattern>
<shadedPattern>org.apache.flink.elasticsearch6.shaded.org.apache.commons</shadedPattern>
</relocation>
<relocation>
<pattern>org.apache.http</pattern>
<shadedPattern>org.apache.flink.elasticsearch6.shaded.org.apache.http</shadedPattern>
</relocation>
<relocation>
<pattern>org.apache.lucene</pattern>
<shadedPattern>org.apache.flink.elasticsearch6.shaded.org.apache.lucene</shadedPattern>
</relocation>
<relocation>
<pattern>org.elasticsearch</pattern>
<shadedPattern>org.apache.flink.elasticsearch6.shaded.org.elasticsearch</shadedPattern>
</relocation>
<relocation>
<pattern>com.fasterxml.jackson</pattern>
<shadedPattern>org.apache.flink.elasticsearch6.shaded.com.fasterxml.jackson</shadedPattern>
</relocation>
</relocations>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
3 新增自定义连接器 my-elasticsearch6
3.1 创建连接器类继承Elasticsearch6DynamicSinkFactory
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.connectors.elasticsearch.table.Elasticsearch6DynamicSinkFactory;
import org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.factories.FactoryUtil;
import pupu.flink.stream.connector.common.constants.Constant;
import pupu.flink.stream.connector.common.https.MateHttpUtils;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
@Slf4j
@Internal
public class PupuElasticsearch6DynamicSinkFactory extends Elasticsearch6DynamicSinkFactory {
// 映射sql中的connector
public static final String IDENTIFIER = "my-elasticsearch-6";
@Override
public String factoryIdentifier() {
return IDENTIFIER;
}
// 新增需要校验的参数
@Override
public Set<ConfigOption<?>> requiredOptions() {
Set<ConfigOption<?>> options = super.requiredOptions();
return options;
}
// 可在该方法中做一些参数的特殊处理
@Override
public DynamicTableSink createDynamicTableSink(Context context) {
final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
// 环境参数
final Configuration envConfig = (Configuration) context.getConfiguration();
// tableSql 参数
Map<String, String> configMap = context.getCatalogTable().getOptions();
return super.createDynamicTableSink(context);
}
}
3.2 tableEnv 环境参数设置
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// tableEnv 设置换将参数
Configuration tabConfig = tableEnv.getConfig().getConfiguration();
// 设置参数
tabConfig.setString(key, value);
- 提交过程中获取环境参数
3.3 获取tableEnv 环境参数
@Override
public DynamicTableSink createDynamicTableSink(Context context) {
...
// 环境参数
final Configuration envConfig = (Configuration) context.getConfiguration();
...
return super.createDynamicTableSink(context);
}
4 配置到org.apache.flink.table.factories.Factory
配置原因:flink 集群启动会加载META-INF.service文件夹下的org.apache.flink.table.factories.Factory,动态遍历factoryIdentifier()获取到sql中指定的连接器
5. mainClass单元测试类的编写
略
6. 应用生效方式
- 打包,将.jar 复制到flink集群的lib目录下
- 程序包pom引用当前包使得代码生效
- 注意:检查应用报是否重复饮用flink-sql-connector-elasticsearch6_2.11 去掉;检查flink集群lib中是否存在flink-sql-connector-elasticsearch6_2.11,去掉。不去掉可能会存在改动的代码失效,运行虚拟机使用的是官网提供的类。
版权声明:本文为qq_27242695原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。