JDBC Connector
jdbc sink
该连接器可以向 JDBC 数据库写入数据。
添加下面的依赖以便使用该连接器(同时添加 JDBC 驱动):
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.11</artifactId>
<version>1.14.4</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.19</version>
</dependency>
已创建的 JDBC Sink 能够保证至少一次的语义。 更有效的精确执行一次可以通过 upsert 语句或幂等更新实现。
用法示例:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env
.fromElements(...)
.addSink(JdbcSink.sink(
"insert into books (id, title, author, price, qty) values (?,?,?,?,?)",
(ps, t) -> {
ps.setInt(1, t.id);
ps.setString(2, t.title);
ps.setString(3, t.author);
ps.setDouble(4, t.price);
ps.setInt(5, t.qty);
},
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl(getDbMetadata().getUrl())
.withDriverName(getDbMetadata().getDriverClass())
.build()));
env.execute();
实战
1、编辑 docker-compose.yml
version: "2.1"
services:
jobmanager:
image: flink:1.14.4-scala_2.11
ports:
- "8081:8081"
command: jobmanager
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
taskmanager:
image: flink:1.14.4-scala_2.11
depends_on:
- jobmanager
command: taskmanager
scale: 1
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
taskmanager.numberOfTaskSlots: 4
mysql:
image: mysql:5.7
ports:
- "3306:3306"
volumes:
- ./data:/var/lib/mysql
- ./mysql-init:/docker-entrypoint-initdb.d
command: [
'mysqld',
'--innodb-buffer-pool-size=80M',
'--character-set-server=utf8mb4',
'--collation-server=utf8mb4_unicode_ci',
'--default-time-zone=+8:00',
'--lower-case-table-names=1',
'--skip-name-resolve'
]
environment:
MYSQL_USER: "sql-demo"
MYSQL_PASSWORD: "demo-sql"
MYSQL_DATABASE: "sql-demo"
MYSQL_RANDOM_ROOT_PASSWORD: "yes"
创建文件夹mysql-init, create-table.sql
CREATE TABLE book (
id INT NOT NULL,
title varchar(30),
author varchar(30),
price INT,
PRIMARY KEY (id)
);
2、启动服务
$ docker-compose up -d
3、案例代码
book
package quick.jdbc;
public class Book {
private int id;
private String title;
private String author;
private int price;
public Book(int id, String title, String author, int price) {
this.id = id;
this.title = title;
this.author = author;
this.price = price;
}
@Override
public String toString() {
return "Book{" +
"id=" + id +
", title='" + title + '\'' +
", author='" + author + '\'' +
", price=" + price +
'}';
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getTitle() {
return title;
}
public void setTitle(String title) {
this.title = title;
}
public String getAuthor() {
return author;
}
public void setAuthor(String author) {
this.author = author;
}
public int getPrice() {
return price;
}
public void setPrice(int price) {
this.price = price;
}
}
job
package quick.jdbc;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.util.ArrayList;
import java.util.List;
public class JdbcSinkExample {
public static void main(String[] args) throws Exception {
final ParameterTool params = ParameterTool.fromArgs(args);
String url = params.get("url","jdbc:mysql://mysql:3306/sql-demo?autoReconnect=true&useSSL=true");
String driver = params.get("driver","com.mysql.cj.jdbc.Driver");
String username = params.get("username","sql-demo");
String password = params.get("password","demo-sql");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
List<Book> list=new ArrayList<>();
for(int i=0;i<10;i++){
Book b=new Book(i,"title"+i,"author"+i,i+50);
list.add(b);
}
DataStream<Book> dataStream= env
.fromElements(list)
.flatMap(new FlatMapFunction< List<Book>, Book>() {
@Override
public void flatMap(List<Book> value, Collector<Book> out)
throws Exception {
for(Book book: value){
out.collect(book);
}
}
});
dataStream.addSink(JdbcSink.sink(
"insert into book (id, title, author, price) values (?,?,?,?)",
(ps, t) -> {
ps.setInt(1, t.getId());
ps.setString(2, t.getTitle());
ps.setString(3, t.getAuthor());
ps.setInt(4, t.getPrice());
},
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl(url)
.withDriverName(driver)
.withUsername(username)
.withPassword(password)
.build()));
env.execute("jdbcsink job");
}
}
4、运行
然后,将打包应用程序提交,Flink 的
Web UI
来提交作业监控集群的状态和正在运行的作业。
从文件夹中启动 docker-compose 脚本。
$ docker-compose up -d
您可以通过Flink 控制台
查看有关正在运行的作业的信息。
从 MySQL 内部探索结果。
$ docker-compose exec mysql mysql -Dsql-demo -usql-demo -pdemo-sql
mysql> use sql-demo;
Database changed
mysql> select count(*) from book;
+----------+
| count(*) |
+----------+
| 10 |
+----------+
版权声明:本文为qq_15604349原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。