白话说Java虚拟机原理系列【第十章】:线程/线程池/容器队列详解【完结】

  • Post author:
  • Post category:java

前导说明:
本文基于《深入理解Java虚拟机》第二版和个人理解完成,
以大白话的形式期望能用大家都看懂的描述讲清楚虚拟机内幕,
后续会增加基于《深入理解Java虚拟机》第三版内容,并进行二个版本的对比

线程

Thread类:表示一个线程,调用start()方法启动,线程的生命周期(各个状态)前边讲过了。但在Java中实现线程的手段并不是单纯创建Thread对象这一种方式:虽然最终都是通过Thread类实现的,但也可以通过不同的写法实现。

  • 1.继承Thread类,实现run()方法。
  • 2.实现Runable接口,实现run()方法。
  • 3.实现Callable接口,实现call()方法。
    • 1.此方式可以有返回值和抛异常。
    • 2.其实Callable的实现类是需要传递给FutureTask的构造方法,而FutureTask实现了Runable。
    • 3.依然使用Thread启动线程。
    • 4.new Thread(new FutureTask(new Callable())).start();
      将会创建一个线程,然后执行FutureTask类的run()方法,而此run方法将会调用传递给他的Callable.call()方法并得到返回值。
    • 5.返回值会保存在FutureTask的属性中,因为是FutureTask的run()方法调用的Callable.call()方法,所以返回值结果在FutureTask中也很好理解,所以可以通过FutureTask.get()方法得到返回结果。

线程池

使用线程池的目的:限制系统中执行线程的数量(每个线程大约占用1M内存空间)。同时也为了减少频繁创建、释放线程带来的性能损耗,以达到让环境的运行性能最佳。

我们先来看一下线程池的继承关系图:
在这里插入图片描述
这里我们主要讲三种线程池:ThreadPoolExecutorScheduledThreadPoolExecutorForkJoinPool

ThreadPoolExecutor

从继承关系中,我们知道:

  • 1.Executor接口,定了一个线程执行方法execute(),并没有线程池的功能,可以认为是一个线程执行器接口。
  • 2.ExecutorService接口,定义了线程池的功能,所以此类开始才算是一个线程池接口。
  • 3.AbstractExecutorService则是一个适配器类,把ExecutorService的方法都做了简单实现,避免后续继承中一定要实现所有方法,这样后续继承中只需实现各自需要的特定方法即可。
  • 4.ThreadPoolExecutor类就是本节对应的java提供的线程池类。

构造函数及参数

构造函数:从源码我们可知知道如下的构造函数

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue);
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory);
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler);

public ThreadPoolExecutor(
	int corePoolSize, //1
	int maximumPoolSize, //2 
	long keepAliveTime, //3
	TimeUnit unit, //4
	BlockingQueue<Runnable> workQueue, //5
	RejectedExecutionHandler handler, //6
	ThreadFactory threadFactory //7
);

