如何优雅的关闭 Spark Streaming 程序(2种思路)

  • Post author:
  • Post category:其他



精选30+云产品,助力企业轻松上云!>>>
hot3.png



点击蓝色“


大数据每日哔哔


”关注我



加个“


星标


”,



第一时间获取



大数据架构,实战经验




背景


Spark Streaming 程序,一般情况下是不会人为关闭的,但是有些时候需要停服修改逻辑(下线调整)。


假设我们使用Yarn 来管理提交的任务,直接使用

yarn application -kill taskId 的话,显然非常暴力。可能会造成数据的丢失或重复计算(假设从 Kafka 消费数据,还没有计算完,这时候被 kill 掉了,下次程序启动之后可能会出现重复计算或数据丢失的情况。)



设计的程序的时候,我们需要尽可能的避免使用这种



暴力



的方式。


那该怎么办呢?带着这个疑问,我们先来看一个参数



spark.streaming.stopGracefullyOnShutdown,


如果为true,Spark将在JVM关闭时优雅地关闭StreamingContext,而不是立即关闭。



参数来源


第一种:发送信号到 Driver


(1)首先需要在程序中设置


spark.streaming.stopGracefullyOnShutdown  为 true 。


(2)启动程序一段时间后,如果想要停止了,这时候需要去 Spark UI 界面上找到这个任务对应的 Driver 在哪个节点启动的。


05512edde59fac20c9c0f5fa0a13164f863.jpg


552d9bdcb47d082a1c535b9854e41543beb.jpg


【注】:一般我是这样启动的:


nohup

spark-submit \

–master yarn \

–executor-cores

1

\

–executor-memory 3g \

–num-executors

6

\

–driver-memory 2g \

–class com.xx.xxx.xxx \

–jars

$

{submitJars} \

–queue xxxx \

–conf

“spark.driver.host=xxx.xx.xx.xx”

\

–driver-class-path

$

{rootDir}/conf \

–conf

“spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC”

\

–properties-file

$

{rootDir}/conf/spark.properties \


$

{rootDir}/lib/xxxx.jar

>


$


{rootDir}/cache/xxxxxx.txt


2


>&1

&

所以我提交任务的机器就是 Driver 节点。


(3)找到 Driver 进程的 PID, 执行 ps -ef | grep java | grep xxx,注意 xxx 一般是你提交的 Spark 应用的名字或 jar 包的名字。


(4)执行:


kill -SIGTERM <AM-PID>


当 Driver 进程收到 SIGTERM 信号之后,一般会打印下面的日志:

17/02/02 01:31:35 ERROR yarn.ApplicationMaster: RECEIVED SIGNAL 15: SIGTERM17/02/02 01:31:35 INFO streaming.StreamingContext: Invoking stop(stopGracefully=true) from shutdown hook...17/02/02 01:31:45 INFO streaming.StreamingContext: StreamingContext stopped successfully17/02/02 01:31:45 INFO spark.SparkContext: Invoking stop() from shutdown hook...17/02/02 01:31:45 INFO spark.SparkContext: Successfully stopped SparkContext...17/02/02 01:31:45 INFO util.ShutdownHookManager: Shutdown hook called


不过,有一个陷阱。




默认情况下,spark.yarn.maxAppAttempts参数使用来自 yarn.resourcemanager.am.max taints in yarn的默认值。默认值为2。因此,在kill命令停止第一个 AM 后,YARN将自动启动另一个AM/驱动程序。你必须再杀第二个。您可以在spark提交过程中设置–conf spark.yarn.maxAppAttempts=1,但是这个参数如果设置成 1 ,就会出现 AM 是失败不重试的风险,容灾效果就变差了,谨慎选择这种方案。


所以上面这种方案,不推荐。



第二种:获取第三方系统做消息通知


思路很简单,就是定时的获取第三方系统(我这里用的是 Redis)的标识,如果需要停止,

调用StreamContext对象stop方法,自己优雅的终止自己。


核心代码:第 15 行代码以及第 17 行代码。

ssc.start()// 设置 redis 配置,第三方系统;val redisConf = Map("host" -> PropertiesUtils.get(s"$appName.redis.host"),                    "port" -> PropertiesUtils.get(s"$appName.redis.port"),                    "auth" -> PropertiesUtils.get(s"$appName.redis.auth"),                    "db" -> PropertiesUtils.get(s"$appName.redis.db")                   )// 下面是定时器:优雅的关掉流逝程序val batchDuration = PropertiesUtils.get(s"$appName.batchDuration").toLong * 1000Lval timer = new java.util.Timer()val task = new java.util.TimerTask {  override def run(): Unit = {    val redisDB = RedisDB.getInstance(redisConf)    // 定时判断,该应用是否需要关闭    if (redisDB.get(s"spark#streaming#$appName") == "shutdown") {      // 优雅的终止Spark-Streaming,保证数据不会丢失      ssc.stop(stopSparkContext = true, stopGracefully = true)      // 直接终止定时器,可能会导致数据丢失      Thread.sleep(batchDuration)      // 终止定时器      cancel()      timer.cancel()    }    redisDB.close()  }}// delay:用户调用 schedule() 方法后,要等待这么长的时间才可以第一次执行run() 方法// period:第一次调用之后,从第二次开始每隔多长的时间调用一次 run() 方法timer.schedule(task, batchDuration, batchDuration)ssc.awaitTermination()



触发方式很简单,只需要把 Redis 的 key 设置成 shutdown 即可。


愿景:这种与业务无关的技术,还是希望框架本身可以屏蔽掉,或者基础平台做好,很遗憾我们目前还没有人力去做。


参考文献

https://www.linkedin.com/pulse/how-shutdown-spark-streaming-job-gracefully-lan-jiang/


(完)

本文分享自微信公众号 – 大数据每日哔哔(bb-bigdata)。

如有侵权,请联系 support@oschina.cn 删除。

本文参与“

OSC源创计划

”,欢迎正在阅读的你也加入,一起分享。



版权声明:本文为qq_31975963原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。