Golang

关注公众号 jb51net

关闭
首页 > 脚本专栏 > Golang > Go 并发缓存

Go实现并发缓存的示例代码

作者:ethannotlazy

高并发数据存储是现代互联网应用开发中常遇到的一大挑战,本文主要介绍了Go实现并发缓存的示例代码,具有一定的参考价值,感兴趣的可以了解一下

并发不安全的 Memo

首先用一个例子演示函数记忆

// A Memo caches the results of calling a Func.
type Memo struct {
	f     Func
	cache map[string]result
}

// Func is the type of the function to memoize.
type Func func(key string) (interface{}, error)

type result struct {
	value interface{}
	err   error
}

func New(f Func) *Memo {
	return &Memo{f: f, cache: make(map[string]result)}
}

// NOTE: not concurrency-safe!
func (memo *Memo) Get(key string) (interface{}, error) {
	res, ok := memo.cache[key]
	if !ok {
		res.value, res.err = memo.f(key)
		memo.cache[key] = res
	}
	return res.value, res.err
}

其中函数f是一个重量级的计算函数,调用它的代价很大,所以要将结果缓存到一个map中加快每次调用。这就是函数记忆。
每次调用Get,将从memo里查询结果,如果没查到,就要调用函数f计算结果,再将它记录到缓存中。
以上实现Get方法在没有使用同步的情况下更新了缓存cache,整个Get函数不是并发安全的

安全但伪并发的 Memo

考虑每次调用Get方法都加锁:

type Memo struct {
	f     Func
	mu    sync.Mutex // guards cache
	cache map[string]result
}

// Get is concurrency-safe.
func (memo *Memo) Get(key string) (value interface{}, err error) {
	memo.mu.Lock()
	res, ok := memo.cache[key]
	if !ok {
		res.value, res.err = memo.f(key)
		memo.cache[key] = res
	}
	memo.mu.Unlock()
	return res.value, res.err
}

由于每次调用都请求互斥锁,Get又将并行的请求操作串行化了。

会导致多余计算的 Memo

考虑以下改进:

func (memo *Memo) Get(key string) (value interface{}, err error) {
	memo.mu.Lock()
	res, ok := memo.cache[key]
	memo.mu.Unlock()
	if !ok {
		res.value, res.err = memo.f(key)

		// Between the two critical sections, several goroutines
		// may race to compute f(key) and update the map.
		memo.mu.Lock()
		memo.cache[key] = res
		memo.mu.Unlock()
	}
	return res.value, res.err
}

该版本分两次获取锁:第一次用于查询缓存,第二次用于在查询无结果时进行更新。
在理想情况下,我们应该避免这种额外的处理。这个功能有时被称为重复抑制(duplication suppression)。

通过通道进行重复抑制

在第四个版本的缓存中,我们为每个entry新加了一个通道ready。在设置完entryresult字段后,通道会关闭,正在等待的goroutine会收到广播,就可以从entry中读取结果了。

// Func is the type of the function to memoize.
type Func func(string) (interface{}, error)

type result struct {
	value interface{}
	err   error
}

type entry struct {
	res   result
	ready chan struct{} // closed when res is ready
}

func New(f Func) *Memo {
	return &Memo{f: f, cache: make(map[string]*entry)}
}

type Memo struct {
	f     Func
	mu    sync.Mutex // guards cache
	cache map[string]*entry	//现在缓存返回的是一个entry
}

func (memo *Memo) Get(key string) (value interface{}, err error) {
	memo.mu.Lock()
	e := memo.cache[key]
	if e == nil {
		// This is the first request for this key.
		// This goroutine becomes responsible for computing
		// the value and broadcasting the ready condition.
		e = &entry{ready: make(chan struct{})}
		memo.cache[key] = e
		memo.mu.Unlock()

		e.res.value, e.res.err = memo.f(key)

		close(e.ready) // broadcast ready condition
	} else {
		// This is a repeat request for this key.
		memo.mu.Unlock()

		<-e.ready // wait for ready condition
	}
	return e.res.value, e.res.err
}

Get函数发现缓存memo中没有记录时,它构造一个entry放到缓存中,但这时key对应的结果还未计算。
这时,如果其他goroutine调用了Get函数查询同样的key时,它会到达<-e.ready语句并因等待通道数据而阻塞。只有当计算结束,负责计算结果的goroutine将通道关闭后,其它goroutine才能够得以继续执行,并查询出结果。

「通过通信共享内存」的另一设计

以上介绍了共享变量并上锁的方法,另一种方案是通信顺序进程
在新的设计中,map变量限制在一个监控goroutine中,而Get的调用者则改为发送消息

// Func is the type of the function to memoize.
type Func func(key string) (interface{}, error)

// A result is the result of calling a Func.
type result struct {
	value interface{}
	err   error
}

type entry struct {
	res   result
	ready chan struct{} // closed when res is ready
}

// A request is a message requesting that the Func be applied to key.
type request struct {
	key      string
	response chan&lt;- result // the client wants a single result
}

type Memo struct{ requests chan request }

// New returns a memoization of f.  Clients must subsequently call Close.
func New(f Func) *Memo {
	memo := &amp;Memo{requests: make(chan request)}
	go memo.server(f)
	return memo
}

func (memo *Memo) Get(key string) (interface{}, error) {
	response := make(chan result)
	memo.requests &lt;- request{key, response}
	res := &lt;-response
	return res.value, res.err
}

func (memo *Memo) Close() { close(memo.requests) }

//!-get

//!+monitor

func (memo *Memo) server(f Func) {
	cache := make(map[string]*entry)
	for req := range memo.requests {
		e := cache[req.key]
		if e == nil {
			// This is the first request for this key.
			e = &amp;entry{ready: make(chan struct{})}
			cache[req.key] = e
			go e.call(f, req.key) // call f(key)
		}
		go e.deliver(req.response)
	}
}

func (e *entry) call(f Func, key string) {
	// Evaluate the function.
	e.res.value, e.res.err = f(key)
	// Broadcast the ready condition.
	close(e.ready)
}

func (e *entry) deliver(response chan&lt;- result) {
	// Wait for the ready condition.
	&lt;-e.ready
	// Send the result to the client.
	response &lt;- e.res
}

Get方法创建一个response通道,并将它放在一个请求里,然后把它发送给监控goroutine,然后从自己创建的response通道中读取。

监控goroutine(即server方法)不断从request通道中读取,直至该通道被关闭。对于每个请求,它先从缓存中查询,如果没找到则创建并插入一个新的entry
监控goroutine先创建一个entry放到缓存中,然后它调用go e.call(f, req.key)创建一个gorouitne来计算结果、关闭ready通道。与此同时它调用go e.deliver(req.response)等待ready通道关闭,并将结果发送到response通道中;

如果监控goroutine直接从缓存找到了结果,那么根据key查到的entry已经包含一个已经关闭的通道,它调用go e.deliver(req.response)就可以直接将结果放到response通道中。

总结起来,server负责了从请求通道中读取请求,对于未完成计算的key,它创建新的goroutine执行计算任务,随后通过请求中附带的resp通道答复请求。

更进一步的改造,可以限制进行计算的goroutine数量、通过context包控制server的生命周期等。

到此这篇关于Go实现并发缓存的示例代码的文章就介绍到这了,更多相关Go 并发缓存内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文