ETL,SQL面试高频考点——HIVE开窗函数(基础篇)
目录标题
一,窗口函数介绍
- 窗口函数,也叫OLAP函数(Online Anallytical Processing,联机分析处理),可以对数据库数据进行实时分析处理。
- 窗口函数由开窗函数和分析函数构成,窗口函数就是既要显示聚集前的数据,又要显示聚集后的数据,简单讲,就是你查询的结果上,多出一列值(可以是聚合值或者排序号),所以分析函数可以分为两类:聚合分析函数和排序分析函数
- 基本语法:
<窗口分析函数> over (partition by <用于分组的字段> order by<用于排序的列名>)
二,开窗函数
1,概要
开窗函数就是over()函数,就是限定一个窗口,来显示分析函数的结果
2,开窗函数一般有两种(固定形式,不可更改)
-- 第一种
over(partiton by ... order by ...)
-- 第二种
over(distribute by ... order by ...)
3,区别
- partiton by是一个一个reduce处理数据的,所以使用全局排序order by distribute
- distribute by是多个reduce处理数据的,所以使用局部排序sort by
三,分析函数分类
1,排序分析函数:
- RANK() OVER();
- ROW_NUMBER() OVER();
- DENSE_RANK() OVER();
- NTILE(n) OVER();
实列解析
- 创建一个学生成绩表
create table if not exists sc(name string,score int)
row format delimited fields terminated by '\t';
-
导入数据到HIVE
张三 100
李四 90
王五 80
刘六 100
田七 70
load data local inpath '数据文件本地路径' into table sc
1.使用 RANK() OVER()
- 特点:并列跳跃排序
-
不开窗sql实现rank()
sql语句
SELECT b.name,b.Score,(
select count(a.score)
from sc a
WHERE a.score>b.score) +1 as Ranking
FROM sc b
ORDER BY b.score DESC
-
结果
-
原理总结:
第一部分实现表的降序排序
sql语句
SELECT b.name,b.Score
FROM sc b
ORDER BY b.score DESC
第二部分实现,假设给你一个分数,如何计算出他的Rank排名:
比如图中的王五是95分,大于他的分数有[100,100],count统计个数为2,加1就能得到他的rank排名3,其他如此类推,所以每个同学的rank排名为成绩在他之上的人数+1
sql语句
--计算大于分数的集合个数
(select count(a.score)
from sc a
WHERE a.score>b.score) +1 as Ranking
- 使用开窗语句:
FROM sc
SELECT name,score,rank() over(ORDER BY score desc) ranking
-
结果:
2.使用 DENSE_RANK() OVER()
- 特点:并列连续排序
-
不开窗sql实现dense_rank()
sql语句
SELECT b.name,b.Score
,(select count(DISTINCT(a.score))
from sc a
WHERE a.score>=b.score) as Ranking
FROM sc b
ORDER BY b.score DESC
-
结果
-
原理
这个与上面的函数不同因为是连续的,所以我们需要对获得到的重复的分数进行去重,比如说李四的成绩为90,那大于等于90的集合为[100,100,95,90,90],去重之后集合为[100,95,90],集合的长度就是他对应的连续排名3 -
DENSE_RANK函数
FROM sc
SELECT name,score,dense_rank() OVER(ORDER BY score DESC) ranking
对比总结
通过上面两个函数,我们发现开窗函数能实现普通函数比较难实现或者无法实现的问题,因为聚合函数只能操作分组的字段,这也是聚合函数最大的特点,窗口函数能够操作所有的字段,不受分组的限制。
3.ROW_NUMBER() PERCENT_RANK() NTILE() 概述
-
ROW_NUMBER()
特点:连续排序 -
PERCENT_RANK()
特点:百分比排序 -
NTILE()
特点:将有序集分桶(bucket)
sql语句
FROM sc
SELECT name,score,
ntile(3) OVER(ORDER BY score DESC) ntres,
percent_rank() OVER(ORDER BY score DESC) prank,
row_number() OVER(ORDER BY score DESC) rownum
结果
2.聚合分析函数
函数 | 作用 |
---|---|
sum | 求和 |
avg | 求平均数 |
max | 求最大值 |
min | 求最小值 |
count | 计数 |
first_value | 返回分区中的第一个值 |
last_value | 返回分区中到当前行的最后一个值 |
lag(col,n,default) | col指定列,n用于统计窗口内往上第n个值 ,default不指定查找不到默认为NULL |
lead(col,n,default) | col指定列,n用于统计窗口内往下第n个值,default不指定查找不到默认为NULL |
cume_dist | 计算某个窗口或分区中某个值的累积分布,值为order by 子句中指定的列的当前行中的值。 |
例:sum avg max min count
FROM sc
SELECT name,score,
sum(score) OVER() sumres,
count(score) OVER() cres ,
min(score) OVER() minres,
max(score) OVER() maxres,
avg(score) OVER() avgres
结果
- 下面的函数的使用是根据over()中的参数变化而变化的
first_value() 函数
- 用法是根据partiton by 的字段进行分区,如果忽略partition by,会根据order by排序后的结果返回第一条数据
- 示例 在每行数据后开窗显示总第一名和每个班的第一名
FROM sc2
SELECT *,first_value(score) OVER(ORDER BY score DESC) as totalFirst,
first_value(score) OVER(PARTITION BY class ORDER BY score DESC) as classFirst
last_value() 函数
- 作用是返回到当前行的最后一条数据
- 示例
FROM SC2
-- 求分区内到当前行的最后一个值
SELECT name,score,class,last_value(score) OVER(PARTITION BY class order by score) lastvalue
lag() 函数
- 用法是用于统计分组内的往上前n个值
- 示例1 排名并显示每个同学和上一位同学的分差
from sc2
select *,
score-lag(score,1,0) over(order by score desc) as gap
- 示例2 排名并显示同一个班上每个同学和上一位同学的分差
from sc2
select *,
score-lag(score,1,score) over(partition by class order by score desc) as gapByClass
lead() 函数
- 用法是用于统计分组内的往下后n个值
- 示例 排名并显示每个同学和后一名一位同学的分差
from sc2
select *,
score-lead(score,1,0) over(order by score desc) as gap
- 示例2 排名并显示同一个班上每个同学和下一位同学的分差
from sc2
select *,
score-lead(score,1,score) over(partition by class order by score desc) as gapByClass
cume_dist() 函数
- 用法是如果按升序排列,则统计:小于等于当前值的行数/总行数(number of rows ≤ current row)/(total number of rows)。如果是降序排列,则统计:大于等于当前值的行数/总行数。
- 示例1 统计小于等于当前分数的人数占比
from sc2
select *,
cume_dist() over(order by score) as cume_dist
- 示例2 统计每个班小于等于当前分数的人数占比
from sc2
select *,
cume_dist() over(partition by class order by score) as cume_dist
3.用spark 自定义HIVE用户自定义函数
- hive中的表,可以通过如下语句缓存表,提高分析速度
cache table 表名
-
自定义UDF函数,相对比较简单我用的spark-shell
例:addName()函数
-
自定义UDAF函数
需要写一个类继承UserDefinedAggregateFunction.实现抽象方法,注册后使用spark.sql调用
例: 构造一个自定义的MyAvg()函数
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types.{DataType, DoubleType, LongType, StructField, StructType}
object UdadDemo {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("UdAd")
val spark: SparkSession = new SparkSession.Builder().config(conf).enableHiveSupport().getOrCreate()
val avg = new MyAvg
spark.udf.register("myAvg",avg)
spark.sql("select MyAvg(score) from sc").show()
}
}
class MyAvg extends UserDefinedAggregateFunction(){
//输入结构
override def inputSchema: StructType = StructType(Seq(StructField("inputColumn",LongType)))
//结构缓存
override def bufferSchema: StructType ={
StructType{
Seq(StructField("sum",LongType),StructField("count",LongType))
}
}
//返回值类型
override def dataType: DataType = DoubleType
//是否是稳定的
override def deterministic: Boolean = true
//初始化缓存设置0值的位置
override def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0)=0L
buffer(1)=0L
}
//更新缓存
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
buffer(0)=buffer.getLong(0)+input.getLong(0)
buffer(1)=buffer.getLong(1)+1
}
//合并缓存
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit ={
buffer1(0)=buffer1.getLong(0)+buffer2.getLong(0)
buffer1(1)=buffer1.getLong(1)+buffer2.getLong(1)
}
//计算输出结果
override def evaluate(buffer: Row): Any = {
buffer.getLong(0).toDouble/buffer.getLong(1)
}
}
-
结果:
±—————————————–+
|myavg(CAST(score AS BIGINT))|
±——————————————+
| 88.0|
±——————————————+
后续更新中~