java并发入门(一)共享模型—Synchronized、Wait/Notify、pack/unpack

  • Post author:
  • Post category:java




一、共享模型—管程



1、共享存在的问题



1.1 共享变量案例

package com.yyds.juc.monitor;

import lombok.extern.slf4j.Slf4j;

@Slf4j(topic = "c.MTest1")
public class MTest1 {

    static int counter = 0;

    public static void main(String[] args) throws InterruptedException {

        Thread t1 = new Thread(() -> {
            for (int i = 0; i < 5000; i++) {
                counter++;
            }
        }, "t1");
        Thread t2 = new Thread(() -> {
            for (int i = 0; i < 5000; i++) {
                counter--;
            }
        }, "t2");
        t1.start();
        t2.start();
        t1.join();
        t2.join();
        log.debug("{}",counter);
    }
}

以上的结果可能是

正数、负数、零

。这是因为 Java 中对静态变量的自增,自减并不是原子操作。

例如对于 i++ 而言(i 为静态变量),实际会产生如下的 JVM 字节码指令:

getstatic i   // 获取静态变量i的值
iconst_1      // 准备常量1
iadd          // 自增
putstatic i   // 将修改后的值存入静态变量i

i–会产生如下的 JVM 字节码指令:

getstatic i      // 获取静态变量i的值
iconst_1         // 准备常量1
isub             // 自减
putstatic i      // 将修改后的值存入静态变量i

在Java中,完成静态变量的自增,自减需要在主存和工作内存中进行数据交换 。

在这里插入图片描述


如果是单线程以上 8 行代码是顺序执行(不会交错)没有问题:

在这里插入图片描述


但多线程下这 8 行代码可能交错运行,比如出现负数情况如下:

在这里插入图片描述



1.2 临界区

一个程序运行多个线程本身是没有问题的,问题出在多个线程访问**共享资源


多个线程读

共享资源

其实也没有问题

在多个线程对

共享资源

读写操作时发生指令交错,就会出现问题。一段代码块内如果存在对

共享资源

的多线程读写操作,称这段代码块为

临界区**

static int counter = 0;

static void increment()
// 临界区
{
	counter++;
}

static void decrement()
// 临界区
{
	counter--;
}

多个线程在临界区内执行,由于代码的

执行序列不同

而导致结果无法预测,称之为发生了

竞态条件



2、利用synchronized 解决共享存在的问题

为了避免临界区的竞态条件发生,有多种手段可以达到目的。

  • 阻塞式的解决方案:synchronized,Lock

  • 非阻塞式的解决方案:原子变量

synchronized,即【对象锁】,它采用互斥的方式让同一时刻至多只有一个线程能持有【对象锁】,其它线程再想获取这个【对象锁】时就会阻塞住。这样就能保证拥有锁的线程可以安全的执行临界区内的代码,不用担心线程上下文切换。

// 语法如下
synchronized(对象) // 线程1, 线程2(blocked)
{
	临界区
}

上诉代码,可以进行改造:

package com.yyds.juc.monitor;

import lombok.extern.slf4j.Slf4j;

@Slf4j(topic = "c.MTest2")
public class MTest2 {

    static int counter = 0;
    static final Object room = new Object();

    public static void main(String[] args) throws InterruptedException {

        Thread t1 = new Thread(() -> {
            for (int i = 0; i < 5000; i++) {
                synchronized (room){
                    counter++;
                }
            }
        }, "t1");
        Thread t2 = new Thread(() -> {
            for (int i = 0; i < 5000; i++) {
                synchronized (room){
                    counter--;
                }
            }
        }, "t2");
        t1.start();
        t2.start();
        t1.join();
        t2.join();
        log.debug("{}",counter);
    }
}

  • synchronized(对象)

    中的对象,可以想象为一个房间(room),有唯一入口(门)房间只能一次进入一人进行计算,线程 t1,t2 想象成两个人

  • 当线程 t1 执行到

    synchronized(room)

    时就好比 t1 进入了这个房间,并锁住了门拿走了钥匙,在门内执行

    count++

    代码

  • 这时候如果 t2 也运行到了

    synchronized(room)

    时,它发现门被锁住了,只能在门外等待,发生了上下文切换,阻塞住了

  • 这中间即使 t1 的 cpu 时间片不幸用完,被踢出了门外(不要错误理解为锁住了对象就能一直执行下去哦),这时门还是锁住的,t1 仍拿着钥匙,t2 线程还在阻塞状态进不来,只有下次轮到 t1 自己再次获得时间片时才能开门进入

  • 当 t1 执行完

    synchronized{} 块

    内的代码,这时候才会从 obj 房间出来并解开门上的锁,唤醒 t2 线程把钥匙给他。t2 线程这时才可以进入 obj 房间,锁住了门拿上钥匙,执行它的

    count--

    代码

如果把 synchronized(obj) 放在 for 循环的外面,如何理解?-- 原子性
如果 t1 synchronized(obj1) 而 t2 synchronized(obj2) 会怎样运作?-- 锁对象
如果 t1 synchronized(obj) 而 t2 没有加会怎么样?如何理解?-- 锁对象

利用面向对象的思想,将共享变量放入到一个类中,将上述代码改造如下:

package com.yyds.juc.monitor;

import lombok.extern.slf4j.Slf4j;

@Slf4j(topic = "c.MTest3")
public class MTest3 {


    public static void main(String[] args) throws InterruptedException {

        RoomCounter counter = new RoomCounter();


        Thread t1 = new Thread(() -> {
            for (int i = 0; i < 5000; i++) {
                counter.increment();
            }
        }, "t1");
        Thread t2 = new Thread(() -> {
            for (int i = 0; i < 5000; i++) {
                counter.decrement();
            }
        }, "t2");
        t1.start();
        t2.start();
        t1.join();
        t2.join();
        log.debug("{}",counter);
    }
}


class RoomCounter{

    int value = 0;
    public void increment() {
        synchronized (this) {
            value++;
        }
    }
    public void decrement() {
        synchronized (this) {
            value--;
        }
    }
    public int get() {
        synchronized (this) {
            return value;
        }
    }

}



3、方法上的synchronized

// 普通方法
class Test{
    public synchronized void test() {
        
    }
}

// 等价于  此时锁住的是调用者
class Test{
    public void test() {
        synchronized(this) {
            
        }
    }
}
// 静态方法
class Test{
    public synchronized static void test() {
        
    }
}
// 等价于  此时锁住的是Test类
class Test{
    public static void test() {
        synchronized(Test.class) {
            
        }
    }
}



4、变量安全性分析



4.1 基本概念


成员变量和静态变量是否线程安全?

  • 如果它们没有共享,则线程安全

  • 如果它们被共享了,根据它们的状态是否能够改变,又分两种情况

    • 如果只有读操作,则线程安全

    • 如果有读写操作,则这段代码是临界区,需要考虑线程安全

package com.yyds.juc.monitor;

import java.util.ArrayList;

public class MTest4ThreadUnsafe {

    ArrayList<String> list = new ArrayList<>();

    static final int THREAD_NUMBER = 2;
    static final int LOOP_NUMBER = 200;
    public static void main(String[] args) {
        // 如果线程2 还未 add,线程1 remove 就会报错
        MTest4ThreadUnsafe test = new MTest4ThreadUnsafe();
        for (int i = 0; i < THREAD_NUMBER; i++) {
            new Thread(() -> {
                test.method1(LOOP_NUMBER);
            }, "Thread" + i).start();
        }
    }
    
    public void method1(int loopNumber){
        for (int i = 0; i < loopNumber; i++) {
            // 临界区,会产生竞态条件
            method2();
            method3();
        }
    }

    private void method2() {
        list.add("1");
    }
    private void method3() {
        list.remove(0);
    }

}

将成员变量改为局部变量,不会报错,代码如下

package com.yyds.juc.monitor;

import java.util.ArrayList;

public class MTest5ThreadSafe {

    static final int THREAD_NUMBER = 2;
    static final int LOOP_NUMBER = 200;
    public static void main(String[] args) {
        // list 是局部变量,每个线程调用时会创建其不同实例,没有共享
        // 而 method2 的参数是从 method1 中传递过来的,与 method1 中引用同一个对象
        // method3 的参数分析与 method2 相同
        MTest4ThreadUnsafe test = new MTest4ThreadUnsafe();
        for (int i = 0; i < THREAD_NUMBER; i++) {
            new Thread(() -> {
                test.method1(LOOP_NUMBER);
            }, "Thread" + i).start();
        }
    }

    public void method1(int loopNumber){
        ArrayList<String> list = new ArrayList<>();
        for (int i = 0; i < loopNumber; i++) {
            // 临界区,会产生竞态条件
            method2(list);
            method3(list);
        }
    }

    private void method2(ArrayList<String> list) {
        list.add("1");
    }
    private void method3(ArrayList<String> list) {
        list.remove(0);
    }
}


局部变量是否线程安全?

  • 局部变量是线程安全的

  • 但局部变量引用的对象则未必

    • 如果该对象没有逃离方法的作用访问,它是线程安全的

    • 如果该对象逃离方法的作用范围,需要考虑线程安全

// 局部变量是线程安全的
public static void test1() {
    int i = 10;
    i++;
}
// 如下图,每个线程调用 test1() 方法时局部变量 i,会在每个线程的栈帧内存中被创建多份,因此不存在共享

在这里插入图片描述

在上面例子中,将成员变量改为局部变量,不报错了,但是如果出现下面

对象逃离方法的作用范围情况

,依旧报错

package com.yyds.juc.monitor;

import java.util.ArrayList;

public class MTest5ThreadSafe {

    static final int THREAD_NUMBER = 2;
    static final int LOOP_NUMBER = 200;
    public static void main(String[] args) {
        // list 是局部变量,每个线程调用时会创建其不同实例,没有共享
        // 而 method2 的参数是从 method1 中传递过来的,与 method1 中引用同一个对象
        // method3 的参数分析与 method2 相同
        MTest4ThreadUnsafe test = new MTest4ThreadUnsafe();
        for (int i = 0; i < THREAD_NUMBER; i++) {
            new Thread(() -> {
                test.method1(LOOP_NUMBER);
            }, "Thread" + i).start();
        }
    }

    public void method1(int loopNumber){
        ArrayList<String> list = new ArrayList<>();
        for (int i = 0; i < loopNumber; i++) {
            // 临界区,会产生竞态条件
            method2(list);
            method3(list);
        }
    }

