最近在做一个实时流计算的项目,采用的是Spark Steaming,主要是对接Spark方便,一个 Streaming Application 往往需要7*24不间断的跑,所以需要有抵御意外的能力(比如机器或者系统挂掉,JVM crash等)。为了让这成为可能,Spark Streaming需要 checkpoint 足够多信息至一个具有容错设计的存储系统才能让 Application 从失败中恢复。Spark Streaming 会 checkpoint 两种类型的数据。
1、Metadata(元数据) checkpointing – 保存定义了 Streaming 计算逻辑至类似 HDFS 的支持容错的存储系统。用来恢复 driver,元数据包括:
配置 – 用于创建该 streaming application 的所有配置
DStream 操作 – DStream 一些列的操作
未完成的 batches – 那些提交了 job 但尚未执行或未完成的 batches
2、Data checkpointing – 保存已生成的RDDs至可靠的存储。这在某些 stateful 转换中是需要的,在这种转换中,生成 RDD 需要依赖前面的 batches,会导致依赖链随着时间而变长。为了避免这种没有尽头的变长,要定期将中间生成的 RDDs 保存到可靠存储来切断依赖链。
3、总结下:
metadata
元数据的checkpoint是用来恢复当驱动程序失败的场景下,
而数据本身或者RDD的checkpoint通常是用来容错有状态的数据处理失败的场景
import org.apache.log4j.{Level, Logger}
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
/**
* Created by csw on 2017/7/13.
*/
object CheckPointTest {
Logger.getLogger("org").setLevel(Level.WARN)
val conf = new SparkConf().setAppName("Spark shell")
val sc = new SparkContext(conf)
//设置时间间隔
val batchDuration=2
// 设置Metadata在HDFS上的checkpoint目录
val dir = "hdfs://master:9000/csw/tmp/test3"
// 通过函数来创建或者从已有的checkpoint里面构建StreamingContext
def functionToCreatContext(): StreamingContext = {
val ssc = new StreamingContext(sc, Seconds(batchDuration))
ssc.checkpoint(dir)
val fileStream: DStream[String] = ssc.textFileStream("hdfs://master:9000/csw/tmp/testStreaming")
//设置通过间隔时间,定时持久checkpoint到hdfs上
fileStream.checkpoint(Seconds(batchDuration*5))
fileStream.foreachRDD(x => {
val collect: Array[String] = x.collect()
collect.foreach(x => println(x))
})
ssc
}
def main(args: Array[String]) {
val context: StreamingContext = StreamingContext.getOrCreate(dir, functionToCreatContext _)
context.start()
context.awaitTermination()
}
}
(1)处理的逻辑必须写在functionToCreateContext函数中,你要是直接写在main方法中,在首次启动后,kill关闭,再启动就会报错
17/07/13 10:57:10 INFO WriteAheadLogManager for Thread: Reading from the logs:
hdfs://master:9000/csw/tmp/test3/receivedBlockMetadata/log-1499914584482-1499914644482
17/07/13 10:57:10 ERROR streaming.StreamingContext: Error starting the context, marking it as stopped
org.apache.spark.SparkException: org.apache.spark.streaming.dstream.MappedDStream@4735d6e5 has not been initialized
at org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:323)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
这个错误因为处理逻辑没放在函数中,全部放在main函数中,虽然能正常运行,也能记录checkpoint数据,但是再次启动先报上面
的错误
解决方案:将逻辑写在函数中,不要写main方法中
(2)打包编译重新上传服务器运行,会发现依旧报错,这次的错误和上面的不一样了
17/07/13 11:26:45 ERROR util.Utils: Exception encountered
java.lang.ClassNotFoundException: streaming.CheckPointTest$$anonfun$functionToCreatContext$1
....
17/07/13 11:26:45 WARN streaming.CheckpointReader: Error reading checkpoint from file hdfs://master:9000/csw/tmp/test3/checkpoint-1499916310000
java.io.IOException: java.lang.ClassNotFoundException: streaming.CheckPointTest$$anonfun$functionToCreatContext$1
......
问题就出在checkpoint上,因为checkpoint的元数据会记录jar的序列化的二进制文件,因为你改动过代码,然后重新编译,新的序列化jar文件,在checkpoint的记录中并不存在,所以就导致了上述错误,如何解决:
也非常简单,删除checkpoint开头的的文件即可,不影响数据本身的checkpoint
hadoop fs -rm /csw/tmp/test3/checkpoint*
然后再次启动,发现一切ok,能从checkpoint恢复数据,然后kill掉又一次启动
就能正常工作了。
但是要注意的是,虽然数据可靠性得到保障了,但是要谨慎的设置刷新间隔,这可能会影响吞吐量,因为每隔固定时间都要向HDFS上写入checkpoint数据,spark streaming官方推荐checkpoint定时持久的刷新间隔一般为批处理间隔的5到10倍是比较好的一个方式。
版权声明:本文为ZMC921原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。