一、. 基本使用
1. es8连接依赖
<dependency>
<groupId>co.elastic.clients</groupId>
<artifactId>elasticsearch-java</artifactId>
<version>8.5.2</version>
</dependency>
2. 连接类
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch.core.IndexRequest;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.ElasticsearchTransport;
import co.elastic.clients.transport.rest_client.RestClientTransport;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
public class ES8Writer extends RichSinkFunction<MyEntity> {
private RestClient restClient;
private ElasticsearchClient client;
@Override
public void open(Configuration parameters) throws Exception {
HttpHost[] httpHosts = new HttpHost[Constraints.esHosts.length];
for (int i=0;i<Constraints.esHosts.length;i++){
httpHosts[i]=new HttpHost(Constraints.esHosts[i], 9200, "http");
}
restClient = RestClient.builder(httpHosts).build();
ElasticsearchTransport transport = new RestClientTransport(
restClient, new JacksonJsonpMapper());
client = new ElasticsearchClient(transport);
}
@Override
public void close() throws Exception {
client.shutdown();
restClient.close();
}
@Override
public void invoke(MyEntity value, Context context) throws Exception {
IndexRequest<Object> indexRequest = new IndexRequest.Builder<>()
.index("index"
.id(value.getId())
.document(value)
.build();
client.index(indexRequest);
}
}
3. 使用
stream.addSink(new ES8Writer()).name("ElasticSearch");
二. 踩到的坑
1. java.lang.NoSuchMethodError: org.apache.http.client.utils.URLEncodedUtils.formatSegments
httpclient 版本问题
增加依赖
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.13</version>
</dependency>
2. com.fasterxml.jackson.databind.JsonMappingException: No serializer found for xxxx
传给es的实体类(代码中的MyEntity)必须使用private属性加getter,setter方法
3. java.lang.NoSuchMethodError: com.fasterxml.jackson.core.JsonParser.currentToken()
jackson版本问题
增加依赖
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.9.0</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.9.0</version>
</dependency>
版权声明:本文为GrassEva原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。