JDBC Connector
   
    
    
    jdbc sink
   
该连接器可以向 JDBC 数据库写入数据。
添加下面的依赖以便使用该连接器(同时添加 JDBC 驱动):
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-jdbc_2.11</artifactId>
    <version>1.14.4</version>
</dependency>
<dependency>
	<groupId>mysql</groupId>
	<artifactId>mysql-connector-java</artifactId>
	<version>8.0.19</version>
</dependency>
已创建的 JDBC Sink 能够保证至少一次的语义。 更有效的精确执行一次可以通过 upsert 语句或幂等更新实现。
用法示例:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env
        .fromElements(...)
        .addSink(JdbcSink.sink(
                "insert into books (id, title, author, price, qty) values (?,?,?,?,?)",
                (ps, t) -> {
                    ps.setInt(1, t.id);
                    ps.setString(2, t.title);
                    ps.setString(3, t.author);
                    ps.setDouble(4, t.price);
                    ps.setInt(5, t.qty);
                },
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                        .withUrl(getDbMetadata().getUrl())
                        .withDriverName(getDbMetadata().getDriverClass())
                        .build()));
env.execute();
    
     实战
    
   
1、编辑 docker-compose.yml
version: "2.1"
services:
  jobmanager:
    image: flink:1.14.4-scala_2.11
    ports:
      - "8081:8081"
    command: jobmanager
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanager        
  taskmanager:
    image: flink:1.14.4-scala_2.11
    depends_on:
      - jobmanager
    command: taskmanager
    scale: 1
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanager
        taskmanager.numberOfTaskSlots: 4   
  mysql:
    image: mysql:5.7
    ports:
      - "3306:3306"
    volumes:
      - ./data:/var/lib/mysql
      - ./mysql-init:/docker-entrypoint-initdb.d
    command: [
          'mysqld',
          '--innodb-buffer-pool-size=80M',
          '--character-set-server=utf8mb4',
          '--collation-server=utf8mb4_unicode_ci',
          '--default-time-zone=+8:00',
          '--lower-case-table-names=1',
          '--skip-name-resolve'
        ]
    environment:
      MYSQL_USER: "sql-demo"
      MYSQL_PASSWORD: "demo-sql"
      MYSQL_DATABASE: "sql-demo"
      MYSQL_RANDOM_ROOT_PASSWORD: "yes"
创建文件夹mysql-init, create-table.sql
CREATE TABLE book (
	id INT NOT NULL,
	title varchar(30),
	author varchar(30),
	price	   INT,
	PRIMARY KEY (id)
);
2、启动服务
$ docker-compose up -d
3、案例代码
book
package quick.jdbc;
public class Book {
    private  int id;
    private String title;
    private String author;
    private int price;
    public Book(int id, String title, String author, int price) {
        this.id = id;
        this.title = title;
        this.author = author;
        this.price = price;
    }
    @Override
    public String toString() {
        return "Book{" +
                "id=" + id +
                ", title='" + title + '\'' +
                ", author='" + author + '\'' +
                ", price=" + price +
                '}';
    }
    public int getId() {
        return id;
    }
    public void setId(int id) {
        this.id = id;
    }
    public String getTitle() {
        return title;
    }
    public void setTitle(String title) {
        this.title = title;
    }
    public String getAuthor() {
        return author;
    }
    public void setAuthor(String author) {
        this.author = author;
    }
    public int getPrice() {
        return price;
    }
    public void setPrice(int price) {
        this.price = price;
    }
}
job
package quick.jdbc;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.util.ArrayList;
import java.util.List;
public class JdbcSinkExample {
    public static void main(String[] args) throws Exception {
        final ParameterTool params = ParameterTool.fromArgs(args);
        String url = params.get("url","jdbc:mysql://mysql:3306/sql-demo?autoReconnect=true&useSSL=true");
        String driver = params.get("driver","com.mysql.cj.jdbc.Driver");
        String username = params.get("username","sql-demo");
        String password = params.get("password","demo-sql");
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        List<Book> list=new ArrayList<>();
        for(int i=0;i<10;i++){
            Book b=new Book(i,"title"+i,"author"+i,i+50);
            list.add(b);
        }
        DataStream<Book> dataStream= env
                .fromElements(list)
                .flatMap(new FlatMapFunction< List<Book>, Book>() {
                    @Override
                    public void flatMap(List<Book> value, Collector<Book> out)
                            throws Exception {
                        for(Book book: value){
                            out.collect(book);
                        }
                    }
                });
        dataStream.addSink(JdbcSink.sink(
                "insert into book (id, title, author, price) values (?,?,?,?)",
                (ps, t) -> {
                    ps.setInt(1, t.getId());
                    ps.setString(2, t.getTitle());
                    ps.setString(3, t.getAuthor());
                    ps.setInt(4, t.getPrice());
                },
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                        .withUrl(url)
                        .withDriverName(driver)
                        .withUsername(username)
                        .withPassword(password)
                        .build()));
        env.execute("jdbcsink job");
    }
}
    
     4、运行
    
   
    然后,将打包应用程序提交,Flink 的
    
     Web UI
    
    来提交作业监控集群的状态和正在运行的作业。
   
从文件夹中启动 docker-compose 脚本。
$ docker-compose up -d
    
     您可以通过Flink 控制台
    
    查看有关正在运行的作业的信息。
   
从 MySQL 内部探索结果。
$ docker-compose exec mysql mysql -Dsql-demo -usql-demo -pdemo-sql
mysql> use sql-demo;
Database changed
mysql> select count(*) from book;
+----------+
| count(*) |
+----------+
|      10 |
+----------+
 
版权声明:本文为qq_15604349原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