参数详解:

  • 1.int corePoolSize:
    线程池中核心任务线程数,正常创建线程池后,池中不会事先创建线程,即线程数为0,只有提交执行任务线程后才会创建线程去执行,当向线程池中提交执行的任务线程数超过核心任务线程数目后,会将超出的任务线程添加到等待队列。当创建线程池后,如果调用prestartAllCoreThreads()或者prestartCoreThread()方法时,线程池就会进行对池中的线程初始化了,初始化成coreSize个线程或者初始化一个线程。
    注意:加入到队列的任务线程什么时候会被取出执行呢?
    答案是,当有线程空闲的时候就会去执行,分以下两种情况。
    其实是,既然加入到队列了,说明corePollSize已经满了,那么当corePoolSize个数个线程中有空闲的了,就会用来从队列中获取然后去执行,当有新任务线程提交时,则先执行提交的任务线程,即队列的优先级较低。
    另一种情况是,当corePoolSize满后,且队列也满了,此时还未达到最大线程,比如又提交了任务线程然后创建了一个线程去执行它,当这个线程执行完后,并不会空闲下来去等待keepAliveTime个时间单位然后去销毁,而是会先加入执行队列中的任务线程,当任务队列的线程都执行完了,那么这个多出corePoolSize的线程才会去等待keepAliveTime时间,然后去销毁。其实用一句话概括就是,只要线程池中的线程是执行完的线程,即空闲的,就会去获取队列中的任务线程然后执行。就是说不会在线程池中专门为执行队列中的任务线程而创建新线程,只会使用目前手头已经存在的线程,且是空闲的时候才会去执行队列中的任务线程。
    重要细节:其实线程池中创建的线程执行任务线程的时候会有一个循环判断,如果当前任务线程!=null,则调用任务线程.start()方法,当任务线程=null(此时即任务线程已经执行完毕),此时调用getTask方法获取任务线程(即从队列中获取线程),不等于null,并且如果符合>corePoolSize的话还会判断keepAliveTime超时终止规则,如果超时了则终止关闭此线程,如果不超时,即不用关闭时,则会运行从队列获取的任务线程(可从源码得知)。这是一个巧妙的设计,有人可能会设计成,如果线程空闲了,就去执行其他的新添加的任务线程,但是这样就需要设计重新分配新任务的功能,代码上实现比较复杂,所以此处是采用让线程直接执行完任务线程后,就去队列中获取任务线程去执行,不过当线程池中的线程数<=corePoolSize时,如果线程空闲下来,他内部依然是不终止了的,只不过同时也会创建一个空线程,这样就不会涉及到分配问题了,又保障了<=corePoolSize时不终止线程的规则,这个注意一哈,可以看源代码,主要是Worker的run()方法。

  • 2.int maximumPoolSize
    线程池中最大任务线程数,当向线程池中添加任务线程时,发现等待队列已满,此时会判断目前线程池中的任务线程是否达到最大任务线程数,如果没有,则会直接创建新的线程来运行新添加的任务线程。

  • 3.long keepAliveTime
    表示线程池中的线程在没有任务线程执行的时候最多空闲多久时间会终止。默认情况该参数只对当线程池中的线程数大于corePoolSize时起作用,即如果线程数<=corePoolSize时,即便线程是空闲的也不会等待keepAliveTime后终止。但是如果创建线程池后调用了allowCoreThreadTimeOut(boolean)方法,那么keepAliveTime将对线程池中的所有线程起作用,只要有线程空闲,就会等待keepAliveTime时间后终止,直到线程池中的线程数为零。

  • 4.TimeUnit timeUnit
    keepAliveTime的单位,通过TimeUnit的静态值表示,共7种取值。

    • 1.TimeUnit.DAYS; //天
    • 2.TimeUnit.HOURS; //小时
    • 3.TimeUnit.MINUTES; //分钟
    • 4.TimeUnit.SECONDS; //秒
    • 5.TimeUnit.MILLISECONDS; //毫秒
    • 6.TimeUnit.MICROSECONDS; //微妙
    • 7.TimeUnit.NANOSECONDS; //纳秒
  • 5.ThreadFactory threadFactory
    线程池创建线程的线程工厂类。

  • 6.RejectedExecutionHandler handler
    表示当拒绝处理任务线程时的策略,有以下四种取值。

    • 1.ThreadPoolExecutor.AbortPolicy:丢弃任务线程并抛出RejectedExecutionException异常。
    • 2.ThreadPoolExecutor.DiscardPolicy:丢弃任务线程,但是不抛出异常。
    • 3.ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列中最前面的任务线程,然后重新尝试执行任务(重复此过程)。
    • 4.ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务 。就是用向线程池添加任务线程的这个线程执行这个线程池不接受的任务线程。

    原理: 接上文,当向线程池中添加任务线程时,如果当前线程池线程数目已经达到了maximumPoolSize那么此时线程池将不再接受新任务线程了,即会运行此处的拒绝策略,进行拒绝处理。
    7.BlockingQueue<Runnable> workQueue
    此即为线程池中的队列,又叫缓存队列或等待队列。该队列为阻塞队列的实现。

    • 1.ArrayBlockingQueue
      基于数组的先进先出队列,此队列创建时必须指定大小;
    • 2.LinkedBlockingQueue
      基于链表的先进先出队列,如果创建时没有指定此队列大小,则默认为Integer.MAX_VALUE;
    • 3.PriorityBlockingQueue
      此队列是一个根据设定的元素的优先级进行排列的阻塞队列,大小是没有上限的。且要求加入此队列的元素具有比较能力。
    • 4.SynchronousQueue
      这个队列比较特殊,它不会保存提交的任务,即该队列是一个没有容量的队列,即放入数据时就会阻塞,而只有消费这个数据时放入时的阻塞才会解开,同样,如果没有放入数据就获取消费数据,消费也会阻塞,除非放入数据,消费的阻塞才会解开,即类似于管道,我放入就要等你接受,你不接受我就一直等,这就是SynchronousQueue阻塞队列。所以使用这种队列的线程池一般要求maximumPoolSize的值是无限的,要不然要执行向同步队列放入数据时,没有消费此数据的线程,那么再来一个任务线程要往里放时就会因为没有队列空间使用而报错,因为SynchronouseQueue没有容量,所以在超过corePoolSize后提交一个任务线程就会使用队列,而队列没有空间,所以他会执行向队列中put的请求,但是此时就要求消费方(即线程池中的线程)去take消费他,也即需要创建新线程,所以一般使用这个队列时,需要将maximumPoolSize设置成无限大,至少是Integer.MAX_VALUE。

