【深度挖掘Java底层源码】「底层技术原理体系」带你零基础认识和分析学习相关的异步任务提交机制FutureTask的底层原理

  • Post author:
  • Post category:java




FutureTask的基本介绍

FutureTask是Java中的一个类,它实现了Future接口和Runnable接口,并且被用作线程执行的任务。FutureTask可以在多线程环境下异步执行一个任务并获取其结果。



FutureTask的特点用法

  1. 异步执行:通过将耗时的任务交给FutureTask,在一个单独的线程中执行,当前线程可以继续执行其他任务,不会被阻塞,从而提高程序的并发性。

  2. 获取结果:FutureTask提供了get()方法,用于在任务完成后获取其执行结果。如果任务尚未完成,get()方法将阻塞当前线程,直到任务完成并返回结果。

  3. 取消任务:可以使用cancel()方法取消任务的执行。如果任务尚未开始执行,则会取消任务的执行;如果任务已经开始执行,则根据传入的参数决定是否中断正在执行的任务。

  4. 线程安全:FutureTask是线程安全的,可以在多个线程中共享使用。多个线程可以同时调用FutureTask的get()方法等待任务结果。

FutureTask常见的应用场景是在并发编程中,将计算密集型的任务交给FutureTask后台执行,并在需要时获取结果。这样可以充分利用系统资源,提高程序的性能和响应能力。



FutureTask的执行流程

在这里插入图片描述



任务提交

在之前的线程池分析中,我们介绍了AbstractExecutorService的实现。AbstractExecutorService是Java中的一个抽象类,它实现了ExecutorService接口,并提供了一些通用的逻辑和方法来简化线程池的实现。



AbstractExecutorService的主要特点和用法

  1. 线程池管理:AbstractExecutorService提供了默认的线程池管理逻辑。我们可以通过继承AbstractExecutorService并实现其中的抽象方法来定制自己的线程池。

  2. 任务执行:通过调用submit()或invokeAll()等方法,可以将任务提交给AbstractExecutorService来执行。它会在适当的时候自动创建线程并执行任务。

  3. 异常处理:AbstractExecutorService提供了一些默认的异常处理机制,比如可以捕获任务执行过程中抛出的异常,并进行相应的处理操作。我们也可以在继承AbstractExecutorService时自定义异常处理逻辑。

  4. 生命周期管理:AbstractExecutorService定义了线程池的生命周期方法,如shutdown()和isShutdown()等,用于管理线程池的启动和关闭。



对象模型转换传递流程

在这里插入图片描述



submit方法提交任务

AbstractExecutorService主要用于实现ExecutorService接口中的submit()和invokeAll()等方法。它提供了一些默认的实现,使得我们可以更方便地创建和管理线程池,并执行任务。

public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
}

public <T> Future<T> submit(Runnable task, T result) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task, result);
    execute(ftask);
    return ftask;
}

public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
}



RunnableFuture任务模型

通过使用RunnableFuture,我们可以更加灵活地管理线程池,处理任务的提交和执行,并在需要时进行异常处理和线程池的关闭。最终进行构建一个FutureTask对象。

protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
    return new FutureTask<T>(runnable, value);
}

protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    return new FutureTask<T>(callable);
}



FutureTask

对于通过submit方法提交的任务,无论是Runnable还是Callable接口实现,都会被统一封装为FutureTask对象,并传递给execute方法。

public FutureTask(Callable<V> callable) {
    if (callable == null)
        throw new NullPointerException();
    this.callable = callable;
    this.state = NEW;       // 确保 callable 的可见性
}

public FutureTask(Runnable runnable, V result) {
    this.callable = Executors.callable(runnable, result);
    this.state = NEW;      // 确保可调用的可见性
}


注意,AbstractExecutorService是一个抽象类,不能直接实例化。我们可以继承AbstractExecutorService类并实现其中的抽象方法,或者使用Java提供的具体实现类如ThreadPoolExecutor来创建并使用线程池



RunnableAdapter适配器模型

