详解Go语言如何实现并发安全的map
作者:水洗牛仔裤
go语言提供的数据类型中,只有channel是并发安全的,基础map并不是并发安全的。以下三种方案实现了并发安全的map。
方案一:读写锁+map
实现原理
给map添加一把读写锁,读操作加读锁进行读取;添加,更新,删除,遍历,获取长度这些操作加写锁后在进行操作。
代码实现
以下代码是并发map的实现演示:
type RWMap struct { sync.RWMutex m map[any]any } func NewGRWMap() *RWMap { return &RWMap{ m: make(map[any]any), } } func (m *RWMap) Get(k int) (any, bool) { m.RLock() defer m.RUnlock() v, existed := m.m[k] return v, existed } func (m *RWMap) Set(k any, v any) { m.Lock() defer m.Unlock() m.m[k] = v } func (m *RWMap) Delete(k any) { m.Lock() defer m.Unlock() delete(m.m, k) } func (m *RWMap) Len() int { m.RLock() defer m.RUnlock() return len(m.m) } func (m *RWMap) Each(f func(k, v any) bool) { m.RLock() defer m.RUnlock() for k, v := range m.m { if !f(k, v) { return } } }
上述代码在读的时候加了个读锁,这个读锁在sync.RWMutex中并没有使用锁,只是将 readerCount
这个字段+1。增删改是加了写锁,写锁在sync.RWMutex中每次都需要加锁。 以上可知,读写锁的加锁力度很大,当需要读多写少的情况下可以使用读写锁加map实现并发安全。
方案二:分片加锁
实现原理
分片加锁的原理就如其名字一样,将一个map分成多个片段(一个片段是一个map),每个片段有自己的锁。
代码实现
concurrent-map提供了一种高性能的解决方案:通过对内部map
进行分片,降低锁粒度,从而达到最少的锁等待时间(锁冲突)
以下是分片加锁中重要的数据类型的结构体:
var SHARD_COUNT = 32 type ConcurrentMap[K comparable, V any] struct { shards []*ConcurrentMapShared[K, V] sharding func(key K) uint32 } type ConcurrentMapShared[K comparable, V any] struct { items map[K]V sync.RWMutex }
结构如下图所示:
上述代码使用泛型实现了不同类型的map[comparable]any
。调用NewWithCustomShardingFunction
函数,传入泛型的类型参数和哈希函数,就可以得到一个并发安全的map了。
func create[K comparable, V any](sharding func(key K) uint32) ConcurrentMap[K, V] { m := ConcurrentMap[K, V]{ sharding: sharding, shards: make([]*ConcurrentMapShared[K, V], SHARD_COUNT), } for i := 0; i < SHARD_COUNT; i++ { m.shards[i] = &ConcurrentMapShared[K, V]{items: make(map[K]V)} } return m } func New[V any]() ConcurrentMap[string, V] { return create[string, V](fnv32) } func NewWithCustomShardingFunction[K comparable, V any](sharding func(key K) uint32) ConcurrentMap[K, V] { return create[K, V](sharding) }
添加过程
func (m ConcurrentMap[K, V]) Set(key K, value V) { shard := m.GetShard(key) shard.Lock() shard.items[key] = value shard.Unlock() } func (m ConcurrentMap[K, V]) GetShard(key K) *ConcurrentMapShared[K, V] { return m.shards[uint(m.sharding(key))%uint(SHARD_COUNT)] }
大致流程如上图所示:
- 调用sharding得到哈希值。
- 对哈希值取模于切片长度,得到对应的分片map。
- 对map加写锁进行操作。
其他流程大致一样,大致都是先找“坑”,再进行读写操作。也可以将分片加锁方案简单的理解为是对读写加锁的方案的升级。
方案三:sync.Map
我们先看一下1.21版本的sync.Map的结构体:
type Map struct { mu Mutex read atomic.Pointer[readOnly] dirty map[any]*entry misses int } type readOnly struct { m map[any]*entry amended bool } type entry struct { p atomic.Pointer[any] }
这个结构的关系如下图所示:
应对特殊场景的 sync.Map
在官方文档中指出,在以下两个场景中使用sync.Map,会比使用读写锁+map,的性能好:
- 只会增长的缓存系统中,一个key只写入一次而被读很多次;
- 多个goroutine为不相交的读,写和重写的键值对。
sync.Map的操作流程
读Load()
func (x *Pointer[T]) Load() *T { return (*T)(LoadPointer(&x.v)) } func (m *Map) loadReadOnly() readOnly { if p := m.read.Load(); p != nil { return *p } return readOnly{} } func (m *Map) Load(key any) (value any, ok bool) { read := m.loadReadOnly() e, ok := read.m[key] if !ok && read.amended { m.mu.Lock() read = m.loadReadOnly() e, ok = read.m[key] if !ok && read.amended { e, ok = m.dirty[key] m.missLocked() } m.mu.Unlock() } if !ok { return nil, false } return e.load() } func (e *entry) load() (value any, ok bool) { p := e.p.Load() if p == nil || p == expunged { return nil, false } return *p, true } func (m *Map) missLocked() { m.misses++ if m.misses < len(m.dirty) { return } m.read.Store(&readOnly{m: m.dirty}) m.dirty = nil m.misses = 0 }
读流程:
先去read中读,有数据直接读取e.load()
结束;没有加锁,去dirty中读,e换成dirty的,调用m.missLocked()
,判断dirty是否存在这个key,不存在return nil, false
;存在e.load()
.
!ok && read.amended
这个判断是在read不存在key并且dirty存在read中没有的数据时为true。 m.missLocked()
记录miss的次数,当miss的次数大于m.dirty
的长度时将dirty数据给read,dirty清空,miss重置为0。
写Store()
func (m *Map) Store(key, value any) { _, _ = m.Swap(key, value) } func (m *Map) Swap(key, value any) (previous any, loaded bool) { read := m.loadReadOnly() if e, ok := read.m[key]; ok { if v, ok := e.trySwap(&value); ok { if v == nil { return nil, false } return *v, true } } m.mu.Lock() read = m.loadReadOnly() if e, ok := read.m[key]; ok { if e.unexpungeLocked() { // The entry was previously expunged, which implies that there is a // non-nil dirty map and this entry is not in it. m.dirty[key] = e } if v := e.swapLocked(&value); v != nil { loaded = true previous = *v } } else if e, ok := m.dirty[key]; ok { if v := e.swapLocked(&value); v != nil { loaded = true previous = *v } } else { if !read.amended { // We're adding the first new key to the dirty map. // Make sure it is allocated and mark the read-only map as incomplete. m.dirtyLocked() m.read.Store(&readOnly{m: read.m, amended: true}) } m.dirty[key] = newEntry(value) } m.mu.Unlock() return previous, loaded } func (m *Map) dirtyLocked() { if m.dirty != nil { return } read := m.loadReadOnly() m.dirty = make(map[any]*entry, len(read.m)) for k, e := range read.m { if !e.tryExpungeLocked() { m.dirty[k] = e } } }
写的流程:
先去read看key是否存在;存在:如果key的value值为expunged
,返回false,走dirty操作;否则,使用cas原子操作直接赋值,结束流程。
返回false,走dirty操作:先加锁,再走一次read,看是否存在key。
- read存在,使用
e.unexpungeLocked()
使用cas将entry设置为nil,若cas成功,将dirty中的entry设置为nil。使用cas设置value。 - read不存在,dirty存在,使用cas设置value。
- 以上都不满足(read,dirty都不存在),判断read中是否缺少数据,缺少时给dirty添加key-value;不缺少时调用
m.dirtyLocked()
,将read中的数据更新到dirty中,将其中删除的数据设置为expunged
,之后将read的amended
设置为true,最后给dirty添加key-value。
解锁,结束。
删Delete()
func (m *Map) Delete(key any) { m.LoadAndDelete(key) } func (m *Map) LoadAndDelete(key any) (value any, loaded bool) { read := m.loadReadOnly() e, ok := read.m[key] if !ok && read.amended { m.mu.Lock() read = m.loadReadOnly() e, ok = read.m[key] if !ok && read.amended { e, ok = m.dirty[key] delete(m.dirty, key) m.missLocked() } m.mu.Unlock() } if ok { return e.delete() } return nil, false } func (e *entry) delete() (value any, ok bool) { for { p := e.p.Load() if p == nil || p == expunged { return nil, false } if e.p.CompareAndSwap(p, nil) { return *p, true } } }
删除流程:
先看read中是否存在,存在,直接调用e.delete()
结束
不存在,且read中缺少数据,加锁,再次查看read,存在:解锁,调用e.delete()
结束;不存在:删除dirty中的key,再调用m.missLocked(),解锁,若dirty中存在并删除了,还需要调用e.delete()
,若dirty不存在key,return结束。
遍历Range()
func (m *Map) Range(f func(key, value any) bool) { read := m.loadReadOnly() if read.amended { m.mu.Lock() read = m.loadReadOnly() if read.amended { read = readOnly{m: m.dirty} m.read.Store(&read) m.dirty = nil m.misses = 0 } m.mu.Unlock() } for k, e := range read.m { v, ok := e.load() if !ok { continue } if !f(k, v) { break } } }
遍历流程: 获取read,若read的数据全,遍历read,若数据不全,加锁,将dirty数据更新到read中,并将dirty值为nil,misses置0,再遍历read。
sync.Map安全并发实现
sync.Map在实现并发问题的同时提升性能的几个优化:
- 用空间换时间,使用两个map,一个无锁,一个有锁,减少加锁对性能的影响。
- 优先从无锁的map中读取,更新,删除。
- 动态调整,miss次数多之后,将加锁map数据给无锁map。
- 延迟删除,删除一个键值只是进行了软删除,在动态调整时会进行硬删除。
- double check,访问有锁map,加锁后再次检查无锁map,是否有数据。
到此这篇关于详解Go语言如何实现并发安全的map的文章就介绍到这了,更多相关Go实现并发安全map内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!