文章目录
1、保护性暂停模式
1.1、概述
旨在用一个线程等待另一个线程的执行结果,因为要等待另一方的结果,因此归类到同步模式
-
有一个结果需要从一个线程传递到另一个线程,让他们关联同一个
GuardedObject
-
如果有结果不断从一个线程到另一个线程,保护性暂停模式便不再适用,可以使用消息队列(见生产者/消费者)
JDK
中,
join
的实现、
Future
的实现,采用的就是此模式
1.2、单任务版GuardedObject
class GuardedObject {
/**
* 对应的结果对象
*/
private Object response;
/**
* 阻塞获取结果
*
* @return Object 返回获取到的结果
*/
public Object get() {
synchronized (this) {
while (response == null) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return response;
}
}
/**
* 带超时时间的阻塞获取结果
*
* @param timeout 超时时间
* @return Object 返回的结果
*/
public Object get(long timeout) {
synchronized (this) {
long beginTime = System.currentTimeMillis();
long passTime = 0;
while (response == null) {
long waitTime = timeout - passTime;
if (waitTime <= 0) {
break;
}
try {
// 重点,避免虚假唤醒导致睡眠时间延长
this.wait(waitTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
passTime = System.currentTimeMillis() - beginTime;
}
return response;
}
}
/**
* 完成后赋值
*
* @param response 响应对象
*/
public void complete(Object response) {
synchronized (this) {
this.response = response;
}
}
}
为什么要讲这个模式,观察一下
Thread
中的
join
方法,你就明白了
是不是很熟悉?
1.3、多任务版GuardedObject
图中
Futures
就好比居民楼一层的信箱(每个信箱有房间编号),左侧的
t0
,
t2
,
t4
就好比等待邮件的居民,右侧的
t1
,
t3
,
t5
就好比邮递员
如果需要在多个类之间使用
GuardedObject
对象,作为参数传递不是很方便,因此设计一个用来解耦的中间类,这样不仅能够解耦【结果等待者】和【结果生产者】,还能够同时支持多个任务的管理
@Slf4j
public class Test {
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 3; i++) {
new People().start();
}
TimeUnit.SECONDS.sleep(1);
for (Integer id : Mailboxes.getIds()) {
new Postman(id, "内容" + id).start();
}
}
}
@Slf4j
class People extends Thread {
@Override
public void run() {
// 收信
GuardedObject guardedObject = Mailboxes.createGuardedObject();
log.info("开始收信 id:{}", guardedObject.getId());
Object mail = guardedObject.get(5000);
log.info("收到信 id:{}, 内容:{}", guardedObject.getId(), mail);
}
}
@Slf4j
class Postman extends Thread {
private final int id;
private final String mail;
public Postman(int id, String mail) {
this.id = id;
this.mail = mail;
}
@Override
public void run() {
GuardedObject guardedObject = Mailboxes.getGuardedObject(id);
log.info("送信 id:{}, 内容:{}", id, mail);
guardedObject.complete(mail);
}
}
class Mailboxes {
private static final Map<Integer, GuardedObject> BOXES = new Hashtable<>();
private static final AtomicInteger ID = new AtomicInteger(1);
/**
* 产生唯一 id
*
* @return int id
*/
private static synchronized int generateId() {
return ID.incrementAndGet();
}
public static GuardedObject getGuardedObject(int id) {
return BOXES.remove(id);
}
public static GuardedObject createGuardedObject() {
GuardedObject go = new GuardedObject(generateId());
BOXES.put(go.getId(), go);
return go;
}
public static Set<Integer> getIds() {
return BOXES.keySet();
}
}
@Data
@NoArgsConstructor
@AllArgsConstructor
class GuardedObject {
/**
* 标识 Guarded Object
*/
private int id;
public GuardedObject(int id) {
this.id = id;
}
/**
* 对应的结果对象
*/
private Object response;
/**
* 阻塞获取结果
*
* @return Object 返回获取到的结果
*/
public Object get() {
synchronized (this) {
while (response == null) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return response;
}
}
/**
* 带超时时间的阻塞获取结果
*
* @param timeout 超时时间
* @return Object 返回的结果
*/
public Object get(long timeout) {
synchronized (this) {
long beginTime = System.currentTimeMillis();
long passTime = 0;
while (response == null) {
long waitTime = timeout - passTime;
if (waitTime <= 0) {
break;
}
try {
// 重点,避免虚假唤醒导致睡眠时间延长
this.wait(waitTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
passTime = System.currentTimeMillis() - beginTime;
}
return response;
}
}
/**
* 完成后赋值
*
* @param response 响应对象
*/
public void complete(Object response) {
synchronized (this) {
this.response = response;
}
}
}
2、生产者消费者模式
2.1、概述
与前面的保护性暂停中的
GuardObject
不同,不需要产生结果和消费结果的线程一一对应,所以生产者消费者模型应该归类到异步模型
-
消费队列可以用来平衡生产和消费的线程资源 -
生产者仅负责产生结果数据,不关心数据该如何处理,而消费者专心处理结果数据 -
消息队列是有容量限制的,满时不会再加入数据,空时不会再消耗数据
JDK
中各种阻塞队列,采用的就是这种模式
2.2、代码实现
@Slf4j
public class Test {
public static void main(String[] args) {
MessageQueue queue = new MessageQueue(2);
for (int i = 0; i < 3; i++) {
int id = i;
new Thread(() -> {
queue.put(new Message(id, "值" + id));
}, "生产者" + i).start();
}
new Thread(() -> {
while (true) {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
Message message = queue.take();
log.info("消费者拿到消息 {}", message);
}
}, "消费者").start();
}
}
// 消息队列类 , java 线程之间通信
@Slf4j
class MessageQueue {
/**
* 消息的队列集合
*/
private final LinkedList<Message> list = new LinkedList<>();
/**
* 队列容量
*/
private final int capcity;
public MessageQueue(int capcity) {
this.capcity = capcity;
}
/**
* 获取消息
*
* @return Message 消息
*/
public Message take() {
// 检查队列是否为空
synchronized (list) {
while (list.isEmpty()) {
try {
log.info("队列为空, 消费者线程等待");
list.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 从队列头部获取消息并返回
Message message = list.removeFirst();
log.info("已消费消息 {}", message);
list.notifyAll();
return message;
}
}
/**
* 存入消息
*
* @param message 消息
*/
public void put(Message message) {
synchronized (list) {
// 检查对象是否已满
while (list.size() == capcity) {
try {
log.info("队列已满, 生产者线程等待");
list.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 将消息加入队列尾部
list.addLast(message);
log.info("已生产消息 {}", message);
list.notifyAll();
}
}
}
/**
* 构造方法初始化后,不允许修改属性,保证线程安全
*/
final class Message {
private final int id;
private final Object value;
public Message(int id, Object value) {
this.id = id;
this.value = value;
}
public int getId() {
return id;
}
public Object getValue() {
return value;
}
@Override
public String toString() {
return "Message{" +
"id=" + id +
", value=" + value +
'}';
}
}
3、终止模式之两阶段终止
在一个线程
T1
中如何优雅的终止线程
T2
?也就是说,
T1
终止
T2
后,还需要留给
T2
处理后续事情的机会。
我们不能直接使用
T1.stop()
方法来直接停止线程,因为这个方法会立刻杀死线程,如果此时
T1
正好锁住了一个共享资源,那么它死后锁将永远得不到释放,使得其他线程永远无法获得锁
-
其中一种做法便是使用借助
interrupt
方法来实现
/**
* @Author 两米以下皆凡人
* @create 2022/2/7 15:23
*/
@Slf4j
public class TwoPhaseTermination {
/**
* 两阶段停止监控线程
*/
private Thread monitorThread;
/**
* 开启线程
*/
public void start() {
monitorThread = new Thread(() -> {
Thread curThread = Thread.currentThread();
while (true) {
if (!curThread.isInterrupted()) {
log.info("正常运行。。");
} else {
log.info("料理后事。。");
break;
}
log.info("执行监控。。");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
//在监控线程sleep的时候打断,会抛出InterruptedException异常,然后打断标记清除,所以需要重新设置好打断标记
curThread.interrupt();
}
}
}, "monitor");
monitorThread.start();
}
/**
* 优雅的停止线程
*/
public void stop() {
monitorThread.interrupt();
}
}
//测试
TwoPhaseTermination twoPhaseTermination = new TwoPhaseTermination();
twoPhaseTermination.start();
log.info("开始执行..");
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
twoPhaseTermination.stop();
log.info("准备结束..");
如上方法确实能够正确运行,但是实现起来多少有点繁琐,还需要考虑线程的打断标记,正常情况下,打断后,标记就会为
true
,如果是
sleep
的时候被打断,因为会重置打断标记,使我们不得不重新设置打断标记,对于程序维护来说比较麻烦
-
volatile
实现
@Slf4j
public class TwoPhaseTermination {
/**
* 两阶段停止监控线程
*/
private Thread monitorThread;
private volatile boolean stopTag;
/**
* 开启线程
*/
public void start() {
stopTag = false;
monitorThread = new Thread(() -> {
while (true) {
if (!stopTag) {
log.info("正常运行。。");
} else {
log.info("料理后事。。");
break;
}
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("执行监控。。");
}
}, "monitor");
monitorThread.start();
}
/**
* 优雅的停止线程
*/
public void stop() {
stopTag = true;
//如果正好在睡觉直接打断
monitorThread.interrupt();
}
}
4、同步模式之犹豫Balking
Balking
(犹豫)模式用在一个线程发现另一个线程或本线程已经做了某一件相同的事,那么本线程就无需再做了,直接结束返回,就拿两阶段终止模式的代码举例,如果我们现在监控了某一个系统资源的回收,只需要使用一个线程就可以了,那如果多个线程调用多次
start
方法,那么每次都会创建一个新的线程去监控,这样是不合理的,所以就可以使用一个开始标记来判断是否需要执行
start
方法
-
但是我们还能使用
volatile
关键字去做吗?答案是否定的,因为这里可能涉及到多个线程对开始标记进行操作,还是不能保证原子性,所以我们只好使用
synchronized
关键字来实现了
@Slf4j
public class TwoPhaseTermination {
/**
* 两阶段停止监控线程
*/
private Thread monitorThread;
private volatile boolean startTag = false;
private boolean stopTag;
/**
* 开启线程
*/
public void start() {
synchronized (TwoPhaseTermination.class) {
if (startTag) {
log.info("启动过了么 {} ", startTag);
return;
}
startTag = true;
}
startTag = true;
stopTag = false;
monitorThread = new Thread(() -> {
while (true) {
if (!stopTag) {
log.info("正常运行。。");
} else {
log.info("料理后事。。");
break;
}
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("执行监控。。");
}
}, "monitor");
monitorThread.start();
log.info("正在启动:");
}
/**
* 优雅的停止线程
*/
public void stop() {
stopTag = true;
//如果正好在睡觉直接打断
monitorThread.interrupt();
}
}
其实懒加载的单例模式,就是这个原理
public final class Singleton {
private Singleton() {
}
private static Singleton INSTANCE = null;
public static synchronized Singleton getInstance() {
if (INSTANCE != null) {
return INSTANCE;
}
INSTANCE = new Singleton();
return INSTANCE;
}
}
5、双重检查加锁单例模式
首先看一下懒汉式的单例模式
public final class Singleton {
private Singleton() {
}
private static Singleton INSTANCE = null;
public static synchronized Singleton getInstance() {
if (INSTANCE != null) {
return INSTANCE;
}
INSTANCE = new Singleton();
return INSTANCE;
}
}
加载静态方法上的
synchronized
实际上是给类
class
加的锁
public final class Singleton {
private Singleton() {
}
private static Singleton INSTANCE = null;
public static Singleton getInstance() {
synchronized{
if (INSTANCE != null) {
return INSTANCE;
}
INSTANCE = new Singleton();
return INSTANCE;
}
}
}
上述实现有一个问题:对于第一次进入该方法,加锁后实例化,没有问题,但是后续已经创建完成了,每次
getInstance
都需要加锁,性能太低了,所以我们便有了如下改进
public final class Singleton {
private Singleton() {
}
private static Singleton INSTANCE = null;
public static Singleton getInstance() {
if(INSTANCE == null) {
synchronized{
if (INSTANCE == null) { // t1
INSTANCE = new Singleton();
}
}
}
return INSTANCE;
}
}
但是上述代码,还是存在一定的问题,我们一起来观察它对应的字节码
0: getstatic #2 // Field INSTANCE:Lcom/phz/test/Singleton;
3: ifnonnull 37
6: ldc #3 // 获得类对象 class com/phz/test/Singleton,用于加锁
8: dup // 复制引用地址
9: astore_0 // 存储一份,用于解锁
10: monitorenter // 创建 Monitor 开始进入同步代码块
11: getstatic #2 // Field INSTANCE:Lcom/phz/test/Singleton;
14: ifnonnull 27
17: new #3 // class com/phz/test/Singleton
20: dup
21: invokespecial #4 // Method "<init>":()V
24: putstatic #2 // Field INSTANCE:Lcom/phz/test/Singleton;
27: aload_0 // 把锁对象拿出来,用于解锁
28: monitorexit
29: goto 37
32: astore_1
33: aload_0
34: monitorexit
35: aload_1
36: athrow
37: getstatic #2 // Field INSTANCE:Lcom/phz/test/Singleton;
40: areturn
其中
- 17 表示创建对象,将对象引用入栈
- 20 表示复制一份对象引用,因为调用构造方法还会消耗一份
- 21 表示利用一个对象引用,调用构造方法,消耗一份
-
24 表示利用一个对象引用,赋值给
static
INSTANCE
,将最后一份引用消耗
但是 JIT 可能会优化为:先执行 24,再执行 21。如果两个线程 t1,t2 按如下时间序列执行:
按照代码来看就是,第一个线程进入,由于
INSTANCE
还为空,所以获取到了锁,代码执行到了
INSTANCE = new Singleton();
此时先将引用传递给了
INSTANCE
,但是还没有执行
new
操作(或者操作太多,时间有点长),此时第二个线程也来了,此时判断
INSTANCE
已经不为空了,就直接
return
,然后去使用这个空对象,最后第一个线程才执行
new
操作,然后释放锁
这个问题出现的原因,就是因为指令的重排序,所以我们为了保证程序的正确性,需要禁止指令重排序,我们可以给
INSTANCE
变量加上
volatile
关键字
6、异步模式之工作线程
6.1、定义
让有限的工作线程(
Worker Thread
)来轮流异步处理无限多的任务。也可以将其归类为分工模式,它的典型实现就是线程池,也体现了经典设计模式中的享元模式。
注意,不同任务类型应该使用不同的线程池,这样能够避免饥饿,并能提升效率
6.2、饥饿
固定大小线程池会有饥饿现象
- 两个工人是同一个线程池中的两个线程
-
他们要做的事情是:为客人点餐和到后厨做菜,这是两个阶段的工作
- 客人点餐:必须先点完餐,等菜做好,上菜,在此期间处理点餐的工人必须等待(也就是说)
- 后厨做菜:没啥说的,做就是了
-
比如工人
A
处理了点餐任务,接下来它要等着工人
B
把菜做好,然后上菜,他俩也配合的蛮好 -
但现在同时来了两个客人,这个时候工人
A
和工人
B
都去处理点餐了,这时没人做饭了,饥饿
static final List<String> MENU = Arrays.asList("地三鲜", "宫保鸡丁", "辣子鸡丁", "烤鸡翅");
static Random RANDOM = new Random();
static String cooking() {
return MENU.get(RANDOM.nextInt(MENU.size()));
}
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(2);
executorService.execute(() -> {
log.info("处理点餐...");
Future<String> f = executorService.submit(() -> {
log.info("做菜");
return cooking();
});
try {
log.info("上菜: {}", f.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
});
executorService.execute(() -> {
log.info("处理点餐...");
Future<String> f = executorService.submit(() -> {
log.info("做菜");
return cooking();
});
try {
log.info("上菜: {}", f.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
});
}
使用
jconsole
工具检测一下
可以发现,这并不是死锁,只是由于线程数不足,导致的饥饿
如何解决呢?简单想,那就是增加几个线程(工人),但是这不能解决根本上的问题,不同任务类型应该使用不同的线程池,只有这样才能避免饥饿,并且还能够提升效率
ExecutorService waiterPool = Executors.newFixedThreadPool(1);
ExecutorService cookPool = Executors.newFixedThreadPool(1);
waiterPool.execute(() -> {
log.info("处理点餐...");
Future<String> f = cookPool.submit(() -> {
log.info("做菜");
return cooking();
});
try {
log.info("上菜: {}", f.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
});
waiterPool.execute(() -> {
log.info("处理点餐...");
Future<String> f = cookPool.submit(() -> {
log.info("做菜");
return cooking();
});
try {
log.info("上菜: {}", f.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
});
}
6.3、创建多少线程池合适
- 过小会导致程序不能充分地利用系统资源、容易导致饥饿
- 过大会导致更多的线程上下文切换,占用更多内存
6.3.1、CPU 密集型运算
通常采用
CPU
核数 + 1 能够实现最优的
CPU
利用率,+ 1 是保证当线程由于页缺失故障(操作系统)或其它原因导致暂停时,额外的这个线程就能顶上去,保证
CPU
时钟周期不被浪费
6.3.2、I/O 密集型运算
CPU
不总是处于繁忙状态,例如,当你执行业务计算时,这时候会使用
CPU
资源,但当你执行
I/O
操作时、远程
RPC
调用时,包括进行数据库操作时,这时候
CPU
就闲下来了,你可以利用多线程提高它的利用率。
-
经验公式如下
线程数 = 核数 * 期望
CPU
利用率 * 总时间(
CPU
计算时间 + 等待时间) /
CPU
计算时间
-
例如
4
核
CPU
计算时间是
50%
,其它等待时间是
50%
,期望
CPU
被
100%
利用,套用公式
4 * 100% * 100% / 50% = 8
-
例如
4
核
CPU
计算时间是
10%
,其它等待时间是
90%
,期望
CPU
被
100%
利用,套用公式
4 * 100% * 100% / 10% = 40
7、享元模式
对于像
String
这样的不可变类,如果我们有修改需求的时候,本身并不是修改其内容,而是创建了新的对象,这样虽然能够避免共享带来的线程安全问题,但是也带来一个严重的问题,那就是对象创建的太频繁,对象创建的太多,为了解决这个矛盾,便引入这个享元模式
7.1、定义
英文名称:
Flyweight pattern
. 当需要重用数量有限的同一类对象时,通过与其他类似对象共享尽可能多的数据来最小化内存使用的对象,属于
GOF
23种设计模式中的一种结构性设计模式
- wikipedia: A flyweight is an object that minimizes memory usage by sharing as much data as possible with other similar objects
出自 “Gang of Four” design patterns
归类 Structual patterns
7.2、体现
在
JDK
中
Boolean
,
Byte
,
Short
,
Integer
,
Long
,
Character
等包装类提供了
valueOf
方法
-
例如
Long
的
valueOf
会缓存
-128~127
之间的
Long
对象,在这个范围之间会重用对象,大于这个范围,才会新建
Long
对象:
public static Long valueOf(long l) {
final int offset = 128;
if (l >= -128 && l <= 127) { // will cache
return LongCache.cache[(int)l + offset];
}
return new Long(l);
}
private static class LongCache {
private LongCache(){}
static final Long cache[] = new Long[-(-128) + 127 + 1];
static {
for(int i = 0; i < cache.length; i++)
cache[i] = new Long(i - 128);
}
}
注意以下几点:
-
Byte
,
Short
,
Long
缓存的范围都是
-128~127
-
Character
缓存的范围是
0~127
-
Integer
的默认范围也是
-128~127
,但是其最小值不能变,最大值可以通过虚拟机参数
-Djava.lang.Integer.IntegerCache.high
来改变 -
Boolean
缓存了
TRUE
和
FALSE
String 也使用了享元模式,具体体现可以参考其串池的原理,不展开讲了,然后还有 BigInteger,BigDecimal 等,因为都是不可变类,所以都用了享元模式
4.3、自定义连接池
/**
* @author 两米以下皆凡人
*/
@Slf4j
public class Test {
@SneakyThrows
public static void main(String[] args) {
MockPool mockPool = new MockPool(5);
for (int i = 0; i < 10; i++) {
new Thread(() -> {
MockConnection connection = mockPool.getConnection();
System.out.println("拿到连接:" + connection.getConnectionName());
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("归还连接:" + connection.getConnectionName());
mockPool.free(connection);
}).start();
}
}
}
final class MockPool {
private final MockConnection[] connections;
private final AtomicIntegerArray state;
private static final Integer DEFAULT_POOL_SIZE = 5;
public MockPool() {
connections = new MockConnection[DEFAULT_POOL_SIZE];
state = new AtomicIntegerArray(DEFAULT_POOL_SIZE);
initPool();
}
public MockPool(Integer poolSize) {
connections = new MockConnection[poolSize];
state = new AtomicIntegerArray(poolSize);
initPool();
}
private void initPool() {
for (int i = 0; i < connections.length; i++) {
connections[i] = new MockConnection("连接-" + (i + 1));
}
}
@SneakyThrows
public MockConnection getConnection() {
for (; ; ) {
for (int i = 0; i < connections.length; i++) {
if (state.get(i) == 0) {
if (state.compareAndSet(i, 0, 1)) {
return connections[i];
}
}
}
synchronized (this) {
System.out.println("无可用连接,等待...");
wait();
}
}
}
public void free(MockConnection connection) {
for (int i = 0; i < connections.length; i++) {
if (connections[i] == connection) {
state.set(i, 0);
synchronized (this) {
notifyAll();
}
return;
}
}
System.out.println("要释放的连接不存在");
}
}
@Data
@NoArgsConstructor
@AllArgsConstructor
class MockConnection {
private String connectionName;
}