前言
   
还有两天就放假了,还在岗位上坚守“swimming”的小伙伴们顶住。博主给大家带来一篇线程池的基本使用解解闷。
    
    
    为什么需要使用线程池
   
    
     1、减少线程创建与切换的开销
    
- 在没有使用线程池的时候,来了一个任务,就创建一个线程,我们知道系统创建和销毁工作线程的开销很大,而且频繁的创建线程也就意味着需要进行频繁的线程切换,这都是一笔很大的开销。
    
     2、控制线程的数量
    
- 使用线程池我们可以有效地控制线程的数量,当系统中存在大量并发线程时,会导致系统性能剧烈下降。
    
    
    线程池做了什么
   
    
     重复利用有限的线程
    
- 线程池中会预先创建一些空闲的线程,他们不断的从工作队列中取出任务,然后执行,执行完之后,会继续执行工作队列中的下一个任务,减少了创建和销毁线程的次数,每个线程都可以一直被重用,变了创建和销毁的开销。
    
    
    线程池的使用
   
    其实常用Java线程池本质上都是由
    
     ThreadPoolExecutor
    
    或者
    
     ForkJoinPool
    
    生成的,只是其根据构造函数传入不同的实参来实例化相应线程池而已。
   
    
    
    Executors
   
    
     Executors
    
    是一个线程池工厂类,该工厂类包含如下集合静态工厂方法来创建线程池:
   
- 
     
 newFixedThreadPool()
 
 :创建一个可重用的、具有固定线程数的线程池
- 
     
 newSingleThreadExecutor()
 
 :创建只有一个线程的线程池
- 
     
 newCachedThreadPool()
 
 :创建一个具有缓存功能的线程池
- 
     
 newWorkStealingPool()
 
 :创建持有足够线程的线程池来支持给定的并行级别的线程池
- 
     
 newScheduledThreadPool()
 
 :创建具有指定线程数的线程池,它可以在指定延迟后执行任务线程
    
    
    ExecutorService接口
   
    对设计模式有了解过的同学都会知道,我们尽量面向接口编程,这样对程序的灵活性是非常友好的。Java线程池也采用了面向接口编程的思想,可以看到
    
     ThreadPoolExecutor
    
    和
    
     ForkJoinPool
    
    所有都是
    
     ExecutorService
    
    接口的实现类。在
    
     ExecutorService
    
    接口中定义了一些常用的方法,然后再各种线程池中都可以使用
    
     ExecutorService
    
    接口中定义的方法,常用的方法有如下几个:
   
- 
     
 向线程池提交线程
 - 
       
 Future<?> submit()
 
 :将一个Runnable对象交给指定的线程池,线程池将在有空闲线程时执行Runnable对象代表的任务,该方法既能接收Runnable对象也能接收Callable对象,这就意味着sumbit()方法可以有返回值。
- 
       
 void execute(Runnable command)
 
 :只能接收Runnable对象,意味着该方法没有返回值。
 
- 
       
- 
     
 关闭线程池
 - 
       
 void shutdown()
 
 :阻止新来的任务提交,对已经提交了的任务不会产生任何影响。(等待所有的线程执行完毕才关闭)
- 
       
 List<Runnable> shutdownNow()
 
 : 阻止新来的任务提交,同时会中断当前正在运行的线程,另外它还将workQueue中的任务给移除,并将这些任务添加到列表中进行返回。(立马关闭)
 
- 
       
- 
     
 检查线程池的状态
 - 
       
 boolean isShutdown()
 
 :调用shutdown()或shutdownNow()方法后返回为true。
- 
       
 boolean isTerminated()
 
 :当调用shutdown()方法后,并且所有提交的任务完成后返回为true;当调用shutdownNow()方法后,成功停止后返回为true。
 
- 
       
    
    
    常见线程池使用示例
   
    
    
    一、newFixedThreadPool
   
线程池中的线程数目是固定的,不管你来了多少的任务。
    
     示例代码
    
   
public class MyFixThreadPool {
    public static void main(String[] args) throws InterruptedException {
        // 创建一个线程数固定为5的线程池
        ExecutorService service = Executors.newFixedThreadPool(5);
        System.out.println("初始线程池状态:" + service);
        for (int i = 0; i < 6; i++) {
            service.execute(() -> {
                try {
                    TimeUnit.MILLISECONDS.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName());
            });
        }
        System.out.println("线程提交完毕之后线程池状态:" + service);
        service.shutdown();//会等待所有的线程执行完毕才关闭,shutdownNow:立马关闭
        System.out.println("是否全部线程已经执行完毕:" + service.isTerminated());//所有的任务执行完了,就会返回true
        System.out.println("是否已经执行shutdown()" + service.isShutdown());
        System.out.println("执行完shutdown()之后线程池的状态:" + service);
        TimeUnit.SECONDS.sleep(5);
        System.out.println("5秒钟过后,是否全部线程已经执行完毕:" + service.isTerminated());
        System.out.println("5秒钟过后,是否已经执行shutdown()" + service.isShutdown());
        System.out.println("5秒钟过后,线程池状态:" + service);
    }
}
    
     运行结果:
    
   