    private void method2(ArrayList<String> list) {
        list.add("1");
    }
    private void method3(ArrayList<String> list) {
        list.remove(0);
    }
}


class ThreadSafeSubClass extends MTest5ThreadSafe{

    public void method3(ArrayList<String> list) {
        new Thread(() -> {
            list.remove(0);
        }).start();
    }
}


常见线程安全类


String

Integer

StringBuffer

Random

Vector

Hashtable

java.util.concurrent 包下的类

这里说它们是线程安全的是指,多个线程调用它们同一个实例的某个方法时,是线程安全的。

它们的每个方法是原子的,但

注意

它们多个方法的组合不是原子的 。

// 如下,多个方法的组合不是原子

Hashtable table = new Hashtable();
// 线程1,线程2
if( table.get("key") == null) {
	table.put("key", value);
}

在这里插入图片描述



4.2 案例详解

package com.yyds.juc.monitor;

import lombok.extern.slf4j.Slf4j;

import java.util.Random;

@Slf4j(topic = "c.MTest7ExerciseTransfer")
public class MTest7ExerciseTransfer {
    public static void main(String[] args) throws InterruptedException {
        Account a = new Account(1000);
        Account b = new Account(1000);
        Thread t1 = new Thread(() -> {
            for (int i = 0; i < 1000; i++) {
                a.transfer(b, randomAmount());
            }
        }, "t1");
        Thread t2 = new Thread(() -> {
            for (int i = 0; i < 1000; i++) {
                b.transfer(a, randomAmount());
            }
        }, "t2");
        t1.start();
        t2.start();
        t1.join();
        t2.join();

        // 查看转账2000次后的总金额
        log.debug("total:{}",(a.getMoney() + b.getMoney()));
    }
    // Random 为线程安全
    static Random random = new Random();
    // 随机 1~100
    public static int randomAmount() {
        return random.nextInt(100) +1;
    }
}



class Account {
    private int money;
    public Account(int money) {
        this.money = money;
    }
    public int getMoney() {
        return money;
    }
    public void setMoney(int money) {
        this.money = money;
    }
    public  void transfer(Account target, int amount) {
        if (this.money > amount) {
            this.setMoney(this.getMoney() - amount);
            target.setMoney(target.getMoney() + amount);
        }
    }
}


// 运行结果
10:08:56.480 c.MTest7ExerciseTransfer [main] - total:4267
    
    
//使用synchronized进行改进
 class Account {
    private int money;
    public Account(int money) {
        this.money = money;
    }
    public int getMoney() {
        return money;
    }
    public void setMoney(int money) {
        this.money = money;
    }
    public  void transfer(Account target, int amount) {
        synchronized (Account.class){ // 锁是Account类
            if (this.money > amount) {
                this.setMoney(this.getMoney() - amount);
                target.setMoney(target.getMoney() + amount);
            }
        }
    }
}
// 运行结果
10:10:38.624 c.MTest7ExerciseTransfer [main] - total:2000



5、Monitor



5.1 对象头

# 普通对象
|--------------------------------------------------------------|
|                    Object Header (64 bits)                   |
|------------------------------------|-------------------------|
|         Mark Word (32 bits)        |    Klass Word (32 bits) |
|------------------------------------|-------------------------|


# 其中 Mark Word 结构为(32位虚拟机)
|-------------------------------------------------------|--------------------|
|                      Mark Word (32 bits)              |       State        |
|-------------------------------------------------------|--------------------|
|     hashcode:25     | age:4 | biased_lock:0 | 01      |       Normal       |  # 正常状态
|-------------------------------------------------------|--------------------|
| thread:23 | epoch:2 | age:4 | biased_lock:1 | 01      |       Biased       |  # 偏向锁
|-------------------------------------------------------|--------------------| 
|              ptr_to_lock_record:30          | 00      | Lightweight Locked |  # 轻量级锁
|-------------------------------------------------------|--------------------| 
|           ptr_to_heavyweight_monitor:30     | 10      | Heavyweight Locked |  # 重锁
|-------------------------------------------------------|--------------------|
|                                             | 11      |   Marked for GC    |
|-------------------------------------------------------|--------------------|


# 64位虚拟机
|--------------------------------------------------------------------|--------------------|
|                     Mark Word (64 bits)                            |      State         |
|--------------------------------------------------------------------|--------------------|
| unused:25 | hashcode:31 | unused:1 | age:4 | biased_lock:0 | 01    |      Normal        |
|--------------------------------------------------------------------|--------------------|
| thread:54 | epoch:2     | unused:1 | age:4 | biased_lock:1 | 01    |      Biased        |
|--------------------------------------------------------------------|--------------------|
|                      ptr_to_lock_record:62                 | 00    | Lightweight Locked |
|--------------------------------------------------------------------|--------------------|
|                  ptr_to_heavyweight_monitor:62             | 10    | Heavyweight Locked |
|--------------------------------------------------------------------|--------------------|
|                                                             | 11   |    Marked for GC   |
|--------------------------------------------------------------------|--------------------|



5.2 Monitor(锁)

​ 每个 Java 对象都可以关联一个 Monitor 对象,如果使用 synchronized 给对象上锁(重量级)之后,该对象头的Mark Word 中就被设置指向 Monitor 对象的指针 。

在这里插入图片描述

  • 刚开始 Monitor 中 Owner 为 null

  • 当 Thread-2 执行 synchronized(obj) 就会将 Monitor 的所有者 Owner 置为 Thread-2,Monitor中只能有一个 Owner

  • 在 Thread-2 上锁的过程中,如果 Thread-3,Thread-4,Thread-5 也来执行 synchronized(obj),就会进入EntryList BLOCKED

  • Thread-2 执行完同步代码块的内容,然后唤醒 EntryList 中等待的线程来竞争锁,竞争的时是非公平的

  • 图中 WaitSet 中的 Thread-0,Thread-1 是之前获得过锁,但条件不满足进入 WAITING 状态的线程



5.3 Synchronized原理

static final Object lock = new Object();
static int counter = 0;

public static void main(String[] args) {
    synchronized (lock) {
    	counter++;
    }
}
Code:
	stack=2, locals=3, args_size=1
        0: getstatic #2     // <- lock引用 (synchronized开始)
        3: dup
        4: astore_1         // lock引用 -> slot 1
        5: monitorenter     // 将 lock对象 MarkWord 置为 Monitor 指针
        6: getstatic #3     // <- i
        9: iconst_1         // 准备常数 1
        10: iadd            // +1
        11: putstatic #3    // -> i
        14: aload_1         // <- lock引用
        15: monitorexit     // 将 lock对象 MarkWord 重置, 唤醒 EntryList
        16: goto 24
        19: astore_2        // e -> slot 2
        20: aload_1         // <- lock引用
        21: monitorexit     // 将 lock对象 MarkWord 重置, 唤醒 EntryList
        22: aload_2         // <- slot 2 (e)
        23: athrow          // throw e
        24: return
	Exception table:
        from to target type
        6  16 19 any
        19 22 19 any



5.4 Synchronized进阶



5.4.1 轻量级锁

轻量级锁的使用场景:如果一个对象虽然有多线程要加锁,但

加锁的时间是错开的(也就是没有竞争)

,那么可以使用轻量级锁来优化。

轻量级锁对使用者是透明的,即语法仍然是 synchronized 。

static final Object obj = new Object();

public static void method1() {
    synchronized( obj ) {
    	// 同步块 A
    	method2();
    }
}
public static void method2() {
    synchronized( obj ) {
    	// 同步块 B
    }
}

在这里插入图片描述

1、创建锁记录(Lock Record)对象,每个线程都的栈帧都会包含一个锁记录的结构,内部可以存储锁定对象的Mark Word

2、让锁记录中 Object reference 指向锁对象,并尝试用 cas 替换 Object 的 Mark Word,将 Mark Word 的值存入锁记录

​ 如果 cas 替换成功,对象头中存储了

锁记录地址和状态 00

,表示由该线程给对象加锁 (如下图)

在这里插入图片描述

​ 如果 cas 失败,有两种情况

​ 如果是其它线程已经持有了该 Object 的轻量级锁,这时表明有竞争,进入

锁膨胀过程

​ 如果是自己执行了

synchronized 锁重入

,那么

再添加一条 Lock Record 作为重入的计数

(如下图)

在这里插入图片描述

3、当退出 synchronized 代码块(解锁时)如果有取值为 null 的锁记录,表示有重入,这时重置锁记录,表示重入计数减一

4、 当退出 synchronized 代码块(解锁时)锁记录的值不为 null,这时使用 cas 将

Mark Word 的值恢复给对象头


成功,则解锁成功

失败,说明轻量级锁进行了

锁膨胀或已经升级为重量级锁

,进入重量级锁解锁流程 。



5.4.2 锁膨胀

如果在尝试加轻量级锁的过程中,CAS 操作无法成功,这时

一种情况就是有其它线程为此对象加上了轻量级锁(有竞争)

,这时需要进行锁膨胀,将轻量级锁变为重量级锁。

1、当 Thread-1 进行轻量级加锁时,Thread-0 已经对该对象加了轻量级锁

在这里插入图片描述

2、这时 Thread-1 加轻量级锁失败,进入锁膨胀流程

即为 Object 对象申请 Monitor 锁,

让 Object 指向重量级锁地址



然后自己进入 Monitor 的 EntryList BLOCKED

在这里插入图片描述

3、当 Thread-0 退出同步块解锁时,使用 cas 将 Mark Word 的值恢复给对象头,失败。这时会进入重量级解锁流程,

即按照 Monitor 地址找到 Monitor 对象,设置 Owner 为 null,唤醒 EntryList 中 BLOCKED 线程 。



5.4.3 自旋优化

重量级锁竞争的时候,还可以使用自旋来进行优化,如果当前线程自旋成功(即这时候持锁线程已经退出了同步块,释放了锁),这时当前线程就可以避免阻塞。

注意:

  • 自旋会占用 CPU 时间,单核 CPU 自旋就是浪费,多核 CPU 自旋才能发挥优势。

  • 在 Java 6 之后自旋锁是自适应的,比如对象刚刚的一次自旋操作成功过,那么认为这次自旋成功的可能性会高,就多自旋几次;反之,就少自旋甚至不自旋。

  • Java 7 之后不能控制是否开启自旋功能 。



