简介
本章介绍令牌桶Token Bucket算法在流量限速场景的原理,以及C++实现和相关测试验证。
常见的限流算法有计数限流,固定窗口限流,滑动窗口限流,漏桶算法限流和令牌桶算法限流。令牌桶算法是限流算法的一种,其原理是系统会以一个恒定的速度往桶里放入固定数量的令牌,而如果请求需要被处理,则需要先从桶里获取对应令牌,当桶里没有令牌可取时,则拒绝服务。
令牌桶算法可应用于多种场景,本章是针对网络流控制限制场景的使用,对外发的网络数据进行控制,使数据以长期的平均速率外发,并运行一个瞬时的最高流量。
原理
定义:令牌桶提供了一种机制,限制流的平均速率,并允许流中达到所需的突发级别。
需求:项目的需求是文件下载的服务器端限流,下载请求过来后(请求其他模块有限制,这里忽略这个问题),在独立线程读取文件并发送数据到该请求的socket上,要求发送到网络的总数据要限流,但是数据不能丢弃,可以延迟等待发送。
实现:
- 定时定量向令牌桶投入令牌,令牌桶有容量限制,令牌桶容量满了则丢弃不再投入令牌。
- 在发送报文时,如果令牌桶的令牌数量小于要发送的网络数据大小,则等待令牌桶的数量足够再发送;如果令牌桶的令牌数量大于等于要发送的网络数据大小,则报文直接发送,并减少令牌桶中对应数量的令牌。
结论:
-
限速速率不一定是匀速的,但长期速率是一定的。
在令牌桶满的情况下,如果有突发流量过来,会瞬时消耗掉令牌桶的令牌,此时的理论上限速率为:令牌桶的容量+速率。比如令牌桶的容量为60M,限速速度为50MB/s,那此时的速率最高可到110MB/s的速度。
-
单次发送的数据不能大于令牌容量,否则获取不到令牌。
-
令牌的获取线程安全,可多线程获取。
实现
类图如下:
具体代码如下:
-
CountSemaphore是信号量的封装。
信号量的容量即令牌桶的容量,提供获取令牌和投递令牌2个操作。该对象封装可作为信号量的公共库使用。
代码分析如下:
- acquire获取令牌的操作中,使用锁保护数据正确性,使用条件等待令牌足够才继续往下执行。
- release增加令牌数量,并通知其他等待条件的线程继续执行。
- 因std::mutex是非公平锁,所以获取到锁的线程是随机的,但长期来看是公平的。
具体代码如下:
-
countsemaphore.h
#ifndef COUNTSEMAPHORE_H #define COUNTSEMAPHORE_H #include <mutex> #include <condition_variable> #include <climits> class CountSemaphore { public: CountSemaphore(unsigned long long initCount, unsigned long long maxCount); bool acquire(unsigned long long count = 1); void release(unsigned long long count = 1); private: std::mutex m_mtx; std::condition_variable m_cv; // 当前可用数量 unsigned long long m_updateCount = 0; // 最大数量 unsigned long long m_maxCount = ULLONG_MAX; }; #endif // COUNTSEMAPHORE_H
-
countsemaphore.cpp
#include "countsemaphore.h" CountSemaphore::CountSemaphore(unsigned long long initCount, unsigned long long maxCount) : m_updateCount(initCount > maxCount ? maxCount : initCount) , m_maxCount(maxCount) { } bool CountSemaphore::acquire(unsigned long long count) { std::unique_lock<std::mutex> lck(m_mtx); // 获取的数量大于最大值,不可能成功 if (count > m_maxCount) { return false; } m_cv.wait(lck, [&]() -> bool { return m_updateCount >= count; }); m_updateCount -= count; return true; } void CountSemaphore::release(unsigned long long count) { std::unique_lock<std::mutex> lck(m_mtx); auto tobeCount = m_updateCount + count; if (tobeCount > m_maxCount) { m_updateCount = m_maxCount; } else { m_updateCount = tobeCount; } m_cv.notify_all(); }
-
TokenSpeedLimiter是令牌桶的封装。
包含令牌桶的限速速度,令牌的投递时间间隔和令牌桶的容量。提供开始和结束投递操作和获取令牌的操作。
其中投递的时间间隔以毫秒为单位,越小速率越均匀。
-
tokenspeedlimiter.h
#ifndef TOKENSPEEDLIMITER_H #define TOKENSPEEDLIMITER_H #include "countsemaphore.h" #include <thread> class TokenSpeedLimiter { public: TokenSpeedLimiter(unsigned long long speed, unsigned long long capacity, unsigned long long deliveryIntervalMs); void begin(); void end(); bool acquireToken(unsigned long long tokenCount); private: void workingThread(); private: // 限速速度(字节/s) unsigned long long m_limitSpeed; // 令牌投递时间间隔(毫秒) unsigned long long m_deliveryIntervalMs; // 信号量 CountSemaphore m_semaphore; // 是否运行 bool m_runing = false; // 线程 std::shared_ptr<std::thread> m_thread = nullptr; }; #endif // TOKENSPEEDLIMITER_H
-
tokenspeedlimiter.cpp
#include "tokenspeedlimiter.h" #include <functional> TokenSpeedLimiter::TokenSpeedLimiter(unsigned long long speed, unsigned long long capacity, unsigned long long deliveryIntervalMs) : m_limitSpeed(speed) , m_deliveryIntervalMs(deliveryIntervalMs) , m_semaphore(0, capacity) { } void TokenSpeedLimiter::begin() { if (m_runing) { return; } m_runing = true; m_thread.reset(new std::thread(std::bind(&TokenSpeedLimiter::workingThread, this))); } void TokenSpeedLimiter::end() { m_runing = false; if (m_thread != nullptr) { m_thread->join(); m_thread = nullptr; } } bool TokenSpeedLimiter::acquireToken(unsigned long long tokenCount) { return m_semaphore.acquire(tokenCount); } void TokenSpeedLimiter::workingThread() { auto lastTime = std::chrono::steady_clock::now(); while(m_runing) { // 延时定时投递 std::this_thread::sleep_for(std::chrono::milliseconds(m_deliveryIntervalMs)); // 计算投递时间差 auto curTime = std::chrono::steady_clock::now(); auto elapsedMs = std::chrono::duration<double, std::milli>(curTime - lastTime).count(); lastTime = curTime; // 根据时间差计算投递令牌的数量(除以1000换算成毫秒投递数量,然后再乘以毫秒时间差) auto tokens = m_limitSpeed * elapsedMs / 1000; // 投递令牌 m_semaphore.release((unsigned long long)tokens); } }
-
-
main.cpp
包含令牌桶对象的调用及测试结果打印。
- sendDatatoNet线程模拟多线程发送数据。
- statisticNetwork统计流量结果和每个线程发送数据的百分比。
#include <QCoreApplication> #include "tokenspeedlimiter.h" #include <iostream> #include <map> #include <sstream> #include <iomanip> // 网络发送字节数,用于统计 unsigned long long sendCount = 0; std::mutex mutexCount; std::map<unsigned int, unsigned long long> mapTheadIdCount; // 网络数据发送测试线程函数 void sendDatatoNet(TokenSpeedLimiter* speedLimiter) { // 每次发送的数据包大小 const int sizeOnePacket = 2 * 1024; while(true) { // 获取令牌 if (!speedLimiter->acquireToken(sizeOnePacket)) { continue; } // 统计总的发送包数量 std::unique_lock<std::mutex> lck(mutexCount); sendCount += sizeOnePacket; // 统计每个线程发送的数包 auto threadId = std::this_thread::get_id(); auto theId = *(unsigned int *)&threadId; auto it = mapTheadIdCount.find(theId); if (it != mapTheadIdCount.end()) { it->second += sizeOnePacket; } else { mapTheadIdCount.insert(std::make_pair(theId, sizeOnePacket)); } } } void statisticNetwork() { auto lastTime = std::chrono::steady_clock::now(); while(true) { // 1秒统计一次 std::this_thread::sleep_for(std::chrono::milliseconds(1000)); // 计算投递时间差 auto curTime = std::chrono::steady_clock::now(); auto elapsedMs = std::chrono::duration<double, std::milli>(curTime - lastTime).count(); lastTime = curTime; // 打印总速率 std::unique_lock<std::mutex> lck(mutexCount); if (elapsedMs > 0) { // * 1000 / elapsedMs为毫秒转换为秒 auto curSpeed = (double)sendCount * 1000 / 1024 / 1024 / elapsedMs; std::cout << "speed: " << curSpeed << " MB/s" << std::endl; } // 打印每个线程发送的百分比 std::cout << "thread send count: "; for (auto it: mapTheadIdCount) { std::cout << it.first << "(" << std::setfill(' ') << std::setw(2) << 100 * it.second / sendCount << "%),"; } std::cout << std::endl; mapTheadIdCount.clear(); sendCount = 0; } } int main(int argc, char *argv[]) { // 构造限速器:限速50M/s,容量为6M,间隔10ms投递令牌;当前的流量峰值为56M(50的速度 + 6M的容量)左右 TokenSpeedLimiter speedLimiter(50 * 1024 * 1024, 50 * 1024 * 1024 / 10 * 1.2, 10); speedLimiter.begin(); // 延时5秒,填满令牌桶容量 std::this_thread::sleep_for(std::chrono::milliseconds(5000)); // 启动网络发送线程 for (int i = 0; i < 10; ++i) { new std::thread(sendDatatoNet, &speedLimiter); } // 启动统计 statisticNetwork(); return 0; }
测试输出如下以及分析如下:
- 因启动令牌桶对象后,延迟5秒才开始发送数据,所以第一条的速度达到了55M/s,其与(50M+6M)B/s的速度计算基本对应。
- 后续发送速度保持再50MB/s左右,和设置的限速速度保持一致。
- 从输出来看,数据发送的线程获取令牌长期来看是公平的。
speed: 55.9062 MB/s thread send count: 356( 9%),1644( 8%),2312(11%),11112(12%),13472( 7%),14696( 6%),20588( 7%),21096(13%),22080(10%),22500(11%), speed: 49.3314 MB/s thread send count: 356( 8%),1644(11%),2312(10%),11112(12%),13472( 7%),14696( 9%),20588( 9%),21096(13%),22080( 9%),22500( 7%), speed: 50.6701 MB/s thread send count: 356( 7%),1644(11%),2312(16%),11112( 7%),13472(11%),14696( 9%),20588( 7%),21096( 6%),22080(10%),22500(11%), speed: 49.9935 MB/s thread send count: 356( 7%),1644(10%),2312( 6%),11112(12%),13472(10%),14696( 6%),20588(16%),21096( 6%),22080(11%),22500(11%), speed: 49.3036 MB/s thread send count: 356(13%),1644(11%),2312( 8%),11112( 6%),13472( 9%),14696(10%),20588( 9%),21096( 9%),22080(11%),22500(10%), speed: 50.6954 MB/s thread send count: 356(11%),1644(10%),2312(14%),11112(10%),13472( 9%),14696( 4%),20588(10%),21096( 8%),22080(10%),22500( 9%), speed: 49.2754 MB/s thread send count: 356(12%),1644( 8%),2312( 5%),11112(11%),13472(13%),14696( 9%),20588( 9%),21096(11%),22080( 8%),22500( 9%), speed: 50.0361 MB/s thread send count: 356(13%),1644( 8%),2312( 7%),11112(10%),13472( 9%),14696( 7%),20588(12%),21096(10%),22080(10%),22500( 8%),