Flink Table中groupby的使用

  • Post author:
  • Post category:其他




环境

测试从kafka获取数据,直接在控制台打印结果

组件 版本
scala 2.12
kafka *
flink 1.13.3



解析

创建环境
val settings = EnvironmentSettings.newInstance().inStreamingMode().build()
val tEnv = TableEnvironment.create(settings)
创建表
 tEnv.executeSql(
      """
        |create table student (
        |id int,
        |name string,
        |sex string
        |)with(
        | 'connector' = 'kafka',
        | 'topic' = 'test-topic',
        | 'properties.bootstrap.servers' = 'server120:9092',
        | 'properties.group.id' = 'testGroup',
        | 'scan.startup.mode' = 'latest-offset',
        | 'format' = 'csv'
        |)
        |""".stripMargin)
执行group by 并打印
    tEnv.from("student")
      .groupBy('sex)
      .select(
        'sex,
        'name.count as "name_cnt",
        'sex.count as "sex_cnt")
      .execute()
      .print()
  }



完整代码

package com.z.demo

import org.apache.flink.table.api._

/**
 * @Author wenz.ma
 * @Date 2021/10/27 13:47
 * @Desc table api 中 groupBy的使用
 */
object TableApiGroupBy01 {
  def main(args: Array[String]): Unit = {
    val settings = EnvironmentSettings.newInstance().inStreamingMode().build()
    val tEnv = TableEnvironment.create(settings)

    tEnv.executeSql(
      """
        |create table student (
        |id int,
        |name string,
        |sex string
        |)with(
        | 'connector' = 'kafka',
        | 'topic' = 'test-topic',
        | 'properties.bootstrap.servers' = 'server120:9092',
        | 'properties.group.id' = 'testGroup',
        | 'scan.startup.mode' = 'latest-offset',
        | 'format' = 'csv'
        |)
        |""".stripMargin)

    tEnv.from("student")
      .groupBy('sex)
      .select(
        'sex,
        'name.count as "name_cnt",
        'sex.count as "sex_cnt")
      .execute()
      .print()
  }
}



依赖


flink table 依赖

<dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-scala-bridge_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-csv</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>


flink 、scala 依赖

 <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>



源码下载


https://download.csdn.net/download/sinat_25528181/44038825


在这里插入图片描述



版权声明:本文为sinat_25528181原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。