Java并发——深入理解Executor框架FutureTask类

  • Post author:
  • Post category:java



一、什么是Future和FutureTask

一直以来都对FutureTask这个“Future”不理解,为什么叫做“未来的任务呢”?这个“Future”体现在哪里呢?现在终于明白,FutureTask的Future就源自于它的异步工作机制,如果我们在主线程中直接写一个函数来执行任务,这是同步的任务,也就是说必须要等这个函数返回以后我们才能继续做接下的事情,但是如果这个函数返回的结果对接下来的任务并没有意义,那么我们等在这里是很浪费时间的,而FutureTask就提供了这么一个异步的返回结果的机制,当执行一个FutureTask的时候,我们可以接着做别的任务,在将来的某个时间,FutureTask任务完成后会返回FutureTask对象来包装返回的结果,只要调用这个对象的get()方法即可获取返回值。

当然多线程中继承ThreadPoolExecutor和实现Runnable也可以实现异步工作机制,可是他们没有返回值。这时可以使用FutureTask包装Runnable或者Callable对象,再使用FutureTask来执行任务。

Future接口和其唯一的实现类FutureTask类一般用于表示异步计算的结果。Future接口下提供方法来检查计算是否完成,等待其完成,并检索计算结果。 结果只能在计算完成后使用方法get进行检索,如有必要,阻塞,直到准备就绪。 取消由cancel方法执行,isCancelled方法用于检测计算是否被取消,isDone方法用于检测计算是否完成。 提供其他方法来确定任务是否正常完成或被取消。


二、FutureTask的使用

根据FutureTask被执行的进度,FutureTask对象可以处于一下

3种状态

未启动:创建了一个FutureTask对象但没有执行futureTask.run();

已启动:futureTask.run()方法被执行的过程中;

已完成:futureTask.run()正常执行结束,或者futureTask被取消(futureTask.cancel()),或者执行futureTask.run()时抛出异常而异常结束;



1、FutureTask的启动:

FutureTask实现了Future接口和Runnable接口,因此FutureTask对象的执行有两种方式:

(1)

交给线程池的Execute或submit方法执行

import java.util.concurrent.*;
import static java.util.concurrent.TimeUnit.MILLISECONDS;


class test{
    public static void main(String[] args) throws InterruptedException {
        ThreadPoolExecutor tpe = new ThreadPoolExecutor(5, 10,100, MILLISECONDS, new ArrayBlockingQueue<Runnable>(5));
        //用FutureTask包装Runnable或者Callable对象
        FutureTask<String> future = new FutureTask<String>(new Callable<String>() {
            @Override
            public String call() {
                try{
                    String a = "return String";
                    return a;
                }
                catch(Exception e){
                    e.printStackTrace();
                    return "exception";
                }
            }
        });
        //交给线程池的Execute或submit方法执行
        tpe.submit(future);
        try{
            System.out.println(future.get());
        }
        catch(Exception e){
            e.printStackTrace();
        }
        finally{
            tpe.shutdown();
        }
    }
}

(2)

由调用线程直接执行:在调用线程中执行futureTask.run()方法

把上述代码的  tpe.submit(future);   替换为   future.run();



2、FutureTask执行完后结果的获取  :


futureTask.get( )


(1)在已启动的状态调用futureTask.get( )或导致调用线程阻塞,知道FutureTask执行完毕,然后得到返回的FutureTask对象,调用futureTask.get( )获得任务返回值;

(2)在已完成状态调用futureTask.get( ),将导致调用线程立即返回(正常完成,得到FutureTask对象)或者抛出异常(被取消或者因异常而结束);



3、futureTask被取消:


futureTask.cancel( )

(1)在未启动状态调用futureTask.cancel( )会导致该任务永远不会再执行;

(2)在已启动状态:

  • 调用futureTask.cancel(true )会以中断的方式尝试停止任务,如果该任务不响应中断则无法停止;
  • 调用futureTask.cancel(false)将不会对正在执行的线程产生影响,也就是已启动的线程会让他执行完毕;

(3)在已完成状态调用futureTask.cancel( )则会返回false;


三、FutureTask的实现

FutureTask是一个

基于AQS同步队列实现的一个自定义同步组件

,通过对同步状态state的竞争实现acquire或者release操作。

FutureTask的内部类Sync实现了AQS接口,通过对tryAcquire等抽象方法的重写和模板方法的调用来实现内部类Sync的tryAcquireShared等方法,然后聚合Sync的方法来实现FutureTask的get,cancel等方法;


FutureTask的get方法最终会调用AQS.acquireSharedInterruptibly方法,这个方法操作成功的条件是同步状态为RAN或者CANCELLED,也就是说如果这个FutureTask有线程E正在执行,那么这个FutureTask的状态是RUN,因此AQS.acquireSharedInterruptibly方法调用失败,此时调用get方法的线程被阻塞,添加到等待队列中(如下图线程D,其中A,B,C是已经被阻塞添加到等待队列中的线程)。当前面执行FutureTask的线程E执行完毕,那么以原子方式更新同步状态state的值为RAN,并执行AQS.release方法,然后唤醒等待队列中的第一个节点中的线程A,此时线程A出队列获得同步状态,并原子设置state为RUN,当线程A执行完毕,把state原子更新为RUN,然后唤醒线程B,以此类推,因此一颗看出对于一个FutureTask,同一时间只有一个线程执行这个任务。



四、FutureTask使用场景

当一个线程需要等待另一个线程把某个任务执行完以后它才能继续执行时;

有若干线程执行若干任务,每个任务最多只能被执行一次;

当多个线程师徒执行同一个任务,但只能允许一个线程执行此任务,其它线程需要等这个任务被执行完毕以后才能继续执行时;



版权声明:本文为tongdanping原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。