Java-线程池动态修改大小

  • Post author:
  • Post category:java




参数说明


corePoolSize

:核心线程数大小,不管它们创建以后是不是空闲的。线程池需要保持 corePoolSize 数量的线程,除非设置了 allowCoreThreadTimeOut;


maximumPoolSize

:最大线程数,线程池中最多允许创建 maximumPoolSize 个线程;


keepAliveTime

:存活时间,如果经过 keepAliveTime 时间后,超过核心线程数的线程还没有接受到新的任务,那就回收;


unit:

keepAliveTime 的时间单位;


workQueue

:存放待执行任务的队列:当提交的任务数超过核心线程数大小后,再提交的任务就存放在这里。它仅仅用来存放被 execute 方法提交的 Runnable 任务;阻塞队列成员表:


threadFactory

:线程工厂,用来创建线程工厂。比如这里面可以自定义线程名称,当进行虚拟机栈分析时,看着名字就知道这个线程是哪里来的,不会懵逼;


handler

:拒绝策略:当队列里面放满了任务、最大线程数的线程都在工作时,这时继续提交的任务线程池就处理不了,应该执行怎么样的拒绝策略;



Executors – 线程池的工厂



SingleThreadExecutor

new ThreadPoolExecutor(1, 1,
                        0L, TimeUnit.MILLISECONDS,
                        new LinkedBlockingQueue<Runnable>())

为什么还有一个线程的线程池?

  1. 任务队列
  2. 生命周期管理



CachePool

new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                              60L, TimeUnit.SECONDS,
                              new SynchronousQueue<Runnable>())

来一个任务执行一个任务,没有线程活着就新建一个线程

当任务数量忽高忽低时可以考虑



FixedThreadPool

new ThreadPoolExecutor(nThreads, nThreads,
                              0L, TimeUnit.MILLISECONDS,
                              new LinkedBlockingQueue<Runnable>());

线程数是固定的,线程不会被灭活

当任务量比较稳定,可以考虑



ScheduledThreadPool

public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue());
}

定时任务线程池



ForkJoinPool



WorkStealingPool

new ForkJoinPool
    (Runtime.getRuntime().availableProcessors(),
     ForkJoinPool.defaultForkJoinWorkerThreadFactory,
     null, true);

每个线程有自己的一个任务队列,当自己任务队列完成了,可以从其他线程的队列拿一个任务出来执行



并发 vs并行

并发是指任务提交,并行指任务执行

并发是并行的子集

面试题:假如提供一个闹钟服务,订阅这个服务的人特别多,10亿人,怎么优化?



ThreadPoolExecutor源码解析



1、常用变量的解释

// 1. `ctl`,可以看做一个int类型的数字,高3位表示线程池状态,低29位表示worker数量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 2. `COUNT_BITS`,`Integer.SIZE`为32,所以`COUNT_BITS`为29
private static final int COUNT_BITS = Integer.SIZE - 3;
// 3. `CAPACITY`,线程池允许的最大线程数。1左移29位,然后减1,即为 2^29 - 1
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
// 4. 线程池有5种状态,按大小排序如下:RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

// Packing and unpacking ctl
// 5. `runStateOf()`,获取线程池状态,通过按位与操作,低29位将全部变成0
private static int runStateOf(int c)     { return c & ~CAPACITY; }
// 6. `workerCountOf()`,获取线程池worker数量,通过按位与操作,高3位将全部变成0
private static int workerCountOf(int c)  { return c & CAPACITY; }
// 7. `ctlOf()`,根据线程池状态和线程池worker数量,生成ctl值
private static int ctlOf(int rs, int wc) { return rs | wc; }

/*
 * Bit field accessors that don't require unpacking ctl.
 * These depend on the bit layout and on workerCount being never negative.
 */
// 8. `runStateLessThan()`,线程池状态小于xx
private static boolean runStateLessThan(int c, int s) {
    return c < s;
}
// 9. `runStateAtLeast()`,线程池状态大于等于xx
private static boolean runStateAtLeast(int c, int s) {
    return c >= s;
}



2、构造方法

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    // 基本类型参数校验
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    // 空指针校验
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    // 根据传入参数`unit`和`keepAliveTime`,将存活时间转换为纳秒存到变量`keepAliveTime `中
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}



3、提交执行task的过程

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     */
    int c = ctl.get();
    // worker数量比核心线程数小,直接创建worker执行任务
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // worker数量超过核心线程数,任务直接进入队列
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // 线程池状态不是RUNNING状态,说明执行过shutdown命令,需要对新加入的任务执行reject()操作。
        // 这儿为什么需要recheck,是因为任务入队列前后,线程池的状态可能会发生变化。
        if (! isRunning(recheck) && remove(command))
            reject(command);
        // 这儿为什么需要判断0值,主要是在线程池构造方法中,核心线程数允许为0
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 如果线程池不是运行状态,或者任务进入队列失败,则尝试创建worker执行任务。
    // 这儿有3点需要注意:
    // 1. 线程池不是运行状态时,addWorker内部会判断线程池状态
    // 2. addWorker第2个参数表示是否创建核心线程
    // 3. addWorker返回false,则说明任务执行失败,需要执行reject操作
    else if (!addWorker(command, false))
        reject(command);
}



