flink连接es8以及遇到的坑

  • Post author:
  • Post category:其他


一、. 基本使用

1. es8连接依赖

<dependency>
    <groupId>co.elastic.clients</groupId>
    <artifactId>elasticsearch-java</artifactId>
    <version>8.5.2</version>
</dependency>

2. 连接类

import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch.core.IndexRequest;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.ElasticsearchTransport;
import co.elastic.clients.transport.rest_client.RestClientTransport;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;

public class ES8Writer extends RichSinkFunction<MyEntity> {
    private RestClient restClient;
    private ElasticsearchClient client;

    @Override
    public void open(Configuration parameters) throws Exception {
        HttpHost[] httpHosts = new HttpHost[Constraints.esHosts.length];
        for (int i=0;i<Constraints.esHosts.length;i++){
            httpHosts[i]=new HttpHost(Constraints.esHosts[i], 9200, "http");
        }
        restClient = RestClient.builder(httpHosts).build();
        ElasticsearchTransport transport = new RestClientTransport(
                restClient, new JacksonJsonpMapper());
        client = new ElasticsearchClient(transport);
    }

    @Override
    public void close() throws Exception {
        client.shutdown();
        restClient.close();
    }

    @Override
    public void invoke(MyEntity value, Context context) throws Exception {
        IndexRequest<Object> indexRequest = new IndexRequest.Builder<>()
                .index("index"
                .id(value.getId())
                .document(value)
                .build();
        client.index(indexRequest);
    }

}

3. 使用

stream.addSink(new ES8Writer()).name("ElasticSearch");

二. 踩到的坑

1. java.lang.NoSuchMethodError: org.apache.http.client.utils.URLEncodedUtils.formatSegments

httpclient 版本问题

增加依赖

<dependency>
    <groupId>org.apache.httpcomponents</groupId>
    <artifactId>httpclient</artifactId>
    <version>4.5.13</version>
</dependency>

2. com.fasterxml.jackson.databind.JsonMappingException: No serializer found for xxxx

传给es的实体类(代码中的MyEntity)必须使用private属性加getter,setter方法

3. java.lang.NoSuchMethodError: com.fasterxml.jackson.core.JsonParser.currentToken()

jackson版本问题

增加依赖

<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-core</artifactId>
    <version>2.9.0</version>
</dependency>
<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
    <version>2.9.0</version>
</dependency>



版权声明:本文为GrassEva原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。