扩展:上边我们单独讲了阻塞队列,或者另一种叫法为阻塞容器,都是容器的一种,我们看看常用的容器。老祖宗级别顶级接口:Collection接口

  • 1.List接口:忽略不讲。
  • 2.Set接口:忽略不讲。
  • 3.Queue接口:队列的顶级接口,下边我们讲这个。
    • 1.阻塞队列
      其实阻塞队列之所以会阻塞,就是因为他们内部都是通过ReentrantLock锁的newCondition()、await()和signal()来做了实现的,就是类似生产者、消费者的例子。当然因为内部使用了ReentrantLock,所以也是线程安全的。同样既然都是ReentrantLock锁实现的,那么ReentrantLock锁的优点同样这里也适用,如公平锁等。阻塞队列之所以阻塞是因为他有阻塞的方法,当然这些阻塞方法都是通过Lock来实现的了,同样也有非阻塞方法,非阻塞的方法一般都是继承自Queue接口,而阻塞的方法则是他们自己提供的,阻塞队列根据不同的队列实现方式分为如下:其实上边已经讲过了。
      阻塞队列内部是通过Lock实现,而Lock就是阻塞同步锁的代表,所以通过Lock实现的队列也顺理成章的叫阻塞队列了
      在这里插入图片描述

      • 1.几个重要的类的继承关系(全都是阻塞队列,即BlockingQueue的子类)
      • 2.ArrayBlockingQueue
        构造是必须指定容量大小,因为它是以数组的形式实现的队列功能,即内部保存有数组的属性。此队列按 FIFO(先进先出)原则对元素进行排序
      • 3.LinkedBlockingQueue
        基于链表实现的队列,即内部有链表的属性,以此实现的队列功能。默认最大值为Integer.MAX_VALUE,也可手动指定大小。此队列按 FIFO(先进先出)排序元素。
      • 4.PriorityBlockingQueue
        一个带优先级的队列,而不是先进先出队列。该队列没有上限,所以put方法永远不会阻塞,只会让系统内存耗尽。该队列中的元素需要具有比较能力。
      • 5.SynchronousQueue:参见上边讲过的。
      • 6.DelayQueue
        是一个没有上线的队列。是通过PriorityQueue队列实现的此队列功能,就是内部有一个PriorityQueue的属性对象,注意PriorityQueue是一个非阻塞队列哦,内部没有同步,即线程不安全。但是DelayQueue内是有锁的哦。是一个将Dalayed接口实现类作为元素的阻塞队列,这个元素可以设置延迟时间,延迟时间不到的话,通过poll获取时将会返回空。队列中不允许存入null值。当一个元素的getDelay(TimeUnit.NANOSECONDS)方法返回一个小于或等于零的值时,则出现期满,即延期时间到,poll就可获取并移除这个元素了。

      API 方法列举

      • add:增加一个元索,如果队列已满,则抛出一个IIIegaISlabEepeplian异常。
      • remove:移除并返回队列头部的元素,如果队列为空,则抛出一个NoSuchElementException异常。
      • element:返回队列头部的元素,如果队列为空,则抛出一个NoSuchElementException异常。
      • offer:添加一个元素并返回true,如果队列已满,则返回false。
      • poll:移除并返问队列头部的元素,如果队列为空,则返回null。
      • peek:返回队列头部的元素,如果队列为空,则返回null。
      • put:添加一个元素,如果队列满,则阻塞(是通过ReentrantLock的await()、signal()实现的)。【注意:阻塞当然那是让调用此队列put方法的线程阻塞喽】
      • take:移除并返回队列头部的元素,如果队列为空,则阻塞(是通过ReentrantLock的await()、signal()实现的)。

      总结:阻塞队列主要作为多线程的共享变量时比较好用,就是生产者、消费者,其实一个线程池不就是一个生产者、消费者的使用模型吗?一个线程池,多个线程共享一个阻塞队列,只不过实现的比较复杂罢了。

    • 2.非阻塞队列
      和阻塞队列类似,就是采用非阻塞同步锁或无锁实现的队列?
      没错,此处线程安全的非阻塞同步采用了CAS,而线程不安全的当然就是普通的队列了。

      • ArrayDeque:此处不做重点讲解
      • PriorityQueue:此处不做重点讲解
      • ConcurrentLinkedQueue:这个后边讲并发容器会详细讲,因为他可以作为并发的共享变量,且此变量没有锁,所以不会阻塞。