4、addworker源码解析

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    // 外层自旋
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // 这个条件写得比较难懂,我对其进行了调整,和下面的条件等价
        // (rs > SHUTDOWN) || 
        // (rs == SHUTDOWN && firstTask != null) || 
        // (rs == SHUTDOWN && workQueue.isEmpty())
        // 1. 线程池状态大于SHUTDOWN时,直接返回false
        // 2. 线程池状态等于SHUTDOWN,且firstTask不为null,直接返回false
        // 3. 线程池状态等于SHUTDOWN,且队列为空,直接返回false
        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        // 内层自旋
        for (;;) {
            int wc = workerCountOf(c);
            // worker数量超过容量,直接返回false
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // 使用CAS的方式增加worker数量。
            // 若增加成功,则直接跳出外层循环进入到第二部分
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            // 线程池状态发生变化,对外层循环进行自旋
            if (runStateOf(c) != rs)
                continue retry;
            // 其他情况,直接内层循环进行自旋即可
            // else CAS failed due to workerCount change; retry inner loop
        } 
    }
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            // worker的添加必须是串行的,因此需要加锁
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                // 这儿需要重新检查线程池状态
                int rs = runStateOf(ctl.get());

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    // worker已经调用过了start()方法,则不再创建worker
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    // worker创建并添加到workers成功
                    workers.add(w);
                    // 更新`largestPoolSize`变量
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            // 启动worker线程
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        // worker线程启动失败,说明线程池状态发生了变化(关闭操作被执行),需要进行shutdown相关操作
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}



5、线程池worker任务单元

private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable
{
    /**
     * This class will never be serialized, but we provide a
     * serialVersionUID to suppress a javac warning.
     */
    private static final long serialVersionUID = 6138294804551838833L;

    /** Thread this worker is running in.  Null if factory fails. */
    final Thread thread;
    /** Initial task to run.  Possibly null. */
    Runnable firstTask;
    /** Per-thread task counter */
    volatile long completedTasks;

    /**
     * Creates with given first task and thread from ThreadFactory.
     * @param firstTask the first task (null if none)
     */
    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        // 这儿是Worker的关键所在,使用了线程工厂创建了一个线程。传入的参数为当前worker
        this.thread = getThreadFactory().newThread(this);
    }

    /** Delegates main run loop to outer runWorker  */
    public void run() {
        runWorker(this);
    }

    // 省略代码...
}



6、核心线程执行逻辑-runworker

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    // 调用unlock()是为了让外部可以中断
    w.unlock(); // allow interrupts
    // 这个变量用于判断是否进入过自旋(while循环)
    boolean completedAbruptly = true;
    try {
        // 这儿是自旋
        // 1. 如果firstTask不为null,则执行firstTask;
        // 2. 如果firstTask为null,则调用getTask()从队列获取任务。
        // 3. 阻塞队列的特性就是:当队列为空时,当前线程会被阻塞等待
        while (task != null || (task = getTask()) != null) {
            // 这儿对worker进行加锁,是为了达到下面的目的
            // 1. 降低锁范围,提升性能
            // 2. 保证每个worker执行的任务是串行的
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            // 如果线程池正在停止,则对当前线程进行中断操作
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            // 执行任务,且在执行前后通过`beforeExecute()`和`afterExecute()`来扩展其功能。
            // 这两个方法在当前类里面为空实现。
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                // 帮助gc
                task = null;
                // 已完成任务数加一 
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        // 自旋操作被退出,说明线程池正在结束
        processWorkerExit(w, completedAbruptly);
    }
}




线程池参数动态化



现有的解决方案的痛点。

现在市面上大多数的答案都是先区分线程池中的任务是 IO 密集型还是 CPU 密集型。

  1. 如果是 CPU 密集型的,可以把核心线程数设置为核心数+1;

  2. 如果是包含 IO 操作的任务

但是往往一台服务器是部署了多个应用,一个应用也会有多个线程池,所以很难配置一个完美的参数



动态更新的工作原理是什么?

看ThreadPoolExecutor 的 setCorePoolSize 方法:

/**
 * Sets the core number of threads.  This overrides any value set
 * in the constructor.  If the new value is smaller than the
 * current value, excess existing threads will be terminated when
 * they next become idle.  If larger, new threads will, if needed,
 * be started to execute any queued tasks.
 *
 * @param corePoolSize the new core size
 * @throws IllegalArgumentException if {@code corePoolSize < 0}
 * @see #getCorePoolSize
 */
