Hadoop源码分析(三)

  • Post author:
  • Post category:其他




2021SC@SDUSC



研究内容简略介绍

上周我们分析了org.apache.hadoop.mapreduce.Job中的的剩余代码,本周将开始对

org.apache.hadoop.mapreduce.Cluster

进行分析。

在这里插入图片描述



hadoop.mapreduce.Cluster源码文件



Cluster构造方法

先来看下Cluster类的成员信息。

package org.apache.hadoop.mapreduce;

import ...
/**
 * Provides a way to access information about the map/reduce cluster.
 */
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class Cluster {

  @InterfaceStability.Evolving
  public static enum JobTrackerStatus {INITIALIZING, RUNNING};
  private ClientProtocolProvider clientProtocolProvider;  //客户端通信协议提供者
  private ClientProtocol client;        //客户端通信协议实例
  private UserGroupInformation ugi;     //用户组信息
  private Configuration conf;           //配置信息
  private FileSystem fs = null;         //文件系统实例
  private Path sysDir = null;           //系统路径
  private Path stagingAreaDir = null;   //作业资源存放路径
  private Path jobHistoryDir = null;    //作业历史路径
  private static final Log LOG = LogFactory.getLog(Cluster.class);  //日志
  //客户端通信协议提供者加载器
  private static ServiceLoader<ClientProtocolProvider> frameworkLoader =
      ServiceLoader.load(ClientProtocolProvider.class);

  static {
    ConfigUtil.loadResources();
  }

  public Cluster(Configuration conf) throws IOException {
    this(null, conf);
  }

  public Cluster(InetSocketAddress jobTrackAddr, Configuration conf) 
      throws IOException {
    this.conf = conf;  //设置配置信息
    this.ugi = UserGroupInformation.getCurrentUser(); //获取当前用户
    initialize(jobTrackAddr, conf);  //完成初始化
  }

  ...
}

官方在源码的开头给出了对cluster类的粗略注释


Provides a way to access information about the map/reduce cluster.


也就是说这个类为我们提供了对map/reduce集群获取信息的方法。

Cluster最重要的两个成员变量是客户端通信协议提供者ClientProtocolProvider实例clientProtocolProvider和客户端通信协议

ClientProtocol实例client,而后者是依托前者的create()方法生成的。

Cluster类提供了两个构造函数。


public Cluster(Configuration conf)

需要传入参数为Configuration 类的对象,我们将在稍后介绍Configuration 的相关信息。这里面的conf,返回的就是各种${HADOOP_HOME}/etc/hadoop里面的配置文件,最后是以XML的集合形式存放。

第二个构造方法

public Cluster(InetSocketAddress jobTrackAddr, Configuration conf)

额外需要一个InetSocketAddress类对象作为参数传入,查询官方api可知,公共类InetSocketAddress扩展了类SocketAddress。这个类实现了一个IP Socket Address(IP地址+端口号),也可以是一对(主机名+端口号),在这种情况下会尝试解析主机名。如果解析失败,则该地址被称为未解析,但仍可在某些情况下使用,例如通过代理连接。它提供了一个由套接字用于绑定、连接或作为返回值的不可变对象。

两个方法的区别在于第二种构造方法除了为集群设置配置信息外,还能够获取当前用户,并借助jobTrackAddr完成初始化—初始化了集群的信息,包括配置文件。



方法initProviderList

private void initProviderList() {
    if (providerList == null) {
      synchronized (frameworkLoader) {
        if (providerList == null) {
          List<ClientProtocolProvider> localProviderList =
              new ArrayList<ClientProtocolProvider>();
          try {
            for (ClientProtocolProvider provider : frameworkLoader) {
              localProviderList.add(provider);
            }
          } catch(ServiceConfigurationError e) {
            LOG.info("Failed to instantiate ClientProtocolProvider, please "
                         + "check the /META-INF/services/org.apache."
                         + "hadoop.mapreduce.protocol.ClientProtocolProvider "
                         + "files on the classpath", e);
          }
          providerList = localProviderList;
        }
      }
    }
  }

initProviderList负责初始化cluster的基本信息。

代码第四行判断用户是否提交配置信息,如果没有提交,则下面开始初始化LocalProviderList,这也就是如果本地测试运行时,clientProtocol为什么会返回

org.apache.hadoop.LocalJobRunner@XXX

的原因。首先初始化一个数组,遍历ClientProtocolProvider ,依次存入数组localProviderList中,若产生异常,则抛出具体错误信息。

当然,若providerList有东西,说明是提交到集群上,并且拿到了相关配置信息,我们就可以直接退出这个方法了。



方法initialize

