flink从数据库读数据

  • Post author:
  • Post category:其他


package com.example.flink;

import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;

/**
 * @Author ex-liujiwei
 * @Date 2022/3/11 23:20
 */
public class Demo01 {
    public static void main(String[] args) throws Exception {
        //sql查询结果列类型
        TypeInformation[] fieldTypes = new TypeInformation[] {
                BasicTypeInfo.STRING_TYPE_INFO,
                BasicTypeInfo.STRING_TYPE_INFO,
                BasicTypeInfo.STRING_TYPE_INFO,
                BasicTypeInfo.STRING_TYPE_INFO
        };

        RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes);
        JDBCInputFormat jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
                //数据库连接信息
                .setDrivername("com.mysql.jdbc.Driver")
                .setDBUrl("jdbc:mysql://10.20.16.15:5102/bsmp?useSSL=false&useUnicode=true&characterEncoding=UTF-8")
                .setUsername("bsmpopr")
                .setPassword("Jcfgdasdf4#")
                .setQuery("SELECT user_id,name,phone_num,email FROM sys_user_info")
                .setRowTypeInfo(rowTypeInfo)
                .finish();

        //搭建flink
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        //获取数据源
        //datasource
        DataSource s = env.createInput(jdbcInputFormat);
        s.print();
        env.execute();
    }
}

pom

 <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.10.0</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>1.10.0</version>
            <!-- <scope>provided</scope> -->
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-jdbc_2.11</artifactId>
            <version>1.8.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-scala-bridge_2.12</artifactId>
            <version>1.9.0</version>
        </dependency>



版权声明:本文为weixin_46054799原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。