Go实现并发缓存的示例代码
作者:ethannotlazy
并发不安全的 Memo
首先用一个例子演示函数记忆:
// A Memo caches the results of calling a Func. type Memo struct { f Func cache map[string]result } // Func is the type of the function to memoize. type Func func(key string) (interface{}, error) type result struct { value interface{} err error } func New(f Func) *Memo { return &Memo{f: f, cache: make(map[string]result)} } // NOTE: not concurrency-safe! func (memo *Memo) Get(key string) (interface{}, error) { res, ok := memo.cache[key] if !ok { res.value, res.err = memo.f(key) memo.cache[key] = res } return res.value, res.err }
其中函数f
是一个重量级的计算函数,调用它的代价很大,所以要将结果缓存到一个map
中加快每次调用。这就是函数记忆。
每次调用Get
,将从memo
里查询结果,如果没查到,就要调用函数f
计算结果,再将它记录到缓存中。
以上实现Get
方法在没有使用同步的情况下更新了缓存cache
,整个Get
函数不是并发安全的。
安全但伪并发的 Memo
考虑每次调用Get
方法都加锁:
type Memo struct { f Func mu sync.Mutex // guards cache cache map[string]result } // Get is concurrency-safe. func (memo *Memo) Get(key string) (value interface{}, err error) { memo.mu.Lock() res, ok := memo.cache[key] if !ok { res.value, res.err = memo.f(key) memo.cache[key] = res } memo.mu.Unlock() return res.value, res.err }
由于每次调用都请求互斥锁,Get
又将并行的请求操作串行化了。
会导致多余计算的 Memo
考虑以下改进:
func (memo *Memo) Get(key string) (value interface{}, err error) { memo.mu.Lock() res, ok := memo.cache[key] memo.mu.Unlock() if !ok { res.value, res.err = memo.f(key) // Between the two critical sections, several goroutines // may race to compute f(key) and update the map. memo.mu.Lock() memo.cache[key] = res memo.mu.Unlock() } return res.value, res.err }
该版本分两次获取锁:第一次用于查询缓存,第二次用于在查询无结果时进行更新。
在理想情况下,我们应该避免这种额外的处理。这个功能有时被称为重复抑制(duplication suppression)。
通过通道进行重复抑制
在第四个版本的缓存中,我们为每个entry
新加了一个通道ready
。在设置完entry
的result
字段后,通道会关闭,正在等待的goroutine会收到广播,就可以从entry
中读取结果了。
// Func is the type of the function to memoize. type Func func(string) (interface{}, error) type result struct { value interface{} err error } type entry struct { res result ready chan struct{} // closed when res is ready } func New(f Func) *Memo { return &Memo{f: f, cache: make(map[string]*entry)} } type Memo struct { f Func mu sync.Mutex // guards cache cache map[string]*entry //现在缓存返回的是一个entry } func (memo *Memo) Get(key string) (value interface{}, err error) { memo.mu.Lock() e := memo.cache[key] if e == nil { // This is the first request for this key. // This goroutine becomes responsible for computing // the value and broadcasting the ready condition. e = &entry{ready: make(chan struct{})} memo.cache[key] = e memo.mu.Unlock() e.res.value, e.res.err = memo.f(key) close(e.ready) // broadcast ready condition } else { // This is a repeat request for this key. memo.mu.Unlock() <-e.ready // wait for ready condition } return e.res.value, e.res.err }
当Get
函数发现缓存memo
中没有记录时,它构造一个entry
放到缓存中,但这时key
对应的结果还未计算。
这时,如果其他goroutine调用了Get
函数查询同样的key
时,它会到达<-e.ready
语句并因等待通道数据而阻塞。只有当计算结束,负责计算结果的goroutine将通道关闭后,其它goroutine才能够得以继续执行,并查询出结果。
- 当一个goroutine试图查询一个不存在的结果时,它创建一个
entry
放到缓存中,并解锁,然后调用f
进行计算。计算完成后更新相应的entry
就可以将ready
通道关闭; - 当一个goroutine试图查询一个已经存在的结果时,他应该立即放弃锁,并等待查到的
entry
的通道的关闭。
「通过通信共享内存」的另一设计
以上介绍了共享变量并上锁的方法,另一种方案是通信顺序进程。
在新的设计中,map
变量限制在一个监控goroutine中,而Get
的调用者则改为发送消息。
// Func is the type of the function to memoize. type Func func(key string) (interface{}, error) // A result is the result of calling a Func. type result struct { value interface{} err error } type entry struct { res result ready chan struct{} // closed when res is ready } // A request is a message requesting that the Func be applied to key. type request struct { key string response chan<- result // the client wants a single result } type Memo struct{ requests chan request } // New returns a memoization of f. Clients must subsequently call Close. func New(f Func) *Memo { memo := &Memo{requests: make(chan request)} go memo.server(f) return memo } func (memo *Memo) Get(key string) (interface{}, error) { response := make(chan result) memo.requests <- request{key, response} res := <-response return res.value, res.err } func (memo *Memo) Close() { close(memo.requests) } //!-get //!+monitor func (memo *Memo) server(f Func) { cache := make(map[string]*entry) for req := range memo.requests { e := cache[req.key] if e == nil { // This is the first request for this key. e = &entry{ready: make(chan struct{})} cache[req.key] = e go e.call(f, req.key) // call f(key) } go e.deliver(req.response) } } func (e *entry) call(f Func, key string) { // Evaluate the function. e.res.value, e.res.err = f(key) // Broadcast the ready condition. close(e.ready) } func (e *entry) deliver(response chan<- result) { // Wait for the ready condition. <-e.ready // Send the result to the client. response <- e.res }
Get
方法创建一个response
通道,并将它放在一个请求里,然后把它发送给监控goroutine,然后从自己创建的response
通道中读取。
监控goroutine(即server
方法)不断从request
通道中读取,直至该通道被关闭。对于每个请求,它先从缓存中查询,如果没找到则创建并插入一个新的entry
:
监控goroutine先创建一个entry
放到缓存中,然后它调用go e.call(f, req.key)
创建一个gorouitne来计算结果、关闭ready
通道。与此同时它调用go e.deliver(req.response)
等待ready
通道关闭,并将结果发送到response
通道中;
如果监控goroutine
直接从缓存找到了结果,那么根据key
查到的entry
已经包含一个已经关闭的通道,它调用go e.deliver(req.response)
就可以直接将结果放到response
通道中。
总结起来,server
负责了从请求通道中读取请求,对于未完成计算的key
,它创建新的goroutine执行计算任务,随后通过请求中附带的resp
通道答复请求。
更进一步的改造,可以限制进行计算的goroutine数量、通过context
包控制server
的生命周期等。
到此这篇关于Go实现并发缓存的示例代码的文章就介绍到这了,更多相关Go 并发缓存内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!