Spark的join聚合操作

  • Post author:
  • Post category:其他



使用场景:

join必须作用于 RDD[K,V] 类型的数据,相当于SQL中的内关联join,只返回两个RDD根据K可以关联上的结果
在类型为(K,V)和(K,W)类型的数据集上调用时,返回一个相同key对应的所有元素对在一起的(K, (V, W))数据集,只返回根据key可以关联上的数据,关联不上的, 直接舍去了
示例:
[K,V]  join [K,W]   返回值类型[K,(V,W)]


Join                       [K,v]  [K,W]    [K,(V,W)]


LeftOuterJoin       [K,(V,Option[W])]


RightOuterJoin     [K,(Option[V],W)]


FullOuterJoin        [K,(Option[V],Option[W])]


聚合案例:

两个数据集:
  * 数据集A    id age name
  * 数据集B    id year month movie
  数据集都是使用空格来分割的
a.txt
u1 12 zs
u2 15 xx
u3 18 aaa
u4 20 xa1
u5 22 xa2

b.txt
u1 2016 9 m1
u2 2017 12 m2
u3 2017 1 m3
u3 2014 2 m4
u3 2012 3 m5
/**
  * Desc:  AB 两个数据集的聚合
  * A:
  * id age name
  * B:
  * id year month movie   按key 先分组
  * result:
  * id age name,year month movie    null null null
  */
object ABDataTest {
  def main(args: Array[String]): Unit = {
    val sc = MySparkUtil.apply(getClass.getSimpleName)

    //读取数据
    val aData: RDD[String] = sc.textFile("data/a.txt")
    val bData: RDD[String] = sc.textFile("data/b.txt")

    // A数据的切分组装
    val aDataTp: RDD[(String, String)] = aData.map(str => {
      val split: Array[String] = str.split(" ", 2)
      val id = split(0)
      val ageAndName = split(1)
      //组装数据
      (id, ageAndName)
    })

    //对B进行切分组装
    val bDataTp: RDD[(String, List[String])] = bData.map(str => {
      val split = str.split(" ", 2)
      // 为了后续更好的使用reduceByKey 这里对数据进行封装
      (split(0), List(split(1)))
    })

    // 先对数据集B 进行分组聚合
    val bDataPro: RDD[(String, List[String])] = bDataTp.reduceByKey((_ ++ _))

    // AB 聚合处理
    val joinData: RDD[(String, (String, Option[List[String]]))] = aDataTp.leftOuterJoin(bDataPro)

    val result: RDD[(String, String)] = joinData.mapValues { case (ageAndName, lst) =>
      val rightResult: String = lst match {
        case None => "null null null"
        // 对List 集合进行升序排序
        case Some(v) => v.sortBy(_.split(" ")(0)).mkString(" ")
      }

      //      ageAndName + "," + rightResult
      ageAndName.concat(",").concat(rightResult)

    }
    result.foreach(println)

    sc.stop()
  }

object MySparkUtil {
  //获取本地的SparkContext
  def apply(appName: String): SparkContext = {
    val conf = new SparkConf()
      .setAppName(appName)
      .setMaster("local[*]")
    new SparkContext(conf)
  }

}



版权声明:本文为longwenyanlan原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。