Java 五种线程池,JDK1.8新增newWorkStealingPool

  • Post author:
  • Post category:java


原文地址:

https://blog.csdn.net/smile_Running/article/details/91409942?utm_source=app

在应用开发中,通常有这样的需求,就是并发下载文件操作,比如百度网盘下载文件、腾讯视频下载视频等,都可以同时下载好几个文件,这就是并发下载。并发下载处理肯定是多线程操作,而大量的创建线程,势必会影响程序的性能,导致卡顿等问题。所以呢,Java 中给我们提供了线程池来管理线程。

首先,我们来看看线程池是什么?顾名思义,好比一个存放线程的池子,我们可以联想水池。线程池意味着可以储存线程,并让池内的线程得以复用,如果池内的某一个线程执行完了,并不会直接摧毁,它有生命,可以存活一些时间,待到下一个任务来时,它会复用这个在等待中线程,避免了再去创建线程的额外开销。

百度对线程池的简介:

【线程池(英语:thread pool):一种

线程

使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。】

线程池的概念与作用就介绍完了,下面就是线程池的运用了,我们来看这样的一个例子,模拟网络下载的功能,开启多任务下载操作,其中每条下载都开辟新线程来执行。

效果图:

可以看到就是这样效果,这里的每次点击 下载 按钮,都会开启一个子线程来更新进度条操作。注意了:这里我们看到的 name 就是线程的名字。可以观察到,5个下载任务所用的线程都是不同的,所以它们的线程名都不一样。

也就是说,我们每个任务开辟的都是一个新的线程,假如我们下载任务量非常庞大时,那开辟的线程将不可控制,先不说性能问题,如果出现了线程安全问题或者是线程的调度,处理起来都是非常困难的。所以这种情况下,非常的有必要引入我们的线程池来管理这些线程,刚刚我们介绍了线程池的优点,现在让我们具体的实现一下,才能体会它到底有那些优势。

首先,我们的线程池类型一共有 4 种,分别是 newSingleThreadPool、newFixedThreadPool、newCachedThreadPool、newScheduledThreadPool 四种,这是在 JDK1.8 版本以前了,在 JDK1.8 版本又加入了一种:newWorkStealingPool,所以现在一共是 5 种。

1、线程池的创建过程

通过这几种线程池的命名,我们大致可以猜测出来它的用意,当然,还是必须要实践一下。对 线程池 的创建一般都是这样的步骤:

  1. //创建单核心的线程池
  2. ExecutorService executorService = Executors.newSingleThreadExecutor();
  3. //创建固定核心数的线程池,这里核心数 = 2
  4. ExecutorService executorService = Executors.newFixedThreadPool(2);
  5. //创建一个按照计划规定执行的线程池,这里核心数 = 2
  6. ExecutorService executorService = Executors.newScheduledThreadPool(2);
  7. //创建一个自动增长的线程池
  8. ExecutorService executorService = Executors.newCachedThreadPool();
  9. //创建一个具有抢占式操作的线程池
  10. ExecutorService executorService = Executors.newWorkStealingPool();

我们只需要这样调用就可成功的创建适用于我们的线程池,不过从上面看不出上面东西来,我们要进入线程池创建的构造器,代码如下:

  1. /**
  2. * Creates a new {@code ThreadPoolExecutor} with the given initial
  3. * parameters and default thread factory and rejected execution handler.
  4. * It may be more convenient to use one of the {@link Executors} factory
  5. * methods instead of this general purpose constructor.
  6. *
  7. * @param corePoolSize the number of threads to keep in the pool, even
  8. * if they are idle, unless {@code allowCoreThreadTimeOut} is set
  9. * @param maximumPoolSize the maximum number of threads to allow in the
  10. * pool
  11. * @param keepAliveTime when the number of threads is greater than
  12. * the core, this is the maximum time that excess idle threads
  13. * will wait for new tasks before terminating.
  14. * @param unit the time unit for the {@code keepAliveTime} argument
  15. * @param workQueue the queue to use for holding tasks before they are
  16. * executed. This queue will hold only the {@code Runnable}
  17. * tasks submitted by the {@code execute} method.
  18. * @throws IllegalArgumentException if one of the following holds:<br>
  19. * {@code corePoolSize < 0}<br>
  20. * {@code keepAliveTime < 0}<br>
  21. * {@code maximumPoolSize <= 0}<br>
  22. * {@code maximumPoolSize < corePoolSize}
  23. * @throws NullPointerException if {@code workQueue} is null
  24. */
  25. public ThreadPoolExecutor(int corePoolSize,
  26. int maximumPoolSize,
  27. long keepAliveTime,
  28. TimeUnit unit,
  29. BlockingQueue<Runnable> workQueue) {
  30. this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
  31. Executors.defaultThreadFactory(), defaultHandler);
  32. }

