需求:
/*
* 把订单和商品表进行关联查询
*/
订单表:
id date pid amount
1001 20150710 P0001 2
1002 20150710 P0001 3
1002 20150710 P0002 3
商品表:
id pname category_id price
P0001 小米5 1000 2000
P0002 锤子T1 1000 3000
输出结果应该为,例如:
1002 20150710 p0001 3 小米5 1000 2000 总数:6000 总数先不计算
1002 20150710 p0002 3 锤子T1 1000 3000 总数:9000
把订单表的 pid 作为k2 ,表的一行数据作为 v2
导出数据库文件的数据格式为
order.text
1001,20150710,p0001,2
1002,20150710,p0002,3
1002,20150710,p0003,3
product.txt
p0001,小米5,1000,2000
p0002,锤子T1,1000,3000
处理方法1 把两个文件分别把商品id作为key作为k2 把数据发到reduce
mapper类:
package com.wxj.mrreducejoin;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import java.io.IOException;
/*
* Created by wxj on 2019/8/16 0016 12:28
*/
public class MJoinMapper extends Mapper<LongWritable, Text,Text,Text> {
Text text = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//获取两个文件
FileSplit inputSplit = (FileSplit) context.getInputSplit();
//可以根据文件名来处理map
String name = inputSplit.getPath().getName();//public Path getPath() { return this.file; }
//也可以根据行内容来区分处理map
String s = value.toString();
if(StringUtils.isNotEmpty(s)){
if(s.startsWith("p")){//读取的商品信息表
String[] split = s.split(",");
String s1 = split[0];
//如果数据量大的话,总是new 对象不行,可以把Text 提出去
// context.write(new Text(s1),value);
text.set(s1);
context.write(text,value);
}else{//读取的订单信息表
String[] split = s.split(",");
String s2 = split[2];
text.set(s2);
context.write(text,value);
}
}
}
}
reducer类
package com.wxj.mrreducejoin; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; /* * Created by wxj on 2019/8/16 0016 12:28 */ public class MJoinReducer extends Reducer<Text, Text, Text,Text> { Text text = new Text(); @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { String firstPart =""; String secPart=""; //还是根据value中元素的首字母为p的话拼接在后面 for (Text value : values) { if(value.toString().startsWith("p")){ secPart=value.toString(); }else{ firstPart=value.toString(); } } text.set(firstPart+"\t"+secPart); context.write(key,text); } }
主类:
package com.wxj.mrreducejoin; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; /* * Created by wxj on 2019/8/16 0016 12:27 */ public class MJoinMain extends Configured implements Tool { @Override public int run(String[] strings) throws Exception { //获取job Job job = Job.getInstance(super.getConf(), "rjoin"); //配置读取文件类型和路径 job.setInputFormatClass(TextInputFormat.class); TextInputFormat.addInputPath(job,new Path("E:\\*****\\input")); //设置map类 job.setMapperClass(MJoinMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); //设置reduce类 job.setReducerClass(MJoinReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); //设置输出类和输出路径 job.setOutputFormatClass(TextOutputFormat.class); TextOutputFormat.setOutputPath(job,new Path("E:\\*****\\myrjoinout")); //提交job boolean b = job.waitForCompletion(true); //返回值 return b?0:1; } static { try { System.load("D:\\soft\\hadoop-2.6.0-cdh5.14.0\\bin\\hadoop.dll"); } catch (UnsatisfiedLinkError e) { System.err.println("Native code library failed to load.\n" + e); System.exit(1); } } public static void main(String[] args) throws Exception { int run = ToolRunner.run(new Configuration(), new MJoinMain(), args); System.exit(run); } }
输出结果
p0001 1001,20150710,p0001,2 p0001,小米5,1000,2000
p0002 1002,20150710,p0002,3 p0002,锤子T1,1000,3000
p0003 1002,20150710,p0003,3
第二种完全使用map实现join查询,这种是把一个比较小的文件先上传的fs文件系统中去,这个文件就相当于共享文件
mapper类:
package com.wxj.mrmapjoin;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
/*
* Created by wxj on 2019/8/16 0016 20:15
*/
public class MapJoinMapper extends Mapper<LongWritable,Text,Text,NullWritable> {
Map<String,String> map = new HashMap<String,String>();
/**
* 初始化的方法,最开始的时候调用一次
* 在这个方法里面可以获取到我们的缓存文件
*/
@Override
protected void setup(Context context) throws IOException, InterruptedException {
Configuration configuration = context.getConfiguration();
//获取到了缓存文件的列表
URI[] cacheFiles = DistributedCache.getCacheFiles(configuration);
//获取我们的分布式文件系统,因为就一个文件,取第一个就可以
FileSystem fileSystem = FileSystem.get(cacheFiles[0], configuration);
//把我们分布式缓存的文件读成一个流
FSDataInputStream inputStream = fileSystem.open(new Path(cacheFiles[0]));
//通过BufferedReader来读取我们的输入流
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream));
String line = null;
while((line = bufferedReader.readLine()) != null){
//p0001,小米5,1000,2000
//往下读一行
String[] split = line.split(",");
map.put(split[0],line);
}
IOUtils.closeQuietly(inputStream);
fileSystem.close();
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] split = line.split(",");
//从map当中获取商品表的数据
String product_line = map.get(split[2]);
context.write(new Text(product_line+"\t"+value.toString()),NullWritable.get());
}
}
主类:
package com.wxj.mrmapjoin;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.net.URI;
/*
* Created by wxj on 2019/8/16 0016 20:14
*/
public class MapJoinMain extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
Configuration conf = super.getConf();
//把我们hdfs的文件,添加到缓存当中去
DistributedCache.addCacheFile(new URI("hdfs://node01:8020/product_cache/pdts.txt"),conf);
Job job = Job.getInstance(conf, "mapJoin");
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job,new Path("file:///E:\\*****\\input"));
job.setMapperClass(MapJoinMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job,new Path("file:///E:\\****\\mymapjoinout"));
boolean b = job.waitForCompletion(true);
return b?0:1;
}
public static void main(String[] args) throws Exception {
int run = ToolRunner.run(new Configuration(), new MapJoinMain(), args);
System.exit(run);
}
}
版权声明:本文为qq_35573689原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。