官方介绍
Flink 中的 API
Flink 为流式 / 批式处理应用程序的开发提供了不同级别的抽象。
- Flink API 最底层的抽象为有状态实时流处理。其抽象实现是 Process Function,并且 Process Function 被 Flink 框架集成到了 DataStream API 中来为我们使用。它允许用户在应用程序中自由地处理来自单流或多流的事件(数据),并提供具有全局一致性和容错保障的状态。此外,用户可以在此层抽象中注册事件时间(event time)和处理时间(processing time)回调方法,从而允许程序可以实现复杂计算。
- Flink API 第二层抽象是 Core APIs。实际上,许多应用程序不需要使用到上述最底层抽象的 API,而是可以使用 Core APIs 进行编程:其中包含 DataStream API(应用于有界 / 无界数据流场景)和 DataSet API(应用于有界数据集场景)两部分。Core APIs 提供的流式 API(Fluent API)为数据处理提供了通用的模块组件,例如各种形式的用户自定义转换(transformations)、联接(joins)、聚合(aggregations)、窗口(windows)和状态(state)操作等。此层 API 中处理的数据类型在每种编程语言中都有其对应的类。
Process Function 这类底层抽象和 DataStream API 的相互集成使得用户可以选择使用更底层的抽象 API 来实现自己的需求。DataSet API 还额外提供了一些原语,比如循环 / 迭代(loop/iteration)操作。
- Flink API 第三层抽象是 Table API。Table API 是以表(Table)为中心的声明式编程(DSL)API,例如在流式数据场景下,它可以表示一张正在动态改变的表。Table API 遵循(扩展)关系模型:即表拥有 schema(类似于关系型数据库中的 schema),并且 Table API 也提供了类似于关系模型中的操作,比如 select、project、join、group-by 和 aggregate 等。Table API 程序是以声明的方式定义应执行的逻辑操作,而不是确切地指定程序应该执行的代码。尽管 Table API 使用起来很简洁并且可以由各种类型的用户自定义函数扩展功能,但还是比 Core API 的表达能力差。此外,Table API 程序在执行之前还会使用优化器中的优化规则对用户编写的表达式进行优化。
表和 DataStream/DataSet 可以进行无缝切换,Flink 允许用户在编写应用程序时将 Table API 与 DataStream/DataSet API 混合使用。
- Flink API 最顶层抽象是 SQL。这层抽象在语义和程序表达式上都类似于 Table API,但是其程序实现都是 SQL 查询表达式。SQL 抽象与 Table API 抽象之间的关联是非常紧密的,并且 SQL 查询语句可以在 Table API 中定义的表上执行。
DataStream/DateSet API
Flink 中的 DataStream 和 DataSet 程序是常规程序,可对数据流实施转换(例如,过滤,更新状态,定义窗口,聚合)。最初从各种来源(例如,消息队列,套接字流,文件)创建数据流。结果通过接收器返回,接收器可以例如将数据写入文件或标准输出(例如命令行终端)。Flink 程序可在各种上下文中运行,独立运行或嵌入其他程序中。执行可以在本地 JVM 或许多计算机的群集中进行。
预定义的 Source 和 Sink
一些比较基本的 Source 和 Sink 已经内置在 Flink 里。 预定义 data sources 支持从文件、目录、socket,以及 collections 和 iterators 中读取数据。 预定义 data sinks 支持把数据写入文件、标准输出(stdout)、标准错误输出(stderr)和 socket。
官方文档
https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/connectors/
DataStream/DateSet API 开发
从本篇开始,增加 DataStream/DateSet API 演示内容,在原有的工程基础上,扩展一个 connectors 模块;此模块会演示以下几个组件简单使用;
-
elasticsearch
-
file(text, csv)
-
kafka
-
jdbc (mysql)
-
rabbitmq
-
redis
新增 connectors 模块
在当前工程中,创建名称为 connectors 的 maven 工程模块
pom.xml
<artifactId>connectors</artifactId>
<dependencies>
<!-- Flink jdbc依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-jdbc_2.11</artifactId>
<version>1.10.1</version>
</dependency>
<!-- mysql驱动包 -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.47</version>
</dependency>
<!-- kafka依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- redis依赖 -->
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
</dependency>
<!-- rabbitMq依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-rabbitmq_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- elasticsearch6依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch6_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
刷新工程 maven,下载相关功能依赖组件包;
创建用户表(演示使用)
-- 数所据库 flink 下创建用户表
CREATE TABLE `t_user` (
`id` int(8) NOT NULL AUTO_INCREMENT,
`name` varchar(40) DEFAULT NULL,
`age` int(3) DEFAULT NULL,
`sex` int(2) DEFAULT NULL,
`address` varchar(40) DEFAULT NULL,
`createTime` timestamp NULL DEFAULT NULL,
`createTimeSeries` bigint(20) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
创建实体 Bean(演示使用)
TUser.java
package com.flink.examples;
/**
* @Description t_user表数据封装类
*/
public class TUser {
private Integer id;
private String name;
private Integer age;
private Integer sex;
private String address;
private Long createTimeSeries;
public TUser(){}
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Integer getAge() {
return age;
}
public void setAge(Integer age) {
this.age = age;
}
public Integer getSex() {
return sex;
}
public void setSex(Integer sex) {
this.sex = sex;
}
public String getAddress() {
return address;
}
public void setAddress(String address) {
this.address = address;
}
public Long getCreateTimeSeries() {
return createTimeSeries;
}
public void setCreateTimeSeries(Long createTimeSeries) {
this.createTimeSeries = createTimeSeries;
}
@Override
public String toString() {
return "TUser{" +
"id=" + id +
", name='" + name + '\'' +
", age=" + age +
", sex=" + sex +
", address='" + address + '\'' +
", createTimeSeries=" + createTimeSeries +
'}';
}
}
TCount.java
package com.flink.examples;
/**
* @Description 统计表
*/
public class TCount {
/**
* 性别
*/
private Integer sex;
/**
* 数量
*/
private Integer num;
public TCount(){}
public TCount(Integer sex, Integer num){
this.sex = sex;
this.num = num;
}
public Integer getSex() {
return sex;
}
public void setSex(Integer sex) {
this.sex = sex;
}
public Integer getNum() {
return num;
}
public void setNum(Integer num) {
this.num = num;
}
}
工程模块
后续关于 DataStream/DateSet API 演示示例均在此 connectors 模块下进行基础上开发;
源码下载
Gitee:
flink-examples: 基于flink.1.11.1版本的工程示例,此示例包含大部份算子、窗口、中间件连接器、tables&sql的用法,适合新人学习使用;