【Flink1.14实战】Docker环境 DataStream jdbc sink

  • Post author:
  • Post category:其他




JDBC Connector



jdbc sink

该连接器可以向 JDBC 数据库写入数据。

添加下面的依赖以便使用该连接器(同时添加 JDBC 驱动):

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-jdbc_2.11</artifactId>
    <version>1.14.4</version>
</dependency>

<dependency>
	<groupId>mysql</groupId>
	<artifactId>mysql-connector-java</artifactId>
	<version>8.0.19</version>
</dependency>

已创建的 JDBC Sink 能够保证至少一次的语义。 更有效的精确执行一次可以通过 upsert 语句或幂等更新实现。

用法示例:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env
        .fromElements(...)
        .addSink(JdbcSink.sink(
                "insert into books (id, title, author, price, qty) values (?,?,?,?,?)",
                (ps, t) -> {
                    ps.setInt(1, t.id);
                    ps.setString(2, t.title);
                    ps.setString(3, t.author);
                    ps.setDouble(4, t.price);
                    ps.setInt(5, t.qty);
                },
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                        .withUrl(getDbMetadata().getUrl())
                        .withDriverName(getDbMetadata().getDriverClass())
                        .build()));
env.execute();


实战

1、编辑 docker-compose.yml

version: "2.1"
services:
  jobmanager:
    image: flink:1.14.4-scala_2.11
    ports:
      - "8081:8081"
    command: jobmanager
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanager        

  taskmanager:
    image: flink:1.14.4-scala_2.11
    depends_on:
      - jobmanager
    command: taskmanager
    scale: 1
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanager
        taskmanager.numberOfTaskSlots: 4   
  mysql:
    image: mysql:5.7
    ports:
      - "3306:3306"
    volumes:
      - ./data:/var/lib/mysql
      - ./mysql-init:/docker-entrypoint-initdb.d
    command: [
          'mysqld',
          '--innodb-buffer-pool-size=80M',
          '--character-set-server=utf8mb4',
          '--collation-server=utf8mb4_unicode_ci',
          '--default-time-zone=+8:00',
          '--lower-case-table-names=1',
          '--skip-name-resolve'
        ]
    environment:
      MYSQL_USER: "sql-demo"
      MYSQL_PASSWORD: "demo-sql"
      MYSQL_DATABASE: "sql-demo"
      MYSQL_RANDOM_ROOT_PASSWORD: "yes"

创建文件夹mysql-init, create-table.sql

CREATE TABLE book (
	id INT NOT NULL,
	title varchar(30),
	author varchar(30),
	price	   INT,
	PRIMARY KEY (id)
);

2、启动服务

$ docker-compose up -d

3、案例代码

book

package quick.jdbc;

public class Book {

    private  int id;
    private String title;
    private String author;
    private int price;

    public Book(int id, String title, String author, int price) {
        this.id = id;
        this.title = title;
        this.author = author;
        this.price = price;
    }

    @Override
    public String toString() {
        return "Book{" +
                "id=" + id +
                ", title='" + title + '\'' +
                ", author='" + author + '\'' +
                ", price=" + price +
                '}';
    }

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public String getTitle() {
        return title;
    }

    public void setTitle(String title) {
        this.title = title;
    }

    public String getAuthor() {
        return author;
    }

    public void setAuthor(String author) {
        this.author = author;
    }

    public int getPrice() {
        return price;
    }

    public void setPrice(int price) {
        this.price = price;
    }
}

job

package quick.jdbc;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.util.ArrayList;
import java.util.List;

public class JdbcSinkExample {

    public static void main(String[] args) throws Exception {

        final ParameterTool params = ParameterTool.fromArgs(args);
        String url = params.get("url","jdbc:mysql://mysql:3306/sql-demo?autoReconnect=true&useSSL=true");
        String driver = params.get("driver","com.mysql.cj.jdbc.Driver");
        String username = params.get("username","sql-demo");
        String password = params.get("password","demo-sql");


        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        List<Book> list=new ArrayList<>();
        for(int i=0;i<10;i++){
            Book b=new Book(i,"title"+i,"author"+i,i+50);
            list.add(b);
        }

        DataStream<Book> dataStream= env
                .fromElements(list)
                .flatMap(new FlatMapFunction< List<Book>, Book>() {
                    @Override
                    public void flatMap(List<Book> value, Collector<Book> out)
                            throws Exception {
                        for(Book book: value){
                            out.collect(book);
                        }
                    }
                });


        dataStream.addSink(JdbcSink.sink(
                "insert into book (id, title, author, price) values (?,?,?,?)",
                (ps, t) -> {
                    ps.setInt(1, t.getId());
                    ps.setString(2, t.getTitle());
                    ps.setString(3, t.getAuthor());
                    ps.setInt(4, t.getPrice());
                },
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                        .withUrl(url)
                        .withDriverName(driver)
                        .withUsername(username)
                        .withPassword(password)
                        .build()));

        env.execute("jdbcsink job");
    }


}


4、运行

然后,将打包应用程序提交,Flink 的

Web UI

来提交作业监控集群的状态和正在运行的作业。

从文件夹中启动 docker-compose 脚本。

$ docker-compose up -d


您可以通过Flink 控制台

查看有关正在运行的作业的信息。

从 MySQL 内部探索结果。

$ docker-compose exec mysql mysql -Dsql-demo -usql-demo -pdemo-sql

mysql> use sql-demo;
Database changed

mysql> select count(*) from book;
+----------+
| count(*) |
+----------+
|      10 |
+----------+



版权声明:本文为qq_15604349原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。