当然,上面的注释都对参数进行了介绍,我们用自己的语言进行归纳一下:

  1. corePoolSize : 表示线程池核心线程数,当初始化线程池时,会创建核心线程进入等待状态,即使它是空闲的,核心线程也不会被摧毁,从而降低了任务一来时要创建新线程的时间和性能开销。
  2. maximumPoolSize : 表示最大线程数,意味着核心线程数都被用完了,那只能重新创建新的线程来执行任务,但是前提是不能超过最大线程数量,否则该任务只能进入阻塞队列进行排队等候,直到有线程空闲了,才能继续执行任务。
  3. keepAliveTime : 表示线程存活时间,除了核心线程外,那些被新创建出来的线程可以存活多久。意味着,这些新的线程一但完成任务,而后面都是空闲状态时,就会在一定时间后被摧毁。
  4. unit : 存活时间单位,没什么好解释的,一看就懂。
  5. workQueue : 表示任务的阻塞队列,由于任务可能会有很多,而线程就那么几个,所以那么还未被执行的任务就进入队列中排队,队列我们知道是 FIFO 的,等到线程空闲了,就以这种方式取出任务。这个一般不需要我们去实现。

还有一个注意点就是它这里的规定,可能会抛出这样的异常情况。这下面写的很明白了,就不要再介绍了:

  1. * @throws IllegalArgumentException if one of the following holds:<br>
  2. * {@code corePoolSize < 0 }
  3. * {@code keepAliveTime < 0 }
  4. * {@code maximumPoolSize <= 0 }
  5. * {@code maximumPoolSize < corePoolSize }
  6. * @throws NullPointerException if {@code workQueue} is null

好了,以上重点几个参数内容我们介绍完了,现在来看看几种线程池的比较和表现吧!

2、线程池的比较

(1)newSingleThreadPool,为单核心线程池,最大线程也只有一个,这里的时间为 0 意味着无限的生命,就不会被摧毁了。它的创建方式源码如下:

  1. public static ExecutorService newSingleThreadExecutor() {
  2. return new FinalizableDelegatedExecutorService
  3. (new ThreadPoolExecutor(1, 1,
  4. 0L, TimeUnit.MILLISECONDS,
  5. new LinkedBlockingQueue<Runnable>()));
  6. }

最形象的就是拿我们下载那个例子,为了便于测试,我当然添加了一个 全部下载的功能, newSingleThreadPool 测试结果如下:

由于我们的线程池中使用的从始至终都是单个线程,所以这里的线程名字都是相同的,而且下载任务都是一个一个的来,直到有空闲线程时,才会继续执行任务,否则都是等待状态。

(2)newFixedThreadPool,我们需要传入一个固定的核心线程数,并且核心线程数等于最大线程数,而且它们的线程数存活时间都是无限的,看它的创建方式:

  1. public static ExecutorService newFixedThreadPool(int nThreads) {
  2. return new ThreadPoolExecutor(nThreads, nThreads,
  3. 0L, TimeUnit.MILLISECONDS,
  4. new LinkedBlockingQueue<Runnable>());
  5. }

对比 newSingleThreadPool,其实改变的也就是可以根据我们来自定义线程数的操作,比较相似。我们通过newFixedThreadPool(2)给它传入了 2 个核心线程数,看看下载效果如何:

显然,它就可以做到并发的下载,我们两个下载任务可以同时进行,并且所用的线程始终都只有两个,因为它的最大线程数等于核心线程数,不会再去创建新的线程了,所以这个方式也可以,但最好还是运用下面一种线程池。

(3)newCachedThreadPool,可以进行缓存的线程池,意味着它的线程数是最大的,无限的。但是核心线程数为 0,这没关系。这里要考虑线程的摧毁,因为不能够无限的创建新的线程,所以在一定时间内要摧毁空闲的线程。看看创建的源码:

  1. public static ExecutorService newCachedThreadPool() {
  2. return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
  3. 60L, TimeUnit.SECONDS,
  4. new SynchronousQueue<Runnable>());
  5. }

