HIVE开窗函数

  • Post author:
  • Post category:其他




ETL,SQL面试高频考点——HIVE开窗函数(基础篇)



一,窗口函数介绍

  • 窗口函数,也叫OLAP函数(Online Anallytical Processing,联机分析处理),可以对数据库数据进行实时分析处理。
  • 窗口函数由开窗函数和分析函数构成,窗口函数就是既要显示聚集前的数据,又要显示聚集后的数据,简单讲,就是你查询的结果上,多出一列值(可以是聚合值或者排序号),所以分析函数可以分为两类:聚合分析函数和排序分析函数
  • 基本语法:
<窗口分析函数> over (partition by <用于分组的字段> order by<用于排序的列名>) 



二,开窗函数


1,概要


开窗函数就是over()函数,就是限定一个窗口,来显示分析函数的结果


2,开窗函数一般有两种(固定形式,不可更改)

-- 第一种
over(partiton by ... order by ...)
-- 第二种
over(distribute by ... order by ...)


3,区别

  • partiton by是一个一个reduce处理数据的,所以使用全局排序order by distribute
  • distribute by是多个reduce处理数据的,所以使用局部排序sort by



三,分析函数分类



1,排序分析函数:

  • RANK() OVER();
  • ROW_NUMBER() OVER();
  • DENSE_RANK() OVER();
  • NTILE(n) OVER();


实列解析
  • 创建一个学生成绩表
create table if not exists sc(name string,score int)
row format delimited fields terminated by '\t';
  • 导入数据到HIVE

    张三 100

    李四 90

    王五 80

    刘六 100

    田七 70
load data local inpath '数据文件本地路径' into table sc


1.使用 RANK() OVER()

  • 特点:并列跳跃排序
  • 不开窗sql实现rank()


    sql语句
SELECT b.name,b.Score,(
select count(a.score)
from sc a
WHERE a.score>b.score) +1 as Ranking
FROM sc b
ORDER BY b.score DESC
  • 结果

    在这里插入图片描述
  • 原理总结:

    第一部分实现表的降序排序


sql语句

SELECT b.name,b.Score
FROM sc b
ORDER BY b.score DESC

在这里插入图片描述

第二部分实现,假设给你一个分数,如何计算出他的Rank排名:

比如图中的王五是95分,大于他的分数有[100,100],count统计个数为2,加1就能得到他的rank排名3,其他如此类推,所以每个同学的rank排名为成绩在他之上的人数+1


sql语句

--计算大于分数的集合个数
(select count(a.score)
from sc a
WHERE a.score>b.score) +1 as Ranking
  • 使用开窗语句:
FROM sc
SELECT name,score,rank()  over(ORDER BY score desc) ranking
  • 结果:

    在这里插入图片描述


2.使用 DENSE_RANK() OVER()

  • 特点:并列连续排序
  • 不开窗sql实现dense_rank()


    sql语句
SELECT b.name,b.Score
,(select count(DISTINCT(a.score))
from sc a
WHERE a.score>=b.score) as Ranking
FROM sc b
ORDER BY b.score DESC
  • 结果

    在这里插入图片描述

  • 原理

    这个与上面的函数不同因为是连续的,所以我们需要对获得到的重复的分数进行去重,比如说李四的成绩为90,那大于等于90的集合为[100,100,95,90,90],去重之后集合为[100,95,90],集合的长度就是他对应的连续排名3


  • DENSE_RANK函数

FROM sc
SELECT name,score,dense_rank() OVER(ORDER BY score DESC) ranking


对比总结

通过上面两个函数,我们发现开窗函数能实现普通函数比较难实现或者无法实现的问题,因为聚合函数只能操作分组的字段,这也是聚合函数最大的特点,窗口函数能够操作所有的字段,不受分组的限制。


3.ROW_NUMBER() PERCENT_RANK() NTILE() 概述


  • ROW_NUMBER()


    特点:连续排序


  • PERCENT_RANK()


    特点:百分比排序


  • NTILE()


    特点:将有序集分桶(bucket)


sql语句

FROM sc
SELECT name,score,
ntile(3) OVER(ORDER BY score DESC) ntres,
percent_rank() OVER(ORDER BY score DESC) prank,
row_number() OVER(ORDER BY score DESC) rownum


结果


在这里插入图片描述



2.聚合分析函数

函数 作用
sum 求和
avg 求平均数
max 求最大值
min 求最小值
count 计数
first_value 返回分区中的第一个值
last_value 返回分区中到当前行的最后一个值
lag(col,n,default) col指定列,n用于统计窗口内往上第n个值 ,default不指定查找不到默认为NULL
lead(col,n,default) col指定列,n用于统计窗口内往下第n个值,default不指定查找不到默认为NULL
cume_dist 计算某个窗口或分区中某个值的累积分布,值为order by 子句中指定的列的当前行中的值。


