java spark 读取 es_sparksql 读取ES(xpack)数据并计算 小龙的博客

  • Post author:
  • Post category:java


Sparksql操作ES读取数据并生成中间表写SQL进行统计分析示例:ES如果已经开启auth,需要设置es.net.http.auth.user和es.net.http.auth.pass,因为使用的9200端口非tcp9300端口,所以不需要设置ssl证书。

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaSparkContext;

import org.apache.spark.sql.Dataset;

import org.apache.spark.sql.Row;

import org.apache.spark.sql.SQLContext;

import java.util.HashMap;

import java.util.Map;

/**

* @Auther: zxl

* @Date: 2019/2/12 17:59

* @Description:

*/

public class SparkSqlTest {

public static void main(String[] args) {

SparkConf conf = new SparkConf().setAppName(“test”)

//.setMaster(“spark://192.168.207.28:7077”);

.setMaster(“local[*]”);

JavaSparkContext sc = new JavaSparkContext(conf);

SQLContext sqlContext = new SQLContext(sc);

Map esOptions = new HashMap<>(5);

esOptions.put(“es.nodes”,”192.168.207.32,192.168.207.33″);

esOptions.put(“es.port”,”9200″);

esOptions.put(“es.net.http.auth.user”,”elastic”);

esOptions.put(“es.net.http.auth.pass”,”XA&YtoOverseas”);

esOptions.put(“es.mapping.date.rich”,”false”);

Dataset esData = sqlContext.read().format(“org.elasticsearch.spark.sql”)

.options(esOptions)

.load(“logstash-mysql-es-waybill_detail_info/doc”);

esData.show();

esData.registerTempTable(“ods_customs_waybill”);

Dataset result = sqlContext.sql(“select client_id, customs_id,count(logistics_no) from ods_customs_waybill group by client_id,customs_id”);

result.show();

}

}

如果以standalone模式运行,打成jar包在spark环境下运行:

spark-submit –master spark://192.168.207.28:7077 –class SparkSqlTest /opt/app/spark-2.4.0-bin-hadoop2.6/yto_examples/spark-yto-1.0.0.jar

注意需要将elasticsearch-spark-20_2.11-6.4.3.jar放入spark_home的jars目录下,,否则报ClassNotFoundException。

f7b4e9b15a7872895dc7a4d75c026c76.gif

查看spark任务:

f7b4e9b15a7872895dc7a4d75c026c76.gif

查看workder执行结果:

f7b4e9b15a7872895dc7a4d75c026c76.gif

show显示的计算结果:

f7b4e9b15a7872895dc7a4d75c026c76.gif



版权声明:本文为weixin_42517466原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。