需求:
/*
* 把订单和商品表进行关联查询
*/
订单表:
id date pid amount
1001 20150710 P0001 2
1002 20150710 P0001 3
1002 20150710 P0002 3
商品表:
id pname category_id price
P0001 小米5 1000 2000
P0002 锤子T1 1000 3000
输出结果应该为,例如:
1002 20150710 p0001 3 小米5 1000 2000 总数:6000 总数先不计算
1002 20150710 p0002 3 锤子T1 1000 3000 总数:9000
把订单表的 pid 作为k2 ,表的一行数据作为 v2
导出数据库文件的数据格式为
order.text
1001,20150710,p0001,2
1002,20150710,p0002,3
1002,20150710,p0003,3
product.txt
p0001,小米5,1000,2000
p0002,锤子T1,1000,3000
处理方法1 把两个文件分别把商品id作为key作为k2 把数据发到reduce
mapper类:
package com.wxj.mrreducejoin;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import java.io.IOException;
/*
* Created by wxj on 2019/8/16 0016 12:28
*/
public class MJoinMapper extends Mapper<LongWritable, Text,Text,Text> {
Text text = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//获取两个文件
FileSplit inputSplit = (FileSplit) context.getInputSplit();
//可以根据文件名来处理map
String name = inputSplit.getPath().getName();//public Path getPath() { return this.file; }
//也可以根据行内容来区分处理map
String s = value.toString();
if(StringUtils.isNotEmpty(s)){
if(s.startsWith("p")){//读取的商品信息表
String[] split = s.split(",");
String s1 = split[0];
//如果数据量大的话,总是new 对象不行,可以把Text 提出去
// context.write(new Text(s1),value);
text.set(s1);
context.write(text,value);
}else{//读取的订单信息表
String[] split = s.split(",");
String s2 = split[2];
text.set(s2);
context.write(text,value);
}
}
}
}
reducer类
package com.wxj.mrreducejoin;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/*
* Created by wxj on 2019/8/16 0016 12:28
*/
public class MJoinReducer extends Reducer<Text, Text, Text,Text> {
Text text = new Text();
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
String firstPart ="";
String secPart="";
//还是根据value中元素的首字母为p的话拼接在后面
for (Text value : values) {
if(value.toString().startsWith("p")){
secPart=value.toString();
}else{
firstPart=value.toString();
}
}
text.set(firstPart+"\t"+secPart);
context.write(key,text);
}
}
主类:
package com.wxj.mrreducejoin;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/*
* Created by wxj on 2019/8/16 0016 12:27
*/
public class MJoinMain extends Configured implements Tool {
@Override
public int run(String[] strings) throws Exception {
//获取job
Job job = Job.getInstance(super.getConf(), "rjoin");
//配置读取文件类型和路径
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job,new Path("E:\\*****\\input"));
//设置map类
job.setMapperClass(MJoinMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
//设置reduce类
job.setReducerClass(MJoinReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
//设置输出类和输出路径
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job,new Path("E:\\*****\\myrjoinout"));
//提交job
boolean b = job.waitForCompletion(true);
//返回值
return b?0:1;
}
static {
try {
System.load("D:\\soft\\hadoop-2.6.0-cdh5.14.0\\bin\\hadoop.dll");
} catch (UnsatisfiedLinkError e) {
System.err.println("Native code library failed to load.\n" + e);
System.exit(1);
}
}
public static void main(String[] args) throws Exception {
int run = ToolRunner.run(new Configuration(), new MJoinMain(), args);
System.exit(run);
}
}
输出结果
p0001 1001,20150710,p0001,2 p0001,小米5,1000,2000
p0002 1002,20150710,p0002,3 p0002,锤子T1,1000,3000
p0003 1002,20150710,p0003,3
第二种完全使用map实现join查询,这种是把一个比较小的文件先上传的fs文件系统中去,这个文件就相当于共享文件
mapper类:
package com.wxj.mrmapjoin;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
/*
* Created by wxj on 2019/8/16 0016 20:15
*/
public class MapJoinMapper extends Mapper<LongWritable,Text,Text,NullWritable> {
Map<String,String> map = new HashMap<String,String>();
/**
* 初始化的方法,最开始的时候调用一次
* 在这个方法里面可以获取到我们的缓存文件
*/
@Override
protected void setup(Context context) throws IOException, InterruptedException {
Configuration configuration = context.getConfiguration();
//获取到了缓存文件的列表
URI[] cacheFiles = DistributedCache.getCacheFiles(configuration);
//获取我们的分布式文件系统,因为就一个文件,取第一个就可以
FileSystem fileSystem = FileSystem.get(cacheFiles[0], configuration);
//把我们分布式缓存的文件读成一个流
FSDataInputStream inputStream = fileSystem.open(new Path(cacheFiles[0]));
//通过BufferedReader来读取我们的输入流
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream));
String line = null;
while((line = bufferedReader.readLine()) != null){
//p0001,小米5,1000,2000
//往下读一行
String[] split = line.split(",");
map.put(split[0],line);
}
IOUtils.closeQuietly(inputStream);
fileSystem.close();
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] split = line.split(",");
//从map当中获取商品表的数据
String product_line = map.get(split[2]);
context.write(new Text(product_line+"\t"+value.toString()),NullWritable.get());
}
}
主类:
package com.wxj.mrmapjoin;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.net.URI;
/*
* Created by wxj on 2019/8/16 0016 20:14
*/
public class MapJoinMain extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
Configuration conf = super.getConf();
//把我们hdfs的文件,添加到缓存当中去
DistributedCache.addCacheFile(new URI("hdfs://node01:8020/product_cache/pdts.txt"),conf);
Job job = Job.getInstance(conf, "mapJoin");
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job,new Path("file:///E:\\*****\\input"));
job.setMapperClass(MapJoinMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job,new Path("file:///E:\\****\\mymapjoinout"));
boolean b = job.waitForCompletion(true);
return b?0:1;
}
public static void main(String[] args) throws Exception {
int run = ToolRunner.run(new Configuration(), new MapJoinMain(), args);
System.exit(run);
}
}
版权声明:本文为qq_35573689原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。