Go语言并发之道学习十二 限速器

  • Post author:
  • Post category:其他


限速器-限制api接口、硬盘、网络访问速度

package main

import (
	"os"
	"sync"
	"log"
	"fmt"
	"sort"
	"time"
	"context"

	"golang.org/x/time/rate"
)

/*
大多数的限速都是基于令牌桶算法的:
如果要访问资源,你必须拥有资源的访问令牌,没有令牌的请求会被拒绝。
假设这些令牌存储在一个等待被检索使用的桶中。
桶的深度为d ,表示一个桶可以容纳d 个访问令牌。(如:d 为5,则可以存放5 个令牌)
每当你需要访问资料时,都会在桶中删除一个令牌。
如果你的存储桶包含5 个令牌,前5 次访问没有问题,操作正常进行。
但是到了第6 次尝试时,就没有访问令牌可用,你的请求必须排队等待,直到令牌可用,或者被拒绝操作。
*/

func main() {
	fmt.Println("simpleLimit")
	simpleLimit()
	fmt.Println()
	fmt.Println("multiLimit")
	multiLimit()
}

type APIConnection struct {
	rateLimiter *rate.Limiter
}

func (a *APIConnection) ReadFile(ctx context.Context)error{
	if err := a.rateLimiter.Wait(ctx); err != nil{
		return err
	}
	//我们登陆限速器有足够的令牌来完成我们的请求
	return nil
}
func (a *APIConnection) ResulveAddress(ctx context.Context) error{
	if err := a.rateLimiter.Wait(ctx); err != nil{
		return err
	}
	//我们登陆限速器有足够的令牌来完成我们的请求
	return nil
}
func Open() *APIConnection{
	return &APIConnection{
		rateLimiter:rate.NewLimiter(rate.Limit(1),1),
	}
	//在这里,我们将所有api连接的速率限制设置为每秒一次
}

//单一速率限制
func simpleLimit(){
	defer log.Println("done.")
	log.SetOutput(os.Stdout)
	log.SetFlags(log.Ltime|log.LUTC)

	apiConnection := Open()
	var wg sync.WaitGroup
	wg.Add(20)

	for i:=0;i<10;i++{
		go func() {
			defer wg.Done()
			err := apiConnection.ReadFile(context.Background())
			if err != nil{
				log.Printf("cannot readfile: %v\n",err)
			}
			log.Printf("readfile\n")
		}()
	}

	for i:=0;i<10;i++{
		go func() {
			defer wg.Done()
			err := apiConnection.ResulveAddress(context.Background())
			if err != nil {
				log.Printf("cannot resolveaddress: %v\n",err)
			}
			log.Printf("resolveaddress")
		}()
	}

	wg.Wait()

	/*
	下面我们使用 golang.org/x/time/rate 包中的令牌桶限速器实现限速

	·Limit定义了某些事件的最大频率
	·Limit表示为每秒事件数
	·zero Limit不允许发生任何事件
	type Limit float64

	·NewLimiter 返回一个新的Limit
	·他允许事件速率为r ,并允许最大数为b 的token
	·r是我们之前说的速率 ,b 是桶深度
	func NewLimiter(r Limit,b int)*Limiter

	·每一个都将事件之间的最小事件间隔转换为一个Limit
	func Every(interval time.Duration) Limit

	·获取间隔时间,如果想针对每次操作的间隔时间进行测量,而不是请求的间隔长度,我们可以这样写:
	rate.Limit(events/timePeriod.Seconds())

	·获取间隔时间,如果不想每次都手动输入,可以使用Every 函数,将返回rate.Inf
	func Per(eventCount int,duration time.Duration) rate.Limit{
		return rate.Every(duration/time.Duration(eventCount))
	}

	·在创建rate.Limiter之后,我们将使用他来阻塞我们的请求,直到获得访问令牌。
	·Wait是WaitN(ctx,1)的缩写
	func (lim *Limiter) Wait(ctx context.Context)

	·WaitN 会执行直到有n 个事件发生
	·如果n 超过Limiter 的突发大小,context被取消,或者预期等待时间超过
	·context的deadline ,他会返回一个错误
	func (lim *Limiter) WaitN(ctx context.Context,n int) (err error)
	*/
}

//速率限制组
/*
在生产中,我们可能会想要建立多层次的限制:
用细粒度的控制来限制每秒的请求,
用粗粒度的控制来限制每分钟、每小时或每天的请求

为此,我们把不同粒度的限速器独立,然后把他们组合成一个限速器组来管理
*/
type RateLimiter interface {
	Wait(context.Context) error
	Limit() rate.Limit
}
type multiLimiterStruct struct {
	limiters []RateLimiter
}
func MultiLimiter(limiters ...RateLimiter) *multiLimiterStruct{
	byLimit := func(i,j int) bool {
		return limiters[i].Limit() < limiters[j].Limit()
	}
	sort.Slice(limiters,byLimit)
	return &multiLimiterStruct{limiters:limiters}
}
func (l *multiLimiterStruct) Wait(ctx context.Context) error{
	for _,l := range l.limiters{
		if err := l.Wait(ctx);err != nil{
			return err
		}
	}
	return nil
}

