一次关于Spark广播变量broadcast variable的优化

  • Post author:
  • Post category:其他



如何通过配置广播变量broadcast variable去优化Spark application

今天,想和大家分享一个我在公司工作中遇到的有趣的Spark-sql问题:由广播变量引起的大量ETL jobs异常。上周,突然好几个同事反应有大批量的etl jobs由于广播连接timeout和Spark executor JVM OOM的异常从而导致spark application执行失败,导致一部分下游数据报表的delay。

# WARN MemoryStore: Not enough space to cache broadcast_6 in memory! (computed 4.9 GiB so far)
23/02/23 09:20:55 WARN BlockManager: Persisting block broadcast_6 to disk instead.
# java.lang.OutOfMemoryError: Java heap space

在本文中,我将深入探讨这些问题的原因,并提供一些优化建议,可以更好地利用集群资源。


直接原因:EMR集群规模缩小

由于公司内部架构调整,我们的ETL job需要从AWS的spark-sql on EMR迁移到Spark-sql on EKS。随着迁移工作的进行,EMR集群的规模从稳定的40个节点减少到了大约22个节点。此外,我们团队最近还发布了EMR Spot实例使用流程,以进一步提高instance的使用率。原集群上25%的on-demand节点切换到了Spot集群上。因此,原先的on-demand集群的实际规模已经减少到了大约16个节点。这种总体规模的变化对于广播变量的处理产生了负面影响,见下文解析。


Note


关于如何在EMR上利用spot instance来分类ETL jobs,提高云平台的资源利用率,节省云平台的cost,有感兴趣的朋友可以点赞关注评论,点赞多的话我就肝一篇关于分享如何在AWS EMR上去使用spot instance的经验贴,分享一下我踩过的坑。


根本原因:spark.sql.autoBroadcastJoinThreshold参数



[1]


广播变量 – Broadcast variable是Spark中一种优化性能的机制,它可以将小的数据集传输到所有的节点上,以便在执行操作时进行本地计算,从而减少数据的传输和处理时间。

spark.sql.autoBroadcastJoinThreshold参数指定了Spark SQL在执行join操作时自动将小表作为广播变量进行处理的阈值。当一个表的大小小于或等于这个阈值时,Spark会自动将其作为广播变量进行处理。这个参数的默认值为10MB,也就是说,当一个表的大小小于或等于10MB时,Spark会自动将其作为广播变量进行处理。但是,这个参数与逐渐缩小的集群规模不匹配,因此会导致广播变量处理的负面影响。需要注意的是,将表作为广播变量进行处理需要消耗一定的内存空间,内存提供来源是Executor进程的Off-Heap内存,而Off-heap值是在启动Executor时就确定的,所以如果广播变量配置过大,就会导致内存不足等问题。


什么是Executor Off-heap?


[2]


在Spark中,Executor Off-heap内存是指Spark Executor进程中用于存储非堆内存数据的区域。这个内存区域是由Spark自己管理的,不依赖于JVM的堆内存管理机制。与堆内存不同,Off-heap内存是由直接内存(Direct Memory)分配的,而且它不受垃圾回收的控制。这种内存的使用可以避免JVM内存回收带来的额外开销,并且可以更加灵活地管理内存。

Off-heap内存主要用于存储数据结构、缓存和序列化对象等,例如本文所述的广播变量,它可以有效地提高Spark应用程序的性能和稳定性。但是,需要注意的是,由于Off-heap内存不受JVM管理,Off-heap内存的使用也需要谨慎地管理和控制,否则可能会引发内存泄漏等问题。


间接原因:Spark Executor的核心和内存配置

当Spark Executor配置了较大的核心和内存,并设置了spark.executor.memoryOverhead参数时,它会迅速填满集群的资源,而分配的Off-Heap内存也会过大,导致资源浪费,可用于广播变量的缓存也会更少,从而导致了上述问题,我们的同事抓取了每天平台上跑的所有ETL jobs的spark configuration的snapshot,结果还是比较闹心的,很多处理数据量只有几个GB的job配置却申请了几十个Core对应几百G的Memory。因此,在配置Spark Executor时,需要特别注意Core和 M emory的分配,云平台是真贵呀!


优化建议

如果不幸或者有幸您遇到了我司类似的问题,推荐采取以下措施:

写个脚本批量检查所有Spark conf文件中spark.sql.autoBroadcastJoinThreshold参数的值,并进行适当的调整。可以联系owner适当减小该值或将其设为”-1”以禁用此功能。

请检查Spark Executor的核心数和内存配置。正如上文所述,“吃多少申请多少”,这也需要培养学习Spark优化的观念,这也是我们公司目前正在努力做的,任重而道远。

根据需要适当调整其他可能影响广播变量的参数,可以适当延长spark.sql.broadcastTimeout的值,减小每一次广播的block的大小(即修改spark.broadcast.blockSize),以及将spark.broadcast.compress设为true。

总之,这个问题虽然看起来非常棘手,但只要对应了解了根本原因和直接原因,就可以采取相应的措施来解决它。同时,也可以从其他方面入手,对广播变量的处理进行优化,以便更好地利用集群资源,提高数据处理和分析的效率。


欢迎点赞关注评论


参考



^


1

https://spark.apache.org/docs/latest/configuration.html



^


2

https://spark.apache.org/docs/latest/tuning.html#memory-management-overview-off-heap-memory



版权声明:本文为Mister___Wang原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。