开发环境
-
JDK 17
-
Idea 2022
熟悉JDK 8版本的同学,大概率都使用过
java.util.concurrent.CompletableFuture
这个类,有时候在业务服务中你可能需要并行的去执行某些骚操作,那就少不了它的存在。
我在业务中就有大量的需求场景,比如:我需要拉渠道方的订单和订单明细,落地到我司系统中。此时用到了两个接口:订单列表查询、订单明细查询,查询列表很简单,一页一页的翻页查询,因为它也只返回了订单号,我举个例子吧!
这里是订单列表查询接口返回的数据结构(Mock数据),一堆的订单号
{
"data":[
"125345345345",
"235894563423",
"345345345343"
]
}
接下来我肯定要根据订单号去查询订单明细的,千万不要下意识的进行for循环一个一个查,那要查到什么时候啊!
这个时候
CompletableFuture
就派上用场了。
CompletableFuture应用
前提条件,我为了方便使用,我是直接在业务的Base包中,封装了一个通用实现,当然只是适用于我们业务的,需要的同学自取。
注意:下面这段代码没有处理真实的异常,想直接用,再往下看!!!
/**
* 偷个懒,线程池直接这样写了先,真实业务中不是这样搞的哈!
*/
private final static ExecutorService executorService = Executors.newFixedThreadPool(4);
/**
* 创建并行任务并执行
*
* @param list 数据源
* @param api API调用逻辑
* @param exceptionHandle 异常处理逻辑
* @param <S> 数据源类型
* @param <T> 程序返回类型
* @return 处理结果列表
*/
public <S, T> List<T> parallelFutureJoin(Collection<S> list, Function<S, T> api, BiFunction<Throwable, S, T> exceptionHandle) {
//规整所有任务
List<CompletableFuture<T>> collectFuture = list.stream()
.map(s -> this.createFuture(() -> api.apply(s), e -> exceptionHandle.apply(e, s)))
.toList();
//汇总所有任务,并执行join,全部执行完成后,统一返回
return CompletableFuture.allOf(collectFuture.toArray(new CompletableFuture<?>[]{}))
.thenApply(f -> collectFuture.stream()
.map(CompletableFuture::join)
.filter(Objects::nonNull)
.collect(Collectors.toList()))
.join();
}
/**
* 创建单个CompletableFuture任务
*
* @param logic 任务逻辑
* @param exceptionHandle 异常处理
* @param <T> 类型
* @return 任务
*/
public <T> CompletableFuture<T> createFuture(Supplier<T> logic, Function<Throwable, T> exceptionHandle) {
return CompletableFuture.supplyAsync(logic, executorService).exceptionally(exceptionHandle);
}
利用上面这块封装的代码,完全适用于我在公司的大部分并行业务场景,也确实提升了我Pod节点的CPU利用率。
不过之后有个问题就坑了,你调用外部API的时候,偶尔会失败,那么失败就要重试,这个时候我就需要正确的判断异常并进行重试操作。
先定义个业务异常类
public static class BizApiException extends RuntimeException {
public BizApiException() {
}
public BizApiException(String message) {
super(message);
}
}
示例代码
下面的代码只是模拟我在业务端的场景,大家乐呵乐呵就行。
public static void main(String[] args) {
CompletableFutureDemo f = new CompletableFutureDemo();
List<Integer> numList = f.parallelFutureJoin(Arrays.asList(1, 2, 3), num -> {
//模拟API调用
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
//...
}
if (num > 2) {
throw new BizApiException("心别太大");
}
return num;
}, (e, num) -> {
//异常向你打开了大门
if (e instanceof BizApiException) {
System.out.println("业务异常,我在处理数字:" + num + ",异常原因:" + e);
return -1;
}
System.out.println("我异常了,老六,我刚才在处理数字:" + num + ",异常原因:" + e);
return -1;
});
System.out.println(numList);
}
注意:我本来想通过返回的Exception判断是不是BizApiException业务异常的,可惜的是这里的异常类型永远都不会是BizApiException,我输出下内容到控制台,就懂了!
执行代码后,我得到的控制台内容
这个时候其实可以看到拿到的真实异常类型了,一个名为
java.util.concurrent.CompletionException
的老六出现了。我看看它是个啥!
我异常了,老六,我刚才在处理数字:3,异常原因:java.util.concurrent.CompletionException: com.java.basic.CompletableFutureDemo$BizApiException: 心别太大
[1, 2, -1]
CompletionException
这个类的注释说明了它的用途,简单理解是:它在完成结果或任务的过程中遇到错误或其他异常时会触发。那我懂了!任务异常之后它就会出现,然后还把我们自己的异常给包起来了!
JDK源代码
/**
* Exception thrown when an error or other exception is encountered
* in the course of completing a result or task.
*
* @since 1.8
* @author Doug Lea
*/
public class CompletionException extends RuntimeException {}
其实还有个异常类,我们也需要关注,它告诉了我们,真实的异常在哪里了!
ExecutionException
这个类的注释就说的很清楚了,自己的异常都在getCause()中了,那就好办了.
/**
* Exception thrown when attempting to retrieve the result of a task
* that aborted by throwing an exception. This exception can be
* inspected using the {@link #getCause()} method.
*
* @see Future
* @since 1.5
* @author Doug Lea
*/
public class ExecutionException extends Exception {}
改造我的并发工具类(完整版)
方法extractRealException就是我要获取的真实异常,同时parallelFutureJoin方法中引用一下,这个工具类就解决了需求。
/**
* 创建并行任务并执行
*
* @param list 数据源
* @param api API调用逻辑
* @param exceptionHandle 异常处理逻辑
* @param <S> 数据源类型
* @param <T> 程序返回类型
* @return 处理结果列表
*/
public <S, T> List<T> parallelFutureJoin(Collection<S> list, Function<S, T> api, BiFunction<Throwable, S, T> exceptionHandle) {
//规整所有任务
List<CompletableFuture<T>> collectFuture = list.stream()
.map(s -> this.createFuture(() -> api.apply(s), e -> exceptionHandle.apply(
this.extractRealException(e), s)))
.toList();
//汇总所有任务,并执行join,全部执行完成后,统一返回
return CompletableFuture.allOf(collectFuture.toArray(new CompletableFuture<?>[]{}))
.thenApply(f -> collectFuture.stream()
.map(CompletableFuture::join)
.filter(Objects::nonNull)
.collect(Collectors.toList()))
.join();
}
/**
* 创建CompletableFuture任务
*
* @param logic 任务逻辑
* @param exceptionHandle 异常处理
* @param <T> 类型
* @return 任务
*/
public <T> CompletableFuture<T> createFuture(Supplier<T> logic, Function<Throwable, T> exceptionHandle) {
return CompletableFuture.supplyAsync(logic, executorService).exceptionally(exceptionHandle);
}
/**
* 提取真正的异常
* <p>
* CompletableFuture抛出的往往不是真正的异常
*
* @param throwable 异常
* @return 真正的异常
*/
public Throwable extractRealException(Throwable throwable) {
if (throwable instanceof CompletionException || throwable instanceof ExecutionException) {
if (throwable.getCause() != null) {
return throwable.getCause();
}
}
return throwable;
}
当然,这还只是个简易版的并行任务工具类,还有更多的可能,大家需要自己去探索了!
参考文献