线程和进程
我理解的进程就是处于运行状态的应用程序,例如:qq是一个应用程序,一个电脑上可以同时登录多个qq账号,每个qq账号都会在任务管理器中呈现出一个独立的任务,也就是多个进程。线程是进程的一部分,是进程为了完成某个功能,划分出一定的资源去使用的任务过程。
总的来说就是,我认为的进程就是运行中的应用程序,为了完成某个功能,比如从文件读取内容,到数据库查询数据等等,会创建一个任务,并且还会分配一定的资源,i/o设备使用或者是数据库连接这样类似的资源给这个任务去执行的一个过程。
java线程和操作系统线程
java线程本质上是对操作系统线程的抽象,java线程的具体实现方式依赖于操作系统线程的实现方式,不同的操作系统,有不同的线程库和线程模型。线程库是创建和管理线程的一套API,线程模型是java线程和操作系统模型的映射方式,在windows中,使用的线程库是Win32 线程库,线程模型是1对1。
内核态和用户态
操作系统中分为两种程序,一个是应用程序,一个是系统程序,当cpu运行普通应用程序的时候,就是处于用户态;当cpu运行系统程序的时候就处于内核态。内核态和用户态的区别就是,处于内核态可以任意访问计算机中的资源不受限制,处于用户态则只可以访问应用程序分配的内存空间。用户程序不能直接访问内核空间,保证了操作系统不受应用程序的侵害。
缓存一致性问题
线程并发的出现是为了提高cpu的利用率,因为cpu从内存中取出数据进行计算的过程很快,而内存将需要计算的数据存放到内存中或从cpu中读取计算好的数据这一过程比cpu计算数据的过程慢多个量级,为了提升效率,会在cpu和内存之间设置多级缓存,cpu每次计算数据都从缓存中取,计算完就会把数据放到缓存中,避免了等待内存的读取。但是缓存也出现了一个新的问题:如果有多个处理器A,B;A将计算完的数据放入到缓存中,但是这个时候内存并没有去及时更新数据,B处理器从内存中取出的数据就会是脏数据。
java内存模型(JMM)
为了解决缓存一致性问题,就需要对内存中数据进行读取的时候遵循一些规则,java中的内存模型就是为了解决这个问题,也就是Java 内存模型(Java Memory Model,JMM)。
JMM规定共享变量都存储在主内存中,每个线程会有自己的工作内存,工作内存中都会存储一份共享变量的副本,当工作内存进行写入共享变量的时候,并不会直接对主内存中的共享变量直接进行操作,而是先更新工作内存中的副本,然后将副本同步到主内存中,实现更新共享变量的结果。
JMM的三大特性
原子性
对于主内存中的共享变量的操作要么全部成功要么全部失败,java中提供了锁和CAS算法来保证原子性。
可见性
对于一个共享变量的更新,会让其他线程感知到变化,java中提供了volatile和final关键字来保证可见性。
有序性
处理器和编译器会对代码块进行优化,在优化的过程中会调换某些行的顺序,但是都不会改变执行的结果,java中的volatile和synchronized 关键字可以保证有序性。
JMM的核心概念- Happens-before(先行发生)
Happens-before的定义:
- 如果操作A先行操作B发生,那么操作A的结果就一定对操作B可见
- 如果操作A先行操作B发生,哪怕调整了操作A的执行顺序,也不会影响操作A的结果对操作B可见,那么允许这种重排序的操作。
这里的先行发生,并不是指在时间上的先后,而是操作系统中分配cpu资源具体执行的先后。
创建线程的三种方式
直接继承Thread类
Thread类也实现了Runable接口
实现Runnable接口
当需要把这个实现类的实例作为新线程启动,需要将这个实现类作为参数传给Thread的构造函数,调用Thread的start方法才是真正的启动一个线程。这个构造函数会去调用Thread类的init方法(重载多次),最终会把实现类赋值给Thread类的target属性。
注释也表示出这个是实际运行的对象。
@Override
public void run() {
if (target != null) {
target.run();
}
}
这个是Thread类的run方法,如果是直接继承Thread类的方式启动创建线程,那就是运行重写方法;但是采用实现Runnable接口的方式创建线程,启动的时候在调用到Thread类的run方法的时候,就会判断,因为在构造函数中,会给target属性赋值,所以会实际运行实现类的run方法。
Callable接口
在Thread类中,是没有参数是Callable的构造函数的,所以Callable接口的实现类不能直接像Runnable接口的实现类那样直接创建出一个Thread实例。
这个时候需要用到FutureTask类,这个类实现了RunnableFuture接口,从接口名称就猜到RunnableFuture既实现了Runnable接口,也实现了Future接口。
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
在FutureTask类中,直接就有一个Callable作为参数的构造函数,这样就可以通过FutureTask来创建出一个Thread类,然后启动一个线程。
和Runnable方法不同的是,Callable接口唯一的方法是call方法;但是和上面一样,在调用Thread类的run方法的时候,会调用传入参数的run方法,所以会调用到FutureTask的run方法,FutureTask类中的run方法就会去调用Callable接口的call方法。
这种创建方式和直接实现Runnable接口不同的是,会有一个返回值,返回接口通过FutureTask对象的get方法获取。这个方法会一直阻塞,直到获取到结果。
为什么使用start方法启动线程,而不是直接调用run方法
因为直接调用run方法和调用普通方法没有什么区别,就等于在当前线程中调用了一个方法,不会去新建一个线程并启动。
在start方法中有这么一行代码: group.add(this);,这个group的注解就是当前线程对象的线程组,并且start方法中有个native方法。start0方法中应该会去根据group去实际创建线程。
java中的线程状态
public enum State {
NEW,
RUNNABLE,
BLOCKED,
WAITING,
TIMED_WAITING,
TERMINATED;
}
Thread中有一个枚举专门用来表示状态,并且源码的注释也表明了调用什么方法之后会处于什么状态。
NEW
创建了但是还没有启动的时候的状态,也就是已经new出来了,但还没有调用start方法。
RUNNABLE
在java虚拟机中执行任务的线程就处于这个状态,也可能正在等待操作系统分配资源的状态,这个等待和等待锁或者调用sleep之后的等待是不一样的,单指的是在调用了start方法之后等待操作系统分配资源的间隔。
BLOCKED
在源码中注解是:
Thread state for a thread blocked waiting for a monitor lock. A thread in the blocked state is waiting for a monitor lock to enter a synchronized block/method or reenter a synchronized block/method after calling Object.wait.
线程的线程状态,该线程被阻止等待监视器锁定。处于阻塞状态的线程正在等待监视器锁进入同步块/方法,或在调用Object.wait后重新进入同步块或方法。
可以看出是在获取synchronized同步锁失败的时候,会处于BLOCKED状态;如果是获取Lock接口锁失败了,并不会处于BLOCKED状态,而是处于WAITING状态,这点是不一样的。
后一句话是在调用wait方法之后,又被调用notify或者notifyAll方法唤醒之后,重新获取stnchronized锁失败了,才会进入到BLOCKED状态,而不是在调用notify或者notifyAll方法被唤醒之后直接进入BLOCKED状态,这个过程线程状态应该是 WAITING –> RUNNABLE –> BLOCKED
WAITING
注解写的很清楚:
Thread state for a waiting thread. A thread is in the waiting state due to calling one of the following methods:
Object.wait with no timeout
Thread.join with no timeout
LockSupport.park
并且特别之处是没有指定时间参数的方法。
TIMED_WAITING
直接看注解:
Thread state for a waiting thread with a specified waiting time. A thread is in the timed waiting state due to calling one of the following methods with a specified positive waiting time:
Thread.sleep
Object.wait with timeout
Thread.join with timeout
LockSupport.parkNanos
LockSupport.parkUntil
TERMINATED
线程执行完,也就是执行完run方法之后的状态。
操作系统中线程的状态
锁分类
按照线程是否对共享资源加上锁,可分为:
- 乐观锁
- 悲观锁
悲观锁: 认为在使用资源的时候其他线程会修改这个资源,所以加上锁,不让其他线程进入。
乐观锁: 认为在使用资源的时候不会有其他线程修改,只会在提交的时候判断共享资源有没有被修改,没有修改就直接提交;有修改就根据设置的策略采取不同的措施(报错或者重试)
按照获取共享资源失败后是否重试,可分为:
- 自旋锁
- 自适应自旋锁
在尝试获取共享资源失败之后,如果直接就进入到阻塞状态,可能很快或者立刻又被唤醒再次尝试获取共享资源,这个时候线程就需要经历两次线程状态变化,线程状态变化就需要切换用户态和内核态,切换状态的消耗时长可能比获取到共享锁的时间还要长。所以为了减少这种损失,在获取共享资源失败之后,线程会等待一定时间,在代码层面就是执行一个do-while循环,让cpu做一些没意义的事情,为的是不让出CPU资源,如果在一定时间内没有获取到就进入阻塞状态,这就是自旋锁。
自适应自旋锁则是在自旋锁的基础上,加上对上一次通过自旋方式获取到共享资源的成功率来判断,是否需要等待。
按照synchronized的使用场景:
- 无锁
- 偏向锁
- 轻量级锁
- 重量级锁
无锁就是没有使用synchronized和Lock接口下的锁,因为他们都是悲观锁,都会对共享资源加上锁;CAS算法是乐观锁的一种实现方式,也就是这里的无锁。
偏向锁是偏向于第一个获取到锁的线程,之后该线程获取到这个偏向锁并且执行完任务,不会去使用CAS算法判断是否被修改过;但是当该锁出现竞争条件的时候,偏向锁就会升级为轻量级锁;如果再出现两个以上的线程竞争该锁,轻量级锁就会升级为重量级锁。
按照获取锁是否需要排队,可以分为:
- 公平锁
- 非公平锁
公平锁就是先来先得,每个线程都按照排队顺序获取锁,但是整体效率比较低,有些可能优先级更高的线程需要排在优先级很低的线程后面;非公平锁则是后到的线程会先尝试直接获取锁,如果获取到了就不需要排队直接执行,这种插队的方式可能会导致有些线程一直获取不到锁。
按照锁是否可以重复进入,可以分为:
- 可重入锁
- 不可重入锁
可重入锁就是在线程获取到锁之后,如果执行代码过程中,又遇到一个使用同一个锁对象的代码块,就可以直接进入,不需要再次获取锁;在使用可重入锁的规则多次获取到锁的情况下,释放锁越需要对应的释放多次。
按照共享资源的拥有者同时是否可以有多个,可以分为:
- 排他锁
- 共享锁
经典的就是ReentrantReadWriteLock,分为读锁和写锁,读锁可以同时被多个线程拥有。在线程对共享资源加上共享锁后,其他的线程只能获取到共享锁锁住的资源,不能获取到排他锁锁住的临界区。
锁的获取和释放的内存意义
synchronized关键字和Lock接口下的实现类,都是锁的一种实现,是为了在并发情况下对共享资源进行操作的时候不会产生脏读或其他的数据问题。在线程获取到锁的时候,JMM会把线程中存储的共享变量的副本置为无效,在临界区代码中访问共享变量就需要访问主内存中的变量。(我有一个疑惑点,获取到锁,把线程本地内存置为无效,那更新主内存变量值又是通过线程的副本去刷心到主内存中的,都置为无效了还可以操作么?还是说不可以读?或者记录操作在释放锁的前一刻再做刷新?)
Synchronized关键字
底层实现
java中创建出来的每个对象都会与一个监视器(monitor)关联,在执行Synchronized修饰的代码块或者方法的时候,也就是在执行到字节码命令monitorenter指令的时候,如果这个锁对象关联的监视器的计数器为0(不考虑重入),没有拥有者,那么当前线程就会将锁对象的拥有者设为当前线程,并且锁对象关联的计数器加1。多次进入相同锁,计数器就累加;执行到monitorexit的时候,锁对象关联的监视器的计数器就会减一,减到0的同时也就不会是锁对象关联的监视器的拥有者了,其他线程就可以尝试获取。
锁升级/锁膨胀
根据synchronized锁不同的状态,由低到高可以将锁分为四种:
- 无锁
- 偏向锁
- 轻量级锁
- 重量级锁
java对象实例在内存中的存储结构如下:
其中对象头中就存储了和锁有关的东西,其中对象头中的存储结构如下:
轻量级锁加锁
在线程执行到同步块的时候,如果当前锁对象的锁状态是偏向锁,并且 【是否是偏向锁】是0,表示当前偏向锁没有被占用;如果锁对象的锁状态是偏向锁,但是【是否是偏向锁】是1,表示该偏向锁已经被占用。
轻量级锁加锁的过程大概是:
- 判断锁对象的状态是【偏向锁】,并且【是否是偏向锁】为0
- 在当前线程的帧栈中,划分出一个叫【锁记录】的空间,并且将锁对象的Mark Word复制到【锁记录】中,具体来说就是将锁对象的Mark Word 复制到【锁记录】中的displaced_header 中。官方将这个复制过来的Mark Word叫做 Displaced Mark Word
- 然后java虚拟机尝试使用CAS算法尝试将锁对象的Mark Word指向刚才复制过去的【锁记录】的指针,再将【锁记录】中的ower指针指向当前线程,也就是上图中的【指向栈中锁记录的指针】
- 如果成功,表示获取轻量级锁成功,将当前线程的【锁状态】的轻量级锁中的【锁标志位】置为0,表示当前线程被轻量级锁锁定。
- 如果失败,分为已经获取过锁,直接重入,另外一种情况就是存在线程竞争的情况,那么轻量级锁会直接升级为重量级锁,重量级会在进行自旋之后判断是否获取到锁。判断失败是哪种情况,就是用轻量级锁中的[【指向栈中锁记录的指针】找到【锁记录】,再根据【锁记录】中的displaced_header指针指向的是否是同步块中的锁对象的Mark Word
轻量级解锁就是使用CAS算法尝试将【指向栈中锁记录的指针】指向的【锁记录】的displace_header的值替换为当前线程的Mark Word,锁对象的Mark Word的指针指向也改变(改为锁对象的地址?这里不知道会怎么修改),因为刚才是指向了【锁记录】。如果更新失败,说明其他线程获取过锁,那么直接升级为重量锁尝试解锁。(但是synchronized不是排他锁么,其他线程可以获取到?)
总结来说,轻量级锁尝试使用CAS算法来实现对共享资源的获取和释放,避免陷入线程竞争,减少需要切换用户态和内核态(阻塞和唤醒一个线程需要操作系统切换到内核态来操作)中消耗的性能。
偏向锁
偏向锁就是更偏向于第一个获取到的锁的线程再次获取当前锁,偏向锁的获取过程大概是:
- 当前线程执行到同步代码块,查看锁对象的锁状态是否处于偏向锁,也就是【锁标志位】为01,并且【是否是偏向锁】为1,当前锁对象就是处于可偏向状态。
- 锁对象处于可偏向状态,当前线程会查看锁对象中的Mark Word中的线程ID(上图中有),是否和当前线程ID一致,一致表明获取过,直接重入,没有值直接将当前线程ID替换到锁对象中。
- 如果锁对象的Mark Word中的线程ID和当前线程不一致,说明当前偏向锁已经被获取,jvm会把锁升级为轻量级锁,进入到轻量级锁的竞争逻辑,也就是尝试复制锁对象的Mark Word到当前线程的帧栈中displaced_header中。。。
锁消除和锁粗化
在写代码的过程中,可能并没有显式的去加锁,但是可能某个方法内部调用的一个方式是synchronized修饰的方法,虽然加了锁,但是jvm会判断出当前这个方法只在方法中调用,不会出现线程逃逸,就会主动进行锁消除,减少竞争锁的消耗。
锁粗化就是在对同一个加锁的方法连续调用多次,那么jvm就会将锁的范围变大,在第一次调用同步方法的时候加锁,一直到最后一次调用再释放锁,避免了连续的加锁,解锁;加锁,解锁的重复步骤。
wait/notify/notifyAll必须在synchronized代码块中
如果直接在一个没有synchronized修饰的代码块中直接调用wait,notify,notifyAll方法,会直接抛出java.lang.IllegalMonitorStateException错误。
在Thread类中,wait方法有三个,但是最终还是调用的用native修饰的wait方法。在这个方法上又有很多注释。当线程调用了这个wait方法之后,就是将当前线程加入到锁对象的waitSet集合中,有四种方法会将线程从锁对象的waitSet中移除:
- 调用锁对象的notify方法,正好唤醒了这个线程
- 调用锁对象的notifyAll方法,唤醒全部waitSet中的线程
- 调用了线程的interrupt方法
- wait方法超时(如果timeout参数为0,就不会超时)
在注释中还有一句话:
This method should only be called by a thread that is the owner * of this object’s monitor.
意思就是只有拥有锁对象的监视器的线程才可以调用wait方法。
而synchronized方法就是让拥有锁对象的监视器的线程可以安全的使用共享资源,如果wait方法在synchronized之外也可以调用,那就是可能会存在一种情况:多个线程同时调用wait,notify方法,但是顺序变为了先notify,再wait。这就是无效唤醒,为了避免无效唤醒,java强制wait/notify/notifyAll必须在synchronized代码块中使用。
Volatile关键字
volatile关键字有两个特性,保证变量可见性和禁止执行重排序。
可见性
volatile变量保证对所有线程的可见性,其中一个线程修改了volatile变量的值,结果对其他线程都是可见的。
在线程改变volatile变量的时候,是将修改过后的本地线程内存的变量值刷新到主内存中,其他线程在尝试获取volatile变量的时候,jvm会将本地线程内存中的volatile变量副本置为无效,而去主内存中获取变量值。
禁止指令重排
对volatile变量,在编译成字节码的时候,会插入不同的内存屏障保证,保证不会发生改变结果的指令重排。
在volatile写操作之前,插入storestore屏障,在volatile写操作之后插入storeload屏障。
在volatile读操作之后,插入loadload和loadstore屏障。
store1;StoreStore;store2; StoreStore屏障保证了store2在写操作之前,会让store1写操作结束,也就是store1写操作的结果是对store2可见的。
store1;storeload;load1; storeload屏障保证了store1写操作的结果是对load1可见的。
同理loadload,loadstore屏障。
final关键字
final关键字也是禁止指令重排序的的。
只要一个不可变对象被正确地构建出来(即没有发生 this 引用逃逸的情况),那其外部的可见状态永远都不会改变,永远都不会看到它在多个线程之中处于不一致的状态。甚至我们可以下结论:不可变对象永远是线程安全的。
写入final
- JMM禁止编译器把对final修饰的变量的写操作重排序到构造函数之外
- 编译器会在写final变量之后插入storestore屏障(这个指令屏障就是为了防止把final写操作重排序到构造函数之外)
读取final
- 在一个线程中,当第一次访问一个带有final修饰变量属性的对象的时候,获取对象的引用和获取这个对象的final修饰的变量的引用,处理器禁止这两个操作重排序
- 编译器会在读取final域的字节码指令之前插入loadload屏障
逃逸
方法逃逸:
在一个方法内存构造的对象实例被其他访问访问到,比如作为参数传递到其他方法
线程逃逸
一个线程中的对象实例被其他线程能够访问到,this逃逸属于线程逃逸级别
CAS
CAS是一种算法,在java中使用了CAS算法来实现乐观锁,CAS算法可以分为三步:
- 获取
- 比较
- 成功更新或者失败(自旋重试)
在java中原子类就是应用CAS算法,在获取原子变量的时候不会去尝试获取锁或者加锁,而是在线程尝试更新原子变量的时候,先获取主内存中的变量值,然后和之前线程中这个变量的副本值进行比较,如果一样,表示没有线程修改过,直接更新;如果值不一样,表示之前被其他线程被修改过,尝试自旋重试。
CAS存在三个问题:
ABA问题
在原子类中,判断是否可以更新变量,是通过值比较的方式;但是可能主内存中的变量的值先由A,更新到B,再由其他线程由B更新到A,这个时候之后的线程在判断的时候是直接判断可以更新的。
解决方法:原子类中有个AtomicStampedReference 类,这个类就是用来保存了变量值的变更和版本的变更。
在AtomicStampedReference 类中,可以看set方法:
public void set(V newReference, int newStamp) {
Pair<V> current = pair;
if (newReference != current.reference || newStamp != current.stamp)
this.pair = Pair.of(newReference, newStamp);
}
是有一个 newStamp这么一个标志位来标志出变量的版本。
只能保证一个共享变量
在CAS中是通过对变量值的判断,所以只能保证一个变量的安全读取和存储。
解决方法: 原子类中有个AtomicReferenceFieldUpdater类,这个类是用来包装一个类的属性,这样可以把原有类的属性都替换成原子属性,这样在更新对象的时候就可以更新多个属性变量。
循环开销
在更新失败的时候,会进入自旋重试,自旋就是让cpu进行空转,多次空转会带来不小的执行开销。
解决方法就是: jvm支持底层命令PAUSE,在自旋失败的时候,让CPU睡眠一段时间再进行自旋重试。
AQS
AbstractQueuedSynchronizer类应用很广泛,在 ock的实现类中有很多继承了AbstractQueuedSynchronizer的内部类,来实现重入锁,公平锁和不公平锁等等。
AbstractQueuedSynchronizer类采用了模板设计模式,并且内部有一个内部类:Node,通过这个Node类实现了一个FIFO的双向队列(CLH队列,取自三个人);通过这个队列来存放尝试获取锁的线程。获取锁失败就会被成为新的尾节点;在首节点执行完毕并且释放锁之后,首页点之后的节点会尝试获取锁,获取成功就成为新的首节点,失败的话再次成为尾节点。
AbstractQueuedSynchronizer中有三个重要的属性,head节点就是首节点,在获取锁成功之后,就会把获取成功的线程对应的node实例设置为head节点(因为前一个首节点执行结束,不需要保留在队列中,所以是覆盖操作);获取失败则会将当前tail节点设置为获取失败节点的前一个节点,从而让获取锁失败的节点成为新的节点。
state是给当前首节点计数获取了几次锁,在继承了AbstractQueuedSynchronizer的FairSync (ReentrantLock的一个内部类)中,重写了AbstractQueuedSynchronizer的tryAcquire方法,在其中就对state进行了累加;对应的在这个内部类中重写的tryRelese方法中,则是释放一次,state减少一。
AQS采用了模板设计模式,其中开放给子类的方法都会在模板方法中被调用,子类可以在开放的方法中实现获取锁,释放锁的逻辑,从而实现公平锁,非公平锁;并且在AQS中有两种模式,一个是独占式,一个是共享式,任何AQS实现的锁都只能实现其中一种模式。在AQS中的模板方法也都基本上分为独占式和共享式。
独占式获取锁 acquire(int arg)
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
可以看到在这个方法中就调用了开放给子类重写的方法tryAcquire方法尝试获取锁,如果获取成功,那直接结束;如果获取失败,就会调用addWaiter和acquireQueued方法,先来看addWaiter方法。
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
首先在addWaiter根据给的node对象(指定了是独占式),接着使用compareAndSetTail方法尝试将当前node节点设置为尾节点,(这里compareAndSetTail方法虽然传进去的是pred,但是实际更新的是tail节点,不是pred节点。)如果CAS尝试更新尾节点失败,那么进入enq方法;
在enq方法中有个for (;😉 ,这是一个死循环,除非把这个节点设置为尾节点成功,不然就会一直循环。
独占式释放锁release
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
独占式释放锁就是唤醒首页点的下一个节点。共享式获取和释放锁都和独占式的差不多。
ReentrantLock
在ReentrantLock类中,有一个抽象内部类Sync,并且继承了AQS,sync还有两个子类,从名称上就知道一个公平锁,一个是非公平锁;
ReentrantLock的构造函数默认是非公平锁的,也可以传入一个boolean值来构造一个公平锁。
在ReentrantLock类中的tryLock方法是调用的nonfairTryAcquire,所以想公平获取锁就需要调用FairSync的tryAcquire方法。
nonfairTryAcquire
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
c=0表示没有被获取,那么就使用CAS尝试更新,如果更新成功,那么将当前线程设置拥有者线程(看方法名称猜的),如果锁已经被获取那么再判断拥有者线程和当前线程是否是同一个,如果是的话就更新state的值。
tryRelease
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
释放锁的时候先判断当前线程和拥有者线程是否是同一个,不是的话直接报错;并且只有当c=0的时候,这个方法才会返回true,把当前拥有者线程置为null,才表示释放成功;因为可重入锁需要多次释放才行。
并发集合
CopyOnWriteArrayList
CopyOnWriteArrayList采用了Copy-On-Write思想,就是在对List集合进行写数据的时候,并不是直接对原始集合进行写入数据,而是对原始集合的一个copy副本进行写入,之后再刷新到原始集合中;如果多个线程在进行写操作,也有多个读操作,那么读操作读到的数据就是旧数据。Copy-On-Write思想就是放弃了数据的实时性,取提高并发度。
public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e;
setArray(newElements);
return true;
} finally {
lock.unlock();
}
}
在add方法中,就是先复制原始数据,newElements[len] = e;这一步就是添加新元素的操作, setArray(newElements)就是给原始集合重新赋值,并且setArray(newElements)中实际赋值的array数组,是用了volatile关键字修饰的。同理remove方法也就是一样的思想,对副本进行操作。
ConcurrentHashMap
因为HashMap之后应该会单独再写一篇,这里就暂时不展开了。
ThreadLocal
根据打印结果可以看到虽然给ThreadLocal实例对象调用了set方法进行赋值了,但是在另一个线程尝试调用get方法的时候并没有获取到想象中的值,实现了线程隔离。
在Thread类中有一个 ThreadLocal.ThreadLocalMap的变量值threadLocals, ThreadLocalMap是ThreadLocal中的一个静态内部类,在ThreadLocalMap中也定义了一个静态内部类Entry
static class Entry extends WeakReference<ThreadLocal<?>> {
/** The value associated with this ThreadLocal. */
Object value;
Entry(ThreadLocal<?> k, Object v) {
super(k);
value = v;
}
}
Entry类是存储了ThreadLocal的一个弱引用,和实际存放进ThreadLocal的值value。并且在ThreadLocalMap中,还有一个Entry数组对象table,在调用ThreadLocal中的set和get方法,就是对这个table进行操作。
ThreadLocal的set方法
public void set(T value) {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null)
map.set(this, value);
else
createMap(t, value);
}
ThreadLocalMap getMap(Thread t) {
return t.threadLocals;
}
private void set(ThreadLocal<?> key, Object value) {
Entry[] tab = table;
int len = tab.length;
int i = key.threadLocalHashCode & (len-1);
for (Entry e = tab[i];
e != null;
e = tab[i = nextIndex(i, len)]) {
ThreadLocal<?> k = e.get();
if (k == key) {
e.value = value;
return;
}
if (k == null) {
replaceStaleEntry(key, value, i);
return;
}
}
tab[i] = new Entry(key, value);
int sz = ++size;
if (!cleanSomeSlots(i, sz) && sz >= threshold)
rehash();
}
如果是ThreadLocal对象第一次set值,在走到for循环的时候,就会不符合 e != null;条件,直接执行 tab[i] = new Entry(key, value);这样当在同一个线程内再次set值的时候,就会进到for循环里面,走到 if (k == key)的判断里,进行值的更新。
ThreadLocal的get方法
public T get() {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null) {
ThreadLocalMap.Entry e = map.getEntry(this);
if (e != null) {
@SuppressWarnings("unchecked")
T result = (T)e.value;
return result;
}
}
return setInitialValue();
}
在public T get()方法中,如果是没有调用set方法之前,就调用了get方法,就会不符合 if (map != null)判断,直接setInitialValue()方法,这个方法又调用了一遍set方法。只不过实际存储的值为null,这就是为什么截图里打印出来的值是null。
实现了线程隔离的原理其实就是对同一个ThreadLocal对象,就算作为参数传到多个不同的线程中,在调用get方法的时候并没有去根据传入的ThreadLocal对象获取,而是找到线程各自本身的threadLocals变量,这个变量是ThreadLocal.ThreadLocalMap类型,接着再找到其中的Entry数组对象table,根据最上面调用get方法的ThreadLocal对象的hashCode进行计算得到下标,其次用这个计算出来的下标在table中找到一个Entry对象,用Entry的get方法得到存储的ThreadLocal的引用和最上面调用get方法的ThreadLocal对象进行比较,相等则直接返回。
如果在当前线程内没有set过值,那么就会和上面写的一样,就会不符合 if (map != null)判断,直接setInitialValue()方法,结果就是直接返回null。
总结来说就是,如果在一个线程内对一个ThradLocal对象没有进行set操作,那么get方法就会返回null;只有通过set方法才会将ThreadLocal对象的引用和实际存储值放到线程里面去。
线程池
使用线程池的方式来创建和管理线程的好处:
- 节省了资源;如果每次有新的任务到来就创建一个新的,执行完再丢弃掉,会花费不少资源用在创建和销毁上面,使用线程池也实现了复用线程的作用。
- 统一管理;使用线程池的方式来创建线程,可以对创建的线程统一管理,使用同一种阻塞队列和同一种拒绝策略,实现了统一管理。
使用ThreadPoolExecutor类的构造函数来手动创建线程池,直接看参数最多的构造方法。
public ThreadPoolExecutor(int corePoolSize, -- 核心线程池数量
int maximumPoolSize, -- 最大线程池数量
long keepAliveTime, -- 空闲线程等待新任务的最大时间
TimeUnit unit, -- 时间单位
BlockingQueue<Runnable> workQueue, -- 阻塞队列
ThreadFactory threadFactory, -- 创建线程使用到的工厂
RejectedExecutionHandler handler) -- 拒绝策略
新创建的线程进来之后,会先判断 核心线程池数量 是否已经满了,满了就再判断阻塞队列是否可以入队成功,失败则继续判断最大线程池数量是否满足,不满足就直接调用拒绝策略。
核心线程池数量
核心线程池中的线程在超过了keepAliveTime之后不会被销毁,除非设置了allowCoreThreadTimeOut参数。
(但是我在线程池中的线程在执行的时候,调用了传进去的ThreadPoolExecutor对象的getPoolSize()方法,我看allowCoreThreadTimeOut方法设置为true或者false没啥区别,暂时还不懂这块)
最大线程池数量
如果需要执行的线程数量大于 【最大线程池数量+阻塞队列初始化长度】,那么就会采取拒绝策略;也就是在需要执行线程的数量大于【最大线程池数量+阻塞队列初始化长度】,实际创建的线程还是【最大线程池数量+阻塞队列初始化长度】个。前提是基于有界队列(ArrayBlockingQueue和LinkedBlockingQueue)
keepAliveTime
在阻塞队列已满,新创建的线程等待进入任务队列中,超过keepAliveTime时间之后就会被销毁。
unit
时间单位,可以指定单位,时分秒等等。
workQueue
用来等待执行任务的队列,有多个可供选择。
ArrayBlockingQueue:
有界任务队列,基于数组结构的有界阻塞队列,可以设置队列上限值,FIFO原则排序,当需要执行的任务数量大于corePoolSize的时候,超过的任务就会被放到阻塞队列里,就是BlockingQueue。当阻塞队列已满并且线程数达到了maximunPoolSize,就会执行拒绝策略 也就是 当需要执行的线程数 > maximunPoolSiz + ArrayBlockingQueue的初始容量的话 就会执行拒绝策略 。
LinkedBlockingQueue:
基于链表结构的阻塞队列,FIFO原则排序。当前线程运行数没有达到corePoolSize时,有了新的提交任务会新建任务去执行,达到了corePoolSize的时候,申请的任务会放到阻塞队列里面,LinkedBlockingQueue没有最大值线定,所以构造函数种的maximunSize是不生效的,只要超过corePoolSize,就会一直放到阻塞队列里面去
SynchronousQueue:
它是一种不存储元素的队列,任务不会先放到队列中去等线程来取,而是直接移交给执行的线程,一般maximumSize设置为Integer.MAX_VALUE.要将一个元素放入SynchronousQueue,就需要有另一个线程在等待接收这个元素。若没有线程在等待,并且线程池的当前线程数小于最大值,则ThreadPoolExecutor就会新建一个线程;否则,根据饱和策略,拒绝任务。newCachedThreadPool默认使用的就是这种同步移交队列。吞吐量高于LinkedBlockingQueue。
PriorityBlockingQueue
优先级阻塞队列,根据优先级执行任务,优先级是通过自然排序或者是Comparator定义实现。如果执行的任务没有实现compareable接口重写方法会报错。执行任务顺序就是按照自定义规则来排序的。默认最大长度是11,但是会自动扩容。
threadFactory
Executors.defaultThreadFactory()返回的是DefaultThreadFactory对象,这个对象只实现了ThreadFactory接口,ThreadFactory接口中只有一个方法,这个接口上的注释有这么一句:
使用线程工厂消除了对新线程调用的硬连线,使应用程序能够使用特殊的线程子类、优先级等。
在DefaultThreadFactory类中重写的newThread 方法如下:
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
我理解就是设置优先级的作用。
handler
创建的线程超过可以处理的最大数之后,会采取的策略。
AbortPolicy :
直接抛出异常,不会执行溢出的任务。
DiscardPolicy :
丢弃请求的任务
DiscardOldestPolicy :
丢弃等待时间最长的任务,ArraysBlockingQueue和LinkedBlockingQueue是FIFO的,所以会丢弃下一个即将执行的任务,如果和优先级任务队列一起用,会导致任务队列中优先级最高的任务被丢弃。
CallerRunsPolicy :
不去新建线程,而是让当前线程执行,就相当在main方法中如果采取了这个策略,并且满足条件,就会让main线程去执行这个任务。
配置参数
线上 – 响应速度优先:需要快速响应用户的请求对于一个线上应用,如果一个页面半天都刷不出来,用户大概率就放弃查看这个页面了,所以这个时候快速响应用户的请求是最重要的。此时应该不设置阻塞队列去缓冲并发任务,并且调高 corePoolSize 和 maxPoolSize 去创造尽可能多的线程快速执行任务
线下 – 吞吐量优先:需要尽可能快地批量处理任务对于需要执行大量线下任务的场景,我们当然希望任务执行的越快越好。这种情况下,由于是线下场景,不涉及用户,所以并不需要太追求速度,而是关注如何使用有限的资源,尽可能地在单位时间内去处理更多的任务,也就是吞吐量优先的问题。所以这里应该设置阻塞队列去缓冲并发任务,调整合适的 corePoolSize 去设置处理任务的核心线程数
maximumPoolSize 设置偏小,workQueue 大小设置偏小,导致拒绝策略频繁被调用
maximumPoolSize 设置偏小,workQueue 大小设置过大,导致很多的任务都被堆积起来,相应的,接口的响应时间就会变长
maximumPoolSize 设置过大,导致线程上下文切换频繁发生,处理速度反而下降
这篇文章的主要内容是根据 《
大话并发
》一边读一边记录的。