Spark广播变量与累加器

  • Post author:
  • Post category:其他




累加器

解决了在Driver端创建的变量在Task中修改但最终不会修改Driver端的变量(Task修改的只是副本,不会同步回Execute)


解决了共享变量写的问题

在这里插入图片描述

当需要一个累加变量时,再Driver 定义作为计数的变量,会复制到Executor中RDD执行时候通过代码对其进行累加,但是结果不会被收集回Driver中,使用累加器可以把Executor的变量值收集回Driver并进行累加

注意:累加器再Driver端定义初始化。1.6版本在Excutor不能使用.value 获取累加器的值



Demo

import org.apache.spark.sql.SparkSession

object AccumulatorTest {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().appName("AccTest").master("local").getOrCreate()
    val sc = spark.sparkContext
    val accumulator = sc.longAccumulator    //创建累加器   还有double、collection 累加器
    val rdd1 = sc.textFile("./data/words")
    val rdd2 = rdd1.map(one => {
      accumulator.add(1)      //使用累加器
      one
    })
    rdd2.collect();
    println(s"accumulator=${accumulator.value}") //获取累加器的值
  }
}



自定义int类型累加器

object MyAccumulator {

  def main(args: Array[String]): Unit = {

    //1. 创建一个SparkContext
    val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("WorkCount")
    val sc = new SparkContext(conf)


    val list1: List[Int] = List(30, 20, 50, 70, 60, 10)
    val rdd1 = sc.parallelize(list1, 2)

    // 注册自定义累加器
    val acc = new MyIntAcc
    sc.register(acc, "MyIntAcc")

    val rdd2 = rdd1.map(x => {
      acc.add(1)
      x
    })

    rdd2.collect()
    print(acc.value)


    //5. 关闭SparkContext
    sc.stop()

  }

}

// 泛型1:对什么值累加 泛型2:累加器最最终的值
class MyIntAcc extends AccumulatorV2[Int, Int] {
  private var sum: Int = 0

  // 判断是否为0
  override def isZero: Boolean = sum == 0

  // 复制操作时会调用  例如Driver到Task  Task到Task
  override def copy(): AccumulatorV2[Int, Int] = {
    // 把当前累加器复制为一个新的累加器
    val acc = new MyIntAcc
    acc.sum = this.sum
    acc
  }

  // 重置累加器
  override def reset(): Unit = sum = 0

  // 累加方法(分区内累加)
  override def add(v: Int): Unit = sum += v

  // 分区间的累加(合并)  other中的sum 合并到this的sum中
  override def merge(other: AccumulatorV2[Int, Int]): Unit = {
    other match {
      case acc: MyIntAcc => this.sum += acc.sum
      // 此处可抛异常 参考LongAccumulator
      case _ => throw new UnsupportedOperationException(
        s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
    }
  }

  // 返回累加最后的值
  override def value: Int = sum
}



自定义复杂累加器

同时计算sum count avg

object MyAccumulator {

  def main(args: Array[String]): Unit = {

    //1. 创建一个SparkContext
    val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("WorkCount")
    val sc = new SparkContext(conf)


    val list1: List[Int] = List(30, 20, 50, 70, 60, 10)
    val rdd1 = sc.parallelize(list1, 2)

    // 注册自定义累加器
    val acc = new MapAcc
    sc.register(acc, "MyIntAcc")

    rdd1.foreach( acc.add(_))
    println(acc.value)


    //5. 关闭SparkContext
    sc.stop()

  }
}

class MapAcc extends AccumulatorV2[Double, Map[String, Any]] {

  private var map = Map[String, Any]()

  override def isZero: Boolean = map.isEmpty

  override def copy(): AccumulatorV2[Double, Map[String, Any]] = {
    val myacc = new MapAcc
    myacc.map = this.map
    myacc
  }

  override def reset(): Unit = map = Map[String, Any]()

  override def add(v: Double): Unit = {
    // 写入 sum 和count
    map += "sum" -> (map.getOrElse("sum", 0D).asInstanceOf[Double] + v)
    map += "count" -> (map.getOrElse("count", 0L).asInstanceOf[Long] + 1L)
  }

  override def merge(other: AccumulatorV2[Double, Map[String, Any]]): Unit = {
    // 合并两个map
    other match {
      case o: MapAcc =>
        map += "sum" -> (map.getOrElse("sum", 0D).asInstanceOf[Double] + o.map.getOrElse("sum", 0D).asInstanceOf[Double])
        map += "count" -> (map.getOrElse("count", 0L).asInstanceOf[Long] + o.map.getOrElse("count", 0L).asInstanceOf[Long])
      // 此处可抛异常 参考LongAccumulator
      case _ =>
        throw new UnsupportedOperationException(
          s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
    }
  }

  override def value: Map[String, Any] = {
    map += ("vag" -> (map.getOrElse("sum", 0D).asInstanceOf[Double] / map.getOrElse("count", 1D).asInstanceOf[Long]))
    map
  }

}



应用场景

  1. 对共享变量进行修改
  2. 一次遍历对多个指标进行运算


累加器一般在action算子中使用,若在transformations 算子使用若失败重试会造成重复累加的问题



广播变量

在这里插入图片描述

解决了 Driver端的变量在发送Task时候 会将变量发送到Executor中,每一个Task 都会携带一份变量副本,导致Executor内存暴增。

定义广播变量后 会传到每一个需要该变量的Executor的BlockManager 中,Task会先到BlockManager 查找变量,在BlockManager 中的广播变量是只读的

广播变量注意:

  1. 广播变量智能将RDD广播出去,可以将RDD的结果创博,rdd.clooetc()
  2. 广播变量只能在Driver端定义,在Executor中使用,不能再Executor中改变广播变量的值

Demo

import org.apache.spark.{SparkConf, SparkContext}

object BroadCastTest {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local").setAppName("testBroadCast")
    val sc = new SparkContext(conf)
    val list = List[String]("zhangsan","lisi")
    val nameList = sc.parallelize(List[String]("zhangsan","lisi","wangwu"))
    val bcList = sc.broadcast(list)  //放入广播变量
    val result = nameList.filter(name => {
      val innerList = bcList.value  //取出广播变量
      !innerList.contains(name)
    })

   result.foreach(println)
  }
}



版权声明:本文为dgqg1223原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。