Java中线程池的实现

  • Post author:
  • Post category:java


Java中线程池的实现

当我们要使用线程时,就要去创建线程,这样看上去似乎很正常,但是频繁地创建线程、销毁线程会对系统资源造成不小地负担,大大降低了系统地效率。

Java中有一个很好地工具,可以事先创建好一些线程,当我们需要使用地时候就直接拿来使用就可以,这样就不会频繁地创建和销毁线程了,这个神奇地工具就是线程池。

线程池是通过Executor执行器来创建。

我们来看一下在java.util.concurrent包下面的Executor类都有哪些东西。

Executor提供的几种创建线程池的方法:


1.newFixedThreadPool

     * Creates a thread pool that reuses a fixed number of threads
     * operating off a shared unbounded queue.  At any point, at most
     * {@code nThreads} threads will be active processing tasks.
     * If additional tasks are submitted when all threads are active,
     * they will wait in the queue until a thread is available.
     * If any thread terminates due to a failure during execution
     * prior to shutdown, a new one will take its place if needed to
     * execute subsequent tasks.  The threads in the pool will exist
     * until it is explicitly {@link ExecutorService#shutdown shutdown}.
     *
     * @param nThreads the number of threads in the pool
     * @return the newly created thread pool
     * @throws IllegalArgumentException if {@code nThreads <= 0}
     */

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

简单翻译一下就是说这个类创建了些个固定数目的共享线程,即实现规定好创建的线程的数量,后面不能更改数量。

在任何一个时间点,活跃工作的线程数量都不能够超过这个最大的数量。

当所有的线程都在工作时,如果这时候有一个新的线程需要被创建,那么这个需要创建的线程需要就会被放进一个等待队列BlockingQueue,直到有空余的线程可以使用才执行。

如果有一个线程在执行期间由于发生了错误而终止运行,一个新的线程会接替这个终止的线程完成就下来的任务。

线程池中的线程们会一直存在直到被要求关闭。

初始化时指定nThreads线程池线程数量上限。


2.newWorkStealingPool:

/**
     * Creates a thread pool that maintains enough threads to support
     * the given parallelism level, and may use multiple queues to
     * reduce contention. The parallelism level corresponds to the
     * maximum number of threads actively engaged in, or available to
     * engage in, task processing. The actual number of threads may
     * grow and shrink dynamically. A work-stealing pool makes no
     * guarantees about the order in which submitted tasks are
     * executed.
     *
     * @param parallelism the targeted parallelism level
     * @return the newly created thread pool
     * @throws IllegalArgumentException if {@code parallelism <= 0}
     * @since 1.8
     */
    public static ExecutorService newWorkStealingPool(int parallelism) {
        return new ForkJoinPool
            (parallelism,
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }

parallelism为并行度,newWorkStealingPool也有一个不带参数的构造,默认并行度为cup的个数。

这个参数规定了同一时间同时执行任务的线程数量上限。平时线程的数量是会动态增加或减少的,并不像newFixedThreadPool那样是固定的线程数量。

同时也不保证按照任务提交的顺序来完成任务,它为了减少竞争,采取了多种的queue。


3.newCachedThreadPool

/**
     * Creates a thread pool that creates new threads as needed, but
     * will reuse previously constructed threads when they are
     * available.  These pools will typically improve the performance
     * of programs that execute many short-lived asynchronous tasks.
     * Calls to {@code execute} will reuse previously constructed
     * threads if available. If no existing thread is available, a new
     * thread will be created and added to the pool. Threads that have
     * not been used for sixty seconds are terminated and removed from
     * the cache. Thus, a pool that remains idle for long enough will
     * not consume any resources. Note that pools with similar
     * properties but different details (for example, timeout parameters)
     * may be created using {@link ThreadPoolExecutor} constructors.
     *
     * @return the newly created thread pool
     */
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

创建一个可以动态增长的线程池,当我们有新的任务提交时,但是当前线程池并没有空余的线程可以供我们使用,这时候就会新建一个线程,如果有以前创建的线程空闲下来,则不会创建新的线程而是使用以前已经创建的。

当一个线程空闲下来超过60秒,这个线程就会被从缓存线程池中移除。

这种缓存线程池在应对那些执行时间很短的异步任务时表现很好。


4.newScheduledThreadPool

/**
     * Creates a thread pool that can schedule commands to run after a
     * given delay, or to execute periodically.
     * @param corePoolSize the number of threads to keep in the pool,
     * even if they are idle
     * @param threadFactory the factory to use when the executor
     * creates a new thread
     * @return a newly created scheduled thread pool
     * @throws IllegalArgumentException if {@code corePoolSize < 0}
     * @throws NullPointerException if threadFactory is null
     */
    public static ScheduledExecutorService newScheduledThreadPool(
            int corePoolSize, ThreadFactory threadFactory) {
        return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
    }

定时周期性地执行任务,线程数量固定。

public class Main {
    public static void main(String[] args) throws Exception {
        // 指定大小为4  
        ScheduledExecutorService m = Executors.newScheduledThreadPool(4);
        m.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                Date now = new Date();
                System.out.println("线程" + Thread.currentThread() + "报时:" + now);
            }

        }, 1, 1, TimeUnit.SECONDS); // 延迟1s秒执行,每隔1s执行一次
    }
}  


5.newSingleThreadPool:

/**
     * Creates an Executor that uses a single worker thread operating
     * off an unbounded queue. (Note however that if this single
     * thread terminates due to a failure during execution prior to
     * shutdown, a new one will take its place if needed to execute
     * subsequent tasks.)  Tasks are guaranteed to execute
     * sequentially, and no more than one task will be active at any
     * given time. Unlike the otherwise equivalent
     * {@code newFixedThreadPool(1)} the returned executor is
     * guaranteed not to be reconfigurable to use additional threads.
     *
     * @return the newly created single-threaded Executor
     */
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

顾名思义,这个线程池里只有一个线程,任何一个时刻,最多只有一个线程在工作。

每个被提交地任务按照先提交先执行的原则依次执行。

当原来的线程出现问题被终止了,会有一个新的线程被创建去执行后面的任务。

这个线程的阻塞队列采用了无边界的LinkedBlockingQueue。

还有一些扩展的比如

newSingleThreadScheduledExecutor

就是在上面一种的情况下加入了计时周期性执行。

这些就是Executor类提供给我们的创建线程池的方法,我们可以根据业务需要选择合适的方法。

这些线程池为了效率和应对特殊的情况,采用了不同的BlockingQueue,下面一篇文章我会介绍一下各种

BlockingQueue的实现类。



版权声明:本文为Dazhu233原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。