【白话Flink基础理论】Flink中的Table API和Flink SQL—动态表(Dynamic tables)&Table时间特性&窗口操作&自定义函数(二)

  • Post author:
  • Post category:其他




——wirte by 橙心橙意橙续缘,



前言


白话系列


————————————————————————————


也就是我在写作时完全不考虑写作方面的约束,完全把自己学到的东西、以及理由和所思考的东西等等都用大白话诉说出来,这样能够让信息最大化的从自己脑子里输出并且输入到有需要的同学的脑中。PS:较为专业的地方还是会用专业口语诉说,大家放心!


白话Flink系列


————————————————————————————


主要是记录本人(国内某985研究生)在Flink基础理论阶段学习的一些所学,更重要的是一些所思所想,所参考的视频资料或者博客以及文献资料均在文末放出.由于研究生期间的课题组和研究方向与Flink接轨较多,而且Flink的学习对于想进入大厂的同学们来说也是非常的赞,所以该系列文章会随着本人学习的深入来不断修改和完善,希望大家也可以多批评指正或者提出宝贵建议。


说在前面


————————————————————

在这里插入图片描述

上节我们已经提到过了Flink中的Table API和Flink sql的基本概念和示例,对于简单的增删改查操作也比较熟悉了;但是我们也能发现在流环境中的表与以前数据库中的表还是不同的。因为对于Flink来说,主要进行的是流处理,对于流处理来有一个最大的特点就是我们无法获得全部的数据,而且数据是不断流入,所以对应的stream所生成的table自然也是不断变化的,而且在table上进行查询或者聚合操作后的结果集也是不断变化的,所以我们就逐渐理解了题目中的

动态表

的含义了,也就是表中的数据均是在随着时间不断地改变。

说到时间,我们前面有讲过Flink的时间语义,这也是Flink区别于Spark最特别地地方,Flink’可以根据时间语义保证结果的快速处理和低延迟,那么需要用到时间的地方我们讲过有windows操作以及watermark调整乱序数据等。那么对于动态表来说当然也是要有时间涉入的,所以就又引出了标题中的Table时间特性。


Ps:由于一般在流环境中对TableAPI使用较多而且复杂,所以本节内容均针对DataStream API,DataSet上的Table操作建议去

Flink官方文档

查看



动态表 Dynamic tables

  • 动态表是 Flink 对流数据的 Table API 和 SQL 支持的核心概念
  • 与表示批处理数据的静态表不同,动态表是随时间变化的


持续查询(Continuous Query)

  • 动态表可以像静态的批处理表一样进行查询,

    查询一个动态表会产生持续查询

    (Continuous Query)
  • 连续查询

    永远不会终止

    ,并会生成另一个动态表
  • 查询会不断更新其动态结果表,以反映其动态输入表上的更改

好好理解这个永远不会终止的含义,就是说数据在不断地流入,Table在不断地改变,相应的查询也在不断发生着变化,就像下图这样,只要有数据流入,程序就一直维持着一条这样的执行管道。

在这里插入图片描述


上图表示的流式表查询的处理过程有以下3点:

  1. 流被转换为动态表

  2. 对动态表计算连续查询,生成新的动态表

  3. 生成的动态表被转换回流



将DataStream转化成动态表

  • 为了处理带有关系查询的流,必须先将其转换为表
  • 从概念上讲,流的每个数据记录,都被解释为对结果表的插入(Insert)修改操作

    在这里插入图片描述

Stream中来一条数据,动态表中就多一条数据。



持续查询

  • 持续查询会在动态表上做计算处理,并作为结果生成新的动态表

    在这里插入图片描述

上图中,Stream中流过来一条数据,那么动态表1中就多一条数据,对应的在动态表1中进行持续查询生成的动态表2中的信息就会发生改变,动态表2中的结果是针对动态表1中的全部数据进行的。



