MR(MapReduce)架构

  • Post author:
  • Post category:其他


一、什么是MR?

•  MapReduce将复杂的、运行于大规模集群上的并行计算过程高度地抽了两个函数:Map和Reduce。

二、为什么使用MR?(特点)

1、易于编程:只关心业务逻辑,它简单的实现一些接口,就可以完成一个分布式程序。

2、良好的扩展性:可动态的增加服务器,解决资源不够用问题。

3、高容错性:出错时转移到其他节点。

4、适合海量数据的计算。

MR框架
MR架构图

架构总结:

数据预处理在任务开始前调用类库,将输入数据分为多个分片(split);Mapper读取自己所属的文件分片(split),将输入数据转换为kv键值对;然后根据排序将相同数据放在相同分片上,聚合得到最终结果。

三、数据类型

四、编程规范

1、Mapper阶段

(1)用户自定义的Mapper要继承自己的父类

(2)Mapper输入的数据是k,v形式

(3)Mapper中的业务逻辑写在map中

(4)Mapper的输出数据是KV形式

(5)map()方法对每一次k,v调用一次

2、Reduce阶段

(1)用户自定义的reduce要继承自己的父类

(2)map输出的数据类型对应reduce输入的数据类型

(3)reduce的业务逻辑在reduce方法中

(4)reduceTadk进程对每一组相同k的<k,v>组调用一次reduce()方法

3、Driver阶段

相当于YARN集群的客户端,用于提交我们整个程序到YARN集群,提交的是封装了MapReduce程序相关运行参数的job对象

五、案例操作

1、mvn依赖

<dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.13</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>2.8.2</version>
        </dependency>
        <!-- 辅助模块包 -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.7.2</version>
        </dependency>
        <!-- 包含mr -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.7.2</version>
        </dependency>
        <!-- hdfs -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.7.2</version>
        </dependency>

    </dependencies>

2、log日志

log4j.rootLogger=INFO, stdout 
log4j.appender.stdout=org.apache.log4j.ConsoleAppender 
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout 
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n 
log4j.appender.logfile=org.apache.log4j.FileAppender 
log4j.appender.logfile.File=target/spring.log 
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout 
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

3、编写Mapper类

public class Mapper1 extends Mapper<LongWritable, Text, Text, IntWritable> {

    
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //获取一行字符串并用文件中所有的格式进行切割
        //试过\t和  |  切割都有问题  
        String [] s = value.toString().split(",");
        //遍历数组
        for(String  str:s){
            //输出
            context.write(new Text(str),new IntWritable(1))
);
        }
    }
}

4、编写Reducer

public class Reduce1 extends Reducer<Text, IntWritable,Text,IntWritable> {
   private  IntWritable  out=new IntWritable();
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int sum=0;
        for (IntWritable value : values) {
            //把IntWritable转成int类型
            sum+=value.get();
        }

        out.set(sum);
        //key  就是map传来对的key
        context.write(key,out);
    }
}

5、编写Driver类

public class Driver1 {
    @Test
    public  void test() throws IOException, ClassNotFoundException, InterruptedException {
        //获取job
        Configuration conf=new  Configuration();

        Job job = Job.getInstance(conf);
        // 设置jar包
        job.setJarByClass(Driver1.class);
        // 关联mapper和reduce
        job.setMapperClass(Mapper1.class);
        job.setReducerClass(Reduce1.class);
        // 设置map输出出的kv类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        //设置最终输出的kv类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        //设置输入路径和输出路径
        FileInputFormat.setInputPaths(job,new Path("D:\\hadoop\\input"));
        FileOutputFormat.setOutputPath(job,new Path("D:\\hadoop\\output"));
        //提交job
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}

6、集群测试(添加build)

<build>
        <plugins>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>2.3.2</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <artifactId>maven-assembly-plugin </artifactId>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                    <archive>
                        <manifest>
     				 		<!--  这里是你自己的Driver的全路径   -->
                            <mainClass>com.lj.wordcount.DriverTest</mainClass>
                        </manifest>
                    </archive>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

7、打成jar包集群运行


[

root

@hadoop

hadoop




2.7.2


]

sbin

/

start



dfs

.


sh



[

root

@hadoop

hadoop




2.7.2


]

$ sbin

/

start



yarn

.


sh

[root@hadoop hadoop-2.7.2]hadoop fs -put word.txt /input


[

root

@hadoop

hadoop




2.7.2


]

$ hadoop jar wc

.


jar

com

.


lj


.


wordcount


.


Driver1


/

input

/

output



版权声明:本文为qq_66563605原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。