1.这是hadoop基础系列教程,适合入门者学习。
2.MapReduce是一种分布式计算模型,解决海量数据问题,由两个阶段组成,map()和reduce()。本文不讲解原理,下面实际操作利用MapReduce实现一个统计单词。
首先准备一个单词源文件,我们要统计每个单词出现的次数
新建word.txt文件,内容如下:
hadluo jeadong flash
flash
jea hadluo hadluo
read flash
MapReduce计算后的结果文件应该是:
单词 次数
hadluo 3
jeadong 1
flash 3
jea 1
read 1
将word.txt 上传到hdfs文件系统的根目录下
hadoop fs -put words.txt路径 hdfs://
新建测试java工程
maven配置
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-core -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>0.20.2</version>
</dependency>
<dependency>
<groupId>jdk.tools</groupId>
<artifactId>jdk.tools</artifactId>
<version>1.7</version>
<scope>system</scope>
<systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.7</source>
<target>1.7</target>
</configuration>
</plugin>
</plugins>
</build>
新建java类hadoop.WordCount.java
package hadoop;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCount {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
System.err.println("输入:" + args[0] + " 输出:" + args[1]);
Job job = new Job(new Configuration());
//设置main函数的类
job.setJarByClass(WordCount.class);
//设置map的class
job.setMapperClass(WCMapper.class);
// map 的输出key类型
job.setMapOutputKeyClass(Text.class);
// map 的 输出 value类型
job.setMapOutputValueClass(LongWritable.class);
// 设置 单词文件的路径
FileInputFormat.setInputPaths(job,
new Path(args[0]));
//设置 reduce 的 class
job.setReducerClass(MCReducer.class);
//设置reduce的 输出key
job.setOutputKeyClass(Text.class);
//设置reduce的 输入 value
job.setOutputValueClass(LongWritable.class);
//设置输出文件的 路径
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//提交任务
job.waitForCompletion(true); // true 打印进度详情
System.err.println("启动MapReduce, 输入文件:" + args[0] + " 输出:" + args[1]);
}
}
/***
* Map过程,
* 输入key:字符偏移量 LongWritable类型
* 输入value: 文件一行的内容 Text类型
* 输出key: 一个单词 Text类型
* 输出value: 整形值 1 LongWritable类型
*
* LongWritable 对应long ,在hadoop中序列化了
* Text 对应String ,在hadoop中序列化了
* @author HadLuo
* @since JDK1.7
* @history 2017年12月27日 新建
*/
class WCMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
@Override
protected void map(LongWritable key, Text value,
Mapper<LongWritable, Text, Text, LongWritable>.Context context) throws IOException,
InterruptedException {
// 得到输入文件的一行数据
String line = value.toString();
//以空格分隔 ,这样得到了每个单词
String[] words = line.split(" ");
for (String word : words) {
// 输出到reduce 阶段进行计算统计
context.write(new Text(word), new LongWritable(1));
}
}
}
/***
* Reduce过程,
* 输入key:一个单词 Text类型
* 输入value: 整形值 1 LongWritable类型
* 输出key: 一个单词 Text类型
* 输出value: 整形值(总单词个数) LongWritable类型
*
* @author HadLuo
* @since JDK1.7
* @history 2017年12月27日 新建
*/
class MCReducer extends Reducer<Text, LongWritable, Text, LongWritable>{
/**
*key : map阶段 分组后的 单词,
*values : 迭代器,map输出的 所有值
**/
@Override
protected void reduce(Text key, Iterable<LongWritable> values,
Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException,
InterruptedException {
long wordSum = 0;
for(LongWritable l : values){
wordSum += l.get();
}
context.write(key, new LongWritable(wordSum));
}
}
代码分析:
源文件word.txt
hadluo jeadong flash
flash
jea hadluo hadluo
read flash
先经过map()函数
map()的输入如下:
(0,hadluo jeadong flash)
(20,flash)
(26,jea hadluo hadluo)
(44,read flash)
输入key是字符偏移量(包括换行符),map函数并不需要此信息,将其忽略, map函数的功能只以空格提取每个单词,并输出如下:
(hadluo,1)
(jeadong,1)
(flash,1)
(flash,1)
(jea,1)
(hadluo,1)
(hadluo,1)
(read,1)
(flash,1)
然后经过MapReduce框架处理后,将以上map的输出进行排序和重组,得到如下:
( hadluo,[1,1,1] )
( jeadong,[1] )
( flash,[1,1,1] )
( jea,[1] )
( read,[1] )
以上就是reduce函数的输入,然后经过reduce函数,我们的reduce函数逻辑如下:
// values: 就是上面我们的 [1,1,1...]
//key: 就是我们上面的单词
protected void reduce(Text key, Iterable<LongWritable> values,
Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException,
InterruptedException {
// 求和
long wordSum = 0;
for(LongWritable l : values){
wordSum += l.get();
}
context.write(key, new LongWritable(wordSum));
}
最后一步就是提交作业,也就是我们main函数的内容,已经有注释了不再多解释。
代码写好之后,在包上右键export,导出jarFile,得到wc.jar,并提交到Linux文件系统 /home/wc.jar
运行hadoop命令
>> hadoop jar /home/wc.jar hadoop.WordCount /words.txt /words_out
hadoop.WordCount : 为jar文件main类的全路径
/words.txt : 为hdfs的源单词文件
/words_out : 经过MapReduce后的结果输出文件(在hdfs上)
将结果文件words_out 下载到linux文件系统上
hadoop fs -get /words_out ./words_out
我们看到words_out有两个文件,
-rw-r--r--. 1 root root 43 12月 28 14:51 part-r-00000
-rw-r--r--. 1 root root 0 12月 28 14:51 _SUCCESS
vim part-r-00000就是我们统计单词的结果
flash 3
hadluo 3
jea 1
jeadong 1
read 1
今天的单词统计过程和代码分析就到这里,谢谢大家!
老生常谈:深圳有爱好音乐的会打鼓(吉他,键盘,贝斯等)的程序员和其它职业可以一起交流加入我们乐队一起嗨。我的QQ:657455400 表演视频实例:
https://v.qq.com/x/page/f0517awx0x4.html