HDFS之利用API进行词频统计

  • Post author:
  • Post category:其他


HDFSWCApp01.java

package com.imooc.bigdata.hadoop.hdfs;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;

/**
 * 使用HDFS API完成wordcount统计
 *
 * 需求:统计HDFS上的文件的wc,然后将统计结果输出到HDFS
 *
 * 功能拆解:
 * 1)读取HDFS上的文件 ==> HDFS API
 * 2)业务处理(词频统计):对文件中的每一行数据都要进行业务处理(按照分隔符分割) ==> Mapper
 * 3)将处理结果缓存起来   ==> Context
 * 4)将结果输出到HDFS ==> HDFS API
 *
 */
public class HDFSWCApp01 {

    public static void main(String[] args) throws Exception {

        // 1)读取HDFS上的文件 ==> HDFS API
        Path input = new Path("/hdfsapi/test/h.txt");


        // 获取要操作的HDFS文件系统
        FileSystem fs = FileSystem.get(new URI("hdfs://192.168.0.233:8020"), new Configuration(),"hadoop");

        RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(input, false);

        ImoocMapper mapper = new WordCountMapper();
        ImoocContext context = new ImoocContext();

        while(iterator.hasNext()) {
            LocatedFileStatus file = iterator.next();
            FSDataInputStream in = fs.open(file.getPath());
            BufferedReader reader = new BufferedReader(new InputStreamReader(in));

            String line = "";
            while ((line = reader.readLine()) != null) {

                // 2)业务处理(词频统计)   (hello,3)
                // TODO... 在业务逻辑完之后将结果写到Cache中去
                mapper.map(line, context);
            }

            reader.close();
            in.close();
        }


        //TODO... 3 将处理结果缓存起来  Map

        Map<Object, Object> contextMap = context.getCacheMap();

        // 4)将结果输出到HDFS ==> HDFS API
        Path output = new Path("/hdfsapi/output/");

        FSDataOutputStream out = fs.create(new Path(output, new Path("wc.out")));

        // TODO... 将第三步缓存中的内容输出到out中去
        Set<Map.Entry<Object, Object>> entries = contextMap.entrySet();
        for(Map.Entry<Object, Object> entry : entries) {
            out.write((entry.getKey().toString() + " \t " + entry.getValue() + "\n").getBytes());
        }

        out.close();
        fs.close();

        System.out.println("xuru的HDFS API统计词频运行成功....");

    }

}

ImoocContext.java

package com.imooc.bigdata.hadoop.hdfs;

import java.util.HashMap;
import java.util.Map;

/**
 * 自定义上下文,其实就是缓存
 */
public class ImoocContext {

    private Map<Object, Object> cacheMap = new HashMap<Object, Object>();

    public Map<Object, Object> getCacheMap() {
        return cacheMap;
    }

    /**
     * 写数据到缓存中去
     * @param key 单词
     * @param value 次数
     */
    public void write(Object key, Object value) {
        cacheMap.put(key, value);
    }

    /**
     * 从缓存中获取值
     * @param key 单词
     * @return  单词对应的词频
     */
    public Object get(Object key) {
        return cacheMap.get(key);
    }


}

ImoocMapper.java

package com.imooc.bigdata.hadoop.hdfs;

/**
 * 自定义Mapper
 */
public interface ImoocMapper {

    /**
     *
     * @param line  读取到到每一行数据
     * @param context  上下文/缓存
     */
    public void map(String line, ImoocContext context);
}

WordCountMapper.java

package com.imooc.bigdata.hadoop.hdfs;

/**
 * 自定义wc实现类
 */
public class WordCountMapper implements ImoocMapper {

    public void map(String line, ImoocContext context) {
        String[] words = line.split("\t");

        for(String word : words) {
            Object value = context.get(word);
            if(value == null) { // 表示没有出现过该单词
                context.write(word, 1);
            } else {
                int v = Integer.parseInt(value.toString());
                context.write(word, v+1);  // 取出单词对应的次数+1
            }
        }

    }
}

在这里插入图片描述



版权声明:本文为qq_45400755原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。