没有核心线程数,但是我们的最大线程数没有限制,所以一点全部开始下载,就会创建出 5 条新的线程同时执行任务,从上图的例子看出,每天线程都不一样。看不出这个线程池的效果,下面我们通过修改这个逻辑。

首先,我们点开始下载,只会下载前面三个,为了证明线程的复用效果,我这里又添加了一个按钮,在这个按钮中继续添加后面两个下载任务。

那么,当线程下载完毕时,空闲线程就会复用,结果显示如下,复用线程池的空闲线程:

另一种情况,当线程池中没有空闲线程时,这时又加了新的任务,它就会创建出新的线程来执行任务,结果如下:

这下算是搞清楚这种线程池的作用了吧,但是由于这种线程池创建时初始化的都是无界的值,一个是最大线程数,一个是任务的阻塞队列,都没有设置它的界限,这可能会出现问题。

这里可以参考我的一篇文章:

AsyncTask 源码

分析,或者这个

单利模式

解读的文章,里面有提到如何创建自定义的线程池,参考的是

AsyncTask

的源码线程池创建代码。

(4)newScheduledThreadPool,这个表示的是有计划性的线程池,就是在给定的延迟之后运行,或周期性地执行。很好理解,大家应该用过 Timer 定时器类吧,这两个差不多的意思。它的构造函数如下:

  1. public ScheduledThreadPoolExecutor(int corePoolSize) {
  2. super(corePoolSize, Integer.MAX_VALUE,
  3. DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
  4. new DelayedWorkQueue());
  5. }

内部有一个延时的阻塞队列来维护任务的进行,延时也就是在这里进行的。我们把创建 newScheduledThreadPool 的代码放出来,这样对比效果图的话,显得更加直观。

  1. //参数2:延时的时长
  2. scheduledExecutorService.schedule(th_all_1, 3000, TimeUnit.MILLISECONDS);
  3. scheduledExecutorService.schedule(th_all_2, 2000, TimeUnit.MILLISECONDS);
  4. scheduledExecutorService.schedule(th_all_3, 1000, TimeUnit.MILLISECONDS);
  5. scheduledExecutorService.schedule(th_all_4, 1500, TimeUnit.MILLISECONDS);
  6. scheduledExecutorService.schedule(th_all_5, 500, TimeUnit.MILLISECONDS);

这个线程池好像不是很常用,做个了解就好了。

(5)newWorkStealingPool,这个是 JDK1.8 版本加入的一种线程池,stealing 翻译为抢断、窃取的意思,它实现的一个线程池和上面4种都不一样,用的是 ForkJoinPool 类,构造函数代码如下:

  1. /**
  2. * Creates a thread pool that maintains enough threads to support
  3. * the given parallelism level, and may use multiple queues to
  4. * reduce contention. The parallelism level corresponds to the
  5. * maximum number of threads actively engaged in, or available to
  6. * engage in, task processing. The actual number of threads may
  7. * grow and shrink dynamically. A work-stealing pool makes no
  8. * guarantees about the order in which submitted tasks are
  9. * executed.
  10. *
  11. * @param parallelism the targeted parallelism level
  12. * @return the newly created thread pool
  13. * @throws IllegalArgumentException if {@code parallelism <= 0}
  14. * @since 1.8
  15. */
  16. public static ExecutorService newWorkStealingPool(int parallelism) {
  17. return new ForkJoinPool
  18. (parallelism,
  19. ForkJoinPool.defaultForkJoinWorkerThreadFactory,
  20. null, true);
  21. }

从上面代码的介绍,最明显的用意就是它是一个并行的线程池,参数中传入的是一个线程并发的数量,这里和之前就有很明显的区别,前面4种线程池都有核心线程数、最大线程数等等,而这就使用了一个并发线程数解决问题。从介绍中,还说明这个线程池不会保证任务的顺序执行,也就是 WorkStealing 的意思,抢占式的工作。

如下图,任务的执行是无序的,哪个线程抢到任务,就由它执行:

对比了以上 5 种线程池,我们看到每个线程池都有自己的特点,这也是为我们封装好的一些比较常用的线程池。当然,我建议你在使用(3)可缓存的线程池时,尽量的不要用默认的那个来创建,因为默认值都是无界的,可能会出现一些问题,这时我们可以参考源码中的线程池初始化参数的设置,可以尽可能的避免错误发生。

通过这个案例,我们把线程池学习了一遍,总结一下线程池在哪些地方用到,比如网络请求、下载、I/O操作等多线程场景,我们可以引入线程池,一个对性能有提升,另一个就是可以让管理线程变得更简单。