将动态表转化为DataStream

  • 与常规的数据库表一样,动态表可以通过插入(Insert)、更新(Update)和删 除(Delete)更改,进行

    持续的修改
  • 将动态表转换为流或将其写入外部系统时,需要对这些更改进行编码

动态表可转化为一下3中数据流:

  • 仅追加(Append-only)流

    • 仅通过插入(Insert)更改来修改的动态表,可以直接转换为仅追加流
  • 撤回(Retract)流

    • 撤回流是包含两类消息的流:添加(Add)消息和撤回(Retract)消息
  • Upsert(更新插入)流

    • Upsert 流也包含两种类型的消息:Upsert 消息和删除(Delete)消息。

以上这3种流分别对应3种更新模式。

在这里插入图片描述

仅追加流是最基本的流,批环境中的表也支持仅追加模式。而其他2种流仅是在流环境中的动态表的更新模式才支持转化。

  • 对于撤回模式来说,每条更新操作对应2步操作——插入和删除,比较冗余。
  • 对于更新插入模式来说,主要是根据KEY来找到要更新的数据,然后进行Upsert操作,更改为新的值。



Table中的时间特性

  • 基于时间的操作(比如 Table API 和 SQL 中

    窗口

    操作),需要定义相关的

    时间语义



    时间数据来源

    的信息
  • Table 可以提供一个逻辑上的

    时间字段

    ,用于在表处理程序中,指示时间和访问相应的时间戳
  • 时间属性,可以是每个表

    schema的一部分

    。一旦定义了时间属性,它就可以 作为一个

    字段

    引用,并且可以在基于时间的操作中使用
  • 时间属性的行为类似于

    常规时间戳

    ,可以

    访问

    ,并且进行

    计算

Table中之所以要用到时间,是因为它和流处理一样,也需要进行窗口操作等。而且他这个时间比较特殊,就直接在表中添加一列来进行表示,类型为时间戳类型。

我们知道在Flink中有2种时间语义,所以接下来就分别来说明一下在Table中是怎么利用这2种时间语义的。



定义处理时间 Processing Time

  • 处理时间语义下,允许表处理程序根据机器的本地时间生成结果。它是时间 的最简单概念。它既不需要提取时间戳,也不需要生成 watermark

我们可以通过以下3种方式来引入。

  • 由 DataStream 转换成表时指定**
Table sensorTable = tableEnv.fromDataStream(dataStream, "id, temperature, timestamp, pt.proctime");

pt是处理时间的列名,可以随便定义,但是后面一定要追加

.proctme

,代表处理时间。

  • 在定义Schema期间,可以使用.proctime,指定字段名定义处理时间字段

    • 这个proctime属性只能通过附加逻辑字段,来扩展物理schema。因此,只能在schema定义的末尾定义它
.withSchema(new Schema() 
					.field("id", DataTypes.STRING()) 
					.field("timestamp", DataTypes.BIGINT()) 
					.field("temperature", DataTypes.DOUBLE()) 
					.field("pt", DataTypes.TIMESTAMP(3)) .proctime()
			)
  • 类似Schema,也可以通过定义DDL的方式来引入时间语义。
String sinkDDL = 
	"create table dataTable (" + 
	" id varchar(20) not null, " + 
	" ts bigint, " +
	" temperature double, " + 
	" pt AS PROCTIME() " + 
	") with (" + 
	" 'connector.type' = 'filesystem', " + 
	" 'connector.path' = '/sensor.txt', " + 
	" 'format.type' = 'csv')";

tableEnv.sqlUpdate(sinkDDL);



