浅析ThreadPoolExecutor

  • Post author:
  • Post category:其他


ThreadPoolExecutor是jdk自带的线程池实现,其中他有四种常用的线程池模式通过ExecutorService获取:newSingleThreadExecutor,newSingleThreadExecutor,newCachedThreadPool,newFixedThreadPool,这四中是jdk自己经过对ThreadPoolExecutor的封装实现不同的线程池类型,今天我们来大概理解下ThreadPoolExecutor。

线程池的构建与接收任务

这块就大概阐述下,好多博客都在说这个。

线程初始化,通过int corePoolSize,

int maximumPoolSize,

long keepAliveTime,

TimeUnit unit,

BlockingQueue workQueue,

ThreadFactory threadFactory,

RejectedExecutionHandler handler

这些参数,其中



corePoolSize

是线程池的核心线程数。



maxPoolSize

是最大线程数



keepAliveTime

线程池的空闲线程保持活跃的时间,前提是线程池的线程数大于corePoolSize,设置这个的目的是为了让jvm不来回的创建线程,已创建线程的任务已经提交归还给了线程池,那么这个线程不会里面走到terminal,而是会等待keepAliveTime的时间,看看是不是有新的任务进来,如果有新的任务,就直接拿取任务去执行,如果没有,则过KeepAliveTime的时间就自动消亡。



unit

标识keepAliveTime的时间单位



workQueue

当线程池的线程数已经大于coolPoolSize但是小于maxPoolSize的时候,就会先把线程先offer进阻塞队列,等到有空闲现成的时候就去阻塞队列中取。



threadFactory

线程工厂,可不传,设定现成的创建方式 ,有默认的线程工厂



handler

线程池的拒绝策略或者也叫抛弃策略,当线程数达到最大线程数或者线程池已经执行了shutDown或者shutDownNow方法到达shutdown,stop或者terminated状态的时候,就会执行这个策略,有四种:AbortPolicy 直接抛RejectedExecutionException;DiscardPolicy 直接丢弃该任务;DiscardOldestPolicy 丢弃等待队列中最老的任务;CallerRunsPolicy 创建一个新的线程执行该任务(当线程池没有执行shutdown()或者shutdownDown()方法的时候,也就是说处于运行中的状态)。

接受线程:

当一个任务到来时,首先会去判断当前线程池的poolSize是不是大于corePoolSize,如果小于,则直接创建新的线程执行;当poolSize达到corePoolSize,就会直接往workQueue中放,当workQueue存放满了,就会判断poolSize是不是小于maxPoolSize,如果小于,则创建新的线程,如果大于,则执行抛弃策略。

具体看ThreadPoolExecutor的execute方法jdk1.8

“`java

public void execute(Runnable command) {

if (command == null)

throw new NullPointerException();

//先去判断poolSize和corePoolSize的大小,|| 是短路运算符,首次添加肯定会走后面的addIfUnderCorePoolSize方法

//这个方法就是党poolSize小于corePoolSize的时候直接创建线程执行任务

//当poolSize >=corePoolSize的时候就直接执行workQueue.offer(command) 这个方法,如果存放失败,则直接返回

//false,则执行addIfUnderMaximumPoolSize方法,这个会判断是不是poolSize是不是大于maxPoolSize,如果大于则

//返回false,不大于则创建线程执行任务返回true.

if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {

if (runState == RUNNING && workQueue.offer(command)) {

if (runState != RUNNING || poolSize == 0)

//保证线程池执行了shutDown或者shutDownNow,不接受新的任务。如果是shutDownNow则直接执行抛弃策略,如果是shutDown则让保证至少有一个线程,把队列中待执行的任务执行完。

ensureQueuedTaskHandled(command);

}

else if (!addIfUnderMaximumPoolSize(command))

reject(command); // is shutdown or saturated

}

}

这里就不帖 具体的方法代码了,可以自己看下。

execute方法和submit方法

execute方法是ThreadPoolExecutor实现接口Executor的方法,而submit则是实现接口ExecutorService的方法,ExecutorService描述了线程池的生命周期 ExecutorService继承了Executor。有点绕。。 自己看源码

在ThreadPoolExecutor中是找不到submit方法的,发现是在其父抽象类AbstractExecutorService中实现的,见下面代码:

“`java

public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Object> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
}

public <T> Future<T> submit(Runnable task, T result) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task, result);
    execute(ftask);
    return ftask;
}

public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
}

可以看见submit是支持Callable和Runnable参数的,并且有返回值。这个返回值刚好是一个RunnableFutrue,RunnableFutrue具有Runnable和Future的特性,也就是说能作为线程入参并且可以带返回值,返回值通过Futrue的get方法得到,代码中使用newTaskFor方法实现Callable,Runable到RunnableFutrue的转换,可以理解为newTaskFor方法就是一个适配器,将Runnable和Callable参数适配成为RunnableFutrue对象,转换完成后然后调用ThreadPoolExecutor的execute方法,则为上面execute方法,可见 execute没有返回值,并且接受的参数是Runnable对象。