面试题

面试题:LinkedList和ArrayList的区别:
主要是ArrayList基于数组实现,数组的特点就是查询效率高,写入时因为要移动数据所以效率低;LinkedList是双向链表,写入时效率高,读取时因为要移动指针所以效率低;

时间复杂度比较:
首先一点关键的是,ArrayList的内部实现是基于基础的对象数组的,因此,它使用get方法访问列表中的任意一个元素时(random access),它的速度要比LinkedList快(O1)。LinkedList中的get方法是按照顺序从列表的一端开始检查,直到另外一端(On)。对LinkedList而言,访问列表中的某个指定元素没有更快的方法了
但在某些情况下LinkedList的表现要优于ArrayList,有些算法在LinkedList中实现时效率更高。比方说,利用Collections.reverse方法对列表进行反转时,其性能就要好些。当要对list进行大量的插入和删除操作时,LinkedList也是一个较好的选择。

总结: ArrayList和LinkedList在性能上各有优缺点,都有各自所适用的地方,总的说来可以描述如下:

  • 1.对ArrayList和LinkedList而言,在列表末尾增加一个元素所花的开销都是固定的。对ArrayList而言,主要是在内部数组中增加一项,指向所添加的元素,偶尔可能会导致对数组重新进行分配;而对LinkedList而言,这个开销是统一的,分配一个内部Entry对象。
  • 2.在ArrayList的中间插入或删除一个元素意味着这个列表中剩余的元素都会被移动;而在LinkedList的中间插入或删除一个元素的开销是固定的。
  • 3.LinkedList不支持高效的随机元素访问。
  • 4.ArrayList的空间浪费主要体现在在list列表的结尾预留一定的容量空间,而LinkedList的空间花费则体现在它的每一个元素都需要消耗相当的空间
    可以这样说:当操作是在一列数据的后面添加数据而不是在前面或中间,并且需要随机地访问其中的元素时,使用ArrayList会提供比较好的性能;当你的操作是在一列数据的前面或中间添加或删除数据,并且按照顺序访问其中的元素时,就应该使用LinkedList了

由此可以看出哈,队列不论是阻塞的还是非阻塞的,也就是个和List、Set同级别的容器罢了,都有线程安全和不安全的,之所以感觉阻塞队列跟阻塞沾边就比较高深是因为它的实现是通过用锁的await和signal实现的罢了,而这正是生产者、消费者模式的底层实现,所以他顺理成章的作为了线程池的内部队列,也即其实线程池就是很好的生产者和消费者的实现案例。

线程池ThreadPoolExecutor的几个重要API方法

线程池ThreadPoolExecutor的几个重要API方法

  • 1.execute()
    将任务线程提交到线程池中执行,没有返回值。
  • 2.submit()
    将任务线程提交到线程池中执行(内部仍然执行的execute()方法),返回值Future对象,可以得到返回值。
  • 3.shutdown()
    进行关闭线程池,他会让线程池进入SHUTDOWN状态,具体表现就是他不会再接受新的任务线程,但他会等待所有已经提交了的任务线程执行完毕,然后最终关闭线程池。优雅关闭。
  • 4.shutdownNow()
    也是关闭线程池,他会让线程池进入STOP状态,也是不能接受新任务线程,但他会尝试终止正在执行任务线程的线程。暴力关闭。

线程池的状态

