通过Map/Reduce将HDFS数据写入ES,ES数据写入HDFS

  • Post author:
  • Post category:其他




环境准备

系统 centos 7

java 1.8

hadoop 2.7

ES 7.15.2 (ES单机版本安装可以参考:https://blog.csdn.net/weixin_36340771/article/details/121389741)



准备hadoop本地运行环境



获得Hadoop文件

链接:https://pan.baidu.com/s/1MGriraZ8ekvzsJyWdPssrw

提取码:u4uc



配置HADOOP_HOME

解压上述文件,然后配置HADOOP_HOME,注意修改地址。

在这里插入图片描述



获得工程代码

https://github.com/BaoPiao/elasticsearch_demo



读取HDFS写入ES

​ 通过FileInputFormat读取HDFS数据,然后通过Mapper将数据转换为MapWritable数据类型,最后通过EsOutputFormat将数据写入ES。



参数说明



基本参数



网络参数


es.nodes

连接的es集群信息


es.port

连接es集群的端口号,默认是9200


es.nodes.path.prefix

连接的前缀信息,默认是空(例如每次都想写入到es.node:9200/custom/path/prefix,那么这里就可以写为/custom/path/prefix)



写入参数


es.resource

指定es索引

es.resource = twitter/tweet #索引叫twitter 类型是tweet


es.resource.write

默认取值es.resource

​ 支持根据数据,写入不同的type中(media_type是字段)

es.resource.write = my-collection/{media_type}

​ 根据数据,写入不同的索引中,此外还支持日期类型用于滚动生成日志索引,具体请见参考资料2。

es.resource.write = my-collection_{media_type}



文档相关参数


es.input.json

默认是false,输入文件格式为JSON


es.write.operation

默认是index



index(default)

如果已经存在会进行替换



create

如果已经存在会报异常



update

更新已经存在的,如果不存在报异常



upsert

如果不存在就是插入,如果存在就是更新


es.output.json

默认是false,输出文件格式是否为JSON


es.ingest.pipeline

默认是none,指定处理管道


es.mapping.id

指定文档id,这里填写的是字段名称,默认为空


es.mapping.parent

指定父文档,这里填写的是字段名称或填写一个常量


es.mapping.version

指定版本号,这里填写的是字段名称或填写一个常量


es.mapping.include

指定那些字段写入ES


es.mapping.exclude

指定哪些字段不写入ES



代码参考

这里只贴出driver代码,其余部分请参考github:https://github.com/BaoPiao/elasticsearch_demo

public class ElasticsearchHadoop {
    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        Configuration conf = new Configuration();
        conf.setBoolean("mapred.map.tasks.speculative.execution", false);
        conf.setBoolean("mapred.reduce.tasks.speculative.execution", false);
        conf.set("es.nodes", "192.168.10.108:9200");
        conf.set("es.resource", "/artists_2");
        conf.set("es.write.operation", "upsert");
        conf.set("es.mapping.include", "id");
        conf.set("es.mapping.id", "id");
        Job job = Job.getInstance(conf);
        job.setJarByClass(ElasticsearchHadoop.class);
        job.setOutputFormatClass(EsOutputFormat.class);
        job.setMapperClass(EsMapper.class);
        job.setMapOutputKeyClass(NullWritable.class);
        job.setMapOutputValueClass(MapWritable.class);
        FileInputFormat.setInputPaths(job, new Path("hdfs://192.168.10.108:9000/test"));
        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0 : 1);

    }
}



读取ES写入HDFS

​ 通过EsInputFormat读取ES数据,返回数据为<Text,MapWritable>,其中Text为key,MapWritable包含字段和字段值信息,最后编写自己的Reduce写到HDFS。



参数说明



元数据信息


es.read.metadata

默认是flase,是否返回原数据信息,例如文档id、版本号等


es.read.metadata.field

默认是_metadata字段,当且仅当es.read.metadata设置为true,返回元数据信息,数据由map封装


es.read.metadata.version

默认是false,当且仅当es.read.metadata设置为true时,该值设置为true才生效返回版本号。



查询参数


es.resource

指定es索引

es.resource = twitter/tweet #索引叫twitter 类型是tweet


es.query

默认为空

  1. uri 查询方式
  2. dsl 查询方式(推荐使用)
  3. 读取文件方式
# uri (or parameter) query
es.query = ?q=costinl

# query dsl
es.query = { "query" : { "term" : { "user" : "costinl" } } }

# external resource
es.query = org/mypackage/myquery.json


es.resource.read

默认值是es.resource


es.resource.read

默认值是es.resource



代码参考

这里只贴出driver部分,具体请参考github:https://github.com/BaoPiao/elasticsearch_demo

public class ReadElasticsearch {
    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        Configuration conf = new Configuration();
        conf.set("es.nodes", "192.168.10.108:9200");
        conf.set("es.resource", "artists_2");
        conf.set("es.read.metadata", "true");
        conf.set("es.read.metadata.field", "_metadata");
        conf.set("es.query", "{\n" +
                "    \"query\":{\n" +
                "        \"match_all\":{}\n" +
                "    }\n" +
                "}");
        org.apache.hadoop.mapreduce.Job job = Job.getInstance(conf);
        job.setInputFormatClass(EsInputFormat.class);

        job.setReducerClass(EsReduce.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(MapWritable.class);

        FileOutputFormat.setOutputPath(job, new Path("D:\\hadoop\\outputES"));
        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0 : 1);
    }
}



最后

还有很多参数没详细列出来,例如写入批次大小、写入重试次数、超时时间等;读取scroll设置等,具体参考资料



参考资料

https://www.elastic.co/guide/en/elasticsearch/hadoop/current/mapreduce.html

https://www.elastic.co/guide/en/elasticsearch/hadoop/current/configuration.html#_essential_settings



版权声明:本文为weixin_36340771原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。