spark中master和worker之间的进程通讯(心跳机制)

  • Post author:
  • Post category:其他


模拟spark源码的心跳机制

  1. worker注册到Master, Master完成注册,并回复worker注册成功
  2. worker定时发送心跳
  3. Master接收到worker心跳后,要更新该worker的最近一次发送心跳的时间
  4. 给Master启动定时任务,定时检测注册的worker有哪些没有更新心跳,并将其从hashmap中删除

这个项目总体分三部分(三个模块)

  • worker向master完成注册
  • worker定时向master发送心跳信息
  • master启动定时任务,定时监测worker是否已经挂了,如果挂了就从注册信息中删除

代码注释写的非常详细,这里就不再说明,直接看注释即可;

SparkMaster


import akka.actor.{Actor, ActorSystem, Props}

import com.typesafe.config.ConfigFactory
import scala.concurrent.duration._
import scala.collection.mutable
// 定义sparkMaster节点
class SparkMaster extends Actor{
  //定义个全局的map,管理workers
  private val workers: mutable.Map[String, WorkerInfo] = mutable.Map[String,WorkerInfo]()
  override def receive: Receive = {
    case "start" => {
      println("master服务器启动了。。。")
      // 启动定时信息检测worker是否心跳超时
      self ! StartTimeOutWorker
    }
    case RegisterWorkerInfo(id,cpu,ram) =>{
      if(!workers.contains(id)){
        val workerInfo : WorkerInfo = new WorkerInfo(id,cpu,ram)
        workers += ((id,workerInfo))
        // 如果某一个worker注册成功,那么就给它回复一条信息
        sender() ! RegisteredWorkerInfo
      }
    }
    case HeartBeat(id) =>{
      // 首先将发送心跳的那个worker找出来,然后更新它的心跳时间
      val info : WorkerInfo = workers(id)
      info.lastHeartBeat=System.currentTimeMillis()
      println("master更新了 " + id + " 心跳时间...")
    }

    case StartTimeOutWorker => {
      import context.dispatcher  //导入隐式转换
      println("开始了定时检测worker心跳的任务")
      //说明
      //1. 0 millis 不延时,立即执行定时器
      //2. 9000 millis 表示每隔9秒执行一次
      //3. self:表示发给自己
      //4. RemoveTimeOutWorker 发送的内容
      context.system.scheduler.schedule(0 millis, 9000 millis, self, RemoveTimeOutWorker)
    }

    case RemoveTimeOutWorker =>{
      //取出map中的所有worker
      val workerInfos = workers.values
      // 获取当前系统时间
      val nowTime = System.currentTimeMillis()
      // 对那些 now - lastHeartBeat > 6000 的worker从map中删除
      workerInfos.filter(workerInfo => nowTime-workerInfo.lastHeartBeat>6000)
        .foreach(workerInfo=>workers.remove(workerInfo.id))
      println("当前有 " + workers.size + " 个worker存活的")
    }
  }
}

object SparkMasterApp{
  def main(args: Array[String]): Unit = {
    //这里我们分析出有3个host,port,sparkMasterActor
    if (args.length != 3) {
      //127.0.0.1 10005 SparkMaster   
      println("请输入参数 host port sparkMasterActor名字")
      sys.exit()
    }
    val host = args(0)
    val port = args(1)
    val name = args(2)

    //先创建ActorSystem
    val config = ConfigFactory.parseString(
      s"""
         |akka.actor.provider="akka.remote.RemoteActorRefProvider"
         |akka.remote.netty.tcp.hostname=${host}
         |akka.remote.netty.tcp.port=${port}
            """.stripMargin)
    val actorSystem = ActorSystem("SparkMasterApp",config)
    // 创建master引用
    val sparkMasterRef = actorSystem.actorOf(Props[SparkMaster],s"${name}")
    sparkMasterRef ! "start"
  }
}

SparkWorker



import java.util.UUID

import akka.actor.{Actor, ActorSelection, ActorSystem, Props}

import com.typesafe.config.ConfigFactory

import scala.concurrent.duration._


