futureTask 继承了Runable 接口,所以本身就是一个线程,内部实现了run方法,
    
     当主线程使用ExecutorService.submit ()方法提交任务
    
    或者使用 futureTask.start() 启动子线程的时候,
    
     开始执行futuretask的run 方法
    
    ,run 方法内部 调用了 Callable 接口的call方法 ,然后这个线程慢慢执行的过程中, 主线程执行完其他操作,调用 futureTask.get()方法的时候,如果子线程没有执行完毕,则把主线程添加到当前futureTask的阻塞队列并等待,等子线程执行完毕之后,调用unpark 通知主线程,主线程继续执行,这就是异步的原理。
   
标红部分的原理是什么样的?
    
     public <T> Future<T>
     
      submit
     
     (Callable<T> task) {
     
     
     if (task == null) throw new NullPointerException();
     
     RunnableFuture<T> ftask =
     
      newTaskFor
     
     (task);
     
     
      execute
     
     (ftask);
     
     return ftask;
     
     }
    
    
     protected <T> RunnableFuture<T>
     
      newTaskFor
     
     (Callable<T> callable) {
     
     
     return new FutureTask<T>(callable);
     
     }
    
    ThreadPoolExecutor 的
    
     execute
    
    (Runnable r)方法内部实现
   
    
     public void execute(Runnable command) {
     
    
    
    if (command == null)
    
    throw new NullPointerException();
    
    /*
    
    * Proceed in 3 steps:
    
    *
    
    * 1. If fewer than corePoolSize threads are running, try to
    
    * start a new thread with the given command as its first
    
    * task.  The call to addWorker atomically checks runState and
    
    * workerCount, and so prevents false alarms that would add
    
    * threads when it shouldn’t, by returning false.
    
    *
    
    * 2. If a task can be successfully queued, then we still need
    
    * to double-check whether we should have added a thread
    
    * (because existing ones died since last checking) or that
    
    * the pool shut down since entry into this method. So we
    
    * recheck state and if necessary roll back the enqueuing if
    
    * stopped, or start a new thread if there are none.
    
    *
    
    * 3. If we cannot queue task, then we try to add a new
    
    * thread.  If it fails, we know we are shut down or saturated
    
    * and so reject the task.
    
    */
    
    int c = ctl.get();
    
    if (workerCountOf(c) < corePoolSize) {
    
    
    
     if (addWorker(command, true))
    
    
    return;
    
    c = ctl.get();
    
    }
    
    if (isRunning(c) && workQueue.offer(command)) {
    
    
    int recheck = ctl.get();
    
    if (! isRunning(recheck) && remove(command))
    
    reject(command);
    
    else if (workerCountOf(recheck) == 0)
    
    addWorker(null, false);
    
    }
    
    else if (!addWorker(command, false))
    
    reject(command);
    
    }
   
    
     
      addWorker
     
     实现
    
    
     private boolean addWorker(Runnable firstTask, boolean core) {
     
    
   
…
    
     w = new Worker(firstTask);
     
     final Thread t = w.thread;
    
    
    if (t != null) {
    
    
    final ReentrantLock mainLock = this.mainLock;
    
    mainLock.lock();
    
    try {
    
    
    // Recheck while holding lock.
    
    // Back out on ThreadFactory failure or if
    
    // shut down before lock acquired.
    
    int rs = runStateOf(ctl.get());
   
    if (rs < SHUTDOWN ||
    
    (rs == SHUTDOWN && firstTask == null)) {
    
    
    if (t.isAlive()) // precheck that t is startable
    
    throw new IllegalThreadStateException();
    
    
     workers.add(w);
    
    
    int s = workers.size();
    
    if (s > largestPoolSize)
    
    largestPoolSize = s;
    
    workerAdded = true;
    
    }
    
    } finally {
    
    
    mainLock.unlock();
    
    }
    
    if (workerAdded) {
    
    
    
     t.start();
    
    
    workerStarted = true;
    
    }
   
…
}
    worker 构造函数
    
    Worker(Runnable firstTask) {
    
    
    setState(-1); // inhibit interrupts until runWorker
    
    this.firstTask = firstTask;
    
    
     this.thread = getThreadFactory().newThread(this);
    
    
    }
   
t.start() 的t 是创建worker 的时候新建的线程,并且线程关联的runnable 就是worker 本身。
    所以调用t.start() 的时候其实就是 调用 worker 的run 方法
    
    public void run() {
    
    
    runWorker(this);
    
    }
   
    final void runWorker(Worker w) {
    
    
    Thread wt = Thread.currentThread();
    
    
     Runnable task = w.firstTask;
    
    
    w.firstTask = null;
    
    w.unlock(); // allow interrupts
    
    boolean completedAbruptly = true;
    
    try {
    
    
    while (task != null || (task = getTask()) != null) {
    
    
    w.lock();
    
    // If pool is stopping, ensure thread is interrupted;
    
    // if not, ensure thread is not interrupted.  This
    
    // requires a recheck in second case to deal with
    
    // shutdownNow race while clearing interrupt
    
    if ((runStateAtLeast(ctl.get(), STOP) ||
    
    (Thread.interrupted() &&
    
    runStateAtLeast(ctl.get(), STOP))) &&
    
    !wt.isInterrupted())
    
    wt.interrupt();
    
    try {
    
    
    beforeExecute(wt, task);
    
    Throwable thrown = null;
    
    try {
    
    
    
     task.run();
    
    
    } catch (RuntimeException x) {
    
    
    thrown = x; throw x;
    
    } catch (Error x) {
    
    
    thrown = x; throw x;
    
    } catch (Throwable x) {
    
    
    thrown = x; throw new Error(x);
    
    } finally {
    
    
    afterExecute(task, thrown);
    
    }
    
    } finally {
    
    
    task = null;
    
    w.completedTasks++;
    
    w.unlock();
    
    }
    
    }
    
    completedAbruptly = false;
    
    } finally {
    
    
    processWorkerExit(w, completedAbruptly);
    
    }
    
    }
   
可以看出整体逻辑就是 Executor 创建了一个新的线程,线程内部的方法调用 futuretask 的 run 方法。
futuretask 执行逻辑,请参考futuretask 原理。
 