定义事件时间 Event Time

  • 事件时间语义,允许表处理程序根据每个记录中包含的时间生成结果。这样 即使在有乱序事件或者延迟事件时,也可以获得正确的结果。
  • 为了处理无序事件,并区分流中的准时和迟到事件;Flink 需要从事件数据中, 提取时间戳,并用来推进事件时间的进展
  • 定义事件时间,同样有三种方法:

    • 由 DataStream 转换成表时指定
    // 将DataStream转换为 Table,并指定时间字段 
    Table sensorTable = tableEnv.fromDataStream(dataStream, "id, timestamp.rowtime, temperature");
    // 或者,直接追加时间字段
    Table sensorTable = tableEnv.fromDataStream(dataStream," id, temperature, timestamp, rt.rowtime");
    

    由于EventTime是数据本身携带的,所以

    .fromDataStream()

    在指定的时候需要提前在datasStream中使用

    .assignTimestampsAndWatermarks()

    进行事件时间的注册和水印的设置。

    • 定义 Table Schema 时指定
    .withSchema(new Schema() 
    	.field("id", DataTypes.STRING()) 
    	.field("timestamp", DataTypes.BIGINT()) 
    	.rowtime( new Rowtime()
    		.timestampsFromField("timestamp")  //指定时间戳
    		.watermarksPeriodicBounded(1000) //周期性生成watermark 1s间隔
    		) 
    	.field("temperature", DataTypes.DOUBLE())
    	)
    
    • 在创建表的 DDL 中定义
    String sinkDDL= "create table dataTable (" +
    	" id varchar(20) not null, " +
    	" ts bigint, " +
    	" temperature double, " +
    	" rt AS TO_TIMESTAMP( FROM_UNIXTIME(ts) ), " +
    	" watermark for rt as rt - interval '1' second" +  //1s时间间隔生成watermark
    	") with (" +
    	" 'connector.type' = 'filesystem', " +
    	" 'connector.path' = '/sensor.txt', " +
    	" 'format.type' = 'csv')";
    tableEnv.sqlUpdate(sinkDDL);
    

可以发现如果Table是从dataStrea转化的话,那么他的水印设置是从流中获取的,其他2种方式的watermark均是在构建表的过程中自定义的。



Table API和Flink SQL中的窗口操作

时间语义,要配合窗口操作才能发挥作用.

在 Table API 和 SQL 中,主要有两种窗口.

  • Group Windows(分组窗口)

    • 根据

      时间



      行计数

      间隔,将行聚合到有限的组(Group)中,并对每个组的数据 执行一次聚合函数
  • Over Windows

    • 针对每个输入行,计算

      相邻行范围之前



      聚合

      (时间或行数范围)

分组窗口就是类似于我们学过的Stream中的窗口操作,就是先对Stream中数据进行KeyBy,然后开窗并进行聚合。而Over Windwos是Table中的窗口操作特有的。



Group Windows(分组窗口)

  • Group Windows 是使用 window(w:GroupWindow)子句定义的,并且必须由as子句指定一个

    别名

  • 为了按窗口对表进行分组,窗口的别名必须在 group by 子句中,像常规的 分组字段一样引用

    	Table table = input 
    		.window([w: GroupWindow] as "w") // 定义窗口,别名为 w 
    		.groupBy("a, w") // 按照字段 a和窗口 w分组 
    		.select("a, b.sum"); // 聚合
    
  • Table API 提供了一组具有特定语义的预定义 Window 类,这些类会被转换为底层 DataStream 或 DataSet 的窗口操作

这里似乎有悖于我们之前流中的windows操作—先KeyBy分组再进行开窗聚合;而这里确实先执行

.window()

后在执行

.groupby()

,但是仔细想会发现

.window()

操作只是在进行窗口的定义和别名的注册,然后下一句先根据字段a分组,再根据窗口w分组,这不就又和我们前面一致了嘛。

Tips:Table API中的

。groupBy()

方法在

.select()

方法之前。



滑动窗口 Sliding windows
  • 滑动窗口要用 Slide 类来定义
// Sliding Event-time Window  滑动事件时间窗口,窗口大小10m,滑动间隔5m,别名为w
.window(Slide.over("10.minutes").every("5.minutes").on("rowtime").as("w"))

// Sliding Processing-time window 
.window(Slide.over("10.minutes").every("5.minutes").on("proctime").as("w"))

