flink 继承connector源码二次开发思路

  • Post author:
  • Post category:其他




flink 继承connector源码二次开发思路

说明:其他连接器jdbc,kafka等等二次开发思路一致

推荐:公司基于flink开发内部平台,一些内部的特殊场景与需求,经常需要修改源码。但是修改源码在版本更新的情况下会导致开发成本大,周期长。本方案通过继承源码的方式,通过加强,打包覆盖源码的类解决上述问题。



1. idea 创建module项目

在这里插入图片描述



2. pom文件说明-以elasticsearch6为例



2.1 参考官方提供的连接包设置

elasticsearch6 为案例,部分pom参考官方提供的连接包,版本号对应,

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-sql-connector-elasticsearch6_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>



2.2 maven-shade-plugin

  1. 作用:maven-shade-plugin打包
  2. 拷贝flink-sql-connector-elasticsearch6_${scala.binary.version}的所有配置,其余的配置根据自身情况做相应的配置



2.3 elasticsearch6 二开pom配置

 <properties>
        <maven.compiler.source>${java.version}</maven.compiler.source>
        <maven.compiler.target>${java.version}</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
        <flink.version>1.13.1</flink.version>
        <scala.binary.version>2.11</scala.binary.version>
        <slf4j.version>1.7.15</slf4j.version>
        <sql.driver.version>8.0.21</sql.driver.version>
        <fastjson.version>1.2.75</fastjson.version>
        <google.guava.version>20.0</google.guava.version>
        <apache.commons.version>3.11</apache.commons.version>
        <cn.hutool.all.version>5.5.2</cn.hutool.all.version>
        <macasaet.version>1.5.0</macasaet.version>
        <!-- compile,provided -->
        <scope>provided</scope>
    </properties>


    <dependencies>

        <!-- flink -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-elasticsearch6_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- kafka -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>${scope}</scope>
            <exclusions>
                <exclusion>
                    <artifactId>slf4j-api</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-csv</artifactId>
            <version>${flink.version}</version>
            <scope>${scope}</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>${flink.version}</version>
            <scope>${scope}</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>${scope}</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>${scope}</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>${flink.version}</version>
            <scope>${scope}</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>${scope}</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>${flink.version}</version>
            <scope>${scope}</scope>
        </dependency>

        <!-- log -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>${slf4j.version}</version>
            <scope>${scope}</scope>
        </dependency>

        <!-- other jar -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>${fastjson.version}</version>
        </dependency>

        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>${cn.hutool.all.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>${apache.commons.version}</version>
        </dependency>

        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>${google.guava.version}</version>
        </dependency>

        <dependency>
            <groupId>com.macasaet.fernet</groupId>
            <artifactId>fernet-java8</artifactId>
            <version>${macasaet.version}</version>
        </dependency>

        <dependency>
            <groupId>com.pupu.bigdata</groupId>
            <artifactId>pupu-flink-connector-common</artifactId>
            <version>1.0.0.0</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.10</version>
            <optional>true</optional>
            <scope>${scope}</scope>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.3.0</version>
                <executions>
                    <execution>
                        <id>shade-flink</id>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <shadeTestJar>false</shadeTestJar>
                            <artifactSet>
                                <includes>
                                    <include>*:*</include>
                                </includes>
                                <excludes>
                                    <exclude>com.carrotsearch:hppc</exclude>
                                    <exclude>com.tdunning:t-digest</exclude>
                                    <exclude>joda-time:joda-time</exclude>
                                    <exclude>net.sf.jopt-simple:jopt-simple</exclude>
                                    <exclude>org.elasticsearch:jna</exclude>
                                    <exclude>org.hdrhistogram:HdrHistogram</exclude>
                                    <exclude>org.yaml:snakeyaml</exclude>
                                </excludes>
                            </artifactSet>

                            <filters>
                                <filter>
                                    <artifact>cn.hutool:hutool-all</artifact>
                                    <excludes>
                                        <exclude>META-INF/**</exclude>
                                    </excludes>
                                </filter>

                                <filter>
                                    <artifact>cn.hutool:hutool-all</artifact>
                                    <excludes>
                                        <exclude>META-INF/**</exclude>
                                    </excludes>
                                </filter>

                                <filter>
                                    <artifact>com.alibaba:fastjson</artifact>
                                    <excludes>
                                        <exclude>META-INF/**</exclude>
                                    </excludes>
                                </filter>

                                <filter>
                                    <artifact>org.elasticsearch:elasticsearch</artifact>
                                    <excludes>
                                        <exclude>config/**</exclude>
                                        <exclude>modules.txt</exclude>
                                        <exclude>plugins.txt</exclude>
                                        <exclude>org/joda/**</exclude>
                                    </excludes>
                                </filter>
                                <filter>
                                    <artifact>org.elasticsearch.client:elasticsearch-rest-high-level-client</artifact>
                                    <excludes>
                                        <exclude>forbidden/**</exclude>
                                    </excludes>
                                </filter>
                                <filter>
                                    <artifact>org.apache.httpcomponents:httpclient</artifact>
                                    <excludes>
                                        <exclude>mozilla/**</exclude>
                                    </excludes>
                                </filter>
                                <filter>
                                    <artifact>org.apache.lucene:lucene-analyzers-common</artifact>
                                    <excludes>
                                        <exclude>org/tartarus/**</exclude>
                                    </excludes>
                                </filter>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <!-- exclude Java 9 specific classes as otherwise the shade-plugin crashes -->
                                        <exclude>META-INF/versions/**</exclude>
                                        <exclude>META-INF/services/com.fasterxml.**</exclude>
                                        <exclude>META-INF/services/org.apache.lucene.**</exclude>
                                        <exclude>META-INF/services/org.elasticsearch.**</exclude>
                                        <exclude>META-INF/LICENSE.txt</exclude>
                                    </excludes>
                                </filter>
                            </filters>

                            <relocations>
                                <!-- Force relocation of all Elasticsearch dependencies. -->
                                <relocation>
                                    <pattern>org.apache.commons</pattern>
                                    <shadedPattern>org.apache.flink.elasticsearch6.shaded.org.apache.commons</shadedPattern>
                                </relocation>
                                <relocation>
                                    <pattern>org.apache.http</pattern>
                                    <shadedPattern>org.apache.flink.elasticsearch6.shaded.org.apache.http</shadedPattern>
                                </relocation>
                                <relocation>
                                    <pattern>org.apache.lucene</pattern>
                                    <shadedPattern>org.apache.flink.elasticsearch6.shaded.org.apache.lucene</shadedPattern>
                                </relocation>
                                <relocation>
                                    <pattern>org.elasticsearch</pattern>
                                    <shadedPattern>org.apache.flink.elasticsearch6.shaded.org.elasticsearch</shadedPattern>
                                </relocation>
                                <relocation>
                                    <pattern>com.fasterxml.jackson</pattern>
                                    <shadedPattern>org.apache.flink.elasticsearch6.shaded.com.fasterxml.jackson</shadedPattern>
                                </relocation>
                            </relocations>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>



3 新增自定义连接器 my-elasticsearch6



3.1 创建连接器类继承Elasticsearch6DynamicSinkFactory

import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.connectors.elasticsearch.table.Elasticsearch6DynamicSinkFactory;
import org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.factories.FactoryUtil;
import pupu.flink.stream.connector.common.constants.Constant;
import pupu.flink.stream.connector.common.https.MateHttpUtils;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

@Slf4j
@Internal
public class PupuElasticsearch6DynamicSinkFactory extends Elasticsearch6DynamicSinkFactory {

    // 映射sql中的connector
    public static final String IDENTIFIER = "my-elasticsearch-6";

    @Override
    public String factoryIdentifier() {
        return IDENTIFIER;
    }

    // 新增需要校验的参数
    @Override
    public Set<ConfigOption<?>> requiredOptions() {
        Set<ConfigOption<?>> options = super.requiredOptions();
        return options;
    }

    // 可在该方法中做一些参数的特殊处理
    @Override
    public DynamicTableSink createDynamicTableSink(Context context) {
        final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);

        // 环境参数
        final Configuration envConfig = (Configuration) context.getConfiguration();

        // tableSql 参数
        Map<String, String> configMap = context.getCatalogTable().getOptions();


        return super.createDynamicTableSink(context);
    }
}



3.2 tableEnv 环境参数设置

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        // tableEnv 设置换将参数
        Configuration tabConfig = tableEnv.getConfig().getConfiguration();
        // 设置参数
        tabConfig.setString(key, value);
  • 提交过程中获取环境参数



3.3 获取tableEnv 环境参数

@Override
    public DynamicTableSink createDynamicTableSink(Context context) {
    	...
        // 环境参数
        final Configuration envConfig = (Configuration) context.getConfiguration();
        ...
        return super.createDynamicTableSink(context);
    }



4 配置到org.apache.flink.table.factories.Factory

配置原因:flink 集群启动会加载META-INF.service文件夹下的org.apache.flink.table.factories.Factory,动态遍历factoryIdentifier()获取到sql中指定的连接器



5. mainClass单元测试类的编写



6. 应用生效方式

  1. 打包,将.jar 复制到flink集群的lib目录下
  2. 程序包pom引用当前包使得代码生效
  3. 注意:检查应用报是否重复饮用flink-sql-connector-elasticsearch6_2.11 去掉;检查flink集群lib中是否存在flink-sql-connector-elasticsearch6_2.11,去掉。不去掉可能会存在改动的代码失效,运行虚拟机使用的是官网提供的类。



版权声明:本文为qq_27242695原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。