生产者和消费者模式

  • Post author:
  • Post category:其他


实现生产者消费者模式

生产者的功能就是往容器中生产100个数据

消费者的功能就是朝容器中消费100个数据



1. wait/notify

import java.util.Date;
import java.util.LinkedList;

/**
 * synchronized + wait/notifyAll
 * @author cxyxh
 * @date 2021-08-02
 */
public class ProducerConsumerSynchronizedWaitNotifyAll {

    public static void main(String[] args) {
        //创建容器
        Storage storage = new Storage();
        //生产者启动
        new Thread(new Producer(storage)).start();
        //消费者启动
        new Thread(new Consumer(storage)).start();
    }

    static class Producer implements Runnable{

        private Storage storage;

        public Producer(Storage storage) {
            this.storage = storage;
        }

        @Override
        public void run() {
            for (int i = 0; i < 100; i++) {
                Date date = new Date();
                storage.add(date);
            }
        }
    }

    static class Consumer implements Runnable{
        private Storage storage;

        public Consumer(Storage storage) {
            this.storage = storage;
        }

        @Override
        public void run() {
            for (int i = 0; i < 100; i++) {
                Date take = storage.take();
            }
        }
    }

    static class Storage{

        private Integer maxSize;
        private LinkedList<Date> storage;

        public Storage(Integer maxSize) {
            this.maxSize = maxSize;
            this.storage = new LinkedList<>();
        }

        public Storage() {
            this.maxSize = 10;
            this.storage = new LinkedList<>();
        }

        public synchronized Date take(){
            //while 在多生产者消费者的情况下有效
            while (storage.size() == 0){
                try {
                    wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            Date pop = storage.pop();
            System.out.println("从存储中获取到:" + pop +  "存储中还剩:" + storage.size() + "个");
            notifyAll();
            return pop;
        }

        public synchronized void add(Date date){
            //while 在多生产者消费者的情况下有效
            while (storage.size() == maxSize){
                try {
                    wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            storage.push(date);
            System.out.println("往存储中添加:" + date + "存储中还剩:" + storage.size() + "个");
            notifyAll();
        }
    }
}

为什么需要在wait外面加上while判断?

在消费时,需要判断当前如果容器中已经空了的话,当前线程就进入wait状态,将锁让出来。如果存在多个消费者的时候,就有可能在storage为空的情况下唤醒其他已经进入wait状态的消费者,这些消费者将从wait状态切换到blocked状态,当前锁释放掉之后就会挨个获取锁挨个执行,就会造成多消费的情况。而存在多个生产者的时候也是一样。所以在每次被唤醒时就需要再次判断条件是否满足,如果还是不合适那就继续睡。



2. LockCondition

import java.util.Date;
import java.util.LinkedList;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * ReentrantLock + condition
 *
 * @author cxyxh
 * @date 2021-08-02
 */
public class ProducerConsumerLockCondition {

    public static void main(String[] args) {
        Storage storage = new Storage();
        new Thread(new Producer(storage)).start();
        new Thread(new Consumer(storage)).start();
    }


    static class Producer implements Runnable {

        private Storage storage;

        public Producer(Storage storage) {
            this.storage = storage;
        }

        @Override
        public void run() {
            for (int i = 0; i < 100; i++) {
                Date date = new Date();
                storage.add(date);
            }
        }
    }

    static class Consumer implements Runnable {
        private Storage storage;

        public Consumer(Storage storage) {
            this.storage = storage;
        }

        @Override
        public void run() {
            for (int i = 0; i < 100; i++) {
                Date take = storage.take();
            }
        }
    }

    static class Storage {

        private Integer maxSize;
        private LinkedList<Date> storage;

        private static final Lock lock = new ReentrantLock();
        private static final Condition empty = lock.newCondition();
        private static final Condition full = lock.newCondition();

        public Storage(Integer maxSize) {
            this.maxSize = maxSize;
            this.storage = new LinkedList<>();
        }

        public Storage() {
            this.maxSize = 10;
            this.storage = new LinkedList<>();
        }

        public Date take() {
            //while 在多生产者消费者的情况下有效
            lock.lock();
            Date pop = null;
            try {
                //当取的时候,容量为0,就休息
                while (storage.size() == 0) {
                    empty.await();
                }
                pop = storage.pop();
                System.out.println("从存储中获取到:" + pop + "存储中还剩:" + storage.size() + "个");
                //取走了一个,就叫醒另外一个
                full.signalAll();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
            return pop;
        }


        public void add(Date date) {
            //while 在多生产者消费者的情况下有效
            lock.lock();
            try {
                //存的时候,如果满了,则休息
                while (storage.size() == maxSize) {
                    full.await();
                }
                storage.push(date);
                //取走一个就叫醒消费者
                System.out.println("往存储中添加:" + date + "存储中还剩:" + storage.size() + "个");
                empty.signalAll();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
    }
}



3. Semaphore

信号量:可以声明拥有多少个资源,当资源等于0时再进行acquire就会陷入等待状态

import java.util.Date;
import java.util.LinkedList;
import java.util.concurrent.Semaphore;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * Semaphore
 * @author cxyxh
 * @date 2021-08-02
 */
public class ProducerConsumerSemaphore {

    public static void main(String[] args) {
        Storage storage = new Storage();
        new Thread(new Producer(storage)).start();
        new Thread(new Consumer(storage)).start();
    }

    static class Producer implements Runnable {

        private Storage storage;

        public Producer(Storage storage) {
            this.storage = storage;
        }

        @Override
        public void run() {
            for (int i = 0; i < 100; i++) {
                Date date = new Date();
                storage.add(date);
            }
        }
    }

    static class Consumer implements Runnable {
        private Storage storage;

        public Consumer(Storage storage) {
            this.storage = storage;
        }

        @Override
        public void run() {
            for (int i = 0; i < 100; i++) {
                Date take = storage.take();
            }
        }
    }

    static class Storage {

        private Integer maxSize;
        private LinkedList<Date> storage;
	    //声明一个锁,资源只有一个
        Semaphore monitor = new Semaphore(1);
	   //声明两个控制生产者消费者的信号量
        Semaphore full = new Semaphore(maxSize);
        Semaphore empty = new Semaphore(0);

        public Storage(Integer maxSize) {
            this.maxSize = maxSize;
            this.storage = new LinkedList<>();
        }

        public Storage() {
            this.maxSize = 10;
            this.storage = new LinkedList<>();
        }

        public Date take() {
            Date pop = null;
            try {
                //尝试判断可以从存储中获取
                empty.acquire();
                //获取锁
                monitor.acquire();
                pop = storage.pop();
                System.out.println("从存储中获取到:" + pop + "存储中还剩:" + storage.size() + "个");
                //取走了一个,就叫醒另外一个
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                //释放锁
                monitor.release();
                //释放掉full中的一个资源,等待在full.acquire的线程就可以执行下去了
                full.release();
            }
            return pop;
        }

        public void add(Date date) {
            try {
                //添加的时候,判断是否可以添加
                full.acquire();
                //如果可以尝试获取锁
                monitor.acquire();
                //添加
                storage.push(date);
                System.out.println("往存储中添加:" + date + "存储中还剩:" + storage.size() + "个");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                //释放锁
                monitor.release();
                //释放empty中一个资源,等待在empty.acquire的线程就可以继续了
                empty.release();
            }
        }
    }
}



版权声明:本文为qq_39935047原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。