初始线程池状态:[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
线程提交完毕之后线程池状态:[Running, pool size = 5, active threads = 5, queued tasks = 1, completed tasks = 0]
是否全部线程已经执行完毕:false
是否已经执行shutdown():true
执行完shutdown()之后线程池的状态:[Shutting down, pool size = 5, active threads = 5, queued tasks = 1, completed tasks = 0]
pool-1-thread-2
pool-1-thread-1
pool-1-thread-4
pool-1-thread-5
pool-1-thread-3
pool-1-thread-2
5秒钟过后,是否全部线程已经执行完毕:true
5秒钟过后,是否已经执行shutdown():true
5秒钟过后,线程池状态:[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 6]
    
     程序分析
    
   
- 
     当我们创建好一个FixedThreadPool之后,该线程池就处于
 
 Running
 
 状态了,但是
 
 pool size
 
 (线程池线程的数量)、
 
 active threads
 
 (当前活跃线程)
 
 queued tasks
 
 (当前排队线程)、
 
 completed tasks
 
 (已完成的任务数)都是0
- 
     当我们把6个任务都提交给线程池之后,
- 
       
 pool size = 5
 
 :因为我们创建的是一个固定线程数为5的线程池(注意:如果这个时候我们只提交了3个任务,那么
 
 pool size = 3
 
 ,说明线程池也是通过懒加载的方式去创建线程)。
- 
       
 active threads = 5
 
 :虽然我们向线程池提交了6个任务,但是线程池的固定大小为5,所以活跃线程只有5个
- 
       
 queued tasks = 1
 
 :虽然我们向线程池提交了6个任务,但是线程池的固定大小为5,只能有5个活跃线程同时工作,所以有一个任务在等待
 
- 
       
- 
     我们第一次执行
 
 shutdown()
 
 的时候,由于任务还没有全部执行完毕,所以
 
 isTerminated()
 
 返回
 
 false
 
 ,
 
 shutdown()
 
 返回true,而线程池的状态会由
 
 Running
 
 变为
 
 Shutting down
 
- 
     从任务的运行结果我们可以看出,名为
 
 pool-1-thread-2
 
 执行了两次任务,证明线程池中的线程确实是重复利用的。
- 
     5秒钟后,
 
 isTerminated()
 
 返回
 
 true
 
 ,
 
 shutdown()
 
 返回
 
 true
 
 ,证明所有的任务都执行完了,线程池也关闭了,我们再次检查线程池的状态
 
 [Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 6]
 
 ,状态已经处于
 
 Terminated
 
 了,然后已完成的任务显示为6
    
    
    二、newSingleThreadExecutor
   
从头到尾整个线程池都只有一个线程在工作。
    
     实例代码
    
   
public class SingleThreadPool {
    public static void main(String[] args) {
        ExecutorService service = Executors.newSingleThreadExecutor();
        for (int i = 0; i < 5; i++) {
            final int j = i;
            service.execute(() -> {
                System.out.println(j + " " + Thread.currentThread().getName());
            });
        }
    }
}
    
     运行结果
    
   
0 pool-1-thread-1
1 pool-1-thread-1
2 pool-1-thread-1
3 pool-1-thread-1
4 pool-1-thread-1
    
     程序分析
    
    
    可以看到只有
    
     pool-1-thread-1
    
    一个线程在工作。
   
    
    
    三、newCachedThreadPool
   
来多少任务,就创建多少线程(前提是没有空闲的线程在等待执行任务,否则还是会复用之前旧(缓存)的线程),直接你电脑能支撑的线程数的极限为止。
    
     实例代码
    
   
public class CachePool {
    public static void main(String[] args) throws InterruptedException {
        ExecutorService service = Executors.newCachedThreadPool();
        System.out.println("初始线程池状态:" + service);
        for (int i = 0; i < 12; i++) {
            service.execute(() -> {
                try {
                    TimeUnit.MILLISECONDS.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName());
            });
        }
        System.out.println("线程提交完毕之后线程池状态:" + service);
        TimeUnit.SECONDS.sleep(50);
        System.out.println("50秒后线程池状态:" + service);
        TimeUnit.SECONDS.sleep(30);
        System.out.println("80秒后线程池状态:" + service);
    }
}
    
     运行结果
    
   
初始线程池状态:[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
线程提交完毕之后线程池状态:[Running, pool size = 12, active threads = 12, queued tasks = 0, completed tasks = 0]
pool-1-thread-3
pool-1-thread-4
pool-1-thread-1
pool-1-thread-2
pool-1-thread-5
pool-1-thread-8
pool-1-thread-9
pool-1-thread-12
pool-1-thread-7
pool-1-thread-6
pool-1-thread-11
pool-1-thread-10
50秒后线程池状态:[Running, pool size = 12, active threads = 0, queued tasks = 0, completed tasks = 12]
80秒后线程池状态:[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 12]
    
     程序分析
    
   
- 因为我们每个线程任务至少需要500毫秒的执行时间,所以当我们往线程池中提交12个任务的过程中,基本上没有空闲的线程供我们重复使用,所以线程池会创建12个线程。
- 缓存中的线程默认是60秒没有活跃就会被销毁掉,可以看到在50秒钟的时候回,所有的任务已经完成了,但是线程池线程的数量还是12。
- 80秒过后,可以看到线程池中的线程已经全部被销毁了。
    
    
    四、newScheduledThreadPool
   
可以在指定延迟后或周期性地执行线程任务的线程池。
    
     ScheduledThreadPoolExecutor
    
   
- 
     
 newScheduledThreadPool()
 
 方法返回的其实是一个
 
 ScheduledThreadPoolExecutor
 
 对象,
 
 ScheduledThreadPoolExecutor
 
 定义如下:
public class ScheduledThreadPoolExecutor
        extends ThreadPoolExecutor
        implements ScheduledExecutorService {
- 
     可以看到,它还是继承了
 
 ThreadPoolExecutor
 
 并实现了
 
 ScheduledExecutorService
 
 接口,而
 
 ScheduledExecutorService
 
 也是继承了
 
 ExecutorService
 
 接口,所以我们也可以像使用之前的线程池对象一样使用,只不过是该对象会额外多了一些方法用于控制延迟与周期:- 
       
 public <V> ScheduledFuture<V> schedule(Callable<V> callable,long delay, TimeUnit unit)
 
 :指定callable任务将在delay延迟后执行
- 
       
 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit)
 
 :指定的command任务将在delay延迟后执行,而且已设定频率重复执行。(一开始并不会执行)
- 
       
 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,ong initialDelay,long delay,TimeUnit unit)
 
 :创建并执行一个在给定初始延迟后首期启用的定期操作,随后在每一个执行终止和下一次执行开始之间都存在给定的延迟。
 
- 
       
    
     示例代码
    
   
下面代码每500毫秒打印一次当前线程名称以及一个随机数字。
public class MyScheduledPool {
    public static void main(String[] args) {
        ScheduledExecutorService service = Executors.newScheduledThreadPool(4);
        service.scheduleAtFixedRate(() -> {
            System.out.println(Thread.currentThread().getName() + new Random().nextInt(1000));
        }, 0, 500, TimeUnit.MILLISECONDS);
    }
}
    
    
    五、newWorkStealingPool
   
每个线程维护着自己的队列,执行完自己的任务之后,会去主动执行其他线程队列中的任务。
    
     示例代码
    
   
public class MyWorkStealingPool {
    public static void main(String[] args) throws IOException {
        ExecutorService service = Executors.newWorkStealingPool(4);
        System.out.println("cpu核心:" + Runtime.getRuntime().availableProcessors());
        service.execute(new R(1000));
        service.execute(new R(2000));
        service.execute(new R(2000));
        service.execute(new R(2000));
        service.execute(new R(2000));
        //由于产生的是精灵线程(守护线程、后台线程),主线程不阻塞的话,看不到输出
        System.in.read();
    }
    static class R implements Runnable {
        int time;
        R(int time) {
            this.time = time;
        }
        @Override
        public void run() {
            try {
                TimeUnit.MILLISECONDS.sleep(time);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(time + " " + Thread.currentThread().getName());
        }
    }
}
    
     运行结果
    
   
cpu核心:4
1000 ForkJoinPool-1-worker-1
2000 ForkJoinPool-1-worker-0
2000 ForkJoinPool-1-worker-3
2000 ForkJoinPool-1-worker-2
2000 ForkJoinPool-1-worker-1
    
     程序分析
    
    
    
     ForkJoinPool-1-worker-1
    
    任务的执行时间是1秒,它会最先执行完毕,然后它会去主动执行其他线程队列中的任务。
   
    
    
    六、ForkJoinPool
   
- 
 ForkJoinPool
 
 可以将一个任务拆分成多个“小任务”并行计算,再把多个“小任务”的结果合并成总的计算结果。
 
 ForkJoinPool
 
 提供了如下几个方法用于创建
 
 ForkJoinPool
 
 实例对象:- 
       
 ForkJoinPool(int parallelism)
 
 :创建一个包含parallelism个并行线程的
 
 ForkJoinPool
 
 ,parallelism的默认值为
 
 Runtime.getRuntime().availableProcessors()
 
 方法的返回值
- 
       
 ForkJoinPool commonPool()
 
 :该方法返回一个通用池,通用池的运行状态不会受
 
 shutdown()
 
 或
 
 shutdownNow()
 
 方法的影响。
 
- 
       
- 
创建了 
 
 ForkJoinPool
 
 示例之后,就可以调用
 
 ForkJoinPool
 
 的
 
 submit(ForkJoinTask task)
 
 或
 
 invoke(ForkJoinTask task)
 
 方法来执行指定任务了。其中
 
 ForkJoinTask
 
 (实现了Future接口)代表一个可以并行、合并的任务。
 
 ForkJoinTask
 
 是一个抽象类,他还有两个抽象子类:
 
 RecursiveAction
 
 和
 
 RecursiveTask
 
 。其中
 
 RecursiveTask
 
 代表有返回值的任务,而
 
 RecursiveAction
 
 代表没有返回值的任务。
    
     示例代码
    
   
    下面代码演示了使用
    
     ForkJoinPool
    
    对1000000个随机整数进行求和。
   
public class MyForkJoinPool {
    static int[] nums = new int[1000000];
    static final int MAX_NUM = 50000;
    static Random random = new Random();
    static {
        for (int i = 0; i < nums.length; i++) {
            nums[i] = random.nextInt(1000);
        }
        System.out.println(Arrays.stream(nums).sum());
    }
//    static class AddTask extends RecursiveAction {
//
//        int start, end;
//
//        AddTask(int start, int end) {
//            this.start = start;
//            this.end = end;
//        }
//
//        @Override
//        protected void compute() {
//            if (end - start <= MAX_NUM) {
//                long sum = 0L;
//                for (int i = 0; i < end; i++) sum += nums[i];
//                System.out.println("from:" + start + " to:" + end + " = " + sum);
//            } else {
//                int middle = start + (end - start) / 2;
//
//                AddTask subTask1 = new AddTask(start, middle);
//                AddTask subTask2 = new AddTask(middle, end);
//                subTask1.fork();
//                subTask2.fork();
//            }
//        }
//    }
    static class AddTask extends RecursiveTask<Long> {
        int start, end;
        AddTask(int start, int end) {
            this.start = start;
            this.end = end;
        }
        @Override
        protected Long compute() {
            // 当end与start之间的差大于MAX_NUM,将大任务分解成两个“小任务”
            if (end - start <= MAX_NUM) {
                long sum = 0L;
                for (int i = start; i < end; i++) sum += nums[i];
                return sum;
            } else {
                int middle = start + (end - start) / 2;
                AddTask subTask1 = new AddTask(start, middle);
                AddTask subTask2 = new AddTask(middle, end);
                // 并行执行两个“小任务”
                subTask1.fork();
                subTask2.fork();
                // 把两个“小任务”累加的结果合并起来
                return subTask1.join() + subTask2.join();
            }
        }
    }
    public static void main(String[] args) throws IOException {
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        AddTask task = new AddTask(0, nums.length);
        forkJoinPool.execute(task);
        long result = task.join();
        System.out.println(result);
        forkJoinPool.shutdown();
    }
}
    
    
    额外补充
   
上面我们说到过:其实常用Java线程池都是由
ThreadPoolExecutor
或者
ForkJoinPool
两个类生成的,只是其根据构造函数传入不同的实参来生成相应线程池而已。那我们现在一起来看看Executors中几个创建线程池对象的静态方法相关的源码:
    
     ThreadPoolExecutor构造函数原型
    
   
public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
    
     参数说明
    
   
- 
     
 corePoolSize
 
 :核心运行的poolSize,也就是当超过这个范围的时候,就需要将新的Runnable放入到等待队列workQueue中了
- 
     
 maximumPoolSize
 
 :线程池维护线程的最大数量,当大于了这个值就会将任务由一个丢弃处理机制来处理(当然也存在永远不丢弃任务的线程池,具体得看策略)
- 
     
 keepAliveTime
 
 :线程空闲时的存活时间(当线程数大于corePoolSize时该参数才有效)
- 
     
 unit
 
 :keepAliveTime的单位
- 
     
 workQueue
 
 :用来保存等待被执行的任务的阻塞队列,且任务必须实现Runable接口
    
     newFixedThreadPool
    
   
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
    
     newSingleThreadExecutor
    
   
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
    
     newCachedThreadPool
    
   
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
    
     newScheduledThreadPool
    
   
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }
    
     newWorkStealingPool
    
   
    public static ExecutorService newWorkStealingPool(int parallelism) {
        return new ForkJoinPool
            (parallelism,
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }
    
    
    拉票环节
   
觉得文章写得不错的朋友可以点赞、转发、加关注呀!你们的支持就是我最大的动力,笔芯!
 