// Sliding Row-count window 计数窗口时间默认处理时间
.window(Slide.over("10.rows").every("5.rows").on("proctime").as("w"))


滚动窗口(Tumbling windows)
  • 滚动窗口要用 Tumble 类来定义
// Tumbling Event-time Window 
.window(Tumble.over("10.minutes").on("rowtime").as("w"))

// Tumbling Processing-time Window 
.window(Tumble.over("10.minutes").on("proctime").as("w"))

// Tumbling Row-count Window
.window(Tumble.over("10.rows").on("proctime").as("w"))


会话窗口(Session windows)
  • 会话窗口要用 Session 类来定义
// Session Event-time Window 
.window(Session.withGap("10.minutes").on("rowtime").as("w"))
// Session Processing-time Window
.window(Session.withGap("10.minutes").on("proctime").as("w"))


SQL 中的 Group Windows

Group Windows 定义在 SQL 查询的 Group By 子句中,具体如下。

  • TUMBLE(time_attr, interval)

    • 定义一个滚动窗口,第一个参数是时间字段,第二个参数是窗口长度
  • HOP(time_attr, interval, interval)

    • 定义一个滑动窗口,第一个参数是时间字段,第二个参数是

      窗口滑动步长

      ,第三个是

      窗口长度
  • SESSION(time_attr, interval)

    • 定义一个会话窗口,第一个参数是时间字段,第二个参数是窗口间隔

这里要注意的滑动窗口的参数设置,不要写反了。



综合示例
Table dataTable = tableEnv.fromDataStream(dataStream, "id, timestamp as ts, temperature as temp, rt.rowtime");

tableEnv.createTemporaryView("sensor", dataTable);
// 5. 窗口操作
// 5.1 Group Window
// table API
Table resultTable = dataTable.window(Tumble.over("10.seconds").on("rt").as("tw"))
					         .groupBy("id, tw")
					         .select("id, id.count, temp.avg, tw.end");

// SQL
Table resultSqlTable = tableEnv.sqlQuery("select id, count(id) as cnt, avg(temp) as avgTemp,tumble_end(rt, interval '10' second) " + "from sensor group by id, tumble(rt, interval '10' second)");
        
tableEnv.toAppendStream(resultTable, Row.class).print("result");
tableEnv.toRetractStream(resultSqlTable, Row.class).print("sql");


tumble_end()

代表滚动窗口的结束时间,那么对应的开始时间就是

_start



Over Windows

  • Over window 聚合是标准 SQL 中已有的(over 子句),可以在查询的 SELECT 子句中定义
  • Over window 聚合,会针对每个输入行,计算相邻行范围内的聚合
  • Over windows 使用 window(w:overwindows*)子句定义,并在 select ()方法中通过别名来引用
Table table = input .window([w: OverWindow] as "w") .select("a, b.sum over w, c.min over w");
  • Table API 提供了 Over 类,来配置 Over 窗口的属性。

Over 字句在mysql 8中已经有了



无界 Over Windows
  • 可以在事件时间或处理时间,以及指定为时间间隔、或行计数的范围内,定义 Over windows

  • 无界的 over window 是使用常量指定的

// 无界的事件时间over window 
.window(Over.partitionBy("a").orderBy("rowtime").preceding(UNBOUNDED_RANGE).as("w")) 

//无界的处理时间over window 
.window(Over.partitionBy("a").orderBy("proctime").preceding(UNBOUNDED_RANGE).as("w")) 

// 无界的事件时间Row-count over window 
.window(Over.partitionBy("a").orderBy("rowtime").preceding(UNBOUNDED_ROW).as("w")) 

//无界的处理时间Row-count over window
.window(Over.partitionBy("a").orderBy("proctime").preceding(UNBOUNDED_ROW).as("w"))


有界 Over Windows
  • 有界的 over window 是用间隔的大小指定的
