Spark join()和cogroup()区别

  • Post author:
  • Post category:其他


官网对join和cogroup解释

在这里插入图片描述

示例代码:

 /**
    * join(otherDataSet,[numTasks])
    * 加入一个RDD,在一个(k,v)和(k,w)类型的dataSet上调用,返回一个(k,(v,w))的pair dataSet。
    */
  def join(): Unit ={
    val list1RDD = sc.parallelize(List((1, "华山派"), (2, "武当派"), (3, "明教"), (3, "崆峒派")))
    val list2RDD = sc.parallelize(List((1, 66), (2, 77), (3, 88)))
    list1RDD.join(list2RDD)
      .foreach(println)
  }

结果:

(1,(华山派,66))
(3,(明教,88))
(3,(崆峒派,88))
(2,(武当派,77))
 /**
    * cogroup(otherDataSet,[numTasks])
    * 对两个RDD中的KV元素,每个RDD中相同key中的元素分别聚合成一个集合。与reduceByKey不同的是针对
    * 两个RDD中相同的key的元素进行合并。
    *
    * 合并两个RDD,生成一个新的RDD。实例中包含两个Iterable值,第一个表示RDD1中相同值,第二个表示RDD2
    * 中相同值(key值),这个操作需要通过partitioner进行重新分区,因此需要执行一次shuffle操作。(
    * 若两个RDD在此之前进行过shuffle,则不需要)
    */
def cogroup(): Unit ={
    val list1RDD = sc.parallelize(List((1, "cat"), (2, "dog")))
    val list2RDD = sc.parallelize(List((1, "tiger"), (1, "elephant"), (3, "panda"), (3, "chicken")))
    val list3RDD = sc.parallelize(List((1, "duck"), (1, "lion"), (3, "bird"), (3, "fish"), (4, "flowers")))

    list1RDD.cogroup(list2RDD,list3RDD)
      .foreach(println)
  }
(4,(CompactBuffer(),CompactBuffer(),CompactBuffer(flowers)))
(1,(CompactBuffer(cat),CompactBuffer(tiger, elephant),CompactBuffer(duck, lion)))
(3,(CompactBuffer(),CompactBuffer(panda, chicken),CompactBuffer(bird, fish)))
(2,(CompactBuffer(dog),CompactBuffer(),CompactBuffer()))

从以上实例可以看出来:join就是把两个集合根据key,进行内容聚合,而cogroup在聚合时会先对RDD中相同的key进行合并。



版权声明:本文为qq_41982570原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。