5.4.4 偏向锁

static final Object obj = new Object();

public static void m1() {
    synchronized( obj ) {
        // 同步块 A
        m2();
    }
}

public static void m2() {
    synchronized( obj ) {
        // 同步块 B
        m3();
    }
}

public static void m3() {
    synchronized( obj ) {
        // 同步块 C
    }
}

在这里插入图片描述


如上图,轻量级锁在没有竞争时(就自己这个线程),每次重入仍然需要执行 CAS 操作。

Java 6 中引入了偏向锁来做进一步优化:只有第一次使用

CAS 将线程 ID 设置到对象的 Mark Word 头,之后发现这个线程 ID 是自己的就表示没有竞争,不用重新 CAS

在这里插入图片描述

一个对象创建时:
	如果开启了偏向锁(默认开启),那么对象创建后,markword 值为 0x05 即最后 3 位为 101,这时它的thread、epoch、age 都为 0
	偏向锁是默认是延迟的,不会在程序启动时立即生效,如果想避免延迟,可以加 VM 参数 -XX:BiasedLockingStartupDelay=0 来禁用延迟
	如果没有开启偏向锁,那么对象创建后,markword 值为 0x01 即最后 3 位为 001,这时它的 hashcode、age 都为 0,第一次用到 hashcode 时才会赋值


5.4.1 撤销偏向锁

1、调用了对象的 hashCode,

但偏向锁的对象 MarkWord 中存储的是线程 id,如果调用 hashCode 会导致偏向锁被撤销


轻量级锁会在锁记录中记录 hashCode

重量级锁会在 Monitor 中记录 hashCode

2、当有其它线程使用偏向锁对象时,会将偏向锁升级为轻量级锁 。

3、调用 wait/notify



5.4.2 批量重偏向

如果对象虽然被多个线程访问,但没有竞争,这时偏向了线程 T1 的对象仍有机会重新偏向 T2,重偏向会重置对象的 Thread ID

当撤销偏向锁阈值超过 20 次后,jvm 会这样觉得,我是不是偏向错了呢,于是会在给这些对象加锁时重新偏向至加锁线程



5.4.3 批量撤销

当撤销偏向锁阈值超过 40 次后,jvm 会这样觉得,自己确实偏向错了,根本就不该偏向。于是整个类的所有对象都会变为不可偏向的,新建的对象也是不可偏向的 。



5.4.5 锁消除

@Fork(1)
@BenchmarkMode(Mode.AverageTime)
@Warmup(iterations=3)
@Measurement(iterations=5)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
public class MyBenchmark {
    
    static int x = 0;
    
    @Benchmark
    public void a() throws Exception {
        x++;
    }
    
    
    @Benchmark // JIT即时编译器会进行锁消除优化
    public void b() throws Exception {
    Object o = new Object();
        synchronized (o) {
       	 x++;
        }
    }
}



6、Wait/Notify



6.1 Wait/Notify的简单介绍

  • obj.wait() 让进入 object 监视器的线程到 waitSet 等待

  • obj.notify() 在 object 上正在 waitSet 等待的线程中

    挑一个

    唤醒

  • obj.notifyAll() 让 object 上正在 waitSet 等待的线程全部唤醒

注意:它们都是线程之间进行协作的手段,都属于 Object 对象的方法。

必须获得此对象的锁

,才能调用这几个方法 。

package com.yyds.juc.monitor;

import lombok.extern.slf4j.Slf4j;
import static java.lang.Thread.sleep;

