实现生产者消费者模式
生产者的功能就是往容器中生产100个数据
消费者的功能就是朝容器中消费100个数据
1. wait/notify
import java.util.Date;
import java.util.LinkedList;
/**
* synchronized + wait/notifyAll
* @author cxyxh
* @date 2021-08-02
*/
public class ProducerConsumerSynchronizedWaitNotifyAll {
public static void main(String[] args) {
//创建容器
Storage storage = new Storage();
//生产者启动
new Thread(new Producer(storage)).start();
//消费者启动
new Thread(new Consumer(storage)).start();
}
static class Producer implements Runnable{
private Storage storage;
public Producer(Storage storage) {
this.storage = storage;
}
@Override
public void run() {
for (int i = 0; i < 100; i++) {
Date date = new Date();
storage.add(date);
}
}
}
static class Consumer implements Runnable{
private Storage storage;
public Consumer(Storage storage) {
this.storage = storage;
}
@Override
public void run() {
for (int i = 0; i < 100; i++) {
Date take = storage.take();
}
}
}
static class Storage{
private Integer maxSize;
private LinkedList<Date> storage;
public Storage(Integer maxSize) {
this.maxSize = maxSize;
this.storage = new LinkedList<>();
}
public Storage() {
this.maxSize = 10;
this.storage = new LinkedList<>();
}
public synchronized Date take(){
//while 在多生产者消费者的情况下有效
while (storage.size() == 0){
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Date pop = storage.pop();
System.out.println("从存储中获取到:" + pop + "存储中还剩:" + storage.size() + "个");
notifyAll();
return pop;
}
public synchronized void add(Date date){
//while 在多生产者消费者的情况下有效
while (storage.size() == maxSize){
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
storage.push(date);
System.out.println("往存储中添加:" + date + "存储中还剩:" + storage.size() + "个");
notifyAll();
}
}
}
为什么需要在wait外面加上while判断?
在消费时,需要判断当前如果容器中已经空了的话,当前线程就进入wait状态,将锁让出来。如果存在多个消费者的时候,就有可能在storage为空的情况下唤醒其他已经进入wait状态的消费者,这些消费者将从wait状态切换到blocked状态,当前锁释放掉之后就会挨个获取锁挨个执行,就会造成多消费的情况。而存在多个生产者的时候也是一样。所以在每次被唤醒时就需要再次判断条件是否满足,如果还是不合适那就继续睡。
2. LockCondition
import java.util.Date;
import java.util.LinkedList;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* ReentrantLock + condition
*
* @author cxyxh
* @date 2021-08-02
*/
public class ProducerConsumerLockCondition {
public static void main(String[] args) {
Storage storage = new Storage();
new Thread(new Producer(storage)).start();
new Thread(new Consumer(storage)).start();
}
static class Producer implements Runnable {
private Storage storage;
public Producer(Storage storage) {
this.storage = storage;
}
@Override
public void run() {
for (int i = 0; i < 100; i++) {
Date date = new Date();
storage.add(date);
}
}
}
static class Consumer implements Runnable {
private Storage storage;
public Consumer(Storage storage) {
this.storage = storage;
}
@Override
public void run() {
for (int i = 0; i < 100; i++) {
Date take = storage.take();
}
}
}
static class Storage {
private Integer maxSize;
private LinkedList<Date> storage;
private static final Lock lock = new ReentrantLock();
private static final Condition empty = lock.newCondition();
private static final Condition full = lock.newCondition();
public Storage(Integer maxSize) {
this.maxSize = maxSize;
this.storage = new LinkedList<>();
}
public Storage() {
this.maxSize = 10;
this.storage = new LinkedList<>();
}
public Date take() {
//while 在多生产者消费者的情况下有效
lock.lock();
Date pop = null;
try {
//当取的时候,容量为0,就休息
while (storage.size() == 0) {
empty.await();
}
pop = storage.pop();
System.out.println("从存储中获取到:" + pop + "存储中还剩:" + storage.size() + "个");
//取走了一个,就叫醒另外一个
full.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
return pop;
}
public void add(Date date) {
//while 在多生产者消费者的情况下有效
lock.lock();
try {
//存的时候,如果满了,则休息
while (storage.size() == maxSize) {
full.await();
}
storage.push(date);
//取走一个就叫醒消费者
System.out.println("往存储中添加:" + date + "存储中还剩:" + storage.size() + "个");
empty.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
}
3. Semaphore
信号量:可以声明拥有多少个资源,当资源等于0时再进行acquire就会陷入等待状态
import java.util.Date;
import java.util.LinkedList;
import java.util.concurrent.Semaphore;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* Semaphore
* @author cxyxh
* @date 2021-08-02
*/
public class ProducerConsumerSemaphore {
public static void main(String[] args) {
Storage storage = new Storage();
new Thread(new Producer(storage)).start();
new Thread(new Consumer(storage)).start();
}
static class Producer implements Runnable {
private Storage storage;
public Producer(Storage storage) {
this.storage = storage;
}
@Override
public void run() {
for (int i = 0; i < 100; i++) {
Date date = new Date();
storage.add(date);
}
}
}
static class Consumer implements Runnable {
private Storage storage;
public Consumer(Storage storage) {
this.storage = storage;
}
@Override
public void run() {
for (int i = 0; i < 100; i++) {
Date take = storage.take();
}
}
}
static class Storage {
private Integer maxSize;
private LinkedList<Date> storage;
//声明一个锁,资源只有一个
Semaphore monitor = new Semaphore(1);
//声明两个控制生产者消费者的信号量
Semaphore full = new Semaphore(maxSize);
Semaphore empty = new Semaphore(0);
public Storage(Integer maxSize) {
this.maxSize = maxSize;
this.storage = new LinkedList<>();
}
public Storage() {
this.maxSize = 10;
this.storage = new LinkedList<>();
}
public Date take() {
Date pop = null;
try {
//尝试判断可以从存储中获取
empty.acquire();
//获取锁
monitor.acquire();
pop = storage.pop();
System.out.println("从存储中获取到:" + pop + "存储中还剩:" + storage.size() + "个");
//取走了一个,就叫醒另外一个
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//释放锁
monitor.release();
//释放掉full中的一个资源,等待在full.acquire的线程就可以执行下去了
full.release();
}
return pop;
}
public void add(Date date) {
try {
//添加的时候,判断是否可以添加
full.acquire();
//如果可以尝试获取锁
monitor.acquire();
//添加
storage.push(date);
System.out.println("往存储中添加:" + date + "存储中还剩:" + storage.size() + "个");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//释放锁
monitor.release();
//释放empty中一个资源,等待在empty.acquire的线程就可以继续了
empty.release();
}
}
}
}
版权声明:本文为qq_39935047原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。