模拟spark源码的心跳机制
- worker注册到Master, Master完成注册,并回复worker注册成功
- worker定时发送心跳
- Master接收到worker心跳后,要更新该worker的最近一次发送心跳的时间
- 给Master启动定时任务,定时检测注册的worker有哪些没有更新心跳,并将其从hashmap中删除
这个项目总体分三部分(三个模块)
- worker向master完成注册
- worker定时向master发送心跳信息
- master启动定时任务,定时监测worker是否已经挂了,如果挂了就从注册信息中删除
代码注释写的非常详细,这里就不再说明,直接看注释即可;
SparkMaster
import akka.actor.{Actor, ActorSystem, Props}
import com.typesafe.config.ConfigFactory
import scala.concurrent.duration._
import scala.collection.mutable
// 定义sparkMaster节点
class SparkMaster extends Actor{
//定义个全局的map,管理workers
private val workers: mutable.Map[String, WorkerInfo] = mutable.Map[String,WorkerInfo]()
override def receive: Receive = {
case "start" => {
println("master服务器启动了。。。")
// 启动定时信息检测worker是否心跳超时
self ! StartTimeOutWorker
}
case RegisterWorkerInfo(id,cpu,ram) =>{
if(!workers.contains(id)){
val workerInfo : WorkerInfo = new WorkerInfo(id,cpu,ram)
workers += ((id,workerInfo))
// 如果某一个worker注册成功,那么就给它回复一条信息
sender() ! RegisteredWorkerInfo
}
}
case HeartBeat(id) =>{
// 首先将发送心跳的那个worker找出来,然后更新它的心跳时间
val info : WorkerInfo = workers(id)
info.lastHeartBeat=System.currentTimeMillis()
println("master更新了 " + id + " 心跳时间...")
}
case StartTimeOutWorker => {
import context.dispatcher //导入隐式转换
println("开始了定时检测worker心跳的任务")
//说明
//1. 0 millis 不延时,立即执行定时器
//2. 9000 millis 表示每隔9秒执行一次
//3. self:表示发给自己
//4. RemoveTimeOutWorker 发送的内容
context.system.scheduler.schedule(0 millis, 9000 millis, self, RemoveTimeOutWorker)
}
case RemoveTimeOutWorker =>{
//取出map中的所有worker
val workerInfos = workers.values
// 获取当前系统时间
val nowTime = System.currentTimeMillis()
// 对那些 now - lastHeartBeat > 6000 的worker从map中删除
workerInfos.filter(workerInfo => nowTime-workerInfo.lastHeartBeat>6000)
.foreach(workerInfo=>workers.remove(workerInfo.id))
println("当前有 " + workers.size + " 个worker存活的")
}
}
}
object SparkMasterApp{
def main(args: Array[String]): Unit = {
//这里我们分析出有3个host,port,sparkMasterActor
if (args.length != 3) {
//127.0.0.1 10005 SparkMaster
println("请输入参数 host port sparkMasterActor名字")
sys.exit()
}
val host = args(0)
val port = args(1)
val name = args(2)
//先创建ActorSystem
val config = ConfigFactory.parseString(
s"""
|akka.actor.provider="akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.hostname=${host}
|akka.remote.netty.tcp.port=${port}
""".stripMargin)
val actorSystem = ActorSystem("SparkMasterApp",config)
// 创建master引用
val sparkMasterRef = actorSystem.actorOf(Props[SparkMaster],s"${name}")
sparkMasterRef ! "start"
}
}
SparkWorker
import java.util.UUID
import akka.actor.{Actor, ActorSelection, ActorSystem, Props}
import com.typesafe.config.ConfigFactory
import scala.concurrent.duration._
class SparkWorker(masterHost:String,masterPort:Int,masterName:String) extends Actor{
// master的引用
var masterPorxy : ActorSelection = _
// 这三个属性一定要是全局属性
val id = UUID.randomUUID().toString
val cupSize = Runtime.getRuntime.availableProcessors() //获取当前系统的cup个数
private val memorySize: Long = Runtime.getRuntime.totalMemory() //获取当前系统的内存总量
override def preStart(): Unit = {
masterPorxy = context.actorSelection(s"akka.tcp://SparkMasterApp@${masterHost}:${masterPort}/user/${masterName}")
println("masterPorxy: "+masterPorxy)
}
override def receive: Receive = {
case "start" => {
println("worker启动了。。。")
// 向master发送注册信息
masterPorxy ! RegisterWorkerInfo(id,cupSize,memorySize)
}
case RegisteredWorkerInfo =>{
println("worker注册成功: "+id)
//当注册成功后,就定义一个定时器,每隔一定时间,发送SendHeartBeat给自己
import context.dispatcher
//说明
//1. 0 millis 不延时,立即执行定时器
//2. 3000 millis 表示每隔3秒执行一次
//3. self:表示发给自己
//4. SendHeartBeat 发送的内容
context.system.scheduler.schedule(0 millis, 3000 millis, self, SendHeartBeat)
}
case SendHeartBeat =>{
println("worker = " + id + "给master发送心跳")
masterPorxy ! HeartBeat(id)
}
}
}
object SparkWorkerApp{
def main(args: Array[String]): Unit = {
if (args.length != 6) {
// 这里可以同时启动多个worker来验证当其中一个worker挂了,master是否能及时清除它
//127.0.0.1 10001 SparkWorker01 127.0.0.1 10005 SparkMaster
//127.0.0.1 10002 SparkWorker02 127.0.0.1 10005 SparkMaster
//127.0.0.1 10003 SparkWorker03 127.0.0.1 10005 SparkMaster
println("请输入参数 workerHost workerPort workerName masterHost masterPort masterName")
sys.exit()
}
val workerHost = args(0)
val workerPort = args(1)
val workerName = args(2)
val masterHost = args(3)
val masterPort = args(4)
val masterName = args(5)
// val (masterHost,masterPort,workerHost,workerPort) =("127.0.0.1",10005,"127.0.0.1",10001)
val config = ConfigFactory.parseString(
s"""
|akka.actor.provider="akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.hostname=${workerHost}
|akka.remote.netty.tcp.port=${workerPort}
""".stripMargin)
val sparkWorkerApp = ActorSystem("SparkWorkerApp",config)
val sparkWorkerActorRef = sparkWorkerApp.actorOf(Props(new SparkWorker(masterHost,masterPort.toInt,masterName)),
s"${workerName}")
sparkWorkerActorRef ! "start"
}
}
MessageProtocol
/** 定义master和worker之间通讯的各种协议
*/
// 1,master和worker 之间进行通讯的协议(workder向master注册,master响应注册成功)
// worker注册信息 //MessageProtocol.scala
case class RegisterWorkerInfo(id: String, cpu: Int, ram: Long)
// 这个是WorkerInfo, 这个信息将来是保存到master的 hm(该hashmap是用于管理worker)
// 将来这个WorkerInfo会扩展(比如增加worker上一次的心跳时间)
class WorkerInfo(val id: String, val cpu: Int, val ram: Long) {
override def toString: String = "WorkerInfo [id=" + id + ", cpu=" + cpu + ", ram=" + ram + "]"
var lastHeartBeat : Long = System.currentTimeMillis()
}
// 当worker注册成功,服务器返回一个RegisteredWorkerInfo 对象
case object RegisteredWorkerInfo
//2,worker定时发送心跳的协议
//worker每隔一定时间由定时器发给自己的一个消息
case object SendHeartBeat
//worker每隔一定时间由定时器触发,而向master发现的协议消息
case class HeartBeat(id: String)
//3,master定时检测worker的协议
//master给自己发送一个触发检查超时worker的信息
case object StartTimeOutWorker
// master给自己发消息,检测worker,对于心跳超时的.
case object RemoveTimeOutWorker
版权声明:本文为liyang7916原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。