DSL(DataSet Language)风格API,就是编程API的方式,来实现SQL语法
DSL:特定领域语言
DataSet的TableApi有一个特点:运算后返回值必回到DataFrame
因为select后,得到的结构,无法判断返回值的具体类型,只能用通用的Row封装
TableAPI基本操作
数据准备
id,name,age,city,score
1,张三,21,BJ,80.0
2,李四,23,BJ,82.0
3,王五,20,SH,88.6
4,赵六,26,SH,83.0
5,田七,30,SZ,90.0
object TableAPI01 {
Logger.getLogger("org").setLevel(Level.ERROR)
def main(args: Array[String]): Unit = {
//获取环境
val spark: SparkSession = SparkSession.builder()
.appName("TableAPI")
.master("local[*]")
.getOrCreate()
//读取数据 创建df
val df: DataFrame = spark.read.option("header", true).option("inferSchema", true).csv("SQLData/csv/stu.csv")
//导入spark中的隐式和函数
import spark.implicits._
import org.apache.spark.sql.functions._
println("-----------------------select及表达式----------------------------")
//使用字符串表达"列"
df.select("id","name").show() //无法对值进行二次操作
//df.select("id + 1","name").show //id+1会被视为一个列名从而出错
//使用字符串形式表达sql表达式,应该使用selectExpr
df.selectExpr("id+1","upper(name)").show()
//使用$符号创建Column对象来表达"列"
df.select($"id",upper($"name"),$"age"+10).show()
//使用单边单引号创建Column对象来表达"列"
df.select('id,upper('name),'age+10).show()
//使用col函数创建Column对象来表达"列"
df.select(col("id"),upper(col("name")),col("age")+10).show()
println("-----------------------起别名----------------------------")
//使用字符串表达"列"
//df.select("id","name").show() //无法对值进行二次操作
//使用字符串形式表达sql表达式,应该使用selectExpr
df.selectExpr("id+1 as new_id","upper(name) as new_name").show()
//使用$符号创建Column对象来表达"列"
df.select($"id" as "new_id",upper($"name") as "new_name",$"age"+10 as "new_age").show()
//使用单边单引号创建Column对象来表达"列"
df.select('id as "new_id",upper('name) as "new_name",'age+10 as "new_age").show()
//使用col函数创建Column对象来表达"列"
df.select(col("id") as "new_id"
,upper(col("name")) as "new_name"
,col("age")+10 as "new_age").show()
println("-----------------------条件过滤----------------------------")
df.where("id > 1 and city='BJ'").show()
df.where('id>1 and 'score >80 ).show()
println("-----------------------order by----------------------------")
df.orderBy($"id".desc).show() //id 降序
df.orderBy("age").show() //age升序
println("-----------------------group by 聚合函数----------------------------")
df.groupBy("city").count().show() //每个city的人数
df.groupBy("city").avg("score").show //每个city的平均分
//常用情况 agg(sum() as '别名',...)
df.groupBy("city").agg(
avg("score") as "avg_score",
sum("score") as "sum_score",
count(lit(1)) as "cnt", //lit(值) 将值转换成常量字段
collect_list("name") as "names"
).show()
println("-----------------------子查询---------------------------")
/**
* 相当于
* select
* *
* from(
* select city,sum(score) as sum_score
* from stu
* group by city) t
* where sum_score >150
*/
df.groupBy("city").agg(sum("score") as "sum_score")
.where($"sum_score">150).show()
}
}
窗口函数示例
数据准备
shop1,2022-01-01,500 shop1,2022-01-02,500 shop1,2022-02-01,500 shop1,2022-04-01,500 shop1,2022-03-01,500 shop1,2022-06-01,500 shop2,2022-01-01,500 shop2,2022-05-01,500 shop2,2022-02-01,500 shop2,2022-03-01,500
object TableAPI02 {
Logger.getLogger("org").setLevel(Level.ERROR)
def main(args: Array[String]): Unit = {
//获取环境
val spark: SparkSession = SparkSession.builder()
.appName("TableAPI")
.master("local[*]")
.getOrCreate()
//读取数据 创建df
//自定义结构
val schema: StructType = StructType(
Seq(
StructField("name", DataTypes.StringType),
StructField("date", DataTypes.StringType),
StructField("amount", DataTypes.IntegerType)
)
)
val df: DataFrame = spark.read.schema(schema).csv("SQLData/shop/shop.csv")
//导入函数和隐式
import spark.implicits._
import org.apache.spark.sql.functions._
//求每个店铺每个月的总金额以及总的累计金额
//1.求每个店铺每个月的总金额
val df2: DataFrame = df.groupBy($"name", substring($"date", 0, 7) as "month")
.agg(sum("amount") as "m_sum_amount")
//2.求总的累计金额
df2.select('name,'month,'m_sum_amount,
sum("m_sum_amount")
over(Window.partitionBy("name").orderBy("month")) as "total_money"
).show()
df2.select('name,'month,'m_sum_amount,
sum("m_sum_amount")
over(Window.partitionBy("name").orderBy("month")
.rowsBetween(Window.unboundedPreceding,Window.currentRow)) as "total_money" //指定行范围
).show()
}
}
join关联查询和union
数据准备 user
uid,name,age,gender,city 1,zss,18,M,BJ 2,ls,20,F,BJ 3,wx,30,M,SH
数据准备 order
oid,money,uid,id 1001,100,1,1 1002,100,2,2 1003,100,3,3 1004,100,1,1 1005,100,2,2 1006,100,3,3
object TableAPI03 {
Logger.getLogger("org").setLevel(Level.ERROR)
def main(args: Array[String]): Unit = {
//获取环境
val spark: SparkSession = SparkSession.builder()
.appName("TableAPI")
.master("local[*]")
.getOrCreate()
//读取数据 创建df
val userDF: DataFrame = spark.read.option("header", true).option("inferSchema", true).csv("sql_data/csv/user.csv")
val orderDF: DataFrame = spark.read.option("header", true).option("inferSchema", true).csv("sql_data/csv/order.csv")
//导入spark中的隐式和函数
import org.apache.spark.sql.functions._
import spark.implicits._
println("-----------------------join关联查询---------------------------")
userDF.crossJoin(orderDF).show() //笛卡尔积
userDF.join(orderDF).show() //没有关联条件 也是笛卡尔积
userDF.join(orderDF,"uid").show()
userDF.join(orderDF,Seq("uid")).show()
userDF.join(orderDF,userDF("uid")===orderDF("id")).show() //当两张表的关联条件的字段名不一致时,可以使用这种形式
//外连接
userDF.join(orderDF,Seq("uid"),"left").show()
userDF.join(orderDF,Seq("uid"),"right").show()
//userDF.join(orderDF,"uid","left") 错误 没有这种构造方法
userDF.join(orderDF,userDF("uid")===orderDF("id"),"left").show()
println("-----------------------union---------------------------")
userDF.union(orderDF).show() //去重
userDF.unionAll(orderDF).show() //不去重
}
}
版权声明:本文为JinVijay原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。