// 有界的事件时间over window 
.window(Over.partitionBy("a").orderBy("rowtime").preceding("1.minutes").as("w"))

// 有界的处理时间over window 
.window(Over.partitionBy("a").orderBy("proctime").preceding("1.minutes").as("w"))

// 有界的事件时间Row-count over window 
.window(Over.partitionBy("a").orderBy("rowtime").preceding("10.rows").as("w"))

// 有界的处理时间Row-count over window
.window(Over.partitionBy("a").orderBy("procime").preceding("10.rows").as("w"))


SQL 中的 Over Windows
  • 用 Over 做窗口聚合时,所有聚合必须在同一窗口上定义,也就是说必须是相同的分区、排序和范围
  • 目前仅支持在当前行范围之前的窗口
  • ORDER BY 必须在单一的时间属性上指定
// 有界计数Over窗口
SELECT COUNT(amount) OVER ( 
	PARTITION BY user
	ORDER BY proctime 
	ROWS BETWEEN 2  PRECEDING AND CURRENT ROW)
FROM Orders

// 无界计数Over窗口
SELECT COUNT(amount) OVER ( 
	PARTITION BY user
	ORDER BY proctime 
	ROWS BETWEEN UNBOUNDED  PRECEDING AND CURRENT ROW)
FROM Orders

// 有界时间Over窗口
SELECT COUNT(amount) OVER ( 
	PARTITION BY user
	ORDER BY proctime 
	RANGE BETWEEN interval '10' second PRECEDING AND CURRENT ROW)
FROM Orders

// 无界时间Over窗口
SELECT COUNT(amount) OVER ( 
	PARTITION BY user
	ORDER BY proctime 
	RANGE BETWEEN UNBOUNDED  PRECEDING AND CURRENT ROW)
FROM Orders


综合示例
Table dataTable = tableEnv.fromDataStream(dataStream, "id, timestamp as ts, temperature as temp, rt.rowtime");

tableEnv.createTemporaryView("sensor", dataTable);
// 5.2 Over Window
// table API
Table overResult = dataTable.window(Over.partitionBy("id").orderBy("rt").preceding("2.rows").as("ow"))
        .select("id, rt, id.count over ow, temp.avg over ow");

// SQL
Table overSqlResult = tableEnv.sqlQuery("select id, rt, count(id) over ow, avg(temp) over ow " +
        " from sensor " +
        " window ow as (partition by id order by rt rows between 2 preceding and current row)");

tableEnv.toAppendStream(overResult, Row.class).print("result");
tableEnv.toRetractStream(overSqlResult, Row.class).print("sql");

Over WIndows 后面的

.partitionBy("id")

是可选的,但是

.orderBy("rt")

必须有,因为只有根据时间进行排序后,才可以进行Over WIndwos



Table API中的函数



内置函数Functions

  • Flink Table API 和 SQL 为用户提供了一组用于数据转换的内置函数

  • SQL 中支持的很多函数,Table API 和 SQL 都已经做了实现

  • 比较函数

    • SQL

      • value1 = value2
      • value1 > value2
    • Table API

      • ANY1 === ANY2
      • ANY1 > ANY2
  • 逻辑函数

    • SQL

      • boolean1 OR boolean2
      • boolean IS FALSE
      • NOT boolean
    • Table API

      • BOOLEAN1 || BOOLEAN2
      • BOOLEAN.isFalse
      • !BOOLEAN

    boolean IS FALSE和NOT boolean 都是取反,但是对于null的处理不同,NOT null还是null,但是null is FALSE 是 false。

  • 算数函数

    • SQL

      • numeric1 + numeric2
      • POWER(numeric1, numeric2)
    • Table API

      • NUMERIC1 + NUMERIC2
      • NUMERIC1.power(NUMERIC2)

    举一反三:对于函数的这种算数操作的话,在Table API中是以一个对象方法的调用实现的,类似

    .count

    这种操作,下面很多也是这种形式,看得多了就熟悉了。

  • 字符串函数

    • SQL

      • string1 || string2
      • UPPER(string)
      • CHAR_LENGTH(string)
    • Table API

      • STRING1 + STRING2
      • STRING.upperCase()
      • STRING.charLength()
  • 时间函数

    • SQL

      • DATE string
      • TIMESTAMP string
      • CURRENT_TIME
      • INTERVAL string range
    • Table API

      • STRING.toDate
      • STRING.toTimestamp
      • currentTime()
      • NUMERIC.days
      • NUMERIC.minutes
  • 聚合函数

    • SQL

      • COUNT(*)
      • SUM(expression)
      • RANK()
      • ROW_NUMBER()
    • Table API

      • FIELD.count
      • FIELD.sum()