例:sum avg max min count

FROM sc
SELECT name,score,
sum(score) OVER() sumres,
count(score) OVER() cres ,
min(score) OVER() minres,
max(score) OVER() maxres,
avg(score) OVER() avgres


结果


在这里插入图片描述

  • 下面的函数的使用是根据over()中的参数变化而变化的


first_value() 函数

  • 用法是根据partiton by 的字段进行分区,如果忽略partition by,会根据order by排序后的结果返回第一条数据
  • 示例 在每行数据后开窗显示总第一名和每个班的第一名
FROM sc2 
SELECT *,first_value(score) OVER(ORDER BY score DESC) as totalFirst,
first_value(score) OVER(PARTITION BY class ORDER BY score DESC) as classFirst

在这里插入图片描述


last_value() 函数

  • 作用是返回到当前行的最后一条数据
  • 示例
FROM SC2
-- 求分区内到当前行的最后一个值
SELECT name,score,class,last_value(score) OVER(PARTITION BY class order by score) lastvalue

在这里插入图片描述


lag() 函数

  • 用法是用于统计分组内的往上前n个值
  • 示例1 排名并显示每个同学和上一位同学的分差
from sc2
select *,
score-lag(score,1,0) over(order by score desc) as gap

在这里插入图片描述

  • 示例2 排名并显示同一个班上每个同学和上一位同学的分差
from sc2
select *,
score-lag(score,1,score) over(partition by class order by score desc) as gapByClass

在这里插入图片描述


lead() 函数

  • 用法是用于统计分组内的往下后n个值
  • 示例 排名并显示每个同学和后一名一位同学的分差
from sc2
select *,
score-lead(score,1,0) over(order by score desc) as gap

在这里插入图片描述

  • 示例2 排名并显示同一个班上每个同学和下一位同学的分差
from sc2
select *,
score-lead(score,1,score) over(partition by class order by score desc) as gapByClass

在这里插入图片描述


cume_dist() 函数

  • 用法是如果按升序排列,则统计:小于等于当前值的行数/总行数(number of rows ≤ current row)/(total number of rows)。如果是降序排列,则统计:大于等于当前值的行数/总行数。
  • 示例1 统计小于等于当前分数的人数占比
from sc2
select *,
cume_dist() over(order by score) as cume_dist

在这里插入图片描述

  • 示例2 统计每个班小于等于当前分数的人数占比
from sc2
select *,
cume_dist() over(partition by class order by score) as cume_dist

在这里插入图片描述



3.用spark 自定义HIVE用户自定义函数

  • hive中的表,可以通过如下语句缓存表,提高分析速度
cache table 表名
  • 自定义UDF函数,相对比较简单我用的spark-shell

    例:addName()函数

    在这里插入图片描述
  • 自定义UDAF函数

    需要写一个类继承UserDefinedAggregateFunction.实现抽象方法,注册后使用spark.sql调用


    例: 构造一个自定义的MyAvg()函数
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types.{DataType, DoubleType, LongType, StructField, StructType}

object UdadDemo {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local").setAppName("UdAd")
    val spark: SparkSession = new SparkSession.Builder().config(conf).enableHiveSupport().getOrCreate()
    val avg = new MyAvg
    spark.udf.register("myAvg",avg)
    spark.sql("select MyAvg(score) from sc").show()
  }
}
class MyAvg extends UserDefinedAggregateFunction(){
  //输入结构
  override def inputSchema: StructType = StructType(Seq(StructField("inputColumn",LongType)))
  //结构缓存
  override def bufferSchema: StructType ={
    StructType{
      Seq(StructField("sum",LongType),StructField("count",LongType))
    }
  }
  //返回值类型
  override def dataType: DataType = DoubleType
  //是否是稳定的
  override def deterministic: Boolean = true
  //初始化缓存设置0值的位置
  override def initialize(buffer: MutableAggregationBuffer): Unit = {
    buffer(0)=0L
    buffer(1)=0L
  }
  //更新缓存
  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    buffer(0)=buffer.getLong(0)+input.getLong(0)
    buffer(1)=buffer.getLong(1)+1
  }
  //合并缓存
  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit ={
    buffer1(0)=buffer1.getLong(0)+buffer2.getLong(0)
    buffer1(1)=buffer1.getLong(1)+buffer2.getLong(1)
  }
  //计算输出结果
  override def evaluate(buffer: Row): Any = {
    buffer.getLong(0).toDouble/buffer.getLong(1)
  }
}
  • 结果:

    ±—————————————–+

    |myavg(CAST(score AS BIGINT))|

    ±——————————————+

    | 88.0|

    ±——————————————+



后续更新中~



版权声明:本文为weixin_46602525原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。