RunnableAdapter的作用是将一个Runnable对象包装成一个Callable对象。在适配器的call()方法中,会调用任务的run()方法来执行任务,并返回预先指定的结果。

 public static <T> Callable<T> callable(Runnable task, T result) {
        if (task == null)
            throw new NullPointerException();
        return new RunnableAdapter<T>(task, result);
 }

通过使用RunnableAdapter,我们可以将Runnable任务在提交给ExecutorService时,以Callable的方式进行处理和执行。

static final class RunnableAdapter<T> implements Callable<T> {
    final Runnable task;
    final T result;
    
    RunnableAdapter(Runnable task, T result) {
        this.task = task;
        this.result = result;
    }
    
    public T call() {
        task.run();
        return result;
    }
}

该适配器的设计使得我们可以更加方便地将Runnable任务与Callable任务一同处理,充分利用ExecutorService的统一接口,并且无需对Runnable任务进行额外的修改。



任务状态

FutureTask的状态定义,说明了FutureTask对象可能处于的不同状态。初始状态为NEW,后续可能会演化为COMPLETING、NORMAL、EXCEPTIONAL、CANCELLED、INTERRUPTING、INTERRUPTED等状态。

private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;

在这里插入图片描述

这些状态描述了FutureTask在执行过程中的不同情况,包括正常完成、遇到异常、被取消、被中断等。了解FutureTask的状态有助于我们理解和处理FutureTask对象在多线程环境下的行为。



任务执行

首先,方法会检查FutureTask的状态是否为NEW,并尝试使用compareAndSwapObject方法将执行线程设置为当前线程。然后,方法将执行并处理相关任务,并根据任务是否成功完成设置相应的结果。最后,清除执行线程(runner)并重新获取FutureTask的状态,根据状态处理可能的取消操作。

public void run() {
    if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) {
        return;
    }
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                setException(ex);
            }
            if (ran) {
                set(result);
            }
        }
    } finally {
        // 让runner非空直到state被稳定设置,以防止并发调用run()
        runner = null;
        // 在将runner设置为空之后必须重新读取state以防止泄漏中断
        int s = state;
        if (s >= INTERRUPTING) {
            handlePossibleCancellationInterrupt(s);
        }
    }
}

在这里插入图片描述



设置线程任务数据结果

对于Callable对象,FutureTask会直接执行它的call方法。如果call方法执行成功,FutureTask会调用set方法来设置执行结果;如果遇到异常,FutureTask会调用setException方法来设置异常。

protected void set(V v) {
    // 首先使用CAS(比较并交换)将状态设置为中间状态COMPLETING
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = v;
        // 将状态设置为正常状态
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // 最终状态
        finishCompletion();
    }
}

protected void setException(Throwable t) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = t;
        // 将状态设置为异常状态
        UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // 最终状态
        finishCompletion();
    }
}

下面是对于Callable直接执行其call方法的处理过程。

在这里插入图片描述

  1. 通过CAS操作将FutureTask的状态设置为中间状态COMPLETING。
  2. 根据执行的结果(成功或异常),将结果或异常存储在outcome变量中,并将状态设置为相应的最终状态(正常状态或异常状态)。
  3. 完成执行并处理相关的完成操作。



获取线程任务数据结果

下面的这两个方法都是用来对全局变量outcome进行赋值的。而当我们通过get方法在另一个线程中获取结果时,将会执行以下过程:

public V get() throws InterruptedException, ExecutionException {
    int s = state;
    if (s <= COMPLETING)
        s = awaitDone(false, 0L);
    return report(s);
}

如果任务尚未完成,则会等待任务完成:

private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    boolean queued = false;
    // 使用for循环阻塞当前线程
    for (;;) {
        // 响应中断
        if (Thread.interrupted()) {
            removeWaiter(q);
            throw new InterruptedException();
        }
        int s = state;
        // 任务已经完成或者已经抛出异常,直接返回
        if (s > COMPLETING) {
            // WaitNode已经创建,当前可以设置为null了
            if (q != null)
                q.thread = null;
            return s;
        }
        else if (s == COMPLETING) // 不能超时
            Thread.yield();
        else if (q == null)
            q = new WaitNode();
        else if (!queued)
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                q.next = waiters, q);
        else if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                removeWaiter(q);
                return state;
            }
            LockSupport.parkNanos(this, nanos);
        }
        else
            LockSupport.park(this);
    }
}