很多操作在SQL中有实现,但是在Table API中还没有实现,所以可以使用SQL,但是Blink中的Table API已经比较完善了,后续可能会添加上。



用户自定义函数(UDF)

  • 用户定义函数(User-defined Functions,UDF)是一个重要的特性,它们显著地扩展了查询的表达能力
  • 在大多数情况下,用户定义的函数必须先

    注册

    ,然后才能在查询中使用
  • 函数通过调用

    registerFunction()

    方法在 TableEnvironment 中注册。当 用户定义的函数被注册时,它被插入到TableEnvironment 的函数目录中,这样Table API 或 SQL 解析器就可以识别并正确地解释它


标量函数(Scalar Functions)
  • 用户定义的标量函数,可以将0、1或多个标量值,映射到新的标量值
  • 为了定义标量函数,必须在 org.apache.flink.table.functions 中扩展基类 Scalar Function,并实现(一个或多个)求值(eval)方法
  • 标量函数的行为由求值方法决定,求值方法必须公开声明并命名为eval

类似map函数,一对一或者多对一

public static class HashCode extends ScalarFunction { 
	private int factor = 13; 
	public HashCode(int factor) { 
		this.factor = factor;
	} 
	// 必须写一个这样的eval函数
	public int eval(String s) { 
		return s.hashCode() * factor;
	}
}


示例

// 3. 将流转换成表
Table sensorTable = tableEnv.fromDataStream(dataStream, "id, timestamp as ts, temperature as temp");

// 4. 自定义标量函数,实现求id的hash值
// 4.1 table API
HashCode hashCode = new HashCode(23);  // 自定义标量函数
// 需要在环境中注册UDF
tableEnv.registerFunction("hashCode", hashCode);
Table resultTable = sensorTable.select("id, ts, hashCode(id)");

// 4.2 SQL
tableEnv.createTemporaryView("sensor", sensorTable);
Table resultSqlTable = tableEnv.sqlQuery("select id, ts, hashCode(id) from sensor");

// 打印输出
tableEnv.toAppendStream(resultTable, Row.class).print("result");
tableEnv.toAppendStream(resultSqlTable, Row.class).print("sql");
// 实现自定义的ScalarFunction
public static class HashCode extends ScalarFunction{  //继承ScalarFunction
    private int factor = 13;

    public HashCode(int factor) {
        this.factor = factor;
    }

    public int eval(String str){
        return str.hashCode() * factor;
    }
}


表函数(Table Functions)
  • 用户定义的表函数,也可以将0、1或多个标量值作为输入参数;与标量函数不同 的是,它可以返回

    任意数量的行

    作为输出,而不是单个值
  • 为了定义一个表函数,必须扩展 org.apache.flink.table.functions 中的基类 TableFunction 并实现(一个或多个)求值方法
  • 表函数的行为由其求值方法决定,求值方法必须是 public 的,并命名为 eval

类似Flatmap函数,可以一对多,或者多对多。

public static class Split extends TableFunction<Tuple2<String, Integer>> { 
private String separator = ","; 
public Split(String separator) { 
	this.separator = separator;
} 
public void eval(String str) { 
	for (String s : str.split(separator)) { 
		collect(new Tuple2<String, Integer>(s, s.length()));
	} }
}


