Semaphore -信号量、实现限流

  • Post author:
  • Post category:其他




信号量模型

一个计数器、一个等待队列、三个方法。计数器和等待队列对外是透明的

三个方法:

Init(); 初始化计数器

down(); 计数器的值-1;如果此时计数器的值小于0,则当前线程将被阻塞,否则当前线程继续执行; – 在Semphore中是acquire()方法

up(); 计数器的值+1;如果此时计数器的值大于或等于0,则唤醒等待队列中的一个线程,并将其从等待队列中移除; – 在Semphore中是release()方法

在 Java SDK 里面,信号量模型是由 java.util.concurrent.Semaphore 实现的,Semaphore 这个类能够保证这三个方法都是原子操作。

在这里插入图片描述



简单使用信号量

static int count;
//初始化信号量
static final Semaphore s = new Semaphore(1);
//用信号量保证互斥    
static void addOne() {
  s.acquire();
  try {
    count+=1;
  } finally {
    s.release();
  }
}

上面简单实现的信号量

定义多个线程同时去调用addOne方法,该方法首先调用了Semaphore.acquire方法,因为Semaphore初始化设置了1,表示同时只允许一个线程可以通过,当线程1和线程2同时去调用addOne方法时,因为acquire方法时原子的,同时只有一个线程可以通过,假设线程1通过了,那么线程1可以继续执行,线程2则进入了Semaphore的等待队列中被阻塞。等到线程1 执行完成后会调用Semaphore.release方法,还原了Semaphore中的计数器,同时由于其计数器》=0所以,会随机释放等待队列中的线程2 ,线程就可以继续执行了。

这种方式也是一个

简单的互斥锁

的功能


Semaphore 可以允许多个线程访问一个临界区



限流器简单实现

/**
 * 简单限流器
 *
 * @author lhb
 * @date 2019-12-5
 */
public class SampleLimiting<T, R> {

    final List<T> pool;
    /**
     * 用信号量实现限流器
     */
    final Semaphore sem;
    // 构造函数
    SampleLimiting(int size, T t) {
        pool = new Vector<T>() {
        };
        for (int i = 0; i < size; i++) {
            pool.add(t);
        }
        sem = new Semaphore(size);
    }

    void exec2() {
        T t = null;
        try {
            sem.acquire();
            t = pool.remove(0);
            TestSemaphore.run((TestSemaphore) t);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            pool.add(t);
            sem.release();
        }
    }

这里生成了一个限流器,同时只允许5个线程进入,

初始化的时候会往对象池中放入5个指定的对象,每次调用exce都会从对象池中获取需要的对象,并且通过Semaphore实现限流方式,等处理结束后把对象放回对象池中,同时调用Semaphore释放等待线程

public static void main(String[] args) throws InterruptedException {
    SampleLimiting<TestSemaphore, String> stringSampleLimiting = new SampleLimiting<>(5,
        new TestSemaphore());

    List<Thread> threads = new ArrayList<>(100);

    for (int i = 0; i < 100; i++) {
        Thread thread = new Thread(new Runnable() {
            @Override
            public void run() {
                stringSampleLimiting.exec2();
            }
        });
        threads.add(thread);
    }
    for (Thread thread : threads) {
        thread.start();
    }
}



版权声明:本文为weixin_39359648原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。