shutDown和shutDownNow方法

ThreadPoolExecutor有四种状态:



static final int RUNNING = 0;




static final int SHUTDOWN = 1;




static final int STOP = 2;




static final int TERMINATED = 3;


转化关系:

* RUNNING -> SHUTDOWN

* On invocation of shutdown(), perhaps implicitly in finalize()

* (RUNNING or SHUTDOWN) -> STOP

* On invocation of shutdownNow()

* SHUTDOWN -> TERMINATED

* When both queue and pool are empty

* STOP -> TERMINATED

* When pool is empty

当线程池调用了shutDown方法,线程池不会里面停止,只是将线程状态转化为SHUTDOWN状态,并且尝试中断正在执行中的线程,线程中断调用的是Thread的interrupt方法(该方法只有在中断监听方法 wait,park,sleep等才能中断线程,其他时候不会中断线程,只是修改了现成的中断标识为true,而不会停止线程的执行) 然后会等到workQueue中的任务都被执行完才会到达TERMINATED状态。

具体代码:

“`java

public void shutdown() {

/*

* Conceptually, shutdown is just a matter of changing the

* runState to SHUTDOWN, and then interrupting any worker

* threads that might be blocked in getTask() to wake them up

* so they can exit. Then, if there happen not to be any

* threads or tasks, we can directly terminate pool via

* tryTerminate. Else, the last worker to leave the building

* turns off the lights (in workerDone).

*

* But this is made more delicate because we must cooperate

* with the security manager (if present), which may implement

* policies that make more sense for operations on Threads

* than they do for ThreadPools. This requires 3 steps:

*

* 1. Making sure caller has permission to shut down threads

* in general (see shutdownPerm).

*

* 2. If (1) passes, making sure the caller is allowed to

* modify each of our threads. This might not be true even if

* first check passed, if the SecurityManager treats some

* threads specially. If this check passes, then we can try

* to set runState.

*

* 3. If both (1) and (2) pass, dealing with inconsistent

* security managers that allow checkAccess but then throw a

* SecurityException when interrupt() is invoked. In this

* third case, because we have already set runState, we can

* only try to back out from the shutdown as cleanly as

* possible. Some workers may have been killed but we remain

* in non-shutdown state (which may entail tryTerminate from

* workerDone starting a new worker to maintain liveness.)

*/

SecurityManager security = System.getSecurityManager();
if (security != null)
        security.checkPermission(shutdownPerm);

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        if (security != null) { // Check if caller can modify our threads
            for (Worker w : workers)
                security.checkAccess(w.thread);
        }

        int state = runState;
        if (state < SHUTDOWN)
        //修改状态为SHUTDOWN
            runState = SHUTDOWN;

        try {
            for (Worker w : workers) {
            //尝试中断线程
                w.interruptIfIdle();
            }
        } catch (SecurityException se) { // Try to back out
            runState = state;
            // tryTerminate() here would be a no-op
            throw se;
        }

//如果poolSize为0,但是workQueue中还有任务,则修改状态为RUNNING,开启一个线程执行workQueue中的任务,如果workQueue为空,则直接修改线程池状态为TERMINATED

tryTerminate(); // Terminate now if pool and queue empty

} finally {

mainLock.unlock();

}

}

可以看出shutDown没有返回值,并且是优雅停线程池

shutDownNow的方法:

“`java

public List shutdownNow() {

/*

* shutdownNow differs from shutdown only in that

* 1. runState is set to STOP,

* 2. all worker threads are interrupted, not just the idle ones, and

* 3. the queue is drained and returned.

*/

SecurityManager security = System.getSecurityManager();

if (security != null)

security.checkPermission(shutdownPerm);

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        if (security != null) { // Check if caller can modify our threads
            for (Worker w : workers)
                security.checkAccess(w.thread);
        }

        int state = runState;
        if (state < STOP)
        //修改状态为STOP
            runState = STOP;

        try {
            for (Worker w : workers) {
            //尝试中断
                w.interruptNow();
            }
        } catch (SecurityException se) { // Try to back out
            runState = state;
            // tryTerminate() here would be a no-op
            throw se;
        }
        //得到workQueue的所有任务
        List<Runnable> tasks = drainQueue();
        tryTerminate(); // Terminate now if pool and queue empty
        //但会任务队列
        return tasks;
    } finally {
        mainLock.unlock();
    }
}

则是直接修改线程状态为STOP,尝试中指所有线程,和shutDown一样

得到workQueue的所有任务,并且返回,如果状态大于RUNNING (SHUTDOWN,STOP,TERMIANTED)则线程池不在接收新的任务。显然shutDownNow不够优雅,谨慎选择。

