spark读写 es

  • Post author:
  • Post category:其他

spark读写es

官方文档:https://www.elastic.co/guide/en/elasticsearch/hadoop/6.7/configuration.html

mvn依赖

我这里使用的版本如下:

名称 版本
spark 2.3.0
es 6.7.0
<dependency>
    <groupId>org.elasticsearch</groupId>
    <artifactId>elasticsearch-spark-20_2.11</artifactId>
    <version>6.7.0</version>
</dependency>

读写ES

1.在spark配置文件中添加如下

spark.es.nodes=es-cn-st21owhm5000fbt5e.elasticsearch.aliyuncs.com
spark.es.port=9200
#spark.es.nodes.wan.only=true
spark.es.net.http.auth.user=xx
spark.es.net.http.auth.pass=xx

2.读取
读取es时支持读取前过滤,可以避免把数据都搂过来再再spark里过滤

import org.elasticsearch.spark._

//将记录解析为Map
def load(index: String, query: String): RDD[collection.Map[String, AnyRef]] = {
  sc.esRDD(s"$index/_doc", query).map(_._2)
}

//直接读取json记录
def loadJson(index: String, query: String): RDD[String] = {
  sc.esJsonRDD(s"$index/_doc", query).map(_._2)
}

3.写es
写es时可以支持一次向多个索引写入,具体哪个索引有记录中的某个字段决定,文档的key也可以由记录中的字段指定。

如以下代码将数据写入索引abc,id由数据中的id字段指定

val meidaRdd= dataRdd.map(m+("media_type" ->"abc") + ("es_key" -> m("id")))
EsSpark.saveToEs(mediaRdd, "{media_type}/_doc", Map("es.mapping.id" -> "es_key"))

可选配置

读的分区数

默认读es时默认会打开scroll slice(分片)读取,每个分片默认最多读10万条,RDD的每个分区对应一个分片。

spark.es.input.use.sliced.partitions=true
spark.es.input.max.docs.per.partition=100000
假设索引有1亿条数据,按照默认配置,分区数=10^8/10^5=1000

当关闭分片读取时,分区数=索引shard数。
数据量不大时建议关闭分片读取。

写的配置

#写es时es底层是用bulk写的,默认1000
spark.es.batch.size.entries=5000
#默认是true,bulk写入后是refresh (refresh指写入后刷新保证之后的读立即可见)
spark.es.batch.write.refresh=false
#默认是index,直接覆盖
spark.es.write.operation=upsert

更多配置还是建议阅读文档。


版权声明:本文为Cool0原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。