private void initialize(InetSocketAddress jobTrackAddr, Configuration conf)
      throws IOException {

    initProviderList();
    final IOException initEx = new IOException(
        "Cannot initialize Cluster. Please check your configuration for "
            + MRConfig.FRAMEWORK_NAME
            + " and the correspond server addresses.");
    if (jobTrackAddr != null) {
      LOG.info(
          "Initializing cluster for Job Tracker=" + jobTrackAddr.toString());
    }
    for (ClientProtocolProvider provider : providerList) {
      LOG.debug("Trying ClientProtocolProvider : "
          + provider.getClass().getName());
      ClientProtocol clientProtocol = null;
      try {
        if (jobTrackAddr == null) {
          clientProtocol = provider.create(conf);
        } else {
          clientProtocol = provider.create(jobTrackAddr, conf);
        }

        if (clientProtocol != null) {
          clientProtocolProvider = provider;
          client = clientProtocol;
          LOG.debug("Picked " + provider.getClass().getName()
              + " as the ClientProtocolProvider");
          break;
        } else {
          LOG.debug("Cannot pick " + provider.getClass().getName()
              + " as the ClientProtocolProvider - returned null protocol");
        }
      } catch (Exception e) {
        final String errMsg = "Failed to use " + provider.getClass().getName()
            + " due to error: ";
        initEx.addSuppressed(new IOException(errMsg, e));
        LOG.info(errMsg, e);
      }
    }

    if (null == clientProtocolProvider || null == client) {
      throw initEx;
    }
  }

initialize是cluster在初始化时真正调用的方法,代码第四行调用了我们上面提到的initProviderList方法来初始化集群基本信息。

在这里插入图片描述

for循环中依次取出每个ClientProtocolProvider,通过其create()方法构造ClientProtocol实例。

在这里插入图片描述

接下来判断jobTrackAddr是否为空。我们通过jobTrackAddr是否存在来决定配置文件有没有配置YARN信息。

如果配置文件没有配置YARN信息,则构建LocalRunner,MR任务本地运行。如果配置文件有配置YARN信息,则构建YarnRunner,MR任务在YARN集群上运行。

在这里插入图片描述

最后设置clientProtocolProvider 和client信息,并且退出循环。不难看出其余代码部分主要是处理抛出的异常,这里就不做进一步分析。至此,cluster初始化完成。

上面create()方法时提到了两种ClientProtocolProvider实现类。

MapReduce中,ClientProtocolProvider抽象类的实现共有YarnClientProtocolProvider、LocalClientProtocolProvider两种,前者为Yarn模式,而后者为Local模式。

在这里插入图片描述

为了进一步了解两者的区别,我们分别点进对应方法查看源码。



ClientProtocolProvider实现类LocalClientProtocolProvider

我们先看下看下Local模式,LocalClientProtocolProvider的create()方法,代码如下:

package org.apache.hadoop.mapred;
import java.io.IOException;
import java.net.InetSocketAddress;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
import org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider;

@InterfaceAudience.Private
public class LocalClientProtocolProvider extends ClientProtocolProvider {

  @Override
  public ClientProtocol create(Configuration conf) throws IOException {
    //两个常量:"mapreduce.framework.name","local"
    String framework =
        conf.get(MRConfig.FRAMEWORK_NAME, MRConfig.LOCAL_FRAMEWORK_NAME);
    //若framework是local,则返回LocalJobRunner,并且设置Map任务数1;否则返回null
    if (!MRConfig.LOCAL_FRAMEWORK_NAME.equals(framework)) {
      return null;
    }
    conf.setInt(JobContext.NUM_MAPS, 1);
    return new LocalJobRunner(conf);
  }

  @Override
  public ClientProtocol create(InetSocketAddress addr, Configuration conf) {
    return null; // LocalJobRunner doesn't use a socket
  }

  @Override
  public void close(ClientProtocol clientProtocol) {
    // no clean up required
  }

}

由上可知,MapReduce需要看参数mapreduce.framework.name确定连接模式,但默认是Local模式的。



ClientProtocolProvider实现类YarnClientProtocolProvider

再来看Yarn模式,看下YarnClientProtocolProvider的create()方法:

package org.apache.hadoop.mapred;
import java.io.IOException;
import java.net.InetSocketAddress;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
import org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider;
public class YarnClientProtocolProvider extends ClientProtocolProvider {

  @Override
  public ClientProtocol create(Configuration conf) throws IOException {
    //若参数mapreduce.framework.name配置为Yarn,则构造一个YARNRunner实例并返回,否则返回null
    if (MRConfig.YARN_FRAMEWORK_NAME.equals(conf.get(MRConfig.FRAMEWORK_NAME))) {
      return new YARNRunner(conf);
    }
    return null;
  }

  @Override
  public ClientProtocol create(InetSocketAddress addr, Configuration conf)
      throws IOException {
    return create(conf);
  }

  @Override
  public void close(ClientProtocol clientProtocol) throws IOException {
    if (clientProtocol instanceof YARNRunner) {
      ((YARNRunner)clientProtocol).close();
    }
  }
}

到了这里,我们就能够知道一个很重要的信息,Cluster中客户端通信协议ClientProtocol实例,要么是Yarn模式下的YARNRunner,要么就是Local模式下的LocalJobRunner。



小结

本次我们对org.apache.hadoop.mapreduce.Cluster的相关源码进行了分析。可以看出,cluster类并没有太多的特殊方法,只是提供给其他类的一些get方法。因此我们把重心放在了cluster的构造方法和初始化上。根据是否配置yarn信息,cluster类的初始化方法会决定MapReduce任务是在本地还是集群上运行。通过对cluster类的分析,对底层又有了进一步了解。



版权声明:本文为LittleBiscu1t原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。