线程池的状态

  • 1.RUNNING
    当创建线程池后,初始时,线程池处于RUNNING状态;
  • 2.SHUTDOWN
    执行shutdown()方法后,见上边解释。
  • 3.STOP
    执行shutdownNow()方法后,见上边解释。
  • 4.TERMINATED
    当线程池处于SHUTDOWN或STOP状态,并且所有工作线程已经销毁,任务缓存队列已经清空或执行结束后,线程池被设置为TERMINATED状态。

线程池的运行原理

线程池的运行原理: 在讲解构造函数的参数时其实已经可以知道原理了
在这里插入图片描述
首先创建线程池后,池中的线程数默认为0,如果提交新的任务线程进来,则会创建线程去执行任务线程,当提交的任务线程超过corePoolSize个数时,新的任务线程将会加入到阻塞队列中去等待,如果阻塞队列已满后仍然有新的任务线程提交进来,那么此时判断是否池中线程数(注意是线程数,不含队列中的哦,队列中的可不是线程数,而是等待执行的任务线程)超过maximumPoolSize,如果没超过,则创建新线程去执行任务线程;如果超过了,此时线程池将不接受新提交进来的这个任务线程了,而是将执行RejectedExecutionHandler设置的拒绝策略。

线程池大小动态调整

线程池大小动态调整

  • 1.setCorePoolSize()
  • 2.setMaximumPoolSize()
  • 3.当上述参数从小变大时,ThreadPoolExecutor可能立即创建新的线程来执行任务,如果没有任务就会执行队列中的任务线程,否则则会空闲或终止。如果参数设置的是从大变小(意思就是比如原来core=10,max=15,现在改成core=5,max=10),那么会从执行完任务线程的空闲线程中进行终止。

ScheduledThreadPoolExecutor

能够延迟、周期执行的ThreadPoolExecutor线程池,就是类似定时任务的线程池。这里不重点讲解,大家可以自行对照ThreadPoolExecutor使用。

ForkJoinPool

在这里插入图片描述

ForkJoinPool是JDK7引入的线程池,核心思想是将大的任务拆分成多个小任务(即fork),然后在将多个小任务处理汇总到一个结果上(即join),非常像MapReduce处理原理。同时,它提供基本的线程池功能,支持设置最大并发线程数,支持任务排队,支持线程池停止,支持线程池使用情况监控,也是AbstractExecutorService的子类,主要还引入了“工作窃取”机制,在多CPU计算机上处理性能更佳。

work-stealing(工作窃取算法)

1.ForkJoinPool提供了一个更有效的利用线程的机制,当ThreadPoolExecutor还在用单个队列存放任务时,ForkJoinPool已经分配了与线程数相等的队列,当有任务加入线程池时,会被平均分配到对应的队列上,各线程进行正常工作,当有线程提前完成时,会从其他线程的队列的末端“窃取”未执行完的任务。当任务量特别大时,CPU多的计算机会表现出更好的性能。

2.也即ForkJoinPool线程池中每一个线程都会有一个自己的双端队列(头尾都可以操作),比如初始创建了5个线程,那么每一个都有自己的队列,当任务线程(ForkJoinTask)提交进来后,会被均分到某一个线程的队列中,当有线程空闲时,它会随机到其他一个有任务线程待执行的线程的队列中从尾部获取(窃取)出来直接运行,这就是窃取算法了。
在这里插入图片描述
示意图:ForkJoinPool中维护着多个线程(一般为CPU核数)在不断地执行Task,每个线程除了执行自己职务内的Task之外,还会根据自己工作线程的闲置情况去获取其他繁忙的工作线程的Task,如此一来就能能够减少线程阻塞或是闲置的时间,提高CPU利用率。

