CompletableFuture是一个异步编程工具类
简单使用
CompletableFuture<String> completableFuture = new CompletableFuture<>();
new Thread(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//保存结果
completableFuture.complete("hello world");
}).start();
//获取结果
String result = completableFuture.get();
System.out.println("输出结果:" + result);
CompletableFuture实现了Future接口,所以它调用get()方法的时候会阻塞在那, 直到结果返回。
runAsync方法
//runAsync传入一个runnable,返回一个CompletableFuture
CompletableFuture<Void> voidCompletableFuture = CompletableFuture.runAsync(() -> {
try {
Thread.sleep(2000);
System.out.println("线程执行完毕");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
// 等待线程执行完毕
System.out.println(voidCompletableFuture.get());
System.out.println("程序运行结束");
runAsync()方法的实现原理
public static CompletableFuture<Void> runAsync(Runnable runnable) {
return asyncRunStage(asyncPool, runnable);
}
//判断使用ForkJoinPool还是每过来一个任务创建一个线程执行,这里asyncPool是一个ForkJoinPool
private static final Executor asyncPool = useCommonPool ? ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
//common在ForkJoinPool静态代码块中被初始化
public static ForkJoinPool commonPool() {
// assert common != null : "static init error";
return common;
}
//e是一个ForkJoinPool
static CompletableFuture<Void> asyncRunStage(Executor e, Runnable f) {
if (f == null) throw new NullPointerException();
CompletableFuture<Void> d = new CompletableFuture<Void>();
//封装成AsyncRun,AsyncRun继承了ForkJoinTask,实现了runnable接口,然后执行ForkJoinPool的execute
//如果是supplyAsync这里会封装成一个AsyncSupply
//thenRun封装成UniRun,thenAccept封装成UniAccept,thenApply封装成UniApply然后放到前一个任务的栈里,即后一个任务会被放进前一个任务的CompletableFuture的栈中:unipush(new UniRun<T>...)
e.execute(new AsyncRun(d, f));
return d;
}
AsyncRun(CompletableFuture<Void> dep, Runnable fn) {
this.dep = dep; this.fn = fn;
}
public void execute(Runnable task) {
if (task == null)
throw new NullPointerException();
ForkJoinTask<?> job;
//传入的是一个AsyncRun,AsyncRun继承了ForkJoinTask
if (task instanceof ForkJoinTask<?>) // avoid re-wrap
job = (ForkJoinTask<?>) task;
else
job = new ForkJoinTask.RunnableExecuteAction(task);
//job被转换成ForkJoinTask
externalPush(job);
}
final void externalPush(ForkJoinTask<?> task) {
WorkQueue[] ws; WorkQueue q; int m;
//获得当前线程的探针
int r = ThreadLocalRandom.getProbe();
int rs = runState;
//不会进if
if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 &&
(q = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 &&
U.compareAndSwapInt(q, QLOCK, 0, 1)) {
ForkJoinTask<?>[] a; int am, n, s;
if ((a = q.array) != null &&
(am = a.length - 1) > (n = (s = q.top) - q.base)) {
int j = ((am & s) << ASHIFT) + ABASE;
U.putOrderedObject(a, j, task);
U.putOrderedInt(q, QTOP, s + 1);
U.putIntVolatile(q, QLOCK, 0);
if (n <= 1)
signalWork(ws, q);
return;
}
U.compareAndSwapInt(q, QLOCK, 1, 0);
}
//task入队列并执行
externalSubmit(task);
}
private void externalSubmit(ForkJoinTask<?> task) {
int r; // initialize caller's probe
//如果probe没有初始化,则初始化
if ((r = ThreadLocalRandom.getProbe()) == 0) {
ThreadLocalRandom.localInit();
r = ThreadLocalRandom.getProbe();
}
for (;;) {
WorkQueue[] ws; WorkQueue q; int rs, m, k;
boolean move = false;
//判断ForkJoinPool的状态
if ((rs = runState) < 0) {
tryTerminate(false, false); // help terminate
throw new RejectedExecutionException();
}
else if ((rs & STARTED) == 0 || // initialize
((ws = workQueues) == null || (m = ws.length - 1) < 0)) {
int ns = 0;
rs = lockRunState();
try {
//初始化workQueues
if ((rs & STARTED) == 0) {
U.compareAndSwapObject(this, STEALCOUNTER, null,
new AtomicLong());
// create workQueues array with size a power of two
int p = config & SMASK; // ensure at least 2 slots
int n = (p > 1) ? p - 1 : 1;
n |= n >>> 1; n |= n >>> 2; n |= n >>> 4;
n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1;
workQueues = new WorkQueue[n];
ns = STARTED;
}
} finally {
unlockRunState(rs, (rs & ~RSLOCK) | ns);
}
}
else if ((q = ws[k = r & m & SQMASK]) != null) {
//加锁
if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) {
//获得队列中的ForkJoinTask数组
ForkJoinTask<?>[] a = q.array;
int s = q.top;
boolean submitted = false; // initial submission or resizing
try { // locked version of push
if ((a != null && a.length > s + 1 - q.base) ||
(a = q.growArray()) != null) {
int j = (((a.length - 1) & s) << ASHIFT) + ABASE;
//cas把task加入到ForkJoinTask数组中
U.putOrderedObject(a, j, task);
//更新下一个加入的task的数组索引
U.putOrderedInt(q, QTOP, s + 1);
submitted = true;
}
} finally {
//解锁
U.compareAndSwapInt(q, QLOCK, 1, 0);
}
if (submitted) {
//通知执行task
signalWork(ws, q);
return;
}
}
move = true; // move on failure
}
else if (((rs = runState) & RSLOCK) == 0) { // create new queue
q = new WorkQueue(this, null);
q.hint = r;
q.config = k | SHARED_QUEUE;
q.scanState = INACTIVE;
rs = lockRunState(); // publish index
if (rs > 0 && (ws = workQueues) != null &&
k < ws.length && ws[k] == null)
ws[k] = q; // else terminated
unlockRunState(rs, rs & ~RSLOCK);
}
else
move = true; // move if busy
if (move)
r = ThreadLocalRandom.advanceProbe(r);
}
}
//task已经添加到了workqueue中
final void signalWork(WorkQueue[] ws, WorkQueue q) {
long c; int sp, i; WorkQueue v; Thread p;
while ((c = ctl) < 0L) { // too few active
if ((sp = (int)c) == 0) { // no idle workers
if ((c & ADD_WORKER) != 0L) // too few workers
//添加工作线程
tryAddWorker(c);
break;
}
if (ws == null) // unstarted/terminated
break;
if (ws.length <= (i = sp & SMASK)) // terminated
break;
if ((v = ws[i]) == null) // terminating
break;
int vs = (sp + SS_SEQ) & ~INACTIVE; // next scanState
int d = sp - v.scanState; // screen CAS
long nc = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & v.stackPred);
if (d == 0 && U.compareAndSwapLong(this, CTL, c, nc)) {
v.scanState = vs; // activate v
if ((p = v.parker) != null)
U.unpark(p);
break;
}
if (q != null && q.base == q.top) // no more work
break;
}
}
private void tryAddWorker(long c) {
boolean add = false;
do {
long nc = ((AC_MASK & (c + AC_UNIT)) |
(TC_MASK & (c + TC_UNIT)));
if (ctl == c) {
int rs, stop; // check if terminating
if ((stop = (rs = lockRunState()) & STOP) == 0)
add = U.compareAndSwapLong(this, CTL, c, nc);
unlockRunState(rs, rs & ~RSLOCK);
if (stop != 0)
break;
if (add) {
//添加worker
createWorker();
break;
}
}
} while (((c = ctl) & ADD_WORKER) != 0L && (int)c == 0);
}
private boolean createWorker() {
//factory在静态代码块中被初始化
ForkJoinWorkerThreadFactory fac = factory;
Throwable ex = null;
ForkJoinWorkerThread wt = null;
try {
//wt = ForkJoinWorkerThread,ForkJoinWorkerThread继承Thread
if (fac != null && (wt = fac.newThread(this)) != null) {
//执行ForkJoinWorkerThread的start方法即执行它的run方法
wt.start();
return true;
}
} catch (Throwable rex) {
ex = rex;
}
deregisterWorker(wt, ex);
return false;
}
public void run() {
if (workQueue.array == null) { // only run once
Throwable exception = null;
try {
onStart();
//执行task
pool.runWorker(workQueue);
} catch (Throwable ex) {
exception = ex;
} finally {
try {
onTermination(exception);
} catch (Throwable ex) {
if (exception == null)
exception = ex;
} finally {
pool.deregisterWorker(this, exception);
}
}
}
}
final void runWorker(WorkQueue w) {
w.growArray(); // allocate queue
int seed = w.hint; // initially holds randomization hint
int r = (seed == 0) ? 1 : seed; // avoid 0 for xorShift
for (ForkJoinTask<?> t;;) {
//获得task
if ((t = scan(w, r)) != null)
//执行task
w.runTask(t);
else if (!awaitWork(w, r))
break;
r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
}
}
final void runTask(ForkJoinTask<?> task) {
if (task != null) {
scanState &= ~SCANNING; // mark as busy
//执行task
(currentSteal = task).doExec();
U.putOrderedObject(this, QCURRENTSTEAL, null); // release for GC
execLocalTasks();
ForkJoinWorkerThread thread = owner;
if (++nsteals < 0) // collect on overflow
transferStealCount(pool);
scanState |= SCANNING;
if (thread != null)
thread.afterTopLevelExec();
}
}
final int doExec() {
int s; boolean completed;
if ((s = status) >= 0) {
try {
//ForkJoinTask一开始被传入的他的子类AsyncRun,所以这里执行AsyncRun的exec方法
completed = exec();
} catch (Throwable rex) {
return setExceptionalCompletion(rex);
}
if (completed)
s = setCompletion(NORMAL);
}
return s;
}
public final boolean exec() {
run();
return true;
}
public void run() {
CompletableFuture<Void> d; Runnable f;
if ((d = dep) != null && (f = fn) != null) {
dep = null; fn = null;
if (d.result == null) {
try {
//执行最开始自定义的runnable
f.run();
d.completeNull();
} catch (Throwable ex) {
d.completeThrowable(ex);
}
}
//thenRun/thenAccept/thenApply这种有后续的方法的话,会将压入栈中任务出栈执行
d.postComplete();
}
}
supplyAsync方法
//传入的是一个Supplier,表示他有返回值
CompletableFuture<String> future = CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "返回结果";
}
});
//获取返回值
String result = future.get();
System.out.println("任务执行结果:" + result);
supplyAsync原理和runAsync类似,只是在forkjoinpool.execute的时候传入的是AsyncSupply,所以后面执行的是
AsyncSupply的exec方法
public final boolean exec() { run(); return true; }
public void run() {
CompletableFuture<T> d; Supplier<T> f;
//获得最开始声明的CompletableFuture
if ((d = dep) != null && (f = fn) != null) {
dep = null; fn = null;
if (d.result == null) {
try {
//将Supplier执行后的值放到CompletableFuture中,CompletableFuture通过get方法获取
d.completeValue(f.get());
} catch (Throwable ex) {
d.completeThrowable(ex);
}
}
d.postComplete();
}
}
thenRun
CompletableFuture voidCompletableFuture = CompletableFuture.supplyAsync(() -> {
try {
System.out.println("supplyAsync run");
Thread.sleep(2000);
System.out.println("supplyAsync run end");
} catch (InterruptedException e) {
e.printStackTrace();
}
return "返回结果";
}).thenRun(() -> {
try {
System.out.println("thenRun run");
Thread.sleep(3000);
System.out.println("thenRun run end");
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务执行结束之后执行的语句");
});
// 阻塞等待任务执行完成
System.out.println("阻塞");
voidCompletableFuture.get();
System.out.println("任务执行结束");
thenAccept(Consumer)
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() ->
{
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("supplyAsync执行完毕");
return "返回的结果";
}).thenAccept(new Consumer<String>() {
//参数为前面supplyAsync的结果
@Override
public void accept(String param) {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("输出结果:" + param);
}
});
// 等待任务执行完成
future.get();
System.out.println("任务执行完毕");
thenApply(Function)
CompletableFuture<String> future = CompletableFuture.supplyAsync(()-> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//返回supplyAsync结果;
return "返回supplyAsync的结果";
}).thenApply(new Function<String, String>() {
//参数为前面supplyAsync的结果
@Override
public String apply(String middle) {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//获取supplyAsync结果,返回");
return middle+"再执行thenApply后返回";
}
});
String str = future.get();
System.out.println("最终的结果为:" + str);
thenCompose
CompletableFuture<String> future = CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
return "hello world";
}
}).thenCompose(new Function<String, CompletionStage<String>>() {
@Override
public CompletionStage<String> apply(String s) {
return CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
String finalStr = s + "执行thenCompose中的supplyAsync然后返回";
return finalStr;
}
});
}
});
String str = future.get();
System.out.println(str);
thenCombine
CompletableFuture<String> future = CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
return "hello";
}
//thenCombine的第二个参数是一个BiFunction,对CompletableFuture的结果进行统一处理
}).thenCombine(CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
return "word";
}
//获得上面两个supplyAsync的返回结果并处理
}), new BiFunction<String, String, String>() {
@Override
public String apply(String s1, String s2) {
return s1 +" "+ s2;
}
});
String result = future.get();
System.out.println(result);
allOf 和anyOf
thenCompose和thenCombine方法只能组合2个CompletableFuture,allOf 和anyOf 可以组合多个CompletableFuture
Random RANDOM = new Random();
CompletableFuture[] futures = new CompletableFuture[10];
int[] ints = new int[10];
for (int i = 0; i < 10; i++) {
final int finalI = i;
CompletableFuture<Void> future = CompletableFuture.runAsync(new Runnable() {
@Override
public void run() {
try {
//随机一个数并sleep
int intNext = RANDOM.nextInt(5000);
Thread.sleep(intNext);
//随机数放入数组
ints[finalI] = intNext;
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
//future放入数组
futures[i] = future;
}
CompletableFuture.allOf(futures).thenRun(new Runnable() {
// CompletableFuture.anyOf(futures).thenRun(new Runnable() {
@Override
public void run() {
System.out.println(Arrays.toString(ints));
}
}).get();
版权声明:本文为weixin_41029286原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。