import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class WordCount {
/**
* 默认MapReduce是通过TextInputFormat进行切片并交给Mapper类进行处理
* TextInputFormat:
* key->当前前行的首字母的索引
* value->当前行的数据
* Mapper类参数:
* 进行了包装:
* 输入key类型:Long类型
* 输入value类型:String
* 输出key类型:String(单词)
* 输出value类型:Long(个数)
*MapReduce为了在传输时需要进行序列化,通过包装,对基本类型进行包装,实现自己的序列化方法
*/
public static class WordCountMapper extends Mapper<LongWritable,Text,Text,LongWritable>{
LongWritable one = new LongWritable(1);
/**
*将每行数据拆分
*/
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String words = value.toString();
//将每行数据拆分成各个单词
String[] wordArr = words.split("\\s+");
for (String word : wordArr) {
//将单词输出 每个单词作为一个
context.write(new Text(word),one);
}
}
}
/**
*
* 进行全局聚合
* Reducer参数:
* 同样对参数进行了包装
* 输入:
* key:String(单词),value:Long(个数)
* 输出:
* key:String,value:Long
*
*/
public static class WordCountReducer extends Reducer<Text,LongWritable,Text,LongWritable>{
/**
*将map输出结果进行全局聚合
* key:单词,values:单词个数集合[1,1,1],context:上下文
*/
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
Long sum = 0L;
for (LongWritable value:values) {
//累加单词的个数
sum += value.get();
}
//以 单词,个数输出最终结果
context.write(key,new LongWritable(sum));
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//创建一个Job
Configuration conf = new Configuration();
conf.set("fs.defaultFS","hdfs://192.168.43.20:9000");
Job job = Job.getInstance(conf,"word-count");
//需要打成jar包才能运行
job.setJarByClass(WordCount.class);
//1.指定输入文件
//通过参数传递一个path进来
FileInputFormat.addInputPath(job,new Path(args[0]));
//2.编写mapper处理逻辑
job.setMapperClass(WordCountMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
//3.shuffle流程
// todo
//4.reducer处理逻辑
job.setReducerClass(WordCountReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
//5.输出文件 并指定输出目录
FileOutputFormat.setOutputPath(job,new Path(args[1]));
//6.运行Job
boolean result = job.waitForCompletion(true);
System.out.println(result ? 1 : 0);
}
}
使用maven打成jar包,上传到服务器
执行:bin/hadoop jar share/hadoop/mapreduce/jar包名称.jar 包名xxx.xxx.xxx.WordCount 输入文件目录 输出文件目录