3.不过这个看上去没有比ThreadPoolExecutor高级到哪里啊?ThreadPoolExecutor也是创建多个线程,然后消耗一个队列。ForkJoin也是创建多个线程,只不过队列变成了多个,且每个线程都可以去消耗任意一个线程的私有队列。==>看上去就是队列多了几个吧?

  • 1.当然了,它的重点还是在将大任务切分为小任务,即小任务提交到线程的队列中,然后还是通过窃取算法执行,这样就是ThreadPoolExecutor没有的特性了。
    例子如右边,其实就是把一个计算两个数之间的数连起来的加和,换成了2个数一算的分任务,最后整合到一起,这种拆分需要自己写逻辑哦。
    public class CountTaskTmp extends RecursiveTask<Integer> {
        private static final int THRESHOLD = 2;
        private int start;
        private int end;
        public CountTaskTmp(int start, int end) {
            this.start = start;
            this.end = end;
        }
        //实现compute方法来实现任务切分和计算
        protected Integer compute() {
            int sum = 0;
            boolean canCompute = (end - start) <= THRESHOLD;
            if (canCompute) {
                for (int i = start; i <= end; i++) sum += i;
            } else {
                //如果任务大于阀值,就分裂成两个子任务计算
                int mid = (start + end) / 2;
                CountTaskTmp leftTask = new CountTaskTmp(start, mid);
                CountTaskTmp rightTask = new CountTaskTmp(mid + 1, end);
                //执行子任务
                leftTask.fork();
                rightTask.fork();
                //等待子任务执行完,并得到结果
                int leftResult = (int) leftTask.join();
                int rightResult = (int) rightTask.join();
                sum = leftResult + rightResult;
            }
            return sum;
        }
    
        public static void main(String[] args) {
            //使用ForkJoinPool来执行任务
            ForkJoinPool forkJoinPool = new ForkJoinPool();
            //生成一个计算资格,负责计算1+2+3+4
            CountTaskTmp task = new CountTaskTmp(1, 4);
            Integer r = forkJoinPool.invoke(task);
            System.out.println(r);
            //  或者可以这样写
            Future<Integer> result = forkJoinPool.submit(task);
            try {
                System.out.println(result.get());
            } catch (Exception e) {
            }
        }
    }
    
  • 2.从案例上看就好像是多线程版本的递归算法。其实就是这样,有递归的算法特性,所以才能把大任务拆成小任务,然后让各个线程执行小任务最后汇总。

4.建议:在fork/join框架中,若某个子问题由于等待另一个子问题的完成而无法继续执行。那么处理该子问题的线程会主动寻找其他尚未运行完成的子问题来执行。这种方式减少了线程的等待时间,提高了性能。子问题中应该避免使用synchronized关键词或其他方式的同步。也不应该是一阻塞IO或过多的访问共享变量。在理想情况下,每个子问题的实现中都应该只进行CPU相关的计算,并且只使用每个问题的内部对象。唯一的同步应该只发生在子问题和创建它的父问题之间。

5.注意执行流程

  • 1.若自己线程的队列非空,则代表自己线程的任务线程还没执行完毕,取出任务线程并执行。【操作自己线程的队列,只是用poll()和peek()方法,即从头部取数据。】
  • 2.若自己线程的队列为空,则随机选取一个其他的工作线程的队列中的任务线程并执行。【操作其他线程的队列时,只能使用take()方法,从队尾获取数据,避免冲突。】

ForkJoinTask任务线程的实现接口

  • 1.RecursiveAction:没有返回结果的任务,可以继承这个类。注意他没有实现Runnable。
  • 2.RecursiveTask:有返回结果的任务线程,可以继承这个类。注意他没有实现Runnable。
  • 3.继承关系
    在这里插入图片描述

ForkJoinPool的任务线程的实现

  • 1.Runnable
  • 2.Callable
  • 3.Future
  • 4.通过源码得知,这三种接口的实现类都可以作为ForkJoinPool线程池的任务线程,只不过重点我们放在ForkJoinTask的实现类上,即Future的实现类。

提交任务的方式

  • 1.将任务提交给线程池执行
    • 1.invoke()
    • 2.submit()
    • 3.execute()
    • 4.此三种方法提交的任务线程就跟上边描述的一样,直接作为ForkJoinPool线程池的任务线程提交,然后由各个线程负责执行,不够执行的则进入各个线程对应的队列,然后会使用窃取算法,跟ThreadPoolExecutor运行方式差别不大。
      具体是不是拆分小任务要看下一步。
  • 2.将任务分解、聚合通过子任务执行
    • 1.fork()
    • 2.join()
    • 3.这两个方法是在任务线程中调用的,即运行任务时是否需要分解运行,然后再聚合结果,如果拆分,那么拆分的会使用子任务来完成执行,虽然还是每个任务都要用一个线程来运行,但是运行单元小了(大小要得当,单元太小就会线程切换频繁,太大就会导致阻塞或耗时较长),这样就能更有效的利用资源提升性能。
      如《窃取算法》章节的案例代码。

