线程执行 之 ExecutorService.submit()方法执行内部逻辑解析 ThreadPoolExecutor

  • Post author:
  • Post category:其他


futureTask 继承了Runable 接口,所以本身就是一个线程,内部实现了run方法,

当主线程使用ExecutorService.submit ()方法提交任务

或者使用 futureTask.start() 启动子线程的时候,

开始执行futuretask的run 方法

,run 方法内部 调用了 Callable 接口的call方法 ,然后这个线程慢慢执行的过程中, 主线程执行完其他操作,调用 futureTask.get()方法的时候,如果子线程没有执行完毕,则把主线程添加到当前futureTask的阻塞队列并等待,等子线程执行完毕之后,调用unpark 通知主线程,主线程继续执行,这就是异步的原理。

标红部分的原理是什么样的?


public <T> Future<T>

submit

(Callable<T> task) {


if (task == null) throw new NullPointerException();

RunnableFuture<T> ftask =

newTaskFor

(task);


execute

(ftask);

return ftask;

}


protected <T> RunnableFuture<T>

newTaskFor

(Callable<T> callable) {


return new FutureTask<T>(callable);

}

ThreadPoolExecutor 的

execute

(Runnable r)方法内部实现


public void execute(Runnable command) {



if (command == null)

throw new NullPointerException();

/*

* Proceed in 3 steps:

*

* 1. If fewer than corePoolSize threads are running, try to

* start a new thread with the given command as its first

* task.  The call to addWorker atomically checks runState and

* workerCount, and so prevents false alarms that would add

* threads when it shouldn’t, by returning false.

*

* 2. If a task can be successfully queued, then we still need

* to double-check whether we should have added a thread

* (because existing ones died since last checking) or that

* the pool shut down since entry into this method. So we

* recheck state and if necessary roll back the enqueuing if

* stopped, or start a new thread if there are none.

*

* 3. If we cannot queue task, then we try to add a new

* thread.  If it fails, we know we are shut down or saturated

* and so reject the task.

*/

int c = ctl.get();

if (workerCountOf(c) < corePoolSize) {



if (addWorker(command, true))


return;

c = ctl.get();

}

if (isRunning(c) && workQueue.offer(command)) {


int recheck = ctl.get();

if (! isRunning(recheck) && remove(command))

reject(command);

else if (workerCountOf(recheck) == 0)

addWorker(null, false);

}

else if (!addWorker(command, false))

reject(command);

}



addWorker

实现


private boolean addWorker(Runnable firstTask, boolean core) {


w = new Worker(firstTask);

final Thread t = w.thread;


if (t != null) {


final ReentrantLock mainLock = this.mainLock;

mainLock.lock();

try {


// Recheck while holding lock.

// Back out on ThreadFactory failure or if

// shut down before lock acquired.

int rs = runStateOf(ctl.get());

if (rs < SHUTDOWN ||

(rs == SHUTDOWN && firstTask == null)) {


if (t.isAlive()) // precheck that t is startable

throw new IllegalThreadStateException();


workers.add(w);


int s = workers.size();

if (s > largestPoolSize)

largestPoolSize = s;

workerAdded = true;

}

} finally {


mainLock.unlock();

}

if (workerAdded) {



t.start();


workerStarted = true;

}

}

worker 构造函数

Worker(Runnable firstTask) {


setState(-1); // inhibit interrupts until runWorker

this.firstTask = firstTask;


this.thread = getThreadFactory().newThread(this);


}

t.start() 的t 是创建worker 的时候新建的线程,并且线程关联的runnable 就是worker 本身。

所以调用t.start() 的时候其实就是 调用 worker 的run 方法

public void run() {


runWorker(this);

}

final void runWorker(Worker w) {


Thread wt = Thread.currentThread();


Runnable task = w.firstTask;


w.firstTask = null;

w.unlock(); // allow interrupts

boolean completedAbruptly = true;

try {


while (task != null || (task = getTask()) != null) {


w.lock();

// If pool is stopping, ensure thread is interrupted;

// if not, ensure thread is not interrupted.  This

// requires a recheck in second case to deal with

// shutdownNow race while clearing interrupt

if ((runStateAtLeast(ctl.get(), STOP) ||

(Thread.interrupted() &&

runStateAtLeast(ctl.get(), STOP))) &&

!wt.isInterrupted())

wt.interrupt();

try {


beforeExecute(wt, task);

Throwable thrown = null;

try {



task.run();


} catch (RuntimeException x) {


thrown = x; throw x;

} catch (Error x) {


thrown = x; throw x;

} catch (Throwable x) {


thrown = x; throw new Error(x);

} finally {


afterExecute(task, thrown);

}

} finally {


task = null;

w.completedTasks++;

w.unlock();

}

}

completedAbruptly = false;

} finally {


processWorkerExit(w, completedAbruptly);

}

}

可以看出整体逻辑就是 Executor 创建了一个新的线程,线程内部的方法调用 futuretask 的 run 方法。

futuretask 执行逻辑,请参考futuretask 原理。



版权声明:本文为xiaoliuliu2050原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。