使用场景:
join必须作用于 RDD[K,V] 类型的数据,相当于SQL中的内关联join,只返回两个RDD根据K可以关联上的结果
在类型为(K,V)和(K,W)类型的数据集上调用时,返回一个相同key对应的所有元素对在一起的(K, (V, W))数据集,只返回根据key可以关联上的数据,关联不上的, 直接舍去了
示例:
[K,V] join [K,W] 返回值类型[K,(V,W)]
Join [K,v] [K,W] [K,(V,W)]
LeftOuterJoin [K,(V,Option[W])]
RightOuterJoin [K,(Option[V],W)]
FullOuterJoin [K,(Option[V],Option[W])]
聚合案例:
两个数据集:
* 数据集A id age name
* 数据集B id year month movie
数据集都是使用空格来分割的
a.txt
u1 12 zs
u2 15 xx
u3 18 aaa
u4 20 xa1
u5 22 xa2
b.txt
u1 2016 9 m1
u2 2017 12 m2
u3 2017 1 m3
u3 2014 2 m4
u3 2012 3 m5
/**
* Desc: AB 两个数据集的聚合
* A:
* id age name
* B:
* id year month movie 按key 先分组
* result:
* id age name,year month movie null null null
*/
object ABDataTest {
def main(args: Array[String]): Unit = {
val sc = MySparkUtil.apply(getClass.getSimpleName)
//读取数据
val aData: RDD[String] = sc.textFile("data/a.txt")
val bData: RDD[String] = sc.textFile("data/b.txt")
// A数据的切分组装
val aDataTp: RDD[(String, String)] = aData.map(str => {
val split: Array[String] = str.split(" ", 2)
val id = split(0)
val ageAndName = split(1)
//组装数据
(id, ageAndName)
})
//对B进行切分组装
val bDataTp: RDD[(String, List[String])] = bData.map(str => {
val split = str.split(" ", 2)
// 为了后续更好的使用reduceByKey 这里对数据进行封装
(split(0), List(split(1)))
})
// 先对数据集B 进行分组聚合
val bDataPro: RDD[(String, List[String])] = bDataTp.reduceByKey((_ ++ _))
// AB 聚合处理
val joinData: RDD[(String, (String, Option[List[String]]))] = aDataTp.leftOuterJoin(bDataPro)
val result: RDD[(String, String)] = joinData.mapValues { case (ageAndName, lst) =>
val rightResult: String = lst match {
case None => "null null null"
// 对List 集合进行升序排序
case Some(v) => v.sortBy(_.split(" ")(0)).mkString(" ")
}
// ageAndName + "," + rightResult
ageAndName.concat(",").concat(rightResult)
}
result.foreach(println)
sc.stop()
}
object MySparkUtil {
//获取本地的SparkContext
def apply(appName: String): SparkContext = {
val conf = new SparkConf()
.setAppName(appName)
.setMaster("local[*]")
new SparkContext(conf)
}
}
版权声明:本文为longwenyanlan原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。