func(l *multiLimiterStruct) Limit() rate.Limit{
	return l.limiters[0].Limit()
}
/*
·type RateLimiter interface {
在这里我们定义了一个RateLimiter 接口,使MultiLimiter 可以递归的定义其他的MultiLimiter 实例

·sort.Slice(limiters,byLimit)
这里我们实现了一个优化,并根据每个RateLimiter 的Limit() 进行排序

·return l.limiters[0].Limit()
因为我们在multiLimiter 实例化时对子RateLimiter 实例进行了排序,
所以我们可以直接返回限制最多的限制器,这将是切片slice 中的第一个元素
*/
type MultiAPIConnection struct {
	networkLimit,
	diskLimit,
	apiLimit RateLimiter
}

func Per(eventCount int,duration time.Duration) rate.Limit{

	return rate.Every(duration/time.Duration(eventCount))
}

func MultiOpen() *MultiAPIConnection{
	//在这里,我们定义每秒的限制,避免突发请求
	//限制每秒钟生成2个令牌,就是每0.5秒生成一个令牌 ,最多存储有2个令牌
	secondLimit := rate.NewLimiter(Per(2,time.Second),2)

	//在这里,我们定义每分钟的限制,为用户提供初始池,每秒的限制将确保我们的系统不会被突发请求而超载
	//限制每分钟生成10个令牌,就是每6秒生成一个令牌 ,最多存储有10个令牌
	minuteLimit := rate.NewLimiter(Per(10,time.Minute),10)

	return &MultiAPIConnection{
		//然后我们组合这两个限制,并将其设置为 MultiAPIConnection 的 apiLimit 限速器。
		apiLimit:MultiLimiter(secondLimit,minuteLimit),

		//这里我们为硬盘读取设置了一个限速器,每秒只能读取一次 ,最多存储有1个令牌
		diskLimit:rate.NewLimiter(rate.Limit(1),1),

		//对于网络,我们设置了每秒3 个请求的限速 ,最多存储有3个令牌
		networkLimit:rate.NewLimiter(Per(3,time.Second),3),
	}
}
func (a *MultiAPIConnection) MultiReadFile(ctx context.Context) error{
	//当我们读取文件时,我们融合api 限速器和硬盘限速器的限制。
	err := MultiLimiter(a.apiLimit,a.diskLimit).Wait(ctx)
	if err != nil{
		return err
	}
	//假设我们在这里执行一些逻辑
	return nil
}
func (a *MultiAPIConnection) MultiResolveAddress(ctx context.Context) error{
	//当我们需要访问网络时,我们融合api 限速器和网络限速器的限制。
	err := MultiLimiter(a.apiLimit,a.networkLimit).Wait(ctx)
	if err != nil{
		return err
	}
	//假设我们在这里执行一些逻辑
	return nil
}


func multiLimit(){
	defer log.Println("multiLimit done.")
	log.SetOutput(os.Stdout)
	log.SetFlags(log.Ltime|log.LUTC)

	multiapiConnection := MultiOpen()
	var wg sync.WaitGroup
	wg.Add(20)

	for i:=0;i<10;i++{
		go func() {
			defer wg.Done()
			err := multiapiConnection.MultiReadFile(context.Background())
			if err != nil{
				log.Printf("cannot MultiReadFile: %v\n",err)
			}
			log.Printf("MultiReadFile\n")
		}()
	}

	for i:=0;i<10;i++{
		go func() {
			defer wg.Done()
			err := multiapiConnection.MultiResolveAddress(context.Background())
			if err != nil {
				log.Printf("cannot MultiResolveAddress: %v\n",err)
			}
			log.Printf("MultiResolveAddress")
		}()
	}

	wg.Wait()

	/*
	我们可以看到,客户端每秒发出两个请求,直到第11个请求,我们开始每6秒发出一次请求。
	这是因为我们耗尽了分钟级限速器的可用令牌,所以限制了请求速度。

	为什么第11个请求只延时了2秒,而不是像之后的请求那样隔了6秒,这可能有点违反直觉。
	原因是这样的,虽然我们将api 请求限制为10次/分钟,但是在一分钟内令牌增加是一个递增的过程。
	当我们第11个请求到达时,我们的分钟级线速度去已经积累了一个令牌

	*/
}



版权声明:本文为daily886原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。