令牌桶 java_高并发系统限流操作之令牌桶实现可变TPS控制

  • Post author:
  • Post category:java


年前有个需求,批量请求供应商API,要有限流操作,并支持TPS与并发数可配置,那时候简单的查了查资料,任务结束就过去了,最近又有个限流的小需求,所以又翻出了以前的代码。

本次简单记录一下令牌桶的实现:

令牌桶算法的原理是系统会以一个恒定的速度往桶里放入令牌,而如果请求需要被处理,则需要先从桶里获取一个令牌,当桶里没有令牌可取时,则拒绝服务。

1c337676f55e2e783f98b6fc7619d807.png

令牌桶算法.png

实现思路:

用LinkedBlockingQueue作为装令牌的桶,Executors.newSingleThreadScheduledExecutor()作为定时器定时将令牌放入桶中,使用构建者模式的代码风格。忘了以前在哪抄的了,就这样吧。

贴上核心代码:

import java.util.concurrent.Executors;

import java.util.concurrent.LinkedBlockingQueue;

import java.util.concurrent.ScheduledExecutorService;

import java.util.concurrent.TimeUnit;

/**

* TokenBucket

*/

public class TokenBucket {

/**

* 每秒最多请求数量

*/

private int maxFlowRate;

/**

* 每秒平均请求数量

*/

private int avgFlowRate;

/**

* 队列来缓存桶数量

*/

private LinkedBlockingQueue tokenQueue;

/**

* 由定时任务持续生成令牌。这样的问题在于会极大的消耗系统资源,如,某接口需要分别对每个用户做访问频率限制。

* 假设系统中存在6W用户,则至多需要开启6W个定时任务来维持每个桶中的令牌数,这样的开销是巨大的。

* 可以做成延迟计算的形式,每次请求令牌的时候,看当前时间是否晚与下一次生成令牌的时间,计算该段时间的令牌数,

* 加入令牌桶,更新数据。

*/

private ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();

/**

* The Mutex do not use directly.

*/

private volatile Object mutexDoNotUseDirectly = new Object();

/**

* The Is start.

*/

private volatile boolean isStart = false;

/**

* The constant A_CHAR.

*/

private static final Byte A_CHAR = ‘a’;

/**

* Instantiates a new Token bucket.

*/

private TokenBucket() {

}

/**

* New builder token bucket.

*

* @return the token bucket

*/

public static TokenBucket newBuilder() {

return new TokenBucket();

}

/**

* 每秒内最大请求数量设置

*

* @param maxFlowRate 每秒内最大请求数量

* @return 当前令牌同

*/

public TokenBucket maxFlowRate(int maxFlowRate) {

this.maxFlowRate = maxFlowRate;

return this;

}

/**

* 每秒平均请求数量设置

*

* @param avgFlowRate 每秒平均请求数量

* @return 当前令牌同

*/

public TokenBucket avgFlowRate(int avgFlowRate) {

this.avgFlowRate = avgFlowRate;

return this;

}

/**

* 构造者模式

*

* @return the token bucket

*/

public TokenBucket build() {

//初始化

init();

//返回当前对象

return this;

}

/**

* 初始化

*/

private void init() {

//初始化桶队列大小

if (maxFlowRate > 0) {

tokenQueue = new LinkedBlockingQueue<>(maxFlowRate);

}

//初始化令牌生产者

TokenProducer tokenProducer = new TokenProducer(avgFlowRate, this);

//每秒执行一次增加令牌操作

scheduledExecutorService.scheduleAtFixedRate(tokenProducer, 0, 1, TimeUnit.SECONDS);

//系统启动

isStart = true;

}

/**

* 停止任务

*/

public void stop() {

isStart = false;

scheduledExecutorService.shutdown();

}

/**

* 查看任务是否执行

*

* @return the boolean

*/

public boolean isStarted() {

return isStart;

}

/**

* 增加令牌

*

* @param tokenNum the token num

*/

private void addTokens(Integer tokenNum) {

// 若是桶已经满了,就不再家如新的令牌

for (int i = 0; i < tokenNum; i++) {

tokenQueue.offer(A_CHAR);

}

}

/**

* 获取令牌

*

* true:获取到1个令牌,非阻塞

*

* false:未获取到令牌,非阻塞

*

* @return boolean

*/

public boolean tryAcquire() {

synchronized (mutexDoNotUseDirectly) {

// 否存在足够的桶数量

if (tokenQueue.size() > 0) {

//队列不为空时返回队首值并移除,队列为空时返回null。非阻塞立即返回。

Byte poll = tokenQueue.poll();

if (poll != null) {

//获取到令牌

return true;

}

}

}

//未获取到令牌

return false;

}

/**

* 令牌生产者

*/

private class TokenProducer implements Runnable {

/**

* 每次加入令牌的数量

*/

private int tokenNum;

/**

* 当前令牌桶

*/

private TokenBucket tokenBucket;

/**

* 令牌生产者构造方法

*

* @param tokenNum 每次加入令牌的数量

* @param tokenBucket 当前令牌桶

*/

private TokenProducer(int tokenNum, TokenBucket tokenBucket) {

this.tokenNum = tokenNum;

this.tokenBucket = tokenBucket;

}

@Override

public void run() {

//增加令牌

tokenBucket.addTokens(tokenNum);

}

}

}



版权声明:本文为weixin_31001855原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。