tryTerminal方法:

“`java

private void tryTerminate() {

if (poolSize == 0) {

int state = runState;

//shutDown走这里会先去判断workQueue是否为空

if (state < STOP && !workQueue.isEmpty()) {

state = RUNNING; // disable termination check below

Thread t = addThread(null);

if (t != null)

t.start();

}

//shutDownNow直接走这里,直接TERMINATED

if (state == STOP || state == SHUTDOWN) {

runState = TERMINATED;

termination.signalAll();

terminated();

}

}

}

ThreadPoolExecutor的Worker

Worker类可以看作是ThreadPoolExecuter线程池中受理任务的活跃线程。

“`java

private final class Worker implements Runnable {

/**

* The runLock is acquired and released surrounding each task

* execution. It mainly protects against interrupts that are

* intended to cancel the worker thread from instead

* interrupting the task being run.

*/

private final ReentrantLock runLock = new ReentrantLock();

    /**
     * Initial task to run before entering run loop. Possibly null.
     */
    private Runnable firstTask;

    /**
     * Per thread completed task counter; accumulated
     * into completedTaskCount upon termination.
     */
    volatile long completedTasks;

    /**
     * Thread this worker is running in.  Acts as a final field,
     * but cannot be set until thread is created.
     */
    Thread thread;

    Worker(Runnable firstTask) {
        this.firstTask = firstTask;
    }

    boolean isActive() {
        return runLock.isLocked();
    }

    /**
     * Interrupts thread if not running a task.
     */
    void interruptIfIdle() {
        final ReentrantLock runLock = this.runLock;
        if (runLock.tryLock()) {
            try {
        if (thread != Thread.currentThread())
        thread.interrupt();
            } finally {
                runLock.unlock();
            }
        }
    }

    /**
     * Interrupts thread even if running a task.
     */
    void interruptNow() {
        thread.interrupt();
    }

    /**
     * Runs a single task between before/after methods.
     */
    private void runTask(Runnable task) {
        final ReentrantLock runLock = this.runLock;
        runLock.lock();
        try {
            /*
             * Ensure that unless pool is stopping, this thread
             * does not have its interrupt set. This requires a
             * double-check of state in case the interrupt was
             * cleared concurrently with a shutdownNow -- if so,
             * the interrupt is re-enabled.
             */
            if (runState < STOP &&
                Thread.interrupted() &&
                runState >= STOP)
                thread.interrupt();
            /*
             * Track execution state to ensure that afterExecute
             * is called only if task completed or threw
             * exception. Otherwise, the caught runtime exception
             * will have been thrown by afterExecute itself, in
             * which case we don't want to call it again.
             */
            boolean ran = false;
            //待实现方法,子类实现,
            beforeExecute(thread, task);
            try {
                task.run();
                ran = true;
                //待实现方法,子类实现,
                afterExecute(task, null);
                ++completedTasks;
            } catch (RuntimeException ex) {
                if (!ran)
                    afterExecute(task, ex);
                throw ex;
            }
        } finally {
            runLock.unlock();
        }
    }

    /**
     * Main run loop
     */
    public void run() {
        try {
            Runnable task = firstTask;
            firstTask = null;
            //firstTask保存当前该线程受理的任务,如果没有任务则会执行getTask方法。
            //具体参看getTask方法,这块用的while循环,说明如果firstTask为空则一直会执行getTask方法,
            //直到getTask方法返回null
            while (task != null || (task = getTask()) != null) {
                runTask(task);
                task = null;
            }
        } finally {
        //pollSize--
            workerDone(this);
        }
    }

“`java

Runnable getTask() {

for (;;) {

try {

int state = runState;

//返回null

if (state > SHUTDOWN)

return null;

Runnable r;

//执行了shutdown方法,则需要吧workQueue中的任务执行完

if (state == SHUTDOWN) // Help drain queue

r = workQueue.poll();

//设置了keepAliveTime或者workQueue中有可能有待处理的

//任务(poolSize > corePoolSize 表示如果有任务出来,会往workQueue中放,或者直接创建线程 –maxPoolSize>poolSize>corePoolSize)

else if (poolSize > corePoolSize || allowCoreThreadTimeOut)

//如果keepAliaveTime的时间没拿到任务,r就为null,则该Worker就销毁了执行workerCanExit方法

r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);

else

//直接阻塞等到workQueue的任务

r = workQueue.take();

if (r != null)

return r;

if (workerCanExit()) {

if (runState >= SHUTDOWN) // Wake up others

interruptIdleWorkers();

return null;

}

// Else retry

} catch (InterruptedException ie) {

// On interruption, re-check runState

}

}

}

worker的代码相对比较好理解。排版很乱,大概写一下!

线程池经常用,大概写一下,好多代码没贴出来,可以翻看jdk源码



版权声明:本文为zhouren1314原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。