ForkJoinPool的成员变量

  • 1.ForkJoinWorkerThread[] workers;
    这个就是线程池,上边的submit或invoke或execute提交后的任务线程都会被随机分配到线程的队列(右侧的属性)属性中,当然如果有线程可以用直接运行了,没有空闲的才会进入队列。还有调用fork()产生的子任务也是被提交到这个队列。
    ForkJoinTask<?>[] queue; 就是接收任务线程的队列
  • 2.ForkJoinTask<?>[] submissionQueue;
    源码的解释是初始化设置的服务于提交的队列。

ForkJoinPool构造方法

在这里插入图片描述

  • ForkJoinPool(int parallelism):创建一个包含parallelism个并行线程的ForkJoinPool
  • ForkJoinPool():以Runtime.availableProcessors()方法的返回值作为parallelism参数来创建ForkJoinPool。

线程安全的容器

同步容器

也可叫做同步阻塞容器,下边我们列举几个常用的容器:

  • 1.Vector

  • 2.HashTable

  • 3.Stack:继承自Vector,很多方法都实现了synchronized。

  • 4.Collections.synchronizedXxx()得到的容器
    在这里插入图片描述

  • 5.以上这些都是通过阻塞同步即synchronized实现的,所以叫同步容器。

并发容器:

也可叫做并发非阻塞容器,即没有通过使用同步synchronized实现的容器。

  • 1.ConcurrentHashMap

    • 1.已经知道HashMap是通过数组+链表实现的,默认是16个数组元素,每个元素都有一个链表维护叶子节点,当需要扩容时,一般都是成倍数扩容,也意味着全部数据进行rehash,性能比较低。且不是线程安全的,即便HashTable是线程安全的,但是读写时,他是把整个表(对象)都锁住了,性能也比较低下。注意只要扩容,就是新建数组,将元数据rehash到新数组,然后删掉原数组。
    • 2.ConcurrentHashMap能做到读取的时候不加锁,因为读取时采用的是Unsafe类的原子操作,所以不存在线程安全问题,就好比CAS一样,所以不需要加锁。
    • 3.他虽然叫并发容器,但是他并不是完全的并发一点不加锁,只是为了提升效率,把锁加的力度更细了。
    • 4.ConcurrentHashMap在写数据的时候是分段加锁的,它提出了一个Segment段的概念,其实可以认为每一个段都是一个HashTable,即又加了一层数组结构,而对每个Segment段进行加锁,这样在写的时候只对数据路由到的某个段进行加锁,其他的段就可以继续提供服务,而默认是由16个段,所以一定程度上是HashTable的16倍并发能力。
      在这里插入图片描述
    • 5.ConcurrentHashMap初始化有三个参数
      • initialCapacity:初始容量,表示每个Segment段即内部数组的容量。默认16。
      • loadFactor:负载因子,各个Segment段扩容的变量因子。
      • ConcurrentLevel:Segment段的数量,此值一旦指定不可修改,如果后续需要扩容,ConcurrentHashMap扩容的是各个Segment段内部的数组,这样避免全盘rehash,而只需rehash某个段即可。默认16。
    • 6.size()方法:因为size是算全部数据,因为每个Segment段都会有个modCount值,表示该段下所有修改操作的次数,它会检查两次每个段的这个值,如果两次都一样就返回这个段的大小,最后相加就是size,如果有变化就在尝试做一次检查,即共检查2次,如果再不对,那么就采取全ConcurrentHashMap加锁进行遍历,然后得到size。
    • 7.缺点:由结构可以看出,ConcurrentHashMap的hash操作需要进行两次,第一次是得到数据对应的Segment段,然后再hash得到对应的Segment段内部的(即HashTable的数组下标)头部,然后存到具体的链表中,这样虽然hash的性能不如普通的HashTable,但是他的并发量能同时达到Segment段的个数,大大提高了并发能力。
    • 8.默认容量:默认ConcurrentHashMap中有16个Segment段,每个段中有个16个元素的数组。而它的扩容策略是对各个段内的数组进行扩容,默认是成倍扩容,然后重新rehash段内的数组的所有数据,类似于HashMap的扩容。
  • 2.ConcurrentLinkedQueue

    由于篇幅问题,该容器原理此处不细讲。

    • 1.是一个基于链接节点的无界线程安全队列。
    • 2.此队列按照 FIFO(先进先出)原则对元素进行排序。
    • 3.此队列采用入队、出队分离的方式,即读写分离,分别的对出队使用CAS、入队也使用CAS操作,并且对队头、队尾标记的更新采用延迟策略,同样这个更新也是CAS操作,所以此队列中允许出现队列与头、尾标记不一致的情况,但是这并不影响使用。
    • 4.入队,是添加到队列的尾部;出队,是从队列的头部获取一个元素并删除;
      删除方式是将头部的这个元素的next指向自己,即断开在队列中的连续性,然后向后移动头部标记。
  • 3.CopyOnWriteArrayList

    • 1.使用读写分离的思想,写的时候创建copy一个新的集合,进行写操作,原集合依然提供服务,新集合写完后,将原集合引用指向新集合。这样读的时候就可以不加锁,而写的时候就需要加锁了,虽然copy的是新集合,但是不加锁,那么并发的时候就copy出N份来了,就有问题了。
    • 2.应用场景:比如网站的白名单、黑名单,当然我们现在一般使用redis实现。
    • 3.缺点
      • 1.内存开销:写操作时,会临时双倍占用内存。
      • 2.最终一致性:因为写数据的时候是读取不到的,只有写完后才能访问,即非实时。
  • 4.CopyOnWriteArraySet

      原理同上。
    