示例

 // 3. 将流转换成表
Table sensorTable = tableEnv.fromDataStream(dataStream, "id, timestamp as ts, temperature as temp");

// 4. 自定义表函数,实现将id拆分,并输出(word, length)
// 4.1 table API
Split split = new Split("_");

// 需要在环境中注册UDF
tableEnv.registerFunction("split", split);
Table resultTable = sensorTable
        .joinLateral("split(id) as (word, length)")
        .select("id, ts, word, length");

// 4.2 SQL
tableEnv.createTemporaryView("sensor", sensorTable);
Table resultSqlTable = tableEnv.sqlQuery("select id, ts, word, length " +
        " from sensor, lateral table(split(id)) as splitid(word, length)"); 
        //as splitid(word, length) 是在给生成的表和其中的列定别名

// 打印输出
tableEnv.toAppendStream(resultTable, Row.class).print("result");
tableEnv.toAppendStream(resultSqlTable, Row.class).print("sql");
// 实现自定义TableFunction
public static class Split extends TableFunction<Tuple2<String, Integer>>{
    // 定义属性,分隔符
    private String separator = ",";

    public Split(String separator) {
        this.separator = separator;
    }

    // 必须实现一个eval方法,没有返回值
    public void eval( String str ){
        for( String s: str.split(separator) ){
            collect(new Tuple2<>(s, s.length()));
        }
    }
}

注意到这里的TableFunction内部需要泛型,也就是输出的类型,而且eval函数没有返回值,使用Collecter的collect方法将返回值收集。


.joinLateral()

JOIN的右边不是一个实际的物理表,而是一个VIEW或者Table-valued Funciton,可以理解为自定义表函数生成了一张新的表,然后原表与新表进行join后,才能被select查询出来。



聚合函数(Aggregate Functions)
  • 用户自定义聚合函数(User-Defined Aggregate Functions,UDAGGs) 可以把一个表中的数据,聚合成一个标量值
  • 用户定义的聚合函数,是通过继承

    AggregateFunction

    抽象类实现的

在这里插入图片描述

图中为求最大单价的例子,每个分组聚合后是一个值

  • AggregationFunction要求必须实现的方法:

    • createAccumulator()
    • accumulate()
    • getValue()
  • AggregateFunction 的工作原理如下:

    • 首先,它需要一个累加器(Accumulator),用来保存聚合中间结果的数据结构; 可以通过调用 createAccumulator() 方法创建空累加器
    • 随后,对每个输入行调用函数的 accumulate() 方法来更新累加器
    • 处理完所有行后,将调用函数的 getValue() 方法来计算并返回最终结果


示例

// 3. 将流转换成表
Table sensorTable = tableEnv.fromDataStream(dataStream, "id, timestamp as ts, temperature as temp");

// 4. 自定义聚合函数,求当前传感器的平均温度值
// 4.1 table API
AvgTemp avgTemp = new AvgTemp();

// 需要在环境中注册UDF
tableEnv.registerFunction("avgTemp", avgTemp);
Table resultTable = sensorTable
        .groupBy("id")
        .aggregate( "avgTemp(temp) as avgtemp" )
        .select("id, avgtemp");

// 4.2 SQL
tableEnv.createTemporaryView("sensor", sensorTable);
Table resultSqlTable = tableEnv.sqlQuery("select id, avgTemp(temp) " +
        " from sensor group by id");

// 打印输出
tableEnv.toRetractStream(resultTable, Row.class).print("result");
tableEnv.toRetractStream(resultSqlTable, Row.class).print("sql");
// 实现自定义的AggregateFunction
public static class AvgTemp extends AggregateFunction<Double, Tuple2<Double, Integer>>{
    @Override
    public Double getValue(Tuple2<Double, Integer> accumulator) {
        return accumulator.f0 / accumulator.f1;
    }

