HDFSWCApp01.java
package com.imooc.bigdata.hadoop.hdfs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
/**
* 使用HDFS API完成wordcount统计
*
* 需求:统计HDFS上的文件的wc,然后将统计结果输出到HDFS
*
* 功能拆解:
* 1)读取HDFS上的文件 ==> HDFS API
* 2)业务处理(词频统计):对文件中的每一行数据都要进行业务处理(按照分隔符分割) ==> Mapper
* 3)将处理结果缓存起来 ==> Context
* 4)将结果输出到HDFS ==> HDFS API
*
*/
public class HDFSWCApp01 {
public static void main(String[] args) throws Exception {
// 1)读取HDFS上的文件 ==> HDFS API
Path input = new Path("/hdfsapi/test/h.txt");
// 获取要操作的HDFS文件系统
FileSystem fs = FileSystem.get(new URI("hdfs://192.168.0.233:8020"), new Configuration(),"hadoop");
RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(input, false);
ImoocMapper mapper = new WordCountMapper();
ImoocContext context = new ImoocContext();
while(iterator.hasNext()) {
LocatedFileStatus file = iterator.next();
FSDataInputStream in = fs.open(file.getPath());
BufferedReader reader = new BufferedReader(new InputStreamReader(in));
String line = "";
while ((line = reader.readLine()) != null) {
// 2)业务处理(词频统计) (hello,3)
// TODO... 在业务逻辑完之后将结果写到Cache中去
mapper.map(line, context);
}
reader.close();
in.close();
}
//TODO... 3 将处理结果缓存起来 Map
Map<Object, Object> contextMap = context.getCacheMap();
// 4)将结果输出到HDFS ==> HDFS API
Path output = new Path("/hdfsapi/output/");
FSDataOutputStream out = fs.create(new Path(output, new Path("wc.out")));
// TODO... 将第三步缓存中的内容输出到out中去
Set<Map.Entry<Object, Object>> entries = contextMap.entrySet();
for(Map.Entry<Object, Object> entry : entries) {
out.write((entry.getKey().toString() + " \t " + entry.getValue() + "\n").getBytes());
}
out.close();
fs.close();
System.out.println("xuru的HDFS API统计词频运行成功....");
}
}
ImoocContext.java
package com.imooc.bigdata.hadoop.hdfs;
import java.util.HashMap;
import java.util.Map;
/**
* 自定义上下文,其实就是缓存
*/
public class ImoocContext {
private Map<Object, Object> cacheMap = new HashMap<Object, Object>();
public Map<Object, Object> getCacheMap() {
return cacheMap;
}
/**
* 写数据到缓存中去
* @param key 单词
* @param value 次数
*/
public void write(Object key, Object value) {
cacheMap.put(key, value);
}
/**
* 从缓存中获取值
* @param key 单词
* @return 单词对应的词频
*/
public Object get(Object key) {
return cacheMap.get(key);
}
}
ImoocMapper.java
package com.imooc.bigdata.hadoop.hdfs;
/**
* 自定义Mapper
*/
public interface ImoocMapper {
/**
*
* @param line 读取到到每一行数据
* @param context 上下文/缓存
*/
public void map(String line, ImoocContext context);
}
WordCountMapper.java
package com.imooc.bigdata.hadoop.hdfs;
/**
* 自定义wc实现类
*/
public class WordCountMapper implements ImoocMapper {
public void map(String line, ImoocContext context) {
String[] words = line.split("\t");
for(String word : words) {
Object value = context.get(word);
if(value == null) { // 表示没有出现过该单词
context.write(word, 1);
} else {
int v = Integer.parseInt(value.toString());
context.write(word, v+1); // 取出单词对应的次数+1
}
}
}
}
版权声明:本文为qq_45400755原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。