使用MapperJoin的方法进行多表查询
customers表:
userId | userName | age |
---|---|---|
1 | zhangsan | 40 |
2 | lisi | 30 |
3 | wangwu | 20 |
orders表:
orderId | userId | goodId | buyNum |
---|---|---|---|
1 | 1 | 123 | 1 |
2 | 2 | 456 | 2 |
3 | 3 | 789 | 5 |
CustomerOrder类:
//mapperjoin推荐使用,但是只适用于两张表格有一张小表格的情况,假设costumer是小文件
public class CustomerOrder implements Writable {
private String userId="";
private String userName="";
private String age="";
private String orderId="";
private String goodId="";
private String buyNum="";
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(userId);
out.writeUTF(userName);
out.writeUTF(age);
out.writeUTF(orderId);
out.writeUTF(goodId);
out.writeUTF(buyNum);
}
@Override
public void readFields(DataInput in) throws IOException {
this.userId = in.readUTF();
this.userName = in.readUTF();
this.age = in.readUTF();
this.orderId = in.readUTF();
this.goodId = in.readUTF();
this.buyNum = in.readUTF();
}
省略geter,seter,tostring,构造器
MapJoinMapper:
import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
public class MapJoinMapper extends Mapper<LongWritable, Text,CustomerOrder, NullWritable> {
Map<String,CustomerOrder> custs = new HashMap<>();//存customer表的map
//只运行前执行一次
@Override
protected void setup(Context context) throws IOException, InterruptedException {
//获取服务器节点缓存文件
URI[] files = context.getCacheFiles();
//判断是否为空
if (null!=files&&files.length>0){
String path = files[0].getPath();
BufferedReader br = new BufferedReader(new FileReader(path));
String line ="";
while ((line=br.readLine())!=null){
String[] infos = line.split(",");
CustomerOrder co = new CustomerOrder();
co.setUserId(infos[0]);
co.setUserName(infos[1]);
co.setAge(infos[2]);
//用户userid作为map的key,其它属性填充的customer对象作为值value
custs.put(co.getUserId(),co);
}
}
}
//每解析一行数据运行一次
@Override
protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException {
String[] infos = value.toString().split(",");//orderid,userid,goodid,buynunm
//找到订单对应的用户
CustomerOrder co = custs.get("1");
//复制订单用户信息对象
CustomerOrder cpCo = new CustomerOrder();
try {
BeanUtils.copyProperties(cpCo,co);
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (InvocationTargetException e) {
e.printStackTrace();
}
//把订单信息填充到对应的用户订单信息表中
cpCo.setOrderId(infos[0]);
cpCo.setGoodId(infos[2]);
cpCo.setBuyNum(infos[3]);
//输出
context.write(cpCo,NullWritable.get());
}
}
MapJoinJob:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
public class MapJoinJob {
public static void main(String[] args) throws IOException, URISyntaxException, InterruptedException, ClassNotFoundException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(MapJoinJob.class);
Path path = new Path("file:///e:/temp1/output");
if (path.getFileSystem(conf).exists(path)){
path.getFileSystem(conf).delete(path,true);
}
//设置缓存文件的路径
job.addCacheFile(new URI("file:///e:/temp1/read1/customers1.csv"));
//设置输入输出文件
FileInputFormat.setInputPaths(job,"file:///e:/temp1/read1/orders.csv");
FileOutputFormat.setOutputPath(job,path);
//设置mapper类
job.setMapperClass(MapJoinMapper.class);
job.setMapOutputKeyClass(CustomerOrder.class);
job.setMapOutputValueClass(NullWritable.class);
job.setNumReduceTasks(0);
job.waitForCompletion(true);
}
}
注意:
- mapperjoin推荐使用,但是只适用于两张表格有一张小表格的情况
- 可以MapJoin省略Reducer,但是没有省略Map仅使用Reducer的
- 一般遇到输出是空文件夹的情况,最好使用DeBug的方法,将断点打在每个方法的入口处,先检查方法是否进入,再根据具体情况分析。
-
创建的文件编码十分重要,最好能使用notepad打开查看,否则Windows环境下的编码十分容易造成空输出问题的出现。例如下面两张图,第一张图输出空文件夹,第二张图正常输出。
版权声明:本文为Gooooot原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。