MySQL如何流式读取千万级大数据(数据分析)

  • Post author:
  • Post category:mysql

随着系统的运作,业务数据的流入,我们系统中的信息也越来越多,在数据量达到百万甚至千万级以上时,我们如何从数据库中拿去到这些数据,进行数据分析,这是一个比较麻烦的难题。有人会说我不停的去分页查询,是否也可以拿到我想要的数据呢?是可以的,但是这样的效率和资源的消耗也是很大,我们想达到的效果其实是以流式读取的方式获取到资源数据,那什么是流式读取呢,下面我为大家结合常用的查询形式与流式的区别,更好的理解流式读取的形式到底是什么样子的。
本章主要以MySQL数据库为例。

我们先介绍下JDBC

JDBC可以理解成一种规范

正如上面说的,不同数据库厂商提供不同驱动包。
我们程序是可以操作不同的数据库的,而数据库的种类又不一样,所以提供了一种规范(JDBC),由不同的数据库厂商去对其进行实现,然后提供出不同数据库的数据库驱动。
在这里插入图片描述

JDBC的开发流程

  1. 注册驱动 Class.forName
  2. 链接数据库connection
  3. SQL书写
  4. 创建statement
  5. 执行SQL
  6. 获取结果集ResultSet
  7. 关闭链接-先开后关原则

在说流式读取之前,我们先思考下,平时我们在做在执行一个sql查询时,应用程序与数据库之间到底发生了什么?

在这里插入图片描述

一、我们我们要执行数据库查询,首先我们程序的应用层会去加载JDBC(Java Data Base Connection)驱动,不同数据库厂商提供不同驱动包,就比如我们要链接的是MySQL数据库,那么加载的就是MySQL驱动包,并调用JDBC一系列接口。
JDBC驱动能做什么呢?
1)JDBC驱动会去向MySQL服务钱建立TCP链接,拿到connection链接。
2)创建statement对象(用于执行静态 SQL 语句并返回它产生的结果的对象)用于执行SQL语句。
3)执行SQL(DML语句或DQL语句)
4)获取结果集ResultSet

二、MySQL驱动与MySQL服务创建连接后,发起条件查询,MySQL服务会根据条件匹配数据,将匹配数据放入发送缓存(send buffer),在发送缓存满后进行数据发送。

三、MySQL驱动会将接收到的匹配数据拿出,放入驱动的本地缓存内,但是会等MySQL服务查询出的所有的匹配数据都拿到后,才会返回给应用层数据 (这里存在阻塞)

四、应用层获取到ResultSet结果集

所以这个流程就会出现,驱动中获取到的匹配数据过多,而导致OOM。

流式查询的流程

在这里插入图片描述

一、我们我们要执行数据库查询,首先我们程序的应用层会去加载JDBC(Java Data Base Connection)驱动,不同数据库厂商提供不同驱动包,就比如我们要链接的是MySQL数据库,那么加载的就是MySQL驱动包,并调用JDBC一系列接口。

二、MySQL驱动与MySQL服务创建连接后,发起条件查询,MySQL服务会根据条件匹配数据,将匹配数据放入发送缓存(send buffer),在发送缓存满后进行数据发送。

三、MySQL驱动会将接收到的匹配数据拿出,放入驱动的本地缓存内,直接返回给应用层数据,持续从链接与buffer中获取MySQL服务所返回的匹配数据,不会等匹配数据都返回后再返回给应用程序。(不阻塞)

从上面两幅图就能很明显看出,我们平时普通的查询流程与流式读取的读取流程的一些区别点,可以有效的避免一些OOM的发生,那么我们如果在项目中如何使用流式读取?接下来直接上代码。

代码实现

首先起一个程序,对其JVM环境配置的小一点。

  • jvm环境
  • -Xms18m -Xmx18m

普通的查询方法代码

/**
 * @author duanxt
 * jvm环境
 * -Xms18m -Xmx18m
 */
@Component
@Slf4j
public class Reader {

    @Autowired
    DataSource dataSource;

    /**
     * 普通默认的读取形式
     *
     * @throws SQLException
     */
    public void read() throws SQLException {

        String sql = "SELECT * from user  LIMIT 10000 ";
        //从数据源拿到链接
        Connection connection = dataSource.getConnection();
        //根据链接创建statement 准备执行sql
        PreparedStatement statement = connection.prepareStatement(sql);
        //在这个地方 会在sql执行后等待所有的结果,并缓存在驱动内存======
        ResultSet resultSet = null;
        try {
            //====================statement执行sql======================
            resultSet = statement.executeQuery();//期间阻塞 不停从mysql中获取
            //=================================================
            while (resultSet.next()) {
                String id = resultSet.getString("id");
                System.out.println(id);
            }
        } catch (Exception e) {
            log.info("读取形式读取报错~");
        } finally {
            //先开后关原则
            if(resultSet!=null){
                resultSet.close();
            }
            statement.close();
            connection.close();
        }


    }

流式查询方法代码

/**
 * @author duanxt
 * jvm环境
 * -Xms18m -Xmx18m
 */
@Component
public class Reader1 {

    @Autowired
    DataSource dataSource;

    /**
     * mysql  流式读取资源
     *
     * @throws SQLException
     */
    public void streamRead() throws SQLException {

        String sql = "SELECT * from user  LIMIT 10000 ";

        //从数据源拿到链接
        Connection connection = dataSource.getConnection();

        //根据链接创建statement 进行 sql提交   做一些statement 配置
        PreparedStatement statement = connection.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
        /**
         * createStreamingResultSet
         * We only stream result sets when they are forward-only, read-only, and the fetch size has been set to Integer.MIN_VALUE
         * 源码中:我们仅在结果集为 forward-only、read-only且提取大小已设置为 Integer.MIN_VALUE 时才对其进行流式处理
         *
         * protected boolean createStreamingResultSet() {
         *         return ((this.query.getResultType() == Type.FORWARD_ONLY) && (this.resultSetConcurrency == java.sql.ResultSet.CONCUR_READ_ONLY)
         *                 && (this.query.getResultFetchSize() == Integer.MIN_VALUE));
         *     }
         */
        statement.setFetchSize(Integer.MIN_VALUE);
        //====================statement执行sql======================
        ResultSet resultSet = statement.executeQuery();//期间不会阻塞 直接返回结果行,过多的缓存在驱动内存中
        //================================================
        while (resultSet.next()) {
            String id = resultSet.getString("id");
        }

        System.out.println("流式读取资源结束");
        //先开后关原则
        resultSet.close();
        statement.close();
        connection.close();
    }


}

执行的测试结果与分析

我们分别去启动服务,和执行这两个方法

   public static void main(String[] args) throws SQLException {
        SpringApplication.run(DwApplication.class, args);

        Reader reader = SpringContext.context.getBean(Reader.class);
        reader.read();
        System.out.println("=================================================");
        Reader1 streamReader = SpringContext.context.getBean(Reader1.class);
        streamReader.streamRead();
    }

结果

2021-10-15 17:05:22.046 ERROR 9088 --- [           main] druid.sql.Statement                      : {conn-10006, pstmt-20000} execute error. SELECT * from user  LIMIT 10000 

java.sql.SQLException: Java heap space
	at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:129) ~[mysql-connector-java-8.0.26.jar:8.0.26]
	at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122) ~[mysql-connector-java-8.0.26.jar:8.0.26]
	at com.mysql.cj.jdbc.ClientPreparedStatement.executeInternal(ClientPreparedStatement.java:953) ~[mysql-connector-java-8.0.26.jar:8.0.26]
	at com.mysql.cj.jdbc.ClientPreparedStatement.executeQuery(ClientPreparedStatement.java:1003) ~[mysql-connector-java-8.0.26.jar:8.0.26]
	at com.alibaba.druid.filter.FilterChainImpl.preparedStatement_executeQuery(FilterChainImpl.java:3240) [druid-1.2.5.jar:1.2.5]
	at com.alibaba.druid.filter.FilterEventAdapter.preparedStatement_executeQuery(FilterEventAdapter.java:465) [druid-1.2.5.jar:1.2.5]
	at com.alibaba.druid.filter.FilterChainImpl.preparedStatement_executeQuery(FilterChainImpl.java:3237) [druid-1.2.5.jar:1.2.5]
	at com.alibaba.druid.wall.WallFilter.preparedStatement_executeQuery(WallFilter.java:681) [druid-1.2.5.jar:1.2.5]
	at com.alibaba.druid.filter.FilterChainImpl.preparedStatement_executeQuery(FilterChainImpl.java:3237) [druid-1.2.5.jar:1.2.5]
	at com.alibaba.druid.filter.FilterEventAdapter.preparedStatement_executeQuery(FilterEventAdapter.java:465) [druid-1.2.5.jar:1.2.5]
	at com.alibaba.druid.filter.FilterChainImpl.preparedStatement_executeQuery(FilterChainImpl.java:3237) [druid-1.2.5.jar:1.2.5]
	at com.alibaba.druid.proxy.jdbc.PreparedStatementProxyImpl.executeQuery(PreparedStatementProxyImpl.java:181) [druid-1.2.5.jar:1.2.5]
	at com.alibaba.druid.pool.DruidPooledPreparedStatement.executeQuery(DruidPooledPreparedStatement.java:227) [druid-1.2.5.jar:1.2.5]
	at com.yunxi.dw.io.Read.Reader.read(Reader.java:42) [classes/:na]
	at com.yunxi.dw.DwApplication.main(DwApplication.java:22) [classes/:na]
Caused by: java.lang.OutOfMemoryError: Java heap space
	at com.sun.crypto.provider.CipherCore.doFinal(CipherCore.java:937) ~[sunjce_provider.jar:1.8.0_261]
	at com.sun.crypto.provider.AESCipher.engineDoFinal(AESCipher.java:491) ~[sunjce_provider.jar:1.8.0_261]
	at javax.crypto.CipherSpi.bufferCrypt(CipherSpi.java:779) ~[na:1.8.0_271]
	at javax.crypto.CipherSpi.engineDoFinal(CipherSpi.java:730) ~[na:1.8.0_271]
	at javax.crypto.Cipher.doFinal(Cipher.java:2463) ~[na:1.8.0_271]
	at sun.security.ssl.SSLCipher$T12GcmReadCipherGenerator$GcmReadCipher.decrypt(SSLCipher.java:1606) ~[na:1.8.0_261]
	at sun.security.ssl.SSLSocketInputRecord.decodeInputRecord(SSLSocketInputRecord.java:259) ~[na:1.8.0_261]
	at sun.security.ssl.SSLSocketInputRecord.decode(SSLSocketInputRecord.java:180) ~[na:1.8.0_261]
	at sun.security.ssl.SSLTransport.decode(SSLTransport.java:110) ~[na:1.8.0_261]
	at sun.security.ssl.SSLSocketImpl.decode(SSLSocketImpl.java:1201) ~[na:1.8.0_261]
	at sun.security.ssl.SSLSocketImpl.readApplicationRecord(SSLSocketImpl.java:1168) ~[na:1.8.0_261]
	at sun.security.ssl.SSLSocketImpl.access$300(SSLSocketImpl.java:74) ~[na:1.8.0_261]
	at sun.security.ssl.SSLSocketImpl$AppInputStream.read(SSLSocketImpl.java:834) ~[na:1.8.0_261]
	at java.io.FilterInputStream.read(FilterInputStream.java:133) ~[na:1.8.0_261]
	at com.mysql.cj.protocol.FullReadInputStream.readFully(FullReadInputStream.java:64) ~[mysql-connector-java-8.0.26.jar:8.0.26]
	at com.mysql.cj.protocol.a.SimplePacketReader.readMessage(SimplePacketReader.java:108) ~[mysql-connector-java-8.0.26.jar:8.0.26]
	at com.mysql.cj.protocol.a.SimplePacketReader.readMessage(SimplePacketReader.java:45) ~[mysql-connector-java-8.0.26.jar:8.0.26]
	at com.mysql.cj.protocol.a.TimeTrackingPacketReader.readMessage(TimeTrackingPacketReader.java:57) ~[mysql-connector-java-8.0.26.jar:8.0.26]
	at com.mysql.cj.protocol.a.TimeTrackingPacketReader.readMessage(TimeTrackingPacketReader.java:41) ~[mysql-connector-java-8.0.26.jar:8.0.26]
	at com.mysql.cj.protocol.a.MultiPacketReader.readMessage(MultiPacketReader.java:61) ~[mysql-connector-java-8.0.26.jar:8.0.26]
	at com.mysql.cj.protocol.a.MultiPacketReader.readMessage(MultiPacketReader.java:44) ~[mysql-connector-java-8.0.26.jar:8.0.26]
	at com.mysql.cj.protocol.a.ResultsetRowReader.read(ResultsetRowReader.java:75) ~[mysql-connector-java-8.0.26.jar:8.0.26]
	at com.mysql.cj.protocol.a.ResultsetRowReader.read(ResultsetRowReader.java:42) ~[mysql-connector-java-8.0.26.jar:8.0.26]
	at com.mysql.cj.protocol.a.NativeProtocol.read(NativeProtocol.java:1570) ~[mysql-connector-java-8.0.26.jar:8.0.26]
	at com.mysql.cj.protocol.a.TextResultsetReader.read(TextResultsetReader.java:87) ~[mysql-connector-java-8.0.26.jar:8.0.26]
	at com.mysql.cj.protocol.a.TextResultsetReader.read(TextResultsetReader.java:48) ~[mysql-connector-java-8.0.26.jar:8.0.26]
	at com.mysql.cj.protocol.a.NativeProtocol.read(NativeProtocol.java:1583) ~[mysql-connector-java-8.0.26.jar:8.0.26]
	at com.mysql.cj.protocol.a.NativeProtocol.readAllResults(NativeProtocol.java:1637) ~[mysql-connector-java-8.0.26.jar:8.0.26]
	at com.mysql.cj.protocol.a.NativeProtocol.sendQueryPacket(NativeProtocol.java:983) ~[mysql-connector-java-8.0.26.jar:8.0.26]
	at com.mysql.cj.NativeSession.execSQL(NativeSession.java:662) ~[mysql-connector-java-8.0.26.jar:8.0.26]
	at com.mysql.cj.jdbc.ClientPreparedStatement.executeInternal(ClientPreparedStatement.java:930) ~[mysql-connector-java-8.0.26.jar:8.0.26]
	at com.mysql.cj.jdbc.ClientPreparedStatement.executeQuery(ClientPreparedStatement.java:1003) ~[mysql-connector-java-8.0.26.jar:8.0.26]

2021-10-15 17:05:22.048  INFO 9088 --- [           main] com.yunxi.dw.io.Read.Reader              : 读取形式读取报错~
=================================================
流式读取资源结束

很明显的看到,如果我们内存资源紧张的情况下,去读取大量的数据资源的时候,默认的读取形式会导致我们的OOM,而流式的读取节省大量的内存资源,并且可以使我们合理的去处理这种数据。

流式读取优势

1.解决内存资源紧张的情况的下,导致的OOM
2.对数据可以平滑的处理,匹配数据资源获取非阻塞
3.内存资源的使用上,避免了内存消耗突然提高的风险
4.避免了在需要大数据读取场景下的,手动分页去多次请求读取,一次查询,流式读取。

应用场景

我们可以做一些大量的数据读取后,做数据分析。
以及我们需要做大量的数据下载到本地的时候,都可以以流式读取后输出到本地。

在这里插入图片描述

我是 祥天 ,期望在提高自己的同时,输出较高质量的分享,感谢各位读者的:点赞收藏评论 ,我们一起加油~


版权声明:本文为weixin_38899094原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。