class SparkWorker(masterHost:String,masterPort:Int,masterName:String) extends Actor{
  // master的引用
  var masterPorxy : ActorSelection = _
 // 这三个属性一定要是全局属性
  val id = UUID.randomUUID().toString
  val cupSize = Runtime.getRuntime.availableProcessors()  //获取当前系统的cup个数
  private val memorySize: Long = Runtime.getRuntime.totalMemory() //获取当前系统的内存总量

  override def preStart(): Unit = {
    masterPorxy = context.actorSelection(s"akka.tcp://SparkMasterApp@${masterHost}:${masterPort}/user/${masterName}")
    println("masterPorxy: "+masterPorxy)
  }
  override def receive: Receive = {
    case "start" => {
      println("worker启动了。。。")
      // 向master发送注册信息
      masterPorxy ! RegisterWorkerInfo(id,cupSize,memorySize)
    }
    case RegisteredWorkerInfo =>{
      println("worker注册成功: "+id)
      //当注册成功后,就定义一个定时器,每隔一定时间,发送SendHeartBeat给自己
      import context.dispatcher
      //说明
      //1. 0 millis 不延时,立即执行定时器
      //2. 3000 millis 表示每隔3秒执行一次
      //3. self:表示发给自己
      //4. SendHeartBeat 发送的内容
      context.system.scheduler.schedule(0 millis, 3000 millis, self, SendHeartBeat)
    }
    case SendHeartBeat =>{
      println("worker = " + id + "给master发送心跳")
      masterPorxy ! HeartBeat(id)
    }
  }
}

object SparkWorkerApp{
  def main(args: Array[String]): Unit = {

    if (args.length != 6) {
     // 这里可以同时启动多个worker来验证当其中一个worker挂了,master是否能及时清除它
      //127.0.0.1 10001 SparkWorker01 127.0.0.1 10005 SparkMaster
      //127.0.0.1 10002 SparkWorker02 127.0.0.1 10005 SparkMaster
      //127.0.0.1 10003 SparkWorker03 127.0.0.1 10005 SparkMaster
      println("请输入参数 workerHost workerPort workerName masterHost masterPort masterName")
      sys.exit()
    }
    val workerHost = args(0)
    val workerPort = args(1)
    val workerName = args(2)
    val masterHost = args(3)
    val masterPort = args(4)
    val masterName = args(5)

//    val (masterHost,masterPort,workerHost,workerPort) =("127.0.0.1",10005,"127.0.0.1",10001)

    val config = ConfigFactory.parseString(
      s"""
         |akka.actor.provider="akka.remote.RemoteActorRefProvider"
         |akka.remote.netty.tcp.hostname=${workerHost}
         |akka.remote.netty.tcp.port=${workerPort}
            """.stripMargin)
    val sparkWorkerApp = ActorSystem("SparkWorkerApp",config)
     val sparkWorkerActorRef = sparkWorkerApp.actorOf(Props(new SparkWorker(masterHost,masterPort.toInt,masterName)),
       s"${workerName}")
    sparkWorkerActorRef ! "start"
  }
}

MessageProtocol



/** 定义master和worker之间通讯的各种协议
 */
// 1,master和worker 之间进行通讯的协议(workder向master注册,master响应注册成功)
// worker注册信息 //MessageProtocol.scala
case class RegisterWorkerInfo(id: String, cpu: Int, ram: Long)
// 这个是WorkerInfo, 这个信息将来是保存到master的 hm(该hashmap是用于管理worker)
// 将来这个WorkerInfo会扩展(比如增加worker上一次的心跳时间)
class WorkerInfo(val id: String, val cpu: Int, val ram: Long) {
  override def toString: String = "WorkerInfo [id=" + id + ", cpu=" + cpu + ", ram=" + ram +  "]"
  var lastHeartBeat : Long = System.currentTimeMillis()
}
// 当worker注册成功,服务器返回一个RegisteredWorkerInfo 对象
case object RegisteredWorkerInfo

//2,worker定时发送心跳的协议
//worker每隔一定时间由定时器发给自己的一个消息
case object SendHeartBeat
//worker每隔一定时间由定时器触发,而向master发现的协议消息
case class HeartBeat(id: String)

//3,master定时检测worker的协议
//master给自己发送一个触发检查超时worker的信息
case object StartTimeOutWorker
// master给自己发消息,检测worker,对于心跳超时的.
case object RemoveTimeOutWorker



版权声明:本文为liyang7916原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。