下面是我们通过get方法在另一个线程中获取结果的过程。

在这里插入图片描述

  1. 首先,通过调用get方法获取FutureTask的状态值。
  2. 如果状态值小于或等于COMPLETING,则调用awaitDone方法来等待任务的完成。
  3. 在awaitDone方法中,根据是否设置了超时时间计算出等待的截止时间,并使用for循环来阻塞当前线程。
  4. 在循环中,对中断请求进行响应,并检查任务的状态。
  5. 如果设置了超时时间,则根据剩余的等待时间使用LockSupport.parkNanos进行阻塞。
  6. 如果没有设置超时时间,则使用LockSupport.park进行阻塞。
  7. 循环会一直执行,直到任务完成或超时,才会返回任务的状态值。
  8. 最后,通过调用report方法来返回相应的结果。

    总之,该过程描述了在另一个线程中通过get方法等待任务完成的过程。它通过阻塞当前线程并监测任务状态来实现等待,直到任务完成后才返回结果。



上报线程任务数据结果

如果任务已完成或者等待任务直到完成后,将会调用report方法来返回结果, report 方法,发现状态为异常的话将包装成 ExecutionException((Throwable)x); 这个异常就是我们在使用 get 的时候需要捕获的异常。

private V report(int s) throws ExecutionException {
    Object x = outcome;
    if (s == NORMAL)
        return (V) x;
    if (s >= CANCELLED)
        throw new CancellationException();
    throw new ExecutionException((Throwable) x);
}

如果状态s等于NORMAL,表示任务正常完成,将会返回实际的结果。如果状态s大于等于CANCELLED,则抛出CancellationException异常。否则,抛出ExecutionException异常。通过这样的设计,无论是任务正常完成还是抛出异常,都能够在线程池中执行的任务中被感知到。



任务取消

通过调用cancel方法可以取消FutureTask的执行,并在任务完成后通过finishCompletion方法来唤醒等待的线程。被唤醒的线程在awaitDone方法中继续循环,检查任务的状态以获取结果。

public boolean cancel(boolean mayInterruptIfRunning) {
    // 如果任务不是刚创建或者是刚创建但是更改为指定状态失败则返回 false
    if (!(state == NEW &&
          UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
              mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
        return false;
    try {    // in case call to interrupt throws exception
        if (mayInterruptIfRunning) {
            try {
                Thread t = runner;
                if (t != null)
                    t.interrupt();
            } finally { // final state
                UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
            }
        }
    } finally {
        finishCompletion();
    }
    return true;
}
  1. FutureTask类中有一个cancel方法,接受一个布尔类型参数mayInterruptIfRunning,用于表示在执行中的任务是否可以中断。
  2. cancel方法首先检查任务的状态,如果状态不是NEW或者尝试将状态更改为INTERRUPTING或CANCELLED失败,则返回false。
  3. 如果允许中断正在运行的任务(mayInterruptIfRunning为true),则调用Thread的interrupt方法来中断任务的线程,并将任务的状态设置为INTERRUPTED。
  4. 然后,调用finishCompletion方法来完成任务的完成操作。
  5. 在finishCompletion方法中,首先尝试将waiters设置为null,以便其他线程无法再加入等待队列。然后循环遍历等待队列,唤醒等待的线程,并将等待队列中节点的thread属性设置为null。
  6. 在唤醒线程后,被唤醒的线程会在awaitDone方法中继续循环,检查任务的状态。
  7. 如果任务的状态大于COMPLETING,则表示任务已完成或抛出异常,直接返回状态。



总结说明

  • FutureTask是一个用于异步执行任务并获取结果的Java类。它提供了获取结果、取消任务等功能,帮助我们更好地处理多线程编程中的任务执行和结果获取。

  • AbstractExecutorService是Java中用于简化线程池实现的一个抽象类。通过继承并实现其中的方法,我们可以更方便地管理线程池的创建、任务的提交和执行。它提供了默认的异常处理和生命周期管理,并可以根据需要进行定制和扩展。