辅助类

1.CountDownLatch

就是一个计数器,比如计数器为4,那么调用await()方法,就会阻塞,直到计数器为0时才继续执行,比如4个线程并发执行,现在我要等4个都执行完后才继续,就可以让每个线程都完成后在finally中做一下计数器减1,这样就可以调用await()阻塞等待了。await(long timeout, TimeUnit unit)此方法是可以设置等待时间,时间到了就不等待了。countDown()方法就会减1。

2.CyclicBarrier

字面意思循环屏障,循环说的是他可以被复用,屏障说的就是,如果多个线程调用了它的await()方法,那么只有所有线程都执行到这个代码才能继续往下走,否则先执行到的将会等待,并且可以设置都到达之后,从多个线程中选一个线程执行另一个指定的线程任务;也有一个方法是等待一段时间后,执行所有已经到达的程序继续,没到达的将会抛异常。

CyclicBarrier barrier  = new CyclicBarrier(4);  
// 用4个这个屏障对象,分别用于四个线程中调用,如下:
... //其他处理逻辑
barrier.await(); //加入屏障,即只有4个线程都达到这里后,才会继续往下执行各自的代码,

意思就是4个线程中都会调用await()方法,调用完后自动等待,只有4个await方法都调用完后4个阻塞的线程才会继续执行,并且会首先从4个线程中随机选一个去调用一下构造CyclicBarrier对象时传递的Runnable实现类的run方法,然后4个线程再继续执行下去。

CyclicBarrier barrier  = new CyclicBarrier(4,new Runnable() {
	public void run() {
		System.out.println("当前线程"+Thread.currentThread().getName());   
	}
});

如果4个线程都到达屏障后,将会从4个线程中选择一个线程执行这个new Runnable线程的代码。

与CountDownLatch的区别:
CountDownLatch 是一次性的,CyclicBarrier 是可循环利用的
CountDownLatch 参与的线程的职责是不一样的,有的在倒计时,有的在等待倒计时结束。
CyclicBarrier 参与的线程职责是一样的。

3.Semaphore

字面意思是信号量,其实这个辅助类类似于锁,只不过他可以提供多把锁,比如一共5个锁,如果有8个线程同时要执行代码,这里不涉及共享变量哈,只是8个线程你想让5个能运行,那就可以用这5个锁去限制。方法acquire()是获取锁,release()是释放锁,都是阻塞的,可以使用tryXxx来执行非阻塞。举例如右边。

public static void main(String[] args) {          
	Semaphore semaphore = new Semaphore(5); //锁的个数
	for(int i=0;i<8;i++)
		new Worker(i,semaphore).start();
}
static class Worker extends Thread{
	private int num;
	private Semaphore semaphore;
	public Worker(int num,Semaphore semaphore){
		this.num = num;
		this.semaphore = semaphore;
	}

	public void run() {
		try {
			semaphore.acquire();
			System.out.println("工人"+this.num+"占用一个机器在生产...");
			Thread.sleep(2000);
			System.out.println("工人"+this.num+"释放出机器");
			semaphore.release();           
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
}

版权声明:本文为lijie1051原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。