之前已经在博客中介绍了spark的dataframe利用union 等一系列方法进行拼接,详情请见
Spark中对Dataframe的union 、unionAll和 unionByName方法说明
但是在那篇博客也提到,利用union的这些方法,必须保证两个dataframe必须列数一致(unionByName方法还需要所有列名必须一致)。
那么如果如果dfA和dfB的列长度不一致,应该怎么去上下拼接呢?
val data2 = Seq(
| ("1", null, "hlj", null),
| ("2", "tian", "jl", "1"),
| ("3", null, "hg", "1"),
| ("4", "tian", "bj", "1"),
| ("5", "ming", "tj", "1")
| ).toDF("useid", "name", "live", "nums")
data2.show()
+-----+----+----+----+
|useid|name|live|nums|
+-----+----+----+----+
| 1|null| hlj|null|
| 2|tian| jl| 1|
| 3|null| hg| 1|
| 4|tian| bj| 1|
| 5|ming| tj| 1|
+-----+----+----+----+
val data3 = Seq(
| ("1", null, "hlj", null, "1"),
| ("2", "tian", "jl", "1", "1"),
| ("3", null, "hg", "1", "1"),
| ("4", "tian", "bj", "1", "1"),
| ("5", "ming", "tj", "1", "1")
| ).toDF("useid", "name", "live", "nums", "key")
data3.show()
+-----+----+----+----+---+
|useid|name|live|nums|key|
+-----+----+----+----+---+
| 1|null| hlj|null| 1|
| 2|tian| jl| 1| 1|
| 3|null| hg| 1| 1|
| 4|tian| bj| 1| 1|
| 5|ming| tj| 1| 1|
+-----+----+----+----+---+
data2和data3对比可以发现data3比data2多了”key”这一列,所以想上线拼接时无法使用union等方法。
通过构建方法填充的方式进行拼接:
import org.apache.spark.sql.functions._
def concatDF(data1:DataFrame, data2:DataFrame):DataFrame={
val cols1 = data1.columns.toSet
val cols2 = data2.columns.toSet
val total = cols1 ++ cols2 // union
def expr(myCols: Set[String], allCols: Set[String]) = {
allCols.toList.map(x => x match {
case x if myCols.contains(x) => col(x)
case _ => lit(null).as(x)
})
}
data1.select(expr(cols1, total):_*).unionAll(data2.select(expr(cols2, total):_*))
}
concatDF(data2,data3).show()
+----+----+----+-----+----+
|name| key|live|useid|nums|
+----+----+----+-----+----+
|null|null| hlj| 1|null|
|tian|null| jl| 2| 1|
|null|null| hg| 3| 1|
|tian|null| bj| 4| 1|
|ming|null| tj| 5| 1|
|null| 1| hlj| 1|null|
|tian| 1| jl| 2| 1|
|null| 1| hg| 3| 1|
|tian| 1| bj| 4| 1|
|ming| 1| tj| 5| 1|
+----+----+----+-----+----+
解决了dataframe列数不同导致无法拼接的问题。
参考链接:
链接地址
版权声明:本文为bowenlaw原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。