Sparksql操作ES读取数据并生成中间表写SQL进行统计分析示例:ES如果已经开启auth,需要设置es.net.http.auth.user和es.net.http.auth.pass,因为使用的9200端口非tcp9300端口,所以不需要设置ssl证书。
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import java.util.HashMap;
import java.util.Map;
/**
* @Auther: zxl
* @Date: 2019/2/12 17:59
* @Description:
*/
public class SparkSqlTest {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName(“test”)
//.setMaster(“spark://192.168.207.28:7077”);
.setMaster(“local[*]”);
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
Map esOptions = new HashMap<>(5);
esOptions.put(“es.nodes”,”192.168.207.32,192.168.207.33″);
esOptions.put(“es.port”,”9200″);
esOptions.put(“es.net.http.auth.user”,”elastic”);
esOptions.put(“es.net.http.auth.pass”,”XA&YtoOverseas”);
esOptions.put(“es.mapping.date.rich”,”false”);
Dataset esData = sqlContext.read().format(“org.elasticsearch.spark.sql”)
.options(esOptions)
.load(“logstash-mysql-es-waybill_detail_info/doc”);
esData.show();
esData.registerTempTable(“ods_customs_waybill”);
Dataset result = sqlContext.sql(“select client_id, customs_id,count(logistics_no) from ods_customs_waybill group by client_id,customs_id”);
result.show();
}
}
如果以standalone模式运行,打成jar包在spark环境下运行:
spark-submit –master spark://192.168.207.28:7077 –class SparkSqlTest /opt/app/spark-2.4.0-bin-hadoop2.6/yto_examples/spark-yto-1.0.0.jar
注意需要将elasticsearch-spark-20_2.11-6.4.3.jar放入spark_home的jars目录下,,否则报ClassNotFoundException。
查看spark任务:
查看workder执行结果:
show显示的计算结果: