Flink异步IO学习记录

  • Post author:
  • Post category:其他


国际惯例,先把

官方文档

介绍一波

假如flink每次IO都要去访问数据库,那么数据库读取都是基于磁盘IO,速度肯定很慢,所以这里会成为流处理的一个性能瓶颈.

那么异步IO就是把原来的同步请求异步化,总的耗时被多次IO分摊掉了.

Asynchronous interaction with the database 
means that a single parallel function instance 
can handle many requests concurrently and receive the 
responses concurrently.

这里的异步指的是一个并发度为1的函数可以并发地发起多个请求和并发地接收多个应答

那你可能会问,为什么不干脆把函数的parallelism提高呢?

 parallelism is in some cases possible as well, but usually comes at
 a very high resource cost: Having many more parallel
 MapFunction instances means more tasks, 
 threads, Flink-internal network connections, 
 network connections to the database, buffers, 
 and general internal bookkeeping overhead.

原来是因为Parallel设置的越高,那么意味着会有更多的Task,开启更多的线程,内部网络的连接数量也会更多,其实也是会造成内部额外开销的增大.

在这里插入图片描述

Talk is cheap, show me the code

假如你要对数据库发起异步IO请求,那么你需要实现三个部分:

  • 实现

    AsyncFunction

    以分发请求

  • 回调函数,取得请求的结果之后传给

    ResultFuture

  • 在DataStream中,把异步IO操作作为一个transformation

//source stream
DataStream<Integer> inputStream = env.addSource(new SimpleSource(maxCount));

//创建异步函数
AsyncFunction<Integer, String> function =
        new SampleAsyncFunction(sleepFactor, failRatio, shutdownWaitTS);

//异步IO作为操作算子
DataStream<String> result;
if (ORDERED.equals(mode)) {
    result = AsyncDataStream.orderedWait(
            inputStream,
            function,
            timeout,
            TimeUnit.MILLISECONDS,
            20
    ).setParallelism(taskNum);
} else {
    result = AsyncDataStream.unorderedWait(
            inputStream,
            function,
            timeout,
            TimeUnit.MILLISECONDS,
            20
    ).setParallelism(taskNum);
}

result.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {

    @Override
    public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
        collector.collect(new Tuple2<>(s, 1));
    }
})
        .keyBy(0)
        .sum(1)
        .print();

env.execute();
}

然后我们来看下这个回调函数是怎样设置的

关键是重写asyncInoke函数,把IO请求的结果传回给ResultFuture

 private static class SampleAsyncFunction extends RichAsyncFunction<Integer, String> {

        private transient ExecutorService executorService;

        // 线程池暂停工作时间,用于模拟一个耗时的异步操作
        private long sleepFactor;

        // 模拟一个出错的IO请求
        private float failRatio;

        private long shutdownWaitTS;

        public SampleAsyncFunction(long sleepFactor, float failRatio, long shutdownWaitTS) {
            this.sleepFactor = sleepFactor;
            this.failRatio = failRatio;
            this.shutdownWaitTS = shutdownWaitTS;
        }

       //模拟线程池,发出请求
        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            executorService = Executors.newFixedThreadPool(30);

        }

        @Override
        public void close() throws Exception {
            super.close();
            ExecutorUtils.gracefulShutdown(shutdownWaitTS, TimeUnit.MILLISECONDS, executorService);
        }

		//回调函数
        @Override
        public void asyncInvoke(Integer integer, ResultFuture<String> resultFuture) throws Exception {
            executorService.submit(() -> {
                long sleep = (long) (ThreadLocalRandom.current().nextFloat() * sleepFactor);
                try {
                    Thread.sleep(sleep);
                    if (ThreadLocalRandom.current().nextFloat() < failRatio) {
                        resultFuture.completeExceptionally(new Exception("failed"));
                    } else {
                        resultFuture.complete(Collections.singletonList("key-" + (integer %10)));
                    }
                } catch (InterruptedException e) {
                    resultFuture.complete(new ArrayList<>(0));
                }
            });
        }
    }



版权声明:本文为weixin_38499215原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。