Go实现并发缓存

  • Post author:
  • Post category:其他




并发不安全的 Memo

首先用一个例子演示

函数记忆

// A Memo caches the results of calling a Func.
type Memo struct {
	f     Func
	cache map[string]result
}

// Func is the type of the function to memoize.
type Func func(key string) (interface{}, error)

type result struct {
	value interface{}
	err   error
}

func New(f Func) *Memo {
	return &Memo{f: f, cache: make(map[string]result)}
}

// NOTE: not concurrency-safe!
func (memo *Memo) Get(key string) (interface{}, error) {
	res, ok := memo.cache[key]
	if !ok {
		res.value, res.err = memo.f(key)
		memo.cache[key] = res
	}
	return res.value, res.err
}

其中函数

f

是一个重量级的计算函数,调用它的代价很大,所以要将结果缓存到一个

map

中加快每次调用。这就是函数记忆。

每次调用

Get

,将从

memo

里查询结果,如果没查到,就要调用函数

f

计算结果,再将它记录到缓存中。

以上实现

Get

方法在没有使用同步的情况下更新了缓存

cache

,整个

Get

函数

不是并发安全的



安全但伪并发的 Memo

考虑每次调用

Get

方法都加锁:

type Memo struct {
	f     Func
	mu    sync.Mutex // guards cache
	cache map[string]result
}

// Get is concurrency-safe.
func (memo *Memo) Get(key string) (value interface{}, err error) {
	memo.mu.Lock()
	res, ok := memo.cache[key]
	if !ok {
		res.value, res.err = memo.f(key)
		memo.cache[key] = res
	}
	memo.mu.Unlock()
	return res.value, res.err
}

由于每次调用都请求互斥锁,

Get

又将并行的请求操作串行化了。



会导致多余计算的 Memo

考虑以下改进:

func (memo *Memo) Get(key string) (value interface{}, err error) {
	memo.mu.Lock()
	res, ok := memo.cache[key]
	memo.mu.Unlock()
	if !ok {
		res.value, res.err = memo.f(key)

		// Between the two critical sections, several goroutines
		// may race to compute f(key) and update the map.
		memo.mu.Lock()
		memo.cache[key] = res
		memo.mu.Unlock()
	}
	return res.value, res.err
}

该版本分两次获取锁:第一次用于查询缓存,第二次用于在查询无结果时进行更新。

在理想情况下,我们应该避免这种额外的处理。这个功能有时被称为重复抑制(duplication suppression)。



通过通道进行重复抑制

在第四个版本的缓存中,我们为每个

entry

新加了一个通道

ready

。在设置完

entry



result

字段后,通道会关闭,正在等待的goroutine会收到广播,就可以从

entry

中读取结果了。

// Func is the type of the function to memoize.
type Func func(string) (interface{}, error)

type result struct {
	value interface{}
	err   error
}

type entry struct {
	res   result
	ready chan struct{} // closed when res is ready
}

func New(f Func) *Memo {
	return &Memo{f: f, cache: make(map[string]*entry)}
}

type Memo struct {
	f     Func
	mu    sync.Mutex // guards cache
	cache map[string]*entry	//现在缓存返回的是一个entry
}

func (memo *Memo) Get(key string) (value interface{}, err error) {
	memo.mu.Lock()
	e := memo.cache[key]
	if e == nil {
		// This is the first request for this key.
		// This goroutine becomes responsible for computing
		// the value and broadcasting the ready condition.
		e = &entry{ready: make(chan struct{})}
		memo.cache[key] = e
		memo.mu.Unlock()

		e.res.value, e.res.err = memo.f(key)

		close(e.ready) // broadcast ready condition
	} else {
		// This is a repeat request for this key.
		memo.mu.Unlock()

		<-e.ready // wait for ready condition
	}
	return e.res.value, e.res.err
}



Get

函数发现缓存

memo

中没有记录时,它构造一个

entry

放到缓存中,但这时

key

对应的结果还未计算。

这时,如果其他goroutine调用了

Get

函数查询同样的

key

时,它会到达

<-e.ready

语句并因等待通道数据而阻塞。只有当计算结束,负责计算结果的goroutine将通道关闭后,其它goroutine才能够得以继续执行,并查询出结果。

  • 当一个goroutine试图查询一个不存在的结果时,它创建一个

    entry

    放到缓存中,并解锁,然后调用

    f

    进行计算。计算完成后更新相应的

    entry

    就可以将

    ready

    通道关闭;
  • 当一个goroutine试图查询一个已经存在的结果时,他应该立即放弃锁,并等待查到的

    entry

    的通道的关闭。



「通过通信共享内存」的另一设计

以上介绍了

共享变量并上锁

的方法,另一种方案是

通信顺序进程



在新的设计中,

map

变量限制在一个监控goroutine中,而

Get

的调用者则改为

发送消息

// Func is the type of the function to memoize.
type Func func(key string) (interface{}, error)

// A result is the result of calling a Func.
type result struct {
	value interface{}
	err   error
}

type entry struct {
	res   result
	ready chan struct{} // closed when res is ready
}

// A request is a message requesting that the Func be applied to key.
type request struct {
	key      string
	response chan<- result // the client wants a single result
}

type Memo struct{ requests chan request }

// New returns a memoization of f.  Clients must subsequently call Close.
func New(f Func) *Memo {
	memo := &Memo{requests: make(chan request)}
	go memo.server(f)
	return memo
}

func (memo *Memo) Get(key string) (interface{}, error) {
	response := make(chan result)
	memo.requests <- request{key, response}
	res := <-response
	return res.value, res.err
}

func (memo *Memo) Close() { close(memo.requests) }

//!-get

//!+monitor

func (memo *Memo) server(f Func) {
	cache := make(map[string]*entry)
	for req := range memo.requests {
		e := cache[req.key]
		if e == nil {
			// This is the first request for this key.
			e = &entry{ready: make(chan struct{})}
			cache[req.key] = e
			go e.call(f, req.key) // call f(key)
		}
		go e.deliver(req.response)
	}
}

func (e *entry) call(f Func, key string) {
	// Evaluate the function.
	e.res.value, e.res.err = f(key)
	// Broadcast the ready condition.
	close(e.ready)
}

func (e *entry) deliver(response chan<- result) {
	// Wait for the ready condition.
	<-e.ready
	// Send the result to the client.
	response <- e.res
}


Get

方法创建一个

response

通道,并将它放在一个请求里,然后把它发送给监控goroutine,然后从自己创建的

response

通道中读取。

监控goroutine(即

server

方法)不断从

request

通道中读取,直至该通道被关闭。对于每个请求,它先从缓存中查询,如果没找到则创建并插入一个新的

entry



监控goroutine先创建一个

entry

放到缓存中,然后它调用

go e.call(f, req.key)

创建一个gorouitne来计算结果、关闭

ready

通道。与此同时它调用

go e.deliver(req.response)

等待

ready

通道关闭,并将结果发送到

response

通道中;

如果监控

goroutine

直接从缓存找到了结果,那么根据

key

查到的

entry

已经包含一个已经关闭的通道,它调用

go e.deliver(req.response)

就可以直接将结果放到

response

通道中。

总结起来,

server

负责了从请求通道中读取请求,对于未完成计算的

key

,它创建新的goroutine执行计算任务,随后通过请求中附带的

resp

通道答复请求。

更进一步的改造,可以限制进行计算的goroutine数量、通过

context

包控制

server

的生命周期等。



版权声明:本文为weixin_43116322原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。