Flink基础系列23-Sink之JDBC

  • Post author:
  • Post category:其他




一.Sink之JDBC概述

Flink的Sink支持的数据库:

image.png

Bahir中支持的数据库:

image.png

从上两图可以看到,Flink的Sink并支持类似MySQL的这种关系型数据库,那么如果我需要通过Flink连接MySQL,该如何操作呢?

这个时候我们可以使用Flink Sink的JDBC连接。



二.pom文件配置

此处,我本地MySQL版本是 8.0.19

<!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>8.0.19</version>
</dependency>



三.MySQL配置

新建数据库及表

CREATE DATABASE `flink_test` DEFAULT CHARACTER SET utf8 COLLATE utf8_general_ci;

CREATE TABLE `sensor_temp` (
  `id` varchar(32) NOT NULL,
  `temp` double NOT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8;



四.编写Java代码

package org.flink.sink;

import org.flink.beans.SensorReading;
import org.example.SourceTest4_UDF;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;

/**
 * @author 只是甲
 * @date   2021-09-15
 * @remark Sink之JDBC
 */

public class SinkTest4_Jdbc {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 从文件读取数据
//        DataStream<String> inputStream = env.readTextFile("D:\\Projects\\BigData\\FlinkTutorial\\src\\main\\resources\\sensor.txt");
//
//        // 转换成SensorReading类型
//        DataStream<SensorReading> dataStream = inputStream.map(line -> {
//            String[] fields = line.split(",");
//            return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
//        });

        DataStream<SensorReading> dataStream = env.addSource(new SourceTest4_UDF.MySensorSource());

        dataStream.addSink(new MyJdbcSink());

        env.execute();
    }

    // 实现自定义的SinkFunction
    public static class MyJdbcSink extends RichSinkFunction<SensorReading> {
        // 声明连接和预编译语句
        Connection connection = null;
        PreparedStatement insertStmt = null;
        PreparedStatement updateStmt = null;

        @Override
        public void open(Configuration parameters) throws Exception {
            connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/flink_test?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai", "root", "123456");
            insertStmt = connection.prepareStatement("insert into sensor_temp (id, temp) values (?, ?)");
            updateStmt = connection.prepareStatement("update sensor_temp set temp = ? where id = ?");
        }

        // 每来一条数据,调用连接,执行sql
        @Override
        public void invoke(SensorReading value, Context context) throws Exception {
            // 直接执行更新语句,如果没有更新那么就插入
            updateStmt.setDouble(1, value.getTemperature());
            updateStmt.setString(2, value.getId());
            updateStmt.execute();
            if( updateStmt.getUpdateCount() == 0 ){
                insertStmt.setString(1, value.getId());
                insertStmt.setDouble(2, value.getTemperature());
                insertStmt.execute();
            }
        }

        @Override
        public void close() throws Exception {
            insertStmt.close();
            updateStmt.close();
            connection.close();
        }
    }
}



五.运行Flink程序查看数据

mysql> select * from sensor_temp;
+-----------+-------------------+
| id        | temp              |
+-----------+-------------------+
| sensor_3  | 65.31089123002162 |
| sensor_10 | 20.23454807781744 |
| sensor_4  | 79.87349739590283 |
| sensor_1  | 68.79742249825429 |
| sensor_2  |  44.1766638371653 |
| sensor_7  | 99.47000620947128 |
| sensor_8  |  68.7360059804266 |
| sensor_5  |  69.9135258264366 |
| sensor_6  | 38.85722751176939 |
| sensor_9  | 69.97758295030204 |
+-----------+-------------------+
10 rows in set (0.00 sec)

mysql>



参考:

  1. https://www.bilibili.com/video/BV1qy4y1q728
  2. https://ashiamd.github.io/docsify-notes/#/study/BigData/Flink/%E5%B0%9A%E7%A1%85%E8%B0%B7Flink%E5%85%A5%E9%97%A8%E5%88%B0%E5%AE%9E%E6%88%98-%E5%AD%A6%E4%B9%A0%E7%AC%94%E8%AE%B0?id=_521-%e4%bb%8e%e9%9b%86%e5%90%88%e8%af%bb%e5%8f%96%e6%95%b0%e6%8d%ae



版权声明:本文为u010520724原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。