    @Override
    public Tuple2<Double, Integer> createAccumulator() {
        return new Tuple2<>(0.0, 0);
    }

    // 必须实现一个accumulate方法,来数据之后更新状态
    public void accumulate( Tuple2<Double, Integer> accumulator, Double temp ){
        accumulator.f0 += temp;
        accumulator.f1 += 1;
    }
}

既然是聚合函数就需要在

Groupby()

以后才能进行操作。它的使用与SQL中或者Table API中提供的原生聚合函数使用方法相同,只是需要提前注册。但是具体的3个函数如何定义还需要多练习。



表聚合函数
  • 用户定义的表聚合函数(User-Defined Table Aggregate Functions, UDTAGGs),可以把一个表中数据,聚合为具有多行和多列的结果表
  • 用户定义表聚合函数,是通过继承

    TableAggregateFunction

    抽象类来实现的

聚合后生成的是多行数据,也就是聚合的结果还是一张表,多个值。

在这里插入图片描述

上图为按照当前的价格高低排序,并输出前两高的价格。

  • TableAggregateFunction要求必须实现的方法:

    • createAccumulator()
    • accumulate()
    • emitValue() //与上述表函数的区别
  • TableAggregateFunction 的工作原理如下:

    • 首先,它同样需要一个累加器(Accumulator),它是保存聚合中间结果的数据结构。通过调用 createAccumulator() 方法可以创建空累加器。
    • 随后,对每个输入行调用函数的 accumulate() 方法来更新累加器。
    • 处理完所有行后,将调用函数的 emitValue() 方法来计算并返回最终结果。


示例


StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
// Stream转化成table
Table table = tEnv.fromDataStream(dataStream,"id, timestamp as ts, temperature as temp");
//注册表聚合函数
tEnv.registerFunction("top2",new Top2());
// 分组并聚合
Table resultTable = table
         .groupBy("id")
         .flatAggregate("top2(temp) as (v,rank)")
         .select("id,v,rank");
 // 结果集转化为撤回流
 tEnv.toRetractStream(resultTable, Row.class).print("table api");
public static class Top2Accum {
        public Double first;
        public Double second;
    }

    /**
     * The top2 user-defined table aggregate function.
     */
    public static class Top2 extends TableAggregateFunction<Tuple2<Double, Integer>, Top2Accum> {

        @Override
        public Top2Accum createAccumulator() {
            Top2Accum acc = new Top2Accum();
            acc.first = Double.MIN_VALUE;
            acc.second = Double.MIN_VALUE;
            return acc;
        }


        public void accumulate(Top2Accum acc, Double v) {
            if (v > acc.first) {
                acc.second = acc.first;
                acc.first = v;
            } else if (v > acc.second) {
                acc.second = v;
            }
        }

        public void merge(Top2Accum acc, Iterable<Top2Accum> iterable) {
            for (Top2Accum otherAcc : iterable) {
                accumulate(acc, otherAcc.first);
                accumulate(acc, otherAcc.second);
            }
        }

        public void emitValue(Top2Accum acc, Collector<Tuple2<Double, Integer>> out) {
            // emit the value and rank
            if (acc.first != Integer.MIN_VALUE) {
                out.collect(Tuple2.of(acc.first, 1));
            }
            if (acc.second != Integer.MIN_VALUE) {
                out.collect(Tuple2.of(acc.second, 2));
            }
        }
    }



总结

这部分的操作可以结合Flink文档里来查看,就像对于UDF的注册,可以通过实例化一个对象来注册,同样也可直接用UDF类来进行注册。而且对于不同的Flink版本来说,所支持的api会发生不少变化,我们重在理解Flink中Table API和Flink SQL,新版本也是对以前旧版本的完善和维护,所以看相关文档中的示例我们也可以很快进行代码迁移。



参考资料


Flink1.12 Table API文档(内含示例)



版权声明:本文为qq_18506419原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。