hadoop 的 join 用法

  • Post author:
  • Post category:其他


需求:

/*
 *  把订单和商品表进行关联查询
 */
订单表:
 id	    date	    pid	   amount
 1001	20150710	P0001	2
 1002	20150710	P0001	3
 1002	20150710	P0002	3

 商品表:
 id	    pname	category_id	   price
 P0001	小米5	1000	       2000
 P0002	锤子T1	1000	       3000


 输出结果应该为,例如:
 1002  20150710   p0001  3   小米5    1000  2000     总数:6000  总数先不计算
 1002  20150710   p0002  3   锤子T1   1000  3000     总数:9000

 把订单表的 pid 作为k2 ,表的一行数据作为 v2

导出数据库文件的数据格式为

order.text

1001,20150710,p0001,2

1002,20150710,p0002,3

1002,20150710,p0003,3

product.txt

p0001,小米5,1000,2000

p0002,锤子T1,1000,3000

处理方法1 把两个文件分别把商品id作为key作为k2 把数据发到reduce

mapper类:

package com.wxj.mrreducejoin;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import java.io.IOException;

/*
 * Created by wxj on 2019/8/16 0016 12:28
 */
public class MJoinMapper extends Mapper<LongWritable, Text,Text,Text> {
    Text text = new Text();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        //获取两个文件
        FileSplit inputSplit = (FileSplit) context.getInputSplit();
        //可以根据文件名来处理map
        String name = inputSplit.getPath().getName();//public Path getPath() { return this.file; }
        //也可以根据行内容来区分处理map

        String s = value.toString();
        if(StringUtils.isNotEmpty(s)){

            if(s.startsWith("p")){//读取的商品信息表

                String[] split = s.split(",");
                String s1 = split[0];
                //如果数据量大的话,总是new 对象不行,可以把Text 提出去
               // context.write(new Text(s1),value);
                text.set(s1);
                context.write(text,value);
            }else{//读取的订单信息表
                String[] split = s.split(",");
                String s2 = split[2];
                text.set(s2);
                context.write(text,value);
            }

        }


    }
}

reducer类

package com.wxj.mrreducejoin;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;


/*
 * Created by wxj on 2019/8/16 0016 12:28
 */
public class MJoinReducer extends Reducer<Text, Text, Text,Text> {
    Text text = new Text();
    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
          String firstPart ="";
          String secPart="";
         //还是根据value中元素的首字母为p的话拼接在后面
        for (Text value : values) {
           if(value.toString().startsWith("p")){
               secPart=value.toString();
           }else{
               firstPart=value.toString();
           }
        }
        text.set(firstPart+"\t"+secPart);
        context.write(key,text);
    }
}

主类:

package com.wxj.mrreducejoin;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/*
 * Created by wxj on 2019/8/16 0016 12:27
 */
public class MJoinMain extends Configured implements Tool {

    @Override
    public int run(String[] strings) throws Exception {
        //获取job
        Job job = Job.getInstance(super.getConf(), "rjoin");
        //配置读取文件类型和路径
        job.setInputFormatClass(TextInputFormat.class);
        TextInputFormat.addInputPath(job,new Path("E:\\*****\\input"));
        //设置map类
        job.setMapperClass(MJoinMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

        //设置reduce类
        job.setReducerClass(MJoinReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        //设置输出类和输出路径
         job.setOutputFormatClass(TextOutputFormat.class);
        TextOutputFormat.setOutputPath(job,new Path("E:\\*****\\myrjoinout"));
        //提交job
        boolean b = job.waitForCompletion(true);

        //返回值
        return b?0:1;

    }


    static {
        try {
            System.load("D:\\soft\\hadoop-2.6.0-cdh5.14.0\\bin\\hadoop.dll");
        } catch (UnsatisfiedLinkError e) {
            System.err.println("Native code library failed to load.\n" + e);
            System.exit(1);
        }
    }

    public static void main(String[] args) throws Exception {
        int run = ToolRunner.run(new Configuration(), new MJoinMain(), args);
        System.exit(run);
    }
}

输出结果

p0001	1001,20150710,p0001,2	p0001,小米5,1000,2000
p0002	1002,20150710,p0002,3	p0002,锤子T1,1000,3000
p0003	1002,20150710,p0003,3	

第二种完全使用map实现join查询,这种是把一个比较小的文件先上传的fs文件系统中去,这个文件就相当于共享文件

mapper类:

package com.wxj.mrmapjoin;

import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;

/*
 * Created by wxj on 2019/8/16 0016 20:15
 */
public class MapJoinMapper  extends Mapper<LongWritable,Text,Text,NullWritable> {
    Map<String,String> map  = new HashMap<String,String>();
    /**
     * 初始化的方法,最开始的时候调用一次
     * 在这个方法里面可以获取到我们的缓存文件
     */
    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        Configuration configuration = context.getConfiguration();
        //获取到了缓存文件的列表
        URI[] cacheFiles = DistributedCache.getCacheFiles(configuration);
        //获取我们的分布式文件系统,因为就一个文件,取第一个就可以
        FileSystem fileSystem = FileSystem.get(cacheFiles[0], configuration);
        //把我们分布式缓存的文件读成一个流
        FSDataInputStream inputStream = fileSystem.open(new Path(cacheFiles[0]));
        //通过BufferedReader来读取我们的输入流
        BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream));
        String line  = null;
        while((line = bufferedReader.readLine()) != null){
            //p0001,小米5,1000,2000
            //往下读一行
            String[] split = line.split(",");
            map.put(split[0],line);
        }
        IOUtils.closeQuietly(inputStream);
        fileSystem.close();
    }

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        String[] split = line.split(",");
        //从map当中获取商品表的数据
        String product_line = map.get(split[2]);
        context.write(new Text(product_line+"\t"+value.toString()),NullWritable.get());

    }
}

主类:

package com.wxj.mrmapjoin;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.net.URI;

/*
 * Created by wxj on 2019/8/16 0016 20:14
 */
public class MapJoinMain  extends Configured implements Tool {

    @Override
    public int run(String[] args) throws Exception {
        Configuration conf = super.getConf();
        //把我们hdfs的文件,添加到缓存当中去
        DistributedCache.addCacheFile(new URI("hdfs://node01:8020/product_cache/pdts.txt"),conf);

        Job job = Job.getInstance(conf, "mapJoin");

        job.setInputFormatClass(TextInputFormat.class);
        TextInputFormat.addInputPath(job,new Path("file:///E:\\*****\\input"));

        job.setMapperClass(MapJoinMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);


        job.setOutputFormatClass(TextOutputFormat.class);
        TextOutputFormat.setOutputPath(job,new Path("file:///E:\\****\\mymapjoinout"));
        boolean b = job.waitForCompletion(true);
        return b?0:1;
    }

    public static void main(String[] args) throws Exception {
        int run = ToolRunner.run(new Configuration(), new MapJoinMain(), args);
        System.exit(run);


    }

}



版权声明:本文为qq_35573689原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。