@Slf4j(topic = "c.MTest8")
public class MTest8 {
    final static Object obj = new Object();
    public static void main(String[] args) throws InterruptedException {
        new Thread(() -> {
            synchronized (obj) {
                log.debug("执行....");
                try {
                    obj.wait(); // 让线程在obj上一直等待下去
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                log.debug("其它代码....");
            }
        },"t1").start();
        new Thread(() -> {
            synchronized (obj) {
                log.debug("执行....");
                try {
                    obj.wait(); // 让线程在obj上一直等待下去
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                log.debug("其它代码....");
            }
        },"t2").start();

        // 主线程两秒后执行
        sleep(2000);
        log.debug("唤醒 obj 上其它线程");
        synchronized (obj) {
            obj.notify(); // 唤醒obj上一个线程
           // obj.notifyAll(); // 唤醒obj上所有等待线程
        }
    }
}
# notify 的一种结果
14:27:24.172 c.MTest8 [t1] - 执行....
14:27:24.188 c.MTest8 [t2] - 执行....
14:27:26.182 c.MTest8 [main] - 唤醒 obj 上其它线程
14:27:26.182 c.MTest8 [t1] - 其它代码....

# notifyAll 的结果
14:25:12.212 c.MTest8 [t1] - 执行....
14:25:12.212 c.MTest8 [t2] - 执行....
14:25:14.214 c.MTest8 [main] - 唤醒 obj 上其它线程
14:25:14.214 c.MTest8 [t2] - 其它代码....
14:25:14.214 c.MTest8 [t1] - 其它代码....


wait() 方法会释放对象的锁,进入 WaitSet 等待区,从而让其他线程就机会获取对象的锁。无限制等待,直到notify 为止



wait(long n) 有时限的等待, 到 n 毫秒后结束等待,或是被 notify


sleep(long n) 和 wait(long n) 的区别

  1. sleep 是 Thread 方法,而 wait 是 Object 的方法

  2. sleep 不需要强制和 synchronized 配合使用,但

    wait 需要和 synchronized 一起用


  3. sleep 在睡眠的同时,不会释放对象锁的,但 wait 在等待的时候会释放对象锁

  4. 它们状态 TIMED_WAITING



6.2 如何正确使用Wait/Notify



6.2.1 原始代码

package com.yyds.juc.monitor;

import lombok.extern.slf4j.Slf4j;

import static java.lang.Thread.sleep;

@Slf4j(topic = "c.MTest9")
public class MTest9 {
    static final  Object room = new Object();
    static boolean hasCigarette = false;
    static boolean hasTakeout = false;

    public static void main(String[] args) throws InterruptedException {

        new Thread(() -> {
            synchronized (room){
                log.debug("有烟没?【{}】",hasCigarette);

                if(!hasCigarette){
                    log.debug("没有烟,先休息一会!");
                    try {
                        sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }

                log.debug("有烟没?【{}】",hasCigarette);
                if(hasCigarette){
                    log.debug("可以干活了!");
                }
            }
        },"tom").start();


        for (int i = 0; i < 5; i++) {
            new Thread(() -> {
                synchronized (room) {
                    log.debug("可以开始干活了");
                }
            }, "其它人").start();
        }

        sleep(1000);
        new Thread(() -> {
            // 这里能不能加 synchronized (room)?
            hasCigarette = true;
            log.debug("烟到了噢!");
        }, "送烟的").start();
    }
}
14:48:48.419 c.MTest9 [tom] - 有烟没?【false】
14:48:48.419 c.MTest9 [tom] - 没有烟,先休息一会!
14:48:49.422 c.MTest9 [送烟的] - 烟到了噢!
14:48:50.421 c.MTest9 [tom] - 有烟没?【true】
14:48:50.421 c.MTest9 [tom] - 可以干活了!
14:48:50.421 c.MTest9 [其它人] - 可以开始干活了
14:48:50.421 c.MTest9 [其它人] - 可以开始干活了
14:48:50.421 c.MTest9 [其它人] - 可以开始干活了
14:48:50.421 c.MTest9 [其它人] - 可以开始干活了
14:48:50.421 c.MTest9 [其它人] - 可以开始干活了

# 其它干活的线程,都要一直阻塞,效率太低
# tom线程必须睡足 2s 后才能醒来,就算烟提前送到,也无法立刻醒来
# 加了 synchronized (room) 后,就好比tom在里面反锁了门睡觉,烟根本没法送进门,main 没加synchronized 就好像 main 线程是翻窗户进来的

# 下面使用 wait - notify 机制进行改进



6.2.2 改进代码-版本2

package com.yyds.juc.monitor;

import static java.lang.Thread.sleep;

import lombok.extern.slf4j.Slf4j;


@Slf4j(topic = "c.MTest9V2")
public class MTest9V2 {
    static final  Object room = new Object();
    static boolean hasCigarette = false;
    static boolean hasTakeout = false;

    public static void main(String[] args) throws InterruptedException {

        new Thread(() -> {
            synchronized (room){
                log.debug("有烟没?【{}】",hasCigarette);

                if(!hasCigarette){
                    log.debug("没有烟,先休息一会!");
                    try {
                        room.wait(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }

                log.debug("有烟没?【{}】",hasCigarette);
                if(hasCigarette){
                    log.debug("可以干活了!");
                }
            }
        },"tom").start();


        for (int i = 0; i < 5; i++) {
            new Thread(() -> {
                synchronized (room) {
                    log.debug("可以开始干活了");
                }
            }, "其它人").start();
        }

        sleep(1000);
        new Thread(() -> {
            synchronized (room) { 
                hasCigarette = true;
                log.debug("烟到了噢!");
                // 唤醒
                room.notify();
            }

        }, "送烟的").start();
    }
}

14:58:59.780 c.MTest9V2 [tom] - 有烟没?【false】
14:58:59.780 c.MTest9V2 [tom] - 没有烟,先休息一会!
14:58:59.780 c.MTest9V2 [其它人] - 可以开始干活了
14:58:59.780 c.MTest9V2 [其它人] - 可以开始干活了
14:58:59.780 c.MTest9V2 [其它人] - 可以开始干活了
14:58:59.780 c.MTest9V2 [其它人] - 可以开始干活了
14:58:59.795 c.MTest9V2 [其它人] - 可以开始干活了
14:59:00.782 c.MTest9V2 [送烟的] - 烟到了噢!
14:59:00.782 c.MTest9V2 [tom] - 有烟没?【true】
14:59:00.782 c.MTest9V2 [tom] - 可以干活了!


# 解决了其它干活的线程阻塞的问题
# 但如果有其它线程也在等待条件呢?



6.2.3 改进代码-版本3

package com.yyds.juc.monitor;

import lombok.extern.slf4j.Slf4j;

import static java.lang.Thread.sleep;


@Slf4j(topic = "c.MTest9V3")
public class MTest9V3 {
    static final  Object room = new Object();
    static boolean hasCigarette = false;
    static boolean hasTakeout = false;

    public static void main(String[] args) throws InterruptedException {

        new Thread(() -> {
            synchronized (room){
                log.debug("有烟没?【{}】",hasCigarette);

                if(!hasCigarette){
                    log.debug("没有烟,先休息一会!");
                    try {
                        room.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }

                log.debug("有烟没?【{}】",hasCigarette);
                if(hasCigarette){
                    log.debug("可以干活了!");
                }else {
                    log.debug("没干成活...");
                }
            }
        },"tom").start();


        new Thread(() -> {
            synchronized (room){
                log.debug("有外卖没?【{}】",hasTakeout);

                if(!hasTakeout){
                    log.debug("没有外卖,先休息一会!");
                    try {
                        room.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }

                log.debug("有外卖没?【{}】",hasTakeout);
                if(hasTakeout){
                    log.debug("可以干活了!");
                }else {
                    log.debug("没干成活...");
                }
            }
        },"jerry").start();


        for (int i = 0; i < 5; i++) {
            new Thread(() -> {
                synchronized (room) {
                    log.debug("可以开始干活了");
                }
            }, "其它人").start();
        }

        sleep(1000);
        new Thread(() -> {
            synchronized (room) {
                hasTakeout = true;
                log.debug("外卖到了噢!");
                // 唤醒(随机唤醒一个)
                room.notify();
            }

        }, "送外卖的").start();
    }
}
15:06:23.726 c.MTest9V3 [tom] - 有烟没?【false】
15:06:23.726 c.MTest9V3 [tom] - 没有烟,先休息一会!
15:06:23.726 c.MTest9V3 [其它人] - 可以开始干活了
15:06:23.726 c.MTest9V3 [其它人] - 可以开始干活了
15:06:23.726 c.MTest9V3 [其它人] - 可以开始干活了
15:06:23.726 c.MTest9V3 [其它人] - 可以开始干活了
15:06:23.726 c.MTest9V3 [其它人] - 可以开始干活了
15:06:23.726 c.MTest9V3 [jerry] - 有外卖没?【false】
15:06:23.726 c.MTest9V3 [jerry] - 没有外卖,先休息一会!
15:06:24.741 c.MTest9V3 [送外卖的] - 外卖到了噢!
15:06:24.741 c.MTest9V3 [tom] - 有烟没?【false】
15:06:24.741 c.MTest9V3 [tom] - 没干成活...

# notify 只能随机唤醒一个 WaitSet 中的线程,这时如果有其它线程也在等待,那么就可能唤醒不了正确的线程,称之为【虚假唤醒】
# 解决方法,改为 notifyAll



6.2.3 改进代码-版本4

package com.yyds.juc.monitor;

import lombok.extern.slf4j.Slf4j;

import static java.lang.Thread.sleep;


@Slf4j(topic = "c.MTest9V4")
public class MTest9V4 {
    static final  Object room = new Object();
    static boolean hasCigarette = false;
    static boolean hasTakeout = false;

    public static void main(String[] args) throws InterruptedException {

        new Thread(() -> {
            synchronized (room){
                log.debug("有烟没?【{}】",hasCigarette);

                if(!hasCigarette){
                    log.debug("没有烟,先休息一会!");
                    try {
                        room.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }

                log.debug("有烟没?【{}】",hasCigarette);
                if(hasCigarette){
                    log.debug("可以干活了!");
                }else {
                    log.debug("没干成活...");
                }
            }
        },"tom").start();


        new Thread(() -> {
            synchronized (room){
                log.debug("有外卖没?【{}】",hasTakeout);

                if(!hasTakeout){
                    log.debug("没有外卖,先休息一会!");
                    try {
                        room.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }

                log.debug("有外卖没?【{}】",hasTakeout);
                if(hasTakeout){
                    log.debug("可以干活了!");
                }else {
                    log.debug("没干成活...");
                }
            }
        },"jerry").start();


        for (int i = 0; i < 5; i++) {
            new Thread(() -> {
                synchronized (room) {
                    log.debug("可以开始干活了");
                }
            }, "其它人").start();
        }

        sleep(1000);
        new Thread(() -> {
            synchronized (room) {
                hasTakeout = true;
                log.debug("外卖到了噢!");
                // 全部唤醒
                room.notifyAll();
            }

        }, "送外卖的").start();
    }
}
15:09:32.757 c.MTest9V4 [tom] - 有烟没?【false】
15:09:32.757 c.MTest9V4 [tom] - 没有烟,先休息一会!
15:09:32.757 c.MTest9V4 [其它人] - 可以开始干活了
15:09:32.757 c.MTest9V4 [其它人] - 可以开始干活了
15:09:32.757 c.MTest9V4 [其它人] - 可以开始干活了
15:09:32.757 c.MTest9V4 [其它人] - 可以开始干活了
15:09:32.757 c.MTest9V4 [其它人] - 可以开始干活了
15:09:32.757 c.MTest9V4 [jerry] - 有外卖没?【false】
15:09:32.757 c.MTest9V4 [jerry] - 没有外卖,先休息一会!
15:09:33.759 c.MTest9V4 [送外卖的] - 外卖到了噢!
15:09:33.759 c.MTest9V4 [jerry] - 有外卖没?【true】
15:09:33.759 c.MTest9V4 [jerry] - 可以干活了!
15:09:33.759 c.MTest9V4 [tom] - 有烟没?【false】
15:09:33.759 c.MTest9V4 [tom] - 没干成活...

# 用 notifyAll 仅解决某个线程的唤醒问题,但使用 if + wait 判断仅有一次机会,一旦条件不成立,就没有重新判断的机会了
# 解决方法,用 while + wait,当条件不成立,再次 wait



6.2.3 改进代码-版本5

package com.yyds.juc.monitor;

import lombok.extern.slf4j.Slf4j;

import static java.lang.Thread.sleep;


@Slf4j(topic = "c.MTest9V5")
public class MTest9V5 {
    static final  Object room = new Object();
    static boolean hasCigarette = false;
    static boolean hasTakeout = false;

    public static void main(String[] args) throws InterruptedException {

        new Thread(() -> {
            synchronized (room){
                log.debug("有烟没?【{}】",hasCigarette);

                while (!hasCigarette){
                    log.debug("没有烟,先休息一会!");
                    try {
                        room.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }

                log.debug("有烟没?【{}】",hasCigarette);
                if(hasCigarette){
                    log.debug("可以干活了!");
                }else {
                    log.debug("没干成活...");
                }
            }
        },"tom").start();


        new Thread(() -> {
            synchronized (room){
                log.debug("有外卖没?【{}】",hasTakeout);

                while(!hasTakeout){
                    log.debug("没有外卖,先休息一会!");
                    try {
                        room.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }

                log.debug("有外卖没?【{}】",hasTakeout);
                if(hasTakeout){
                    log.debug("可以干活了!");
                }else {
                    log.debug("没干成活...");
                }
            }
        },"jerry").start();


        for (int i = 0; i < 5; i++) {
            new Thread(() -> {
                synchronized (room) {
                    log.debug("可以开始干活了");
                }
            }, "其它人").start();
        }

        sleep(1000);
        new Thread(() -> {
            synchronized (room) {
                hasTakeout = true;
                log.debug("外卖到了噢!");
                // 全部唤醒
                room.notifyAll();
            }

        }, "送外卖的").start();
    }
}
15:13:19.224 c.MTest9V5 [tom] - 有烟没?【false】
15:13:19.224 c.MTest9V5 [tom] - 没有烟,先休息一会!
15:13:19.224 c.MTest9V5 [其它人] - 可以开始干活了
15:13:19.224 c.MTest9V5 [其它人] - 可以开始干活了
15:13:19.224 c.MTest9V5 [其它人] - 可以开始干活了
15:13:19.224 c.MTest9V5 [其它人] - 可以开始干活了
15:13:19.224 c.MTest9V5 [其它人] - 可以开始干活了
15:13:19.224 c.MTest9V5 [jerry] - 有外卖没?【false】
15:13:19.224 c.MTest9V5 [jerry] - 没有外卖,先休息一会!
15:13:20.224 c.MTest9V5 [送外卖的] - 外卖到了噢!
15:13:20.224 c.MTest9V5 [jerry] - 有外卖没?【true】
15:13:20.224 c.MTest9V5 [jerry] - 可以干活了!
15:13:20.224 c.MTest9V5 [tom] - 没有烟,先休息一会!

总结一下:模板如下

synchronized(lock) {
    while(条件不成立) {
    	lock.wait();
    }
	// 干活
}

//另一个线程
synchronized(lock) {
    lock.notifyAll();
}



6.3 保护性暂停



6.3.1 基本概念

保护性暂停,即 Guarded Suspension,

用在一个线程等待另一个线程的执行结果



要点

  • 有一个结果需要从一个线程传递到另一个线程,让他们关联同一个 GuardedObject

  • 如果有结果不断从一个线程到另一个线程那么可以使用消息队列(见生产者/消费者)

  • JDK 中,join 的实现、Future 的实现,采用的就是此模式

  • 因为要等待另一方的结果,因此归类到同步模式

在这里插入图片描述



6.3.2 简单实现

package com.yyds.juc.monitor;

public class GuardedObject {

    private Object response;
    private final Object lock = new Object();
    /**
     * 获取response
     */
    public Object get(){
        synchronized (lock){
            // 条件不满足,就等待
            while (response == null){
                try {
                    lock.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            return response;
        }
    }

    /**
     * 将数据获取给response
     */
    public void complete(Object response){
        synchronized (lock){
            // 条件满足,通知等待线程
            this.response = response;
            lock.notifyAll();
        }
    }
}
package com.yyds.juc.monitor;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;

public class Downloader {
    public static List<String> download() throws IOException {
        HttpURLConnection conn = (HttpURLConnection) new URL("https://www.baidu.com/").openConnection();
        List<String> lines = new ArrayList<>();
        try (BufferedReader reader =
                     new BufferedReader(new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8))) {
            String line;
            while ((line = reader.readLine()) != null) {
                lines.add(line);
            }
        }
        return lines;
    }
}
package com.yyds.juc.monitor;

import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.util.List;


@Slf4j(topic = "c.MTest0")
public class MTest0 {

    public static void main(String[] args) {

        GuardedObject guardedObject = new GuardedObject();

        // t2线程执行下载
        new Thread(() -> {
            List<String> response = null;
            try {
                response = Downloader.download();
                log.debug("download complete......");
                guardedObject.complete(response);
            } catch (IOException e) {
                e.printStackTrace();
            }

        },"t2").start();


        // t1线程获取t2线程的结果
        new Thread(() -> {
            log.debug("waiting...");
            Object response = guardedObject.get();
            log.debug("get response: [{}] lines", ((List<String>) response).size());
        },"t1").start();
    }
}
18:10:24.053 c.MTest0 [t1] - waiting...
18:10:24.977 c.MTest0 [t2] - download complete......
18:10:24.977 c.MTest0 [t1] - get response: [3] lines



6.3.3 带超时版实现

package com.yyds.juc.monitor;

import lombok.extern.slf4j.Slf4j;

@Slf4j(topic = "c.GuardedObject2")
public class GuardedObject2 {

    private Object response;
    private final Object lock = new Object();


    /**
     * 获取response
     */
    public Object get(long millis){
        synchronized (lock){

            // 1、记录最初的时间
            long begin = System.currentTimeMillis();
            // 2、已经经历的时间
            long timePassed = 0;

            // 条件不满足,就等待
            while (response == null){

                // 3、还需要等待的时间
                long waitTime = millis - timePassed;
                log.debug("waitTime: {}", waitTime);

                if(waitTime <= 0){
                    log.debug("break...");
                    break;
                }

                try {
                    lock.wait(waitTime);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                // 4、如果提前被唤醒
                timePassed = System.currentTimeMillis() - begin;
                log.debug("timePassed: {}, object is null 【{}】", timePassed, response == null);
            }
            return response;
        }
    }

    /**
     * 将数据获取给response
     */
    public void complete(Object response){
        synchronized (lock){
            // 条件满足,通知等待线程
            this.response = response;
            log.debug("notify...");
            lock.notifyAll();
        }
    }
}

测试

package com.yyds.juc.monitor;

import lombok.extern.slf4j.Slf4j;

import java.util.Arrays;
import java.util.List;

import static java.lang.Thread.sleep;

@Slf4j(topic = "c.MTest0Time")
public class MTest0Time {
    public static void main(String[] args) {
        GuardedObject2 guardedObject2 = new GuardedObject2();

        // t2线程结果传输给t1
        new Thread(() -> {
            try {
                sleep(1000);
                guardedObject2.complete(null);
                sleep(1000);
                guardedObject2.complete(Arrays.asList("hello","world"));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"t2").start();


        Object response = guardedObject2.get(2500);
        if(response != null){
            log.debug("get response: [{}] lines", ((List<String>) response).size());
        }else {
            log.debug("can't get response");
        }
    }
}

// 测试结果如下
10:31:22.105 c.GuardedObject2 [main] - waitTime: 2500
10:31:23.119 c.GuardedObject2 [t2] - notify...
10:31:23.119 c.GuardedObject2 [main] - timePassed: 1014, object is nulltrue10:31:23.119 c.GuardedObject2 [main] - waitTime: 1486
10:31:24.121 c.GuardedObject2 [t2] - notify...
10:31:24.121 c.GuardedObject2 [main] - timePassed: 2016, object is nullfalse10:31:24.121 c.MTest0Time [main] - get response: [2] lines



6.3.4 join原理

join体现就是保护性暂停。

t1.join();



// 源码如下
public final void join() throws InterruptedException {
        join(0);
}


 public final synchronized void join(long millis) throws InterruptedException {
        long base = System.currentTimeMillis();
        long now = 0;

        if (millis < 0) {
            throw new IllegalArgumentException("timeout value is negative");
        }

        if (millis == 0) {
            // 调用者线程进入 t1 的 waitSet 等待, 直到 t1 运行结束
            while (isAlive()) {
                wait(0);
            }
        } else {
            while (isAlive()) {
                long delay = millis - now;
                if (delay <= 0) {
                    break;
                }
                wait(delay);
                now = System.currentTimeMillis() - base;
            }
        }
}



6.3.5 多任务GuardedObject

​ 图中 Futures 就好比居民楼一层的信箱(每个信箱有房间编号),左侧的 t0,t2,t4 就好比等待邮件的居民,右侧的 t1,t3,t5 就好比邮递员。

​ 如果需要在多个类之间使用 GuardedObject 对象,作为参数传递不是很方便,因此设计一个用来解耦的中间类,这样不仅能够解耦【结果等待者】和【结果生产者】,还能够同时支持多个任务的管理 。

在这里插入图片描述

package com.yyds.juc.monitor;

import lombok.extern.slf4j.Slf4j;

@Slf4j(topic = "c.GuardedObject3")
public class GuardedObject3 {

    // 新增 id 用来标识 Guarded Object
    private int id;

    public GuardedObject3(int id) {
        this.id = id;
    }

    public int getId() {
        return id;
    }

    private Object response;

    /**
     * 获取response
     */
    public Object get(long millis){
        synchronized (this){

            // 1、记录最初的时间
            long begin = System.currentTimeMillis();
            // 2、已经经历的时间
            long timePassed = 0;

            // 条件不满足,就等待
            while (response == null){

                // 3、还需要等待的时间
                long waitTime = millis - timePassed;
                log.debug("waitTime: {}", waitTime);

                if(waitTime <= 0){
                    log.debug("break...");
                    break;
                }

                try {
                    this.wait(waitTime);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                // 4、如果提前被唤醒
                timePassed = System.currentTimeMillis() - begin;
                log.debug("timePassed: {}, object is null 【{}】", timePassed, response == null);
            }
            return response;
        }
    }

    /**
     * 将数据获取给response
     */
    public void complete(Object response){
        synchronized (this){
            // 条件满足,通知等待线程
            this.response = response;
            log.debug("notify...");
            this.notifyAll();
        }
    }
}
package com.yyds.juc.monitor;

import java.util.Hashtable;
import java.util.Map;
import java.util.Set;

/**
 * 中间解耦的类
 */
public class MailBoxes {

    private static Map<Integer, GuardedObject3> boxes = new Hashtable<>();

    private static int id = 1;

    // 产生唯一 id
    private static synchronized int generateId() {
        return id++;
    }

    public static GuardedObject3 getGuardedObject(int id) {
        return boxes.remove(id);
    }

    public static GuardedObject3 createGuardedObject() {
        GuardedObject3 go = new GuardedObject3(generateId());
        boxes.put(go.getId(), go);
        return go;
    }

    public static Set<Integer> getIds() {
        return boxes.keySet();
    }
}
package com.yyds.juc.monitor;

import lombok.extern.slf4j.Slf4j;

@Slf4j(topic = "c.MPeople")
public class MPeople extends Thread {

    @Override
    public void run() {
        // 收信
        GuardedObject3 guardedObject = MailBoxes.createGuardedObject();
        log.debug("开始收信 id:{}", guardedObject.getId());
        Object mail = guardedObject.get(5000);
        log.debug("收到信 id:{}, 内容:{}", guardedObject.getId(), mail);
    }

}



package com.yyds.juc.monitor;

import lombok.extern.slf4j.Slf4j;

@Slf4j(topic = "c.MPostman")
public class MPostman extends Thread {
    private int id;
    private String mail;
    public MPostman(int id, String mail) {
        this.id = id;
        this.mail = mail;
    }
    @Override
    public void run() {
        GuardedObject3 guardedObject = MailBoxes.getGuardedObject(id);
        log.debug("送信 id:{}, 内容:{}", id, mail);
        guardedObject.complete(mail);
    }
}

测试类

package com.yyds.juc.monitor;

import static java.lang.Thread.sleep;

public class MuliGuadeTest {
    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < 3; i++) {
            new MPeople().start();
        }

        sleep(1000);

        for (Integer id : MailBoxes.getIds()) {
            new MPostman(id, "内容" + id).start();
        }
    }
}

// 测试结果如下
11:20:37.183 c.MPeople [Thread-0] - 开始收信 id:1
11:20:37.183 c.MPeople [Thread-2] - 开始收信 id:2
11:20:37.183 c.MPeople [Thread-1] - 开始收信 id:3
11:20:37.186 c.GuardedObject3 [Thread-2] - waitTime: 5000
11:20:37.186 c.GuardedObject3 [Thread-0] - waitTime: 5000
11:20:37.186 c.GuardedObject3 [Thread-1] - waitTime: 5000
11:20:38.195 c.MPostman [Thread-3] - 送信 id:3, 内容:内容3
11:20:38.195 c.MPostman [Thread-4] - 送信 id:2, 内容:内容2
11:20:38.195 c.GuardedObject3 [Thread-3] - notify...
11:20:38.195 c.GuardedObject3 [Thread-4] - notify...
11:20:38.195 c.MPostman [Thread-5] - 送信 id:1, 内容:内容1
11:20:38.195 c.GuardedObject3 [Thread-2] - timePassed: 1009, object is nullfalse11:20:38.195 c.GuardedObject3 [Thread-5] - notify...
11:20:38.195 c.GuardedObject3 [Thread-1] - timePassed: 1009, object is nullfalse11:20:38.195 c.MPeople [Thread-2] - 收到信 id:2, 内容:内容2
11:20:38.195 c.MPeople [Thread-1] - 收到信 id:3, 内容:内容3
11:20:38.195 c.GuardedObject3 [Thread-0] - timePassed: 1009, object is nullfalse11:20:38.195 c.MPeople [Thread-0] - 收到信 id:1, 内容:内容1



6.4 生产者/消费者

  • 与前面的保护性暂停中的 GuardObject 不同,不需要产生结果和消费结果的线程一一对应

  • 消费队列可以用来平衡生产和消费的线程资源

  • 生产者仅负责产生结果数据,不关心数据该如何处理,而消费者专心处理结果数据

  • 消息队列是有容量限制的,满时不会再加入数据,空时不会再消耗数据

  • JDK 中各种阻塞队列,采用的就是这种模式

在这里插入图片描述

代码实现如下

package com.yyds.juc.produce;

public class Message {
    private int id;
    private Object message;


    public Message(int id, Object message) {
        this.id = id;
        this.message = message;
    }
    public int getId() {
        return id;
    }

    public Object getMessage() {
        return message;
    }
}
package com.yyds.juc.produce;

import lombok.extern.slf4j.Slf4j;

import java.util.LinkedList;

@Slf4j(topic = "c.MessageQueue")
public class MessageQueue {

    private LinkedList<Message> queue;

    private int capacity;

    public MessageQueue(int capacity) {
        this.capacity = capacity;
        this.queue = new LinkedList<>();
    }

    public Message take() {
        synchronized (queue) {
            while (queue.isEmpty()) {
                log.debug("没货了, wait");
                try {
                    queue.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            Message message = queue.removeFirst();
            queue.notifyAll();
            return message;
        }
    }

    public void put(Message message) {
        synchronized (queue) {
            while (queue.size() == capacity) {
                log.debug("库存已达上限, wait");
                try {
                    queue.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            queue.addLast(message);
            queue.notifyAll();
        }
    }

}
package com.yyds.juc.produce;

import com.yyds.juc.monitor.Downloader;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.util.List;

@Slf4j(topic = "c.ProConTest")
public class ProConTest {
    public static void main(String[] args) {
        // 创建容量为2的队列
        MessageQueue messageQueue = new MessageQueue(2);

        // 4 个生产者线程, 下载任务
        for (int i = 0; i < 4; i++) {
            int id = i;
            new Thread(() -> {
                try {
                    log.debug("download...");
                    List<String> response = Downloader.download();
                    log.debug("try put message({})", id);
                    messageQueue.put(new Message(id, response));
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }, "生产者" + i).start();
        }


        // 1 个消费者线程, 处理结果
        new Thread(() -> {
            while (true) {
                Message message = messageQueue.take();
                List<String> response = (List<String>) message.getMessage();
                log.debug("take message({}): [{}] lines", message.getId(), response.size());
            }
        }, "消费者").start();
        
    }
}
// 测试结果如下
11:39:46.911 c.ProConTest [生产者0] - download...
11:39:46.911 c.MessageQueue [消费者] - 没货了, wait
11:39:46.911 c.ProConTest [生产者3] - download...
11:39:46.911 c.ProConTest [生产者1] - download...
11:39:46.911 c.ProConTest [生产者2] - download...
11:39:47.853 c.ProConTest [生产者2] - try put message(2)
11:39:47.853 c.ProConTest [生产者1] - try put message(1)
11:39:47.853 c.ProConTest [生产者3] - try put message(3)
11:39:47.853 c.ProConTest [生产者0] - try put message(0)
11:39:47.855 c.MessageQueue [生产者0] - 库存已达上限, wait
11:39:47.855 c.MessageQueue [生产者3] - 库存已达上限, wait
11:39:47.855 c.ProConTest [消费者] - take message(2): [3] lines
11:39:47.855 c.MessageQueue [生产者0] - 库存已达上限, wait
11:39:47.855 c.ProConTest [消费者] - take message(1): [3] lines
11:39:47.856 c.ProConTest [消费者] - take message(3): [3] lines
11:39:47.856 c.ProConTest [消费者] - take message(0): [3] lines
11:39:47.856 c.MessageQueue [消费者] - 没货了, wait



7、pack、unpack

// 暂停当前线程
LockSupport.park();

// 恢复某个线程的运行
LockSupport.unpark(暂停线程对象)

与 Object 的 wait & notify 相比

  • wait,notify 和 notifyAll 必须配合 Object Monitor 一起使用,而 park,unpark 不必

  • park & unpark 是以线程为单位来【阻塞】和【唤醒】线程,而 notify 只能随机唤醒一个等待线程,notifyAll是唤醒所有等待线程,就不那么【精确】

  • park & unpark 可以先 unpark,而 wait & notify 不能先 notify



7.1 先pack后unpack

在这里插入图片描述

1. 当前线程调用 Unsafe.park() 方法
2. 检查 _counter ,本情况为 0,这时,获得 _mutex 互斥锁
3. 线程进入 _cond 条件变量阻塞
4. 设置 _counter = 0  

在这里插入图片描述

1. 调用 Unsafe.unpark(Thread_0) 方法,设置 _counter 为 1
2. 唤醒 _cond 条件变量中的 Thread_0
3. Thread_0 恢复运行
4. 设置 _counter 为 0



7.2 先unpack后pack

在这里插入图片描述

1. 调用 Unsafe.unpark(Thread_0) 方法,设置 _counter 为 1
2. 当前线程调用 Unsafe.park() 方法
3. 检查 _counter ,本情况为 1,这时线程无需阻塞,继续运行
4. 设置 _counter 为 0



8、线程的6种状态

在这里插入图片描述



情况 1 NEW –> RUNNABLE

当调用 t.start() 方法时,由 NEW –> RUNNABLE



情况 2 RUNNABLE <–> WAITING

t 线程用 synchronized(obj) 获取了对象锁后

  • 调用 obj.wait() 方法时,t 线程从 RUNNABLE –> WAITING

  • 调用 obj.notify() , obj.notifyAll() , t.interrupt() 时

    • 竞争锁成功,t 线程从 WAITING –> RUNNABLE

    • 竞争锁失败,t 线程从 WAITING –> BLOCKED



情况 3 RUNNABLE <–> WAITING

  • 当前线程调用 t.join() 方法时,当前线程从 RUNNABLE –> WAITING

    • 注意是当前线程在t 线程对象的监视器上等待
  • t 线程运行结束,或调用了当前线程的 interrupt() 时,当前线程从 WAITING –> RUNNABLE



情况 4 RUNNABLE <–> WAITING

  • 当前线程调用 LockSupport.park() 方法会让当前线程从 RUNNABLE –> WAITING

  • 调用 LockSupport.unpark(目标线程) 或调用了线程 的 interrupt() ,会让目标线程从 WAITING –>

    RUNNABLE



情况 5 RUNNABLE <–> TIMED_WAITING

t 线程用 synchronized(obj) 获取了对象锁后

  • 调用 obj.wait(long n) 方法时,t 线程从 RUNNABLE –> TIMED_WAITING

  • t 线程等待时间超过了 n 毫秒,或调用 obj.notify() , obj.notifyAll() , t.interrupt() 时

    • 竞争锁成功,t 线程从 TIMED_WAITING –> RUNNABLE

    • 竞争锁失败,t 线程从 TIMED_WAITING –> BLOCKED



情况 6 RUNNABLE <–> TIMED_WAITING

  • 当前线程调用 t.join(long n) 方法时,当前线程从 RUNNABLE –> TIMED_WAITING

    • 注意是当前线程在t 线程对象的监视器上等待
  • 当前线程等待时间超过了 n 毫秒,或t 线程运行结束,或调用了当前线程的 interrupt() 时,当前线程从

    TIMED_WAITING –> RUNNABLE



情况 7 RUNNABLE <–> TIMED_WAITING

  • 当前线程调用 Thread.sleep(long n) ,当前线程从 RUNNABLE –> TIMED_WAITING

  • 当前线程等待时间超过了 n 毫秒,当前线程从 TIMED_WAITING –> RUNNABLE



情况 8 RUNNABLE <–> TIMED_WAITING

  • 当前线程调用 LockSupport.parkNanos(long nanos) 或 LockSupport.parkUntil(long millis) 时,当前线

    程从 RUNNABLE –> TIMED_WAITING

  • 调用 LockSupport.unpark(目标线程) 或调用了线程 的 interrupt() ,或是等待超时,会让目标线程从

    TIMED_WAITING–> RUNNABLE



情况 9 RUNNABLE <–> BLOCKED

  • t 线程用 synchronized(obj) 获取了对象锁时如果竞争失败,从 RUNNABLE –> BLOCKED

  • 持 obj 锁线程的同步代码块执行完毕,会唤醒该对象上所有 BLOCKED 的线程重新竞争,如果其中 t 线程竞争

    成功,从 BLOCKED –> RUNNABLE ,其它失败的线程仍然 BLOCKED



情况 10 RUNNABLE <–> TERMINATED

当前线程所有代码运行完毕,进入 TERMINATED



9、死锁、活锁、饥饿



9.1 死锁

一个线程需要同时获取多把锁,这时就容易发生死锁 。如下代码,就产生了死锁。

package com.yyds.juc.lock;

import lombok.extern.slf4j.Slf4j;
import static java.lang.Thread.sleep;

@Slf4j(topic = "c.DeadLockTest")
public class DeadLockTest {
    public static void main(String[] args) {

        Object A = new Object();
        Object B = new Object();

        Thread t1 = new Thread(() -> {
            synchronized (A) {
                log.debug("lock A");
                try {
                    sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                synchronized (B) {
                    log.debug("lock B");
                    log.debug("操作...");
                }
            }
        }, "t1");

        Thread t2 = new Thread(() -> {
            synchronized (B) {
                log.debug("lock B");
                try {
                    sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                synchronized (A) {
                    log.debug("lock A");
                    log.debug("操作...");
                }
            }
        }, "t2");

        t1.start();
        t2.start();
    }
}

// 测试结果如下
21:47:10.154 c.DeadLockTest [t1] - lock A
21:47:10.154 c.DeadLockTest [t2] - lock B
# 定位死锁
# 检测死锁可以使用 jconsole工具,或者使用 jps 定位进程 id,再用 jstack 定位死锁

D:\juc\src\main\java\com\yyds\juc\monitor>jps
88304 Launcher
91744 DeadLockTest
92320 Jps
91716 KotlinCompileDaemon
41752 RemoteMavenServer36
43036

D:\juc\src\main\java\com\yyds\juc\monitor>jstack 91744
2023-03-08 21:50:08
Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.102-b14 mixed mode):

"DestroyJavaVM" #14 prio=5 os_prio=0 tid=0x0000000002853800 nid=0x16640 waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"t2" #13 prio=5 os_prio=0 tid=0x000000001fcf5800 nid=0x149ec waiting for monitor entry [0x00000000202cf000]
   java.lang.Thread.State: BLOCKED (on object monitor)              # t2线程阻塞
        at com.yyds.juc.lock.DeadLockTest.lambda$main$1(DeadLockTest.java:38)
        - waiting to lock <0x000000076c8b0030> (a java.lang.Object) # 等待锁0x000000076c8b0030
        - locked <0x000000076c8b0040> (a java.lang.Object)          # t2锁住了0x000000076c8b0040
        at com.yyds.juc.lock.DeadLockTest$$Lambda$2/706277948.run(Unknown Source)
        at java.lang.Thread.run(Thread.java:745)

"t1" #12 prio=5 os_prio=0 tid=0x000000001fcf5000 nid=0x16174 waiting for monitor entry [0x00000000201cf000]
   java.lang.Thread.State: BLOCKED (on object monitor)               # t1线程阻塞
        at com.yyds.juc.lock.DeadLockTest.lambda$main$0(DeadLockTest.java:23)
        - waiting to lock <0x000000076c8b0040> (a java.lang.Object)  # 等待锁0x000000076c8b0040
        - locked <0x000000076c8b0030> (a java.lang.Object)           # t2锁住了0x000000076c8b0030
        at com.yyds.juc.lock.DeadLockTest$$Lambda$1/687241927.run(Unknown Source)
        at java.lang.Thread.run(Thread.java:745)

......


Found one Java-level deadlock:
=============================
"t2":
  waiting to lock monitor 0x000000001c4beae8 (object 0x000000076c8b0030, a java.lang.Object),
  which is held by "t1"
"t1":
  waiting to lock monitor 0x000000001c4c0f58 (object 0x000000076c8b0040, a java.lang.Object),
  which is held by "t2"

Java stack information for the threads listed above:
===================================================
"t2":
        at com.yyds.juc.lock.DeadLockTest.lambda$main$1(DeadLockTest.java:38)
        - waiting to lock <0x000000076c8b0030> (a java.lang.Object)
        - locked <0x000000076c8b0040> (a java.lang.Object)
        at com.yyds.juc.lock.DeadLockTest$$Lambda$2/706277948.run(Unknown Source)
        at java.lang.Thread.run(Thread.java:745)
"t1":
        at com.yyds.juc.lock.DeadLockTest.lambda$main$0(DeadLockTest.java:23)
        - waiting to lock <0x000000076c8b0040> (a java.lang.Object)
        - locked <0x000000076c8b0030> (a java.lang.Object)
        at com.yyds.juc.lock.DeadLockTest$$Lambda$1/687241927.run(Unknown Source)
        at java.lang.Thread.run(Thread.java:745)

Found 1 deadlock.

在这里插入图片描述



9.2 活锁

活锁出现在两个线程互相改变对方的结束条件,最后谁也无法结束。

package com.yyds.juc.lock;

import lombok.extern.slf4j.Slf4j;

import static java.lang.Thread.sleep;

@Slf4j(topic = "c.LiveLockTest")
public class LiveLockTest {

    static volatile int count = 10;
    static final Object lock = new Object();

    public static void main(String[] args) {
        new Thread(() -> {
             // 期望减到 0 退出循环
            while (count > 0) {
                try {
                    sleep(200);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                count--;
                log.debug("count: {}", count);
            }
        }, "t1").start();

        
        new Thread(() -> {
            // 期望超过 20 退出循环
            while (count < 20) {
                try {
                    sleep(200);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                count++;
                log.debug("count: {}", count);
            }
        }, "t2").start();
    }
}



9.3 饥饿

使用顺序加锁的方式,解决之前的死锁问题。

下面图展示了死锁问题

在这里插入图片描述

通过顺序加锁进行解决

在这里插入图片描述


饥饿是一个线程由于优先级太低,始终得不到 CPU 调度执行,也不能够结束 。

顺序加锁可能会产生饥饿。



10、ReentrantLock

// 获取锁
reentrantLock.lock();
try {
	// 临界区
} finally {
	// 释放锁
	reentrantLock.unlock();
}

相对于 synchronized 它具备如下特点

  • 可中断

  • 可以设置超时时间

  • 可以设置为公平锁

  • 支持多个条件变量


    不过,与 synchronized 一样,都支持可重入



10.1 可重入


可重入是指同一个线程如果首次获得了这把锁,那么因为它是这把锁的拥有者,因此有权利再次获取这把锁


如果是不可重入锁,那么第二次获得锁时,自己也会被锁挡住

package com.yyds.juc.lock;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.locks.ReentrantLock;

@Slf4j(topic = "c.ReentrantLockTest")
public class ReentrantLockTest {
    static ReentrantLock lock = new ReentrantLock();
    public static void main(String[] args) {
        method1();
    }
    public static void method1() {
        lock.lock();
        try {
            log.debug("execute method1");
            method2();
        } finally {
            lock.unlock();
        }
    }
    public static void method2() {
        lock.lock();
        try {
            log.debug("execute method2");
            method3();
        } finally {
            lock.unlock();
        }
    }
    public static void method3() {
        lock.lock();
        try {
            log.debug("execute method3");
        } finally {
            lock.unlock();
        }
    }
}
// 测试结果
22:25:22.766 c.ReentrantLockTest [main] - execute method1
22:25:22.767 c.ReentrantLockTest [main] - execute method2
22:25:22.767 c.ReentrantLockTest [main] - execute method3



10.2 可打断

package com.yyds.juc.lock;


import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.locks.ReentrantLock;

import static java.lang.Thread.sleep;

@Slf4j(topic = "c.ReentrantLockInterrupt")
public class ReentrantLockInterrupt {

    public static void main(String[] args) {

        ReentrantLock lock = new ReentrantLock();

        Thread t1 = new Thread(() -> {
            log.debug("启动...");
            try {
                // 注意如果是不可中断模式,那么即使使用了 interrupt 也不会让等待中断
                lock.lockInterruptibly();
            } catch (InterruptedException e) {
                e.printStackTrace();
                log.debug("等锁的过程中被打断");
                return;
            }
        },"t1");

        lock.lock();
        log.debug("获得了锁");
        t1.start();
        try {
            sleep(1000);
            t1.interrupt();
            log.debug("执行打断");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
}

// 测试结果如下
11:17:34.554 c.ReentrantLockInterrupt [main] - 获得了锁
11:17:34.554 c.ReentrantLockInterrupt [t1] - 启动...
11:17:35.555 c.ReentrantLockInterrupt [main] - 执行打断
11:17:35.555 c.ReentrantLockInterrupt [t1] - 等锁的过程中被打断
java.lang.InterruptedException
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireInterruptibly(AbstractQueuedSynchronizer.java:898)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchronizer.java:1222)
	at java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:335)
	at com.yyds.juc.lock.ReentrantLockInterrupt.lambda$main$0(ReentrantLockInterrupt.java:20)
	at java.lang.Thread.run(Thread.java:745)



10.3 锁超时

package com.yyds.juc.lock;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

import static java.lang.Thread.sleep;

@Slf4j(topic = "c.ReentrantLockOverTime")
public class ReentrantLockOverTime {
    public static void main(String[] args) throws InterruptedException {

        ReentrantLock lock = new ReentrantLock();
        
        Thread t1 = new Thread(() -> {

            log.debug("启动...");
            try {
                if(!lock.tryLock(2L, TimeUnit.SECONDS)){
                    log.debug("获取锁超过2S,获取失败,返回");
                    return;
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            try {
                log.debug("获得了锁");
            } finally {
                lock.unlock();
            }
        },"t1");

        // 主线程先获取锁
        lock.lock();
        log.debug("获得了锁");
        t1.start();
        try {
          sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }

    }
}
// 测试结果如下
11:37:26.249 c.ReentrantLockOverTime [main] - 获得了锁
11:37:26.249 c.ReentrantLockOverTime [t1] - 启动...
11:37:28.263 c.ReentrantLockOverTime [t1] - 获取锁超过2S,获取失败,返回



10.3.1 哲学家就餐问题

有五位哲学家,围坐在圆桌旁。

  • 他们只做两件事,思考和吃饭,思考一会吃口饭,吃完饭后接着思考。

  • 吃饭时要用两根筷子吃,桌上共有 5 根筷子,每位哲学家左右手边各有一根筷子。

  • 如果筷子被身边的人拿着,自己就得等待


筷子类

package com.yyds.juc.lock;

/**
 * 筷子类
 */
public class Chopstick {
    String name;

    public Chopstick(String name) {
        this.name = name;
    }

    @Override
    public String toString() {
        return "筷子{" + name + '}';
    }
}


哲学家类

package com.yyds.juc.lock;

import lombok.extern.slf4j.Slf4j;

/**
 * 哲学家类
 */
@Slf4j(topic = "c.Philosopher")
public class Philosopher extends Thread {
    Chopstick left;
    Chopstick right;

    public Philosopher(String name, Chopstick left, Chopstick right) {
        super(name);
        this.left = left;
        this.right = right;
    }

    private void eat() throws InterruptedException {
        log.debug("eating...");
        sleep(1000);
    }

    @Override
    public void run()  {
        while (true) {
            // 获得左手筷子
            synchronized (left) {
                // 获得右手筷子
                synchronized (right) {
                    // 吃饭
                    try {
                        eat();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
               // 放下右手筷子
            }
            // 放下左手筷子
        }
    }
}


测试类

package com.yyds.juc.lock;


public class PhilosopherTest {
    public static void main(String[] args) {
        Chopstick c1 = new Chopstick("1");
        Chopstick c2 = new Chopstick("2");
        Chopstick c3 = new Chopstick("3");
        Chopstick c4 = new Chopstick("4");
        Chopstick c5 = new Chopstick("5");
        new Philosopher("苏格拉底", c1, c2).start();
        new Philosopher("柏拉图", c2, c3).start();
        new Philosopher("亚里士多德", c3, c4).start();
        new Philosopher("赫拉克利特", c4, c5).start();
        new Philosopher("阿基米德", c5, c1).start();
    }
}
// 会发现执行后,出现死锁



10.3.2 利用 tryLock解决哲学家就餐问题


筷子类继承ReentrantLock

package com.yyds.juc.lock;

import java.util.concurrent.locks.ReentrantLock;

/**
 * 筷子类
 */
public class ChopstickV2 extends ReentrantLock {
    String name;

    public ChopstickV2(String name) {
        this.name = name;
    }

    @Override
    public String toString() {
        return "筷子{" + name + '}';
    }
}


哲学家类利用tryLock

package com.yyds.juc.lock;

import lombok.extern.slf4j.Slf4j;

/**
 * 哲学家类
 */
@Slf4j(topic = "c.PhilosopherV2")
public class PhilosopherV2 extends Thread {
    ChopstickV2 left;
    ChopstickV2 right;

    public PhilosopherV2(String name, ChopstickV2 left, ChopstickV2 right) {
        super(name);
        this.left = left;
        this.right = right;
    }

    private void eat() throws InterruptedException {
        log.debug("eating...");
        sleep(1000);
    }

    @Override
    public void run()  {
        while (true) {
            // 尝试获得左手筷子
            if(left.tryLock()){
                try {
                    // 尝试获取右手筷子
                    if(right.tryLock()){
                        try {
                            eat();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        } finally {
                            right.unlock();
                        }
                    }
                } finally {
                    // 右手筷子没有获取,就释放左手筷子
                    left.unlock();
                }
            }
        }
    }
}


测试类

package com.yyds.juc.lock;


public class PhilosopherTestV2 {
    public static void main(String[] args) {
        ChopstickV2 c1 = new ChopstickV2("1");
        ChopstickV2 c2 = new ChopstickV2("2");
        ChopstickV2 c3 = new ChopstickV2("3");
        ChopstickV2 c4 = new ChopstickV2("4");
        ChopstickV2 c5 = new ChopstickV2("5");
        new PhilosopherV2("苏格拉底", c1, c2).start();
        new PhilosopherV2("柏拉图", c2, c3).start();
        new PhilosopherV2("亚里士多德", c3, c4).start();
        new PhilosopherV2("赫拉克利特", c4, c5).start();
        new PhilosopherV2("阿基米德", c5, c1).start();
    }
}



10.4 条件变量

synchronized 中也有条件变量,就是那个 waitSet 休息室,当条件不满足时进入 waitSet 等待

ReentrantLock 的条件变量比 synchronized 强大之处在于,它是支持

多个条件变量

的,这就好比

  • synchronized 是那些不满足条件的线程都在一间休息室等消息

  • 而 ReentrantLock 支持多间休息室,有专门等烟的休息室、专门等早餐的休息室、唤醒时也是按休息室来唤醒


使用要点:

  • await 前需要获得锁

  • await 执行后,会释放锁,进入 conditionObject 等待

  • await 的线程被唤醒(或打断、或超时)取重新竞争 lock 锁

  • 竞争 lock 锁成功后,从 await 后继续执行

package com.yyds.juc.lock;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

import static java.lang.Thread.sleep;

@Slf4j(topic = "c.ReentracntLockCondition")
public class ReentracntLockCondition {

    static ReentrantLock lock = new ReentrantLock();
    static Condition waitCigaretteQueue = lock.newCondition();
    static Condition waitbreakfastQueue = lock.newCondition();
    static volatile boolean hasCigrette = false;
    static volatile boolean hasBreakfast = false;


    public static void main(String[] args) throws InterruptedException {

        new Thread(() -> {
            try {
                lock.lock();
                while (!hasCigrette) {
                    try {
                        waitCigaretteQueue.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }

                log.debug("等到了烟");
            } finally {
                lock.unlock();
            }
        }, "烟").start();


        new Thread(() -> {
            try {
                lock.lock();

                while (!hasBreakfast) {
                    try {
                        waitbreakfastQueue.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }

                log.debug("等到了早餐");
            } finally {
                lock.unlock();
            }
        }, "早餐").start();


        sleep(1000);
        sendBreakfast();
        sleep(1000);
        sendCigarette();
    }

    private static void sendCigarette() {
        lock.lock();
        try {
            log.debug("送烟来了");
            hasCigrette = true;
            waitCigaretteQueue.signal();
        } finally {
            lock.unlock();
        }
    }

    private static void sendBreakfast() {
        lock.lock();
        try {
            log.debug("送早餐来了");
            hasBreakfast = true;
            waitbreakfastQueue.signal();
        } finally {
            lock.unlock();
        }
    }
}
// 测试结果如下
15:49:12.650 c.ReentracntLockCondition [main] - 送早餐来了
15:49:12.652 c.ReentracntLockCondition [早餐] - 等到了早餐
15:49:13.655 c.ReentracntLockCondition [main] - 送烟来了
15:49:13.655 c.ReentracntLockCondition [] - 等到了烟



10.5 线程之间的顺序控制



10.5.1 通过wait/notify固定顺序控制


先 2 后 1 打印

package com.yyds.juc.order;

import lombok.extern.slf4j.Slf4j;

@Slf4j(topic = "c.WaitNotifyOrder")
public class WaitNotifyOrder {

    static Object obj = new Object();
    // t2 运行标记, 代表 t2 是否执行过
    static boolean t2runed = false;

    public static void main(String[] args) {

        new Thread(() -> {
            synchronized (obj){
                // t2没有执行过,就等待
                while (!t2runed){
                    try {
                        obj.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
            log.debug("1");
        },"t1").start();


        new Thread(() -> {
            log.debug("2");
            synchronized (obj){
                // 修改运行标记
                t2runed = true;
                // 通知 obj 上等待的线程(可能有多个,因此需要用 notifyAll)
                obj.notifyAll();
            }
        },"t2").start();

    }
}
// 测试结果如下
17:40:08.113 c.WaitNotifyOrder [t2] - 2
17:40:08.115 c.WaitNotifyOrder [t1] - 1



10.5.2 通过park/unpark固定顺序控制


先 2 后 1 打印

package com.yyds.juc.order;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.locks.LockSupport;

@Slf4j(topic = "c.ParkUnparkOrder")
public class ParkUnparkOrder {
    public static void main(String[] args) {

        /**
         * 没有『许可』时,当前线程暂停运行;
         * 有『许可』时,用掉这个『许可』,当前线程恢复运行
         */
        Thread t1 =  new Thread(() -> {
            // 暂停当前线程
            LockSupport.park();
            log.debug("t1");
        },"t1");


        /**
         * 给线程 t1 发放『许可』(多次连续调用 unpark 只会发放一个『许可』)
         */
        Thread t2 =   new Thread(() -> {
            log.debug("t2");
            LockSupport.unpark(t1);
        },"t1");


        t1.start();
        t2.start();
    }
}
// 测试结果如下
17:33:37.742 c.ParkUnparkOrder [t1] - t2
17:33:37.742 c.ParkUnparkOrder [t1] - t1



10.5.3 通过wait/notify交替输出

线程 1 输出 a 5 次,线程 2 输出 b 5 次,线程 3 输出 c 5 次。现在要求输出 abcabcabcabcabc 怎么实现

package com.yyds.juc.order;


public class SyncWaitNotify {

    private int flag;
    private int loopNumber;

    public SyncWaitNotify(int flag, int loopNumber) {
        this.flag = flag;
        this.loopNumber = loopNumber;
    }

    public void print(int waitFlag, int nextFlag, String str) {
        for (int i = 0; i < loopNumber; i++) {
            synchronized (this) {
                while (this.flag != waitFlag) {
                    try {
                        this.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                System.out.print(str);
                flag = nextFlag;
                this.notifyAll();
            }
        }
    }
}
package com.yyds.juc.order;

import lombok.extern.slf4j.Slf4j;

@Slf4j(topic = "c.WaitNotifyOneByOne")
public class WaitNotifyOneByOne {



  public static void main(String[] args) {

      SyncWaitNotify syncWaitNotify = new SyncWaitNotify(1, 5);
      new Thread(() -> {
          syncWaitNotify.print(1, 2, "a");
      }).start();
      new Thread(() -> {
          syncWaitNotify.print(2, 3, "b");
      }).start();
      new Thread(() -> {
          syncWaitNotify.print(3, 1, "c");
      }).start();


  }
}
// 测试结果如下
abcabcabcabcabc



10.5.4 通过ReentrantLock交替输出

package com.yyds.juc.order;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class ReentrantOneByOne {
    public static void main(String[] args) throws InterruptedException {


        AwaitSignal as = new AwaitSignal(5);
        Condition aWaitSet = as.newCondition();
        Condition bWaitSet = as.newCondition();
        Condition cWaitSet = as.newCondition();

        new Thread(() -> {
            as.print("a", aWaitSet, bWaitSet);
        }).start();

        new Thread(() -> {
            as.print("b", bWaitSet, cWaitSet);
        }).start();

        new Thread(() -> {
            as.print("c", cWaitSet, aWaitSet);
        }).start();


        // 先唤醒a线程
        Thread.sleep(1000);
        as.lock();
        try {
            aWaitSet.signal();
        }finally {
            as.unlock();
        }
        
    }
}
// 测试结果如下
abcabcabcabcabc


class AwaitSignal extends ReentrantLock{

    // 循环次数
    private int loopNumber;

    public AwaitSignal(int loopNumber) {
        this.loopNumber = loopNumber;
    }


    /**
     *
     * @param str 要打印的内容
     * @param current 当前休息室
     * @param next 下一个休息室
     */
    public void print(String str, Condition current, Condition next) {
        for (int i = 0; i < loopNumber; i++) {
            this.lock();
            try {
                current.await();
                System.out.print(str);
                next.signal();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                this.unlock();
            }
        }
    }


}



10.5.5 通过park/unpark交替输出

package com.yyds.juc.order;

import java.util.concurrent.locks.LockSupport;

public class SyncPark {
    private int loopNumber;
    private Thread[] threads;

    public SyncPark(int loopNumber) {
        this.loopNumber = loopNumber;
    }

    public void setThreads(Thread... threads) {
        this.threads = threads;
    }

    public void print(String str) {
        for (int i = 0; i < loopNumber; i++) {
            LockSupport.park();
            System.out.print(str);
            LockSupport.unpark(nextThread());
        }
    }

    private Thread nextThread() {
        Thread current = Thread.currentThread();
        int index = 0;
        for (int i = 0; i < threads.length; i++) {
            if(threads[i] == current) {
                index = i;
                break;
            }
        }
        if(index < threads.length - 1) {
            return threads[index+1];
        } else {
            return threads[0];
        }
    }

    public void start() {
        for (Thread thread : threads) {
            thread.start();
        }
        LockSupport.unpark(threads[0]);
    }

}




class TestUnpark{
    public static void main(String[] args) {
        SyncPark syncPark = new SyncPark(5);
        Thread t1 = new Thread(() -> {
            syncPark.print("a");
        });
        Thread t2 = new Thread(() -> {
            syncPark.print("b");
        });
        Thread t3 = new Thread(() -> {
            syncPark.print("c\n");
        });
        syncPark.setThreads(t1, t2, t3);
        syncPark.start();
    }
}


// 测试结果如下
abc
abc
abc
abc
abc



版权声明:本文为qq_44665283原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。