1. 每个core配置2~3个任务(每个任务对应一个partition),比如 46 core 的机器,配置 hibench.default.shuffle.parallelism 92
hibench.default.map.parallelism 92 //这个配置值是否生效取决于文件个数及大小和创建RDD时的参数
2. 每个executor配置4~6个 core,executor数= 总core/每个executor core
hibench.yarn.executor.num 12
hibench.yarn.executor.cores 4
3. 每个executor的内存要看处理的数据量,比如 128MB的数据 x (2~5),因Java里的对象头,字节对齐,hashcode都额外占用空间。serialize的比例可以参考UI的
Shuffle Spill (Memory) / Shuffle Spill (Disk)
spark.executor.memory 5760m
4. 由于spark.memory.fraction 默认值是0.6,如果GC时间比较长的话,可以减少为0.1~0.3来减少 GC的时间
spark.memory.fraction
5. 可以适当增加driver的资源配置
spark.driver.cores 2
spark.driver.memory 6g
6. 默认是JavaSerilizer,使用kryo serializer来提高压缩数据的性能
spark.io.compression.lz4.blockSize 64k
spark.io.compression.snappy.blockSize 64k
spark.serializer org.apache.spark.serializer.KryoSerializer
spark.kryoserializer.buffer.max 128m
spark.kryo.unsafe true
7. 通过改变bypassMergeThreshold值来进入unsafe_shuffle_handle或者sort_shuffle_handle来提高性能
spark.shuffle.sort.bypassMergeThreshold 1
8. 如果临时对象比较多,而老生代对象比较少,可以适当增加新生代空间比例来扩容
spark.executor.extraJavaOptions -XX:NewRatio=1
9. shuffle相关的配置可以尝试
spark.shuffle.io.serverThreads 8
spark.shuffle.io.clientThreads 8
#spark.shuffle.io.sendBuffer 2097152
#spark.shuffle.io.receiveBuffer 8388608
spark.reducer.maxSizeInFlight 96m
spark.shuffle.file.buffer 128k
spark.shuffle.service.index.cache.size 128m
spark.shuffle.spill.diskWriteBufferSize 16m
spark.shuffle.unsafe.file.output.buffer 64k
10. rpc 相关的配置可以尝试
Spark.rpc.message.maxSize 128
11. 堆外内存只有在unsafe shuffle handle时有效,且只有 unsafe shuffle writer使用,unsafe shuffle reader代码(spill)初始化时强制使用了onheap,为什么?我也想知道
spark.memory.offHeap.enabled true
spark.memory.offHeap.size 20g
12. 调度推测相关的配置可以尝试
#speculation
spark.speculation false
spark.speculation.interval 100ms
spark.speculation.multiplier 1.5
spark.speculation.quantile 0.75
13. 压缩相关的配置可以尝试
spark.io.compression.lz4.blockSize 128k
spark.rdd.compress true
14. 其它相关的配置可以尝试
spark.buffer.pageSize 4m
spark.executor.memoryOverhead 384
spark.driver.memoryOverhead 384
大开日志
spark.eventLog.enabled true
spark.eventLog.dir hdfs://localhost:9000/sparklogs
下面是结合IDEA remote断点调试用的
#spark.executor.extraJavaOptions -agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005
#spark.driver.extraJavaOptions -agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=28080