public void setCorePoolSize(int corePoolSize) {
    if (corePoolSize < 0)
        throw new IllegalArgumentException();
    int delta = corePoolSize - this.corePoolSize;
    this.corePoolSize = corePoolSize;
    if (workerCountOf(ctl.get()) > corePoolSize)
        interruptIdleWorkers();
    else if (delta > 0) {
        // We don't really know how many new threads are "needed".
        // As a heuristic, prestart enough new workers (up to new
        // core size) to handle the current number of tasks in
        // queue, but stop if queue becomes empty while doing so.
        int k = Math.min(delta, workQueue.size());
        while (k-- > 0 && addWorker(null, true)) {
            if (workQueue.isEmpty())
                break;
        }
    }
}

在Spring 的 ThreadPoolTaskExecutor类 (就是对JDK ThreadPoolExecutor 的一层包装,可以理解为装饰者模式)的 setCorePoolSize 方法:

/**
 * Set the ThreadPoolExecutor's core pool size.
 * Default is 1.
 * <p><b>This setting can be modified at runtime, for example through JMX.</b>
 */
public void setCorePoolSize(int corePoolSize) {
   synchronized (this.poolSizeMonitor) {
      this.corePoolSize = corePoolSize;
      if (this.threadPoolExecutor != null) {
         this.threadPoolExecutor.setCorePoolSize(corePoolSize);
      }
   }
}

看ThreadPoolExecutor的setMaximumPoolSize 源码:

/**
 * Sets the maximum allowed number of threads. This overrides any
 * value set in the constructor. If the new value is smaller than
 * the current value, excess existing threads will be
 * terminated when they next become idle.
 *
 * @param maximumPoolSize the new maximum
 * @throws IllegalArgumentException if the new maximum is
 *         less than or equal to zero, or
 *         less than the {@linkplain #getCorePoolSize core pool size}
 * @see #getMaximumPoolSize
 */
public void setMaximumPoolSize(int maximumPoolSize) {
    if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)
        throw new IllegalArgumentException();
    this.maximumPoolSize = maximumPoolSize;
    if (workerCountOf(ctl.get()) > maximumPoolSize)
        interruptIdleWorkers();
}

经过前面两个方法的分析,我们知道了最大线程数和核心线程数可以动态调整。



动态设置的注意点有哪些?

当只调整核心线程数,不调整最大线程数是,调整的时候可能会出现核心线程数调整之后无效的情况;


原因看源码:

/**
 * Performs blocking or timed wait for a task, depending on
 * current configuration settings, or returns null if this worker
 * must exit because of any of:
 * 1. There are more than maximumPoolSize workers (due to
 *    a call to setMaximumPoolSize).
 * 2. The pool is stopped.
 * 3. The pool is shutdown and the queue is empty.
 * 4. This worker timed out waiting for a task, and timed-out
 *    workers are subject to termination (that is,
 *    {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
 *    both before and after the timed wait, and if the queue is
 *    non-empty, this worker is not the last thread in the pool.
 *
 * @return task, or null if the worker must exit, in which case
 *         workerCount is decremented
 */
private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // Are workers subject to culling?
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 如果工作线程数大于最大线程数,则对工作线程数量进行减一操作,然后返回 null。
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
          //对工作线程数量进行减一操作
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

所以,这个地方的实际流程应该是:

  1. 创建新的工作线程 worker,然后工作线程数进行加一操作。

  2. 运行创建的工作线程 worker,开始获取任务 task。

  3. 工作线程数量大于最大线程数,对工作线程数进行减一操作。

  4. 返回 null,即没有获取到 task。

  5. 清理该任务,流程结束。

    这样一加一减,所以真正在执行任务的工作线程数的数量一直没有发生变化,也就是最大线程数。

**解决方法:**设置核心线程数的时候,同时设置最大线程数即可。其实可以把二者设置为相同的值,然后设置allowCoreThreadTimeOut 参数设置为 true ,核心线程在空闲了 keepAliveTime 的时间后也会被回收的,相当于线程池自动给你动态修改。



如何动态指定队列长度?

/** The capacity bound, or Integer.MAX_VALUE if none */
private final int capacity;

因为LinkedBlockingQueue的capacity是被final修饰的,所以是不允许动态修改的;

所以要想动态修改只能自己实现一个BlockingQueue,然后capacity可以动态修改即可;复制一个LinkedBlockingQueue源码,将capacity的final修饰去掉,添加set方法,保存为ResizableCapacityLinkedBlockingQueue.java即可,然后使用ResizableCapacityLinkedBlockingQueue作为任务队列;



这个过程中涉及到的面试题有哪些?

问题一:线程池被创建后里面有线程吗?如果没有的话,你知道有什么方法对线程池进行预热吗?

答:线程池被创建后如果没有任务过来,里面是不会有线程的。如果需要预热的话可以调用下面的两个方法:

prestartCoreThread()和prestartAllCoreThreads()

问题二:核心线程数会被回收吗?需要什么设置?

答:核心线程数默认是不会被回收的,如果需要回收核心线程数,需要调用下面的方法:allowCoreThreadTimeOut();



版权声明:本文为qq_21239913原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。