国际惯例,先把
官方文档
介绍一波
假如flink每次IO都要去访问数据库,那么数据库读取都是基于磁盘IO,速度肯定很慢,所以这里会成为流处理的一个性能瓶颈.
那么异步IO就是把原来的同步请求异步化,总的耗时被多次IO分摊掉了.
Asynchronous interaction with the database
means that a single parallel function instance
can handle many requests concurrently and receive the
responses concurrently.
这里的异步指的是一个并发度为1的函数可以并发地发起多个请求和并发地接收多个应答
那你可能会问,为什么不干脆把函数的parallelism提高呢?
parallelism is in some cases possible as well, but usually comes at
a very high resource cost: Having many more parallel
MapFunction instances means more tasks,
threads, Flink-internal network connections,
network connections to the database, buffers,
and general internal bookkeeping overhead.
原来是因为Parallel设置的越高,那么意味着会有更多的Task,开启更多的线程,内部网络的连接数量也会更多,其实也是会造成内部额外开销的增大.
Talk is cheap, show me the code
假如你要对数据库发起异步IO请求,那么你需要实现三个部分:
-
实现
AsyncFunction
以分发请求 -
回调函数,取得请求的结果之后传给
ResultFuture
-
在DataStream中,把异步IO操作作为一个transformation
//source stream
DataStream<Integer> inputStream = env.addSource(new SimpleSource(maxCount));
//创建异步函数
AsyncFunction<Integer, String> function =
new SampleAsyncFunction(sleepFactor, failRatio, shutdownWaitTS);
//异步IO作为操作算子
DataStream<String> result;
if (ORDERED.equals(mode)) {
result = AsyncDataStream.orderedWait(
inputStream,
function,
timeout,
TimeUnit.MILLISECONDS,
20
).setParallelism(taskNum);
} else {
result = AsyncDataStream.unorderedWait(
inputStream,
function,
timeout,
TimeUnit.MILLISECONDS,
20
).setParallelism(taskNum);
}
result.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
collector.collect(new Tuple2<>(s, 1));
}
})
.keyBy(0)
.sum(1)
.print();
env.execute();
}
然后我们来看下这个回调函数是怎样设置的
关键是重写asyncInoke函数,把IO请求的结果传回给ResultFuture
private static class SampleAsyncFunction extends RichAsyncFunction<Integer, String> {
private transient ExecutorService executorService;
// 线程池暂停工作时间,用于模拟一个耗时的异步操作
private long sleepFactor;
// 模拟一个出错的IO请求
private float failRatio;
private long shutdownWaitTS;
public SampleAsyncFunction(long sleepFactor, float failRatio, long shutdownWaitTS) {
this.sleepFactor = sleepFactor;
this.failRatio = failRatio;
this.shutdownWaitTS = shutdownWaitTS;
}
//模拟线程池,发出请求
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
executorService = Executors.newFixedThreadPool(30);
}
@Override
public void close() throws Exception {
super.close();
ExecutorUtils.gracefulShutdown(shutdownWaitTS, TimeUnit.MILLISECONDS, executorService);
}
//回调函数
@Override
public void asyncInvoke(Integer integer, ResultFuture<String> resultFuture) throws Exception {
executorService.submit(() -> {
long sleep = (long) (ThreadLocalRandom.current().nextFloat() * sleepFactor);
try {
Thread.sleep(sleep);
if (ThreadLocalRandom.current().nextFloat() < failRatio) {
resultFuture.completeExceptionally(new Exception("failed"));
} else {
resultFuture.complete(Collections.singletonList("key-" + (integer %10)));
}
} catch (InterruptedException e) {
resultFuture.complete(new ArrayList<>(0));
}
});
}
}
版权声明:本文为weixin_38499215原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。