环境
测试从kafka获取数据,直接在控制台打印结果
组件 | 版本 |
---|---|
scala | 2.12 |
kafka | * |
flink | 1.13.3 |
解析
创建环境
val settings = EnvironmentSettings.newInstance().inStreamingMode().build()
val tEnv = TableEnvironment.create(settings)
创建表
tEnv.executeSql(
"""
|create table student (
|id int,
|name string,
|sex string
|)with(
| 'connector' = 'kafka',
| 'topic' = 'test-topic',
| 'properties.bootstrap.servers' = 'server120:9092',
| 'properties.group.id' = 'testGroup',
| 'scan.startup.mode' = 'latest-offset',
| 'format' = 'csv'
|)
|""".stripMargin)
执行group by 并打印
tEnv.from("student")
.groupBy('sex)
.select(
'sex,
'name.count as "name_cnt",
'sex.count as "sex_cnt")
.execute()
.print()
}
完整代码
package com.z.demo
import org.apache.flink.table.api._
/**
* @Author wenz.ma
* @Date 2021/10/27 13:47
* @Desc table api 中 groupBy的使用
*/
object TableApiGroupBy01 {
def main(args: Array[String]): Unit = {
val settings = EnvironmentSettings.newInstance().inStreamingMode().build()
val tEnv = TableEnvironment.create(settings)
tEnv.executeSql(
"""
|create table student (
|id int,
|name string,
|sex string
|)with(
| 'connector' = 'kafka',
| 'topic' = 'test-topic',
| 'properties.bootstrap.servers' = 'server120:9092',
| 'properties.group.id' = 'testGroup',
| 'scan.startup.mode' = 'latest-offset',
| 'format' = 'csv'
|)
|""".stripMargin)
tEnv.from("student")
.groupBy('sex)
.select(
'sex,
'name.count as "name_cnt",
'sex.count as "sex_cnt")
.execute()
.print()
}
}
依赖
flink table 依赖
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
flink 、scala 依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
源码下载
版权声明:本文为sinat_25528181原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。