Spark Streaming如何使用checkpoint容错

  • Post author:
  • Post category:其他





最近在做一个实时流计算的项目,采用的是Spark Steaming,主要是对接Spark方便,一个 Streaming Application 往往需要7*24不间断的跑,所以需要有抵御意外的能力(比如机器或者系统挂掉,JVM crash等)。为了让这成为可能,Spark Streaming需要 checkpoint 足够多信息至一个具有容错设计的存储系统才能让 Application 从失败中恢复。Spark Streaming 会 checkpoint 两种类型的数据。

1、Metadata(元数据) checkpointing – 保存定义了 Streaming 计算逻辑至类似 HDFS 的支持容错的存储系统。用来恢复 driver,元数据包括:


配置 – 用于创建该 streaming application 的所有配置


DStream 操作 – DStream 一些列的操作


未完成的 batches – 那些提交了 job 但尚未执行或未完成的 batches
2、Data checkpointing – 保存已生成的RDDs至可靠的存储。这在某些 stateful 转换中是需要的,在这种转换中,生成 RDD 需要依赖前面的 batches,会导致依赖链随着时间而变长。为了避免这种没有尽头的变长,要定期将中间生成的 RDDs 保存到可靠存储来切断依赖链。

3、总结下:







metadata

元数据的checkpoint是用来恢复当驱动程序失败的场景下,






而数据本身或者RDD的checkpoint通常是用来容错有状态的数据处理失败的场景
import org.apache.log4j.{Level, Logger}
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

/**
  * Created by csw on 2017/7/13.
  */
object CheckPointTest {
  Logger.getLogger("org").setLevel(Level.WARN)
  val conf = new SparkConf().setAppName("Spark shell")
  val sc = new SparkContext(conf)
  //设置时间间隔
  val batchDuration=2
  // 设置Metadata在HDFS上的checkpoint目录
  val dir = "hdfs://master:9000/csw/tmp/test3"
  // 通过函数来创建或者从已有的checkpoint里面构建StreamingContext
  def functionToCreatContext(): StreamingContext = {
    val ssc = new StreamingContext(sc, Seconds(batchDuration))
    ssc.checkpoint(dir)
    val fileStream: DStream[String] = ssc.textFileStream("hdfs://master:9000/csw/tmp/testStreaming")
    //设置通过间隔时间,定时持久checkpoint到hdfs上
    fileStream.checkpoint(Seconds(batchDuration*5))
    fileStream.foreachRDD(x => {
      val collect: Array[String] = x.collect()
      collect.foreach(x => println(x))
    })
    ssc
  }

  def main(args: Array[String]) {
    val context: StreamingContext = StreamingContext.getOrCreate(dir, functionToCreatContext _)
    context.start()
    context.awaitTermination()
  }
}



(1)处理的逻辑必须写在functionToCreateContext函数中,你要是直接写在main方法中,在首次启动后,kill关闭,再启动就会报错



17/07/13 10:57:10 INFO WriteAheadLogManager  for Thread: Reading from the logs:
hdfs://master:9000/csw/tmp/test3/receivedBlockMetadata/log-1499914584482-1499914644482
17/07/13 10:57:10 ERROR streaming.StreamingContext: Error starting the context, marking it as stopped
org.apache.spark.SparkException: org.apache.spark.streaming.dstream.MappedDStream@4735d6e5 has not been initialized
	at org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:323)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)


这个错误因为处理逻辑没放在函数中,全部放在main函数中,虽然能正常运行,也能记录checkpoint数据,但是再次启动先报上面

的错误



解决方案:将逻辑写在函数中,不要写main方法中


(2)打包编译重新上传服务器运行,会发现依旧报错,这次的错误和上面的不一样了
17/07/13 11:26:45 ERROR util.Utils: Exception encountered
java.lang.ClassNotFoundException: streaming.CheckPointTest$$anonfun$functionToCreatContext$1
....
17/07/13 11:26:45 WARN streaming.CheckpointReader: Error reading checkpoint from file hdfs://master:9000/csw/tmp/test3/checkpoint-1499916310000
java.io.IOException: java.lang.ClassNotFoundException: streaming.CheckPointTest$$anonfun$functionToCreatContext$1
......

问题就出在checkpoint上,因为checkpoint的元数据会记录jar的序列化的二进制文件,因为你改动过代码,然后重新编译,新的序列化jar文件,在checkpoint的记录中并不存在,所以就导致了上述错误,如何解决:




也非常简单,删除checkpoint开头的的文件即可,不影响数据本身的checkpoint










hadoop fs -rm /csw/tmp/test3/checkpoint* 


然后再次启动,发现一切ok,能从checkpoint恢复数据,然后kill掉又一次启动




就能正常工作了。








但是要注意的是,虽然数据可靠性得到保障了,但是要谨慎的设置刷新间隔,这可能会影响吞吐量,因为每隔固定时间都要向HDFS上写入checkpoint数据,spark streaming官方推荐checkpoint定时持久的刷新间隔一般为批处理间隔的5到10倍是比较好的一个方式。













版权声明:本文为ZMC921原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。