Hadoop中的KeyValueInputFormat

  • Post author:
  • Post category:其他



一:背景

有时候,我们可以不以偏移量和行文本内容来作为数据源到MapTask的输入格式,而使用键值对的形式,使用KeyValueInputFormat就可以完成这种需求。


二:技术实现

数据源如下

技术分享

操作代码如下:

public class MyKeyValueTextInputFormat {  
        // 定义输入路径  
        private static final String INPUT_PATH = "hdfs://liaozhongmin:9000/hello";  
        // 定义输出路径  
        private static final String OUT_PATH = "hdfs://liaozhongmin:9000/out";  
  
        public static void main(String[] args) {  
  
            try {  
                // 创建配置信息  
                Configuration conf = new Configuration();  
                //设置行的分隔符,这里是制表符,第一个制表符前面的是Key,第一个制表符后面的内容都是value  
                conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR, "\t");  
                /**********************************************/  
                //对Map端输出进行压缩  
                /*conf.setBoolean("mapred.compress.map.output", true); 
                //设置map端输出使用的压缩类 
                conf.setClass("mapred.map.output.compression.codec", GzipCodec.class, CompressionCodec.class); 
                //对reduce端输出进行压缩 
                conf.setBoolean("mapred.output.compress", true); 
                //设置reduce端输出使用的压缩类 
                conf.setClass("mapred.output.compression.codec", GzipCodec.class, CompressionCodec.class);*/  
                // 添加配置文件(我们可以在编程的时候动态配置信息,而不需要手动去改变集群)  
                /* 
                 * conf.addResource("classpath://hadoop/core-site.xml");  
                 * conf.addResource("classpath://hadoop/hdfs-site.xml"); 
                 * conf.addResource("classpath://hadoop/hdfs-site.xml"); 
                 */  
  
                // 创建文件系统  
                FileSystem fileSystem = FileSystem.get(new URI(OUT_PATH), conf);  
                // 如果输出目录存在,我们就删除  
                if (fileSystem.exists(new Path(OUT_PATH))) {  
                    fileSystem.delete(new Path(OUT_PATH), true);  
                }  
  
                // 创建任务  
                Job job = new Job(conf, MyKeyValueTextInputFormat.class.getName());  
  
                //1.1   设置输入目录和设置输入数据格式化的类  
                FileInputFormat.setInputPaths(job, INPUT_PATH);  
                job.setInputFormatClass(KeyValueTextInputFormat.class);  
  
                //1.2   设置自定义Mapper类和设置map函数输出数据的key和value的类型  
                job.setMapperClass(MyKeyValueInputFormatMapper.class);  
                job.setMapOutputKeyClass(Text.class);  
                job.setMapOutputValueClass(LongWritable.class);  
  
                //1.3   设置分区和reduce数量(reduce的数量,和分区的数量对应,因为分区为一个,所以reduce的数量也是一个)  
                job.setPartitionerClass(HashPartitioner.class);  
                job.setNumReduceTasks(1);  
  
                //1.4   排序、分组  
                //1.5   归约  
                //2.1   Shuffle把数据从Map端拷贝到Reduce端。  
                //2.2   指定Reducer类和输出key和value的类型  
                job.setReducerClass(MyKeyValueInputFormatReducer.class);  
                job.setOutputKeyClass(Text.class);  
                job.setOutputValueClass(LongWritable.class);  
  
                //2.3   指定输出的路径和设置输出的格式化类  
                FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));  
                job.setOutputFormatClass(TextOutputFormat.class);  
  
                // 提交作业 退出  
                System.exit(job.waitForCompletion(true) ? 0 : 1);  
              
            } catch (Exception e) {  
                e.printStackTrace();  
            }  
        }  
      
        /** 
         * 自定义Mapper类 
         * @author 廖钟民 
         * time : 2015年1月15日下午8:00:01 
         * @version 
         */  
    public static class MyKeyValueInputFormatMapper extends Mapper<Text, Text, Text, LongWritable>{  
  
        /** 
         * 输入数据是 
         * hello    you 
         * hello    me 
         * you  me  love 
         *  
         * 进入map的键值对应该是<hello,you> <hello,me> <you,me love>每个键值对分别调用map()函数 
         */  
        protected void map(Text key, Text value, Mapper<Text, Text, Text, LongWritable>.Context context) throws IOException, InterruptedException {  
            //把key和value都当做key写出去  
            context.write(key, new LongWritable(1));  
            context.write(value, new LongWritable(1));  
        }  
    }  
    /** 
     * map()函数的输出结果为: 
     *<hello,1> <you,1> <hello,1> <me,1> <you,1> <me love,1> 
     *排序分组后的结果为: 
     *<hello,{1,1}> <me,{1}> <me love,{1}> <you,{1,1}> 
     */  
    /** 
     * 自定义Reducer类 
     * @author 廖钟民 
     * time : 2015年1月15日下午8:00:12 
     * @version 
     */  
    public static class MyKeyValueInputFormatReducer extends Reducer<Text, LongWritable, Text, LongWritable>{  
        @Override  
        protected void reduce(Text key, Iterable<LongWritable> values, Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException,  
                InterruptedException {  
              
            int sum = 0;  
            //遍历统计  
            for (LongWritable s : values){  
                sum += s.get();  
            }  
              
            context.write(key, new LongWritable(sum));  
        }  
    }  
}  

程序运行结果:

技术分享



版权声明:本文为asd1456732891原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。