累加器
解决了在Driver端创建的变量在Task中修改但最终不会修改Driver端的变量(Task修改的只是副本,不会同步回Execute)
解决了共享变量写的问题
当需要一个累加变量时,再Driver 定义作为计数的变量,会复制到Executor中RDD执行时候通过代码对其进行累加,但是结果不会被收集回Driver中,使用累加器可以把Executor的变量值收集回Driver并进行累加
注意:累加器再Driver端定义初始化。1.6版本在Excutor不能使用.value 获取累加器的值
Demo
import org.apache.spark.sql.SparkSession
object AccumulatorTest {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName("AccTest").master("local").getOrCreate()
val sc = spark.sparkContext
val accumulator = sc.longAccumulator //创建累加器 还有double、collection 累加器
val rdd1 = sc.textFile("./data/words")
val rdd2 = rdd1.map(one => {
accumulator.add(1) //使用累加器
one
})
rdd2.collect();
println(s"accumulator=${accumulator.value}") //获取累加器的值
}
}
自定义int类型累加器
object MyAccumulator {
def main(args: Array[String]): Unit = {
//1. 创建一个SparkContext
val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("WorkCount")
val sc = new SparkContext(conf)
val list1: List[Int] = List(30, 20, 50, 70, 60, 10)
val rdd1 = sc.parallelize(list1, 2)
// 注册自定义累加器
val acc = new MyIntAcc
sc.register(acc, "MyIntAcc")
val rdd2 = rdd1.map(x => {
acc.add(1)
x
})
rdd2.collect()
print(acc.value)
//5. 关闭SparkContext
sc.stop()
}
}
// 泛型1:对什么值累加 泛型2:累加器最最终的值
class MyIntAcc extends AccumulatorV2[Int, Int] {
private var sum: Int = 0
// 判断是否为0
override def isZero: Boolean = sum == 0
// 复制操作时会调用 例如Driver到Task Task到Task
override def copy(): AccumulatorV2[Int, Int] = {
// 把当前累加器复制为一个新的累加器
val acc = new MyIntAcc
acc.sum = this.sum
acc
}
// 重置累加器
override def reset(): Unit = sum = 0
// 累加方法(分区内累加)
override def add(v: Int): Unit = sum += v
// 分区间的累加(合并) other中的sum 合并到this的sum中
override def merge(other: AccumulatorV2[Int, Int]): Unit = {
other match {
case acc: MyIntAcc => this.sum += acc.sum
// 此处可抛异常 参考LongAccumulator
case _ => throw new UnsupportedOperationException(
s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
}
}
// 返回累加最后的值
override def value: Int = sum
}
自定义复杂累加器
同时计算sum count avg
object MyAccumulator {
def main(args: Array[String]): Unit = {
//1. 创建一个SparkContext
val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("WorkCount")
val sc = new SparkContext(conf)
val list1: List[Int] = List(30, 20, 50, 70, 60, 10)
val rdd1 = sc.parallelize(list1, 2)
// 注册自定义累加器
val acc = new MapAcc
sc.register(acc, "MyIntAcc")
rdd1.foreach( acc.add(_))
println(acc.value)
//5. 关闭SparkContext
sc.stop()
}
}
class MapAcc extends AccumulatorV2[Double, Map[String, Any]] {
private var map = Map[String, Any]()
override def isZero: Boolean = map.isEmpty
override def copy(): AccumulatorV2[Double, Map[String, Any]] = {
val myacc = new MapAcc
myacc.map = this.map
myacc
}
override def reset(): Unit = map = Map[String, Any]()
override def add(v: Double): Unit = {
// 写入 sum 和count
map += "sum" -> (map.getOrElse("sum", 0D).asInstanceOf[Double] + v)
map += "count" -> (map.getOrElse("count", 0L).asInstanceOf[Long] + 1L)
}
override def merge(other: AccumulatorV2[Double, Map[String, Any]]): Unit = {
// 合并两个map
other match {
case o: MapAcc =>
map += "sum" -> (map.getOrElse("sum", 0D).asInstanceOf[Double] + o.map.getOrElse("sum", 0D).asInstanceOf[Double])
map += "count" -> (map.getOrElse("count", 0L).asInstanceOf[Long] + o.map.getOrElse("count", 0L).asInstanceOf[Long])
// 此处可抛异常 参考LongAccumulator
case _ =>
throw new UnsupportedOperationException(
s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
}
}
override def value: Map[String, Any] = {
map += ("vag" -> (map.getOrElse("sum", 0D).asInstanceOf[Double] / map.getOrElse("count", 1D).asInstanceOf[Long]))
map
}
}
应用场景
- 对共享变量进行修改
- 一次遍历对多个指标进行运算
累加器一般在action算子中使用,若在transformations 算子使用若失败重试会造成重复累加的问题
广播变量
解决了 Driver端的变量在发送Task时候 会将变量发送到Executor中,每一个Task 都会携带一份变量副本,导致Executor内存暴增。
定义广播变量后 会传到每一个需要该变量的Executor的BlockManager 中,Task会先到BlockManager 查找变量,在BlockManager 中的广播变量是只读的
广播变量注意:
- 广播变量智能将RDD广播出去,可以将RDD的结果创博,rdd.clooetc()
- 广播变量只能在Driver端定义,在Executor中使用,不能再Executor中改变广播变量的值
Demo
import org.apache.spark.{SparkConf, SparkContext}
object BroadCastTest {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("testBroadCast")
val sc = new SparkContext(conf)
val list = List[String]("zhangsan","lisi")
val nameList = sc.parallelize(List[String]("zhangsan","lisi","wangwu"))
val bcList = sc.broadcast(list) //放入广播变量
val result = nameList.filter(name => {
val innerList = bcList.value //取出广播变量
!innerList.contains(name)
})
result.foreach(println)
}
}
版权声明:本文为dgqg1223原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。