hadoop基础教程(二) MapReduce 单词统计

  • Post author:
  • Post category:其他



1.这是hadoop基础系列教程,适合入门者学习。


2.MapReduce是一种分布式计算模型,解决海量数据问题,由两个阶段组成,map()和reduce()。本文不讲解原理,下面实际操作利用MapReduce实现一个统计单词。

首先准备一个单词源文件,我们要统计每个单词出现的次数

新建word.txt文件,内容如下:

hadluo jeadong flash
flash
jea hadluo hadluo
read flash

MapReduce计算后的结果文件应该是:

单词  次数
hadluo 3
jeadong 1
flash 3
jea 1
read 1

将word.txt 上传到hdfs文件系统的根目录下

hadoop fs -put words.txt路径 hdfs://

新建测试java工程

maven配置

        <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-core -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-core</artifactId>
            <version>0.20.2</version>
        </dependency>

        <dependency>
            <groupId>jdk.tools</groupId>
            <artifactId>jdk.tools</artifactId>
            <version>1.7</version>
            <scope>system</scope>
            <systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>
        </dependency>

    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.7</source>
                    <target>1.7</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

新建java类hadoop.WordCount.java

package hadoop;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        System.err.println("输入:" + args[0] + "  输出:" + args[1]);
        Job job = new Job(new Configuration());
        //设置main函数的类
        job.setJarByClass(WordCount.class);
        //设置map的class
        job.setMapperClass(WCMapper.class);
        // map 的输出key类型
        job.setMapOutputKeyClass(Text.class);
        // map 的 输出 value类型
        job.setMapOutputValueClass(LongWritable.class);
        // 设置  单词文件的路径
        FileInputFormat.setInputPaths(job, 
                new Path(args[0]));

        //设置 reduce 的 class
        job.setReducerClass(MCReducer.class);
        //设置reduce的 输出key
        job.setOutputKeyClass(Text.class);
        //设置reduce的 输入 value
        job.setOutputValueClass(LongWritable.class);
        //设置输出文件的 路径
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        //提交任务
        job.waitForCompletion(true); // true 打印进度详情
        System.err.println("启动MapReduce,  输入文件:" + args[0] + "  输出:" + args[1]);
    }
}

/***
 * Map过程,
 * 输入key:字符偏移量    LongWritable类型
 * 输入value: 文件一行的内容   Text类型
 * 输出key: 一个单词    Text类型
 * 输出value: 整形值 1   LongWritable类型
 * 
 * LongWritable 对应long ,在hadoop中序列化了
 * Text 对应String ,在hadoop中序列化了
 * @author HadLuo
 * @since JDK1.7
 * @history 2017年12月27日 新建
 */
class WCMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
    @Override
    protected void map(LongWritable key, Text value,
            Mapper<LongWritable, Text, Text, LongWritable>.Context context) throws IOException,
            InterruptedException {
        // 得到输入文件的一行数据
        String line = value.toString();
        //以空格分隔 ,这样得到了每个单词
        String[] words = line.split(" ");
        for (String word : words) {
            // 输出到reduce 阶段进行计算统计
            context.write(new Text(word), new LongWritable(1));
        }

    }
}
/***
 * Reduce过程,
 * 输入key:一个单词    Text类型
 * 输入value: 整形值 1    LongWritable类型
 * 输出key: 一个单词    Text类型
 * 输出value: 整形值(总单词个数)   LongWritable类型
 * 
 * @author HadLuo
 * @since JDK1.7
 * @history 2017年12月27日 新建
 */
class MCReducer extends Reducer<Text, LongWritable, Text, LongWritable>{

    /**
     *key : map阶段 分组后的 单词,
     *values : 迭代器,map输出的 所有值
     **/
    @Override
    protected void reduce(Text key, Iterable<LongWritable> values,
            Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException,
            InterruptedException {
        long wordSum = 0;
        for(LongWritable l : values){
            wordSum += l.get();
        }
        context.write(key, new LongWritable(wordSum));
    }
}


代码分析:

源文件word.txt

hadluo jeadong flash
flash
jea hadluo hadluo
read flash

先经过map()函数

map()的输入如下:

(0,hadluo jeadong flash)
(20,flash)
(26,jea hadluo hadluo)
(44,read flash)

输入key是字符偏移量(包括换行符),map函数并不需要此信息,将其忽略, map函数的功能只以空格提取每个单词,并输出如下:

(hadluo,1)
(jeadong,1)
(flash,1)
(flash,1)
(jea,1)
(hadluo,1)
(hadluo,1)
(read,1)
(flash,1)

然后经过MapReduce框架处理后,将以上map的输出进行排序和重组,得到如下:

( hadluo,[1,1,1] )
( jeadong,[1] )
( flash,[1,1,1] )
( jea,[1] )
( read,[1] )

以上就是reduce函数的输入,然后经过reduce函数,我们的reduce函数逻辑如下:

    // values: 就是上面我们的 [1,1,1...]
    //key: 就是我们上面的单词
    protected void reduce(Text key, Iterable<LongWritable> values,
            Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException,
            InterruptedException {
           // 求和
        long wordSum = 0;
        for(LongWritable l : values){
            wordSum += l.get();
        }
        context.write(key, new LongWritable(wordSum));
    }

最后一步就是提交作业,也就是我们main函数的内容,已经有注释了不再多解释。


代码写好之后,在包上右键export,导出jarFile,得到wc.jar,并提交到Linux文件系统 /home/wc.jar


运行hadoop命令

>> hadoop jar /home/wc.jar hadoop.WordCount /words.txt /words_out

hadoop.WordCount : 为jar文件main类的全路径
/words.txt :  为hdfs的源单词文件
/words_out : 经过MapReduce后的结果输出文件(在hdfs上)

将结果文件words_out 下载到linux文件系统上

hadoop fs -get /words_out ./words_out

我们看到words_out有两个文件,

-rw-r--r--. 1 root root 43 1228 14:51 part-r-00000
-rw-r--r--. 1 root root  0 1228 14:51 _SUCCESS

vim part-r-00000就是我们统计单词的结果

flash   3
hadluo  3
jea     1
jeadong 1
read    1

今天的单词统计过程和代码分析就到这里,谢谢大家!


老生常谈:深圳有爱好音乐的会打鼓(吉他,键盘,贝斯等)的程序员和其它职业可以一起交流加入我们乐队一起嗨。我的QQ:657455400 表演视频实例:

https://v.qq.com/x/page/f0517awx0x4.html



版权声明:本文为LuoZheng4698729原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。