Golang基础学习之map的示例详解
作者:IguoChan
0. 简介
哈希表是常见的数据结构,有的语言会将哈希称作字典或者映射,在Go
中,哈希就是常见的数据类型map
。哈希表提供了键值之间的映射,其读写性能是O(1)。
1. 实现原理
1.1 底层结构
hmap
在Go
中,map
的底层结构是hmap
,如下。实际上,map
类型就是一个指向一个hmap
结构体的指针,所以其可以理解为是Go
中的”引用“
类型(有的文章认为slice
也是引用类型,说实话这我不敢苟同,因为切片的拷贝切片发生的操作并不一定会完全影响原切片,譬如append
操作)。
// A header for a Go map. type hmap struct { // Note: the format of the hmap is also encoded in cmd/compile/internal/reflectdata/reflect.go. // Make sure this stays in sync with the compiler's definition. count int // # live cells == size of map. Must be first (used by len() builtin) flags uint8 B uint8 // log_2 of # of buckets (can hold up to loadFactor * 2^B items) noverflow uint16 // approximate number of overflow buckets; see incrnoverflow for details hash0 uint32 // hash seed buckets unsafe.Pointer // array of 2^B Buckets. may be nil if count==0. oldbuckets unsafe.Pointer // previous bucket array of half the size, non-nil only when growing nevacuate uintptr // progress counter for evacuation (buckets less than this have been evacuated) extra *mapextra // optional fields }
以上字段中,含义我们都可以按照注释理解,我们需要着重关注buckets
、oldbuckets
和extra
几个字段。bucket
就是我们常说的”桶“,一个桶中最多装8个key-value
对,我们也可以理解为8个槽。
bmap
以下runtime.bmap
定义的bucket
的结构体,可以看到,其只是存储了8个tophash
值,即8个key
的哈希的高 8 位,通过比较不同键的哈希的高 8 位可以减少访问键值对次数以提高性能。
// A bucket for a Go map. type bmap struct { // tophash generally contains the top byte of the hash value // for each key in this bucket. If tophash[0] < minTopHash, // tophash[0] is a bucket evacuation state instead. tophash [bucketCnt]uint8 // Followed by bucketCnt keys and then bucketCnt elems. // NOTE: packing all the keys together and then all the elems together makes the // code a bit more complicated than alternating key/elem/key/elem/... but it allows // us to eliminate padding which would be needed for, e.g., map[int64]int8. // Followed by an overflow pointer. }
因为哈希表中可能存储不同类型的键值对,所以键值对的空间大小只能在实际编译时进行推导,在编译时,bmap
结构体会被以下结构所替代,参考cmd/compile/internal/reflectdata.MapBucketType。可以发现,在内存排列上,没有采取key1/elem1/key2/elem2...
的排列,而是将所有的key
存放在一起,所有的value
存放在一起,这是为了避免键值的类型间隔排列带来的内存对齐问题,反而更加浪费内存。
type bmap struct { topbits [8]uint8 keys [8]keytype elems [8]elemtype overflow uintptr
需要注意的是,如果keys
和elems
没有指针,map
实现可以在旁边保留一个溢出指针列表,以便可以将buckets
标记为没有指针,这样就可以避免在GC
时扫描整个map
。 在这种情况下,overflow
字段的类型是uintptr
;否则,其类型就是unsafe.Pointer
。而这个溢出的指针列表就是hmap
中的extra
字段,其类型定义如下。其实,extra
字段就是为了优化GC
而设计的。
// mapextra holds fields that are not present on all maps. type mapextra struct { // If both key and elem do not contain pointers and are inline, then we mark bucket // type as containing no pointers. This avoids scanning such maps. // However, bmap.overflow is a pointer. In order to keep overflow buckets // alive, we store pointers to all overflow buckets in hmap.extra.overflow and hmap.extra.oldoverflow. // overflow and oldoverflow are only used if key and elem do not contain pointers. // overflow contains overflow buckets for hmap.buckets. // oldoverflow contains overflow buckets for hmap.oldbuckets. // The indirection allows to store a pointer to the slice in hiter. overflow *[]*bmap oldoverflow *[]*bmap // nextOverflow holds a pointer to a free overflow bucket. nextOverflow *bmap }
1.2 map创建
map
在代码中的创建有多种方式,其函数类似于make(map[KT]VT, hint intType)
,hint
并不能认为是map
的容量,只能说是给map
创建传入的一个提示大小,不填时默认为0.
var map1 = map[int]int{ 1: 1, } func makeMapIntInt() map[int]int { return make(map[int]int) } func makeMapIntIntWithHint(hint int) map[int]int { return make(map[int]int, hint) } func main() { _ = map1 map2 := make(map[int]int) _ = map2 map3 := makeMapIntInt() _ = map3 map4 := make(map[int]int, 9) _ = map4 map5 := makeMapIntIntWithHint(9) _ = map5 map6 := make(map[int]int, 53) _ = map6 map7 := makeMapIntIntWithHint(53) _ = map7
如上,通过运行go tool compile -S main.go > main.i
得到汇编代码以及调试,可以总结如下:
当创建的map
被分配到栈上,且其hint
小于等于bucketCnt = 8
时(map2
),会被采取如下优化:
MOVD $""..autotmp_28-1200(SP), R16
MOVD $""..autotmp_28-1072(SP), R0
STP.P (ZR, ZR), 16(R16)
CMP R0, R16
BLE 44
PCDATA $1, ZR
CALL runtime.fastrand(SB)
当创建的map
被分配到堆上且其hint
小于等于8时,不管是通过字面量初始化(map1
)还是通过make
函数初始化(map3
),其都将调用makemap_small
函数创建;
当创建的map
的hint
大于8,且小于等于52(此时是hmap
中B=3
时的最大装载量)时,其将调用 makemap
函数完成初始化,且extra
字段是nil
,且会在堆上分配buckets
;
当hint
大于52(即hmap.B ≥ 4
)时,其将调用 makemap
函数完成初始化,且extra
字段不为nil
,且会在堆上分配buckets
;
func makemap64(t *maptype, hint int64, h *hmap) *hmap // makemap_small implements Go map creation for make(map[k]v) and // make(map[k]v, hint) when hint is known to be at most bucketCnt // at compile time and the map needs to be allocated on the heap. func makemap_small() *hmap // makemap implements Go map creation for make(map[k]v, hint). // If the compiler has determined that the map or the first bucket // can be created on the stack, h and/or bucket may be non-nil. // If h != nil, the map can be created directly in h. // If h.buckets != nil, bucket pointed to can be used as the first bucket. func makemap(t *maptype, hint int, h *hmap) *hmap
接下来,我们具体分析一下map
创建时所做的事情,即分析makemap_small
和makemap
函数到底做了什么。
hint=0并新增一个元素 如上所述,当调用make
创建map
时不传入hint
,调用的是makemap_small
函数,其实这个函数做的事情特别简单,就是在堆上创建了一个hmap
对象,初始化了哈希种子。
func makemap_small() *hmap { h := new(hmap) h.hash0 = fastrand() return h }
在写操作的时候,会判断这个hmap
对象的buckets
是否为空,如果是空的,那么就会创建一个bucket
,如下图片,可以很好地展现以下代码创建的map
对象的内存结构。
m := make(map[int]int) m[1] = 1
hint=53 前面说过,当hint
大于52时,会调用makemap
函数,并且生成溢出桶,下面,我们就借助这种情况,好好分析一下makemap
函数。
func makemap(t *maptype, hint int, h *hmap) *hmap { mem, overflow := math.MulUintptr(uintptr(hint), t.bucket.size) if overflow || mem > maxAlloc { hint = 0 } // initialize Hmap if h == nil { h = new(hmap) } h.hash0 = fastrand() // Find the size parameter B which will hold the requested # of elements. // For hint < 0 overLoadFactor returns false since hint < bucketCnt. B := uint8(0) for overLoadFactor(hint, B) { B++ } h.B = B // allocate initial hash table // if B == 0, the buckets field is allocated lazily later (in mapassign) // If hint is large zeroing this memory could take a while. if h.B != 0 { var nextOverflow *bmap h.buckets, nextOverflow = makeBucketArray(t, h.B, nil) if nextOverflow != nil { h.extra = new(mapextra) h.extra.nextOverflow = nextOverflow } } return h }
makemap
函数会首先判断设置的hint
大小是不是超出了限制,比如超过了最大允许申请内存,或者最大指针数,如果超出了的话,会将hint
置为0,所以可以看出,map
创建时的hint
是个建议值;然后,会通过overLoadFactor
函数判断对于hint
大小的map
,根据6.5
的装载因子,大致需要多少个bucket
,从而确定h.B
这个参数;最后会根据h.B
参数和运行时的表类型参数t
确定需要为buckets
申请多少内存,以及是否需要申请溢出桶。以下代码的hint=53
,计算出来的h.B=4
,所以需要24个桶,同时也会分配溢出桶。
m := make(map[int]int, 53)
值得注意的是,上面两种不同的桶(可分为正常桶和溢出桶,可以看出2hmap.B
指的是正常桶的数目,不包括溢出桶)在内存中是连续存储的,只是用不同的指针指向而已,其中,extra.nextOverflow
指向的是下一个能用的溢出桶,而extra.overflow
和extra.oldoverflow
在map
的key-value
都是非指针类型时起作用,用于存储指向溢出桶的指针,优化GC
。
1.3 写操作
对于map
而言,不管是修改key
对应的value
还是设置value
,对其都是写操作,在运行时,大致会调用runtime.mapassign
函数,不过,Go SDK
包对一些特殊的key
值做了优化操作,比如:
key类型 | 插入函数 | 备注 |
---|---|---|
uint32 | runtime.mapassign_fast32 | |
uint64 | runtime.mapassign_fast64 | int类型时也会用这个函数 |
string | runtime.mapassign_faststr |
这里,我们还是分析基础的runtime.mapassign
函数,鉴于函数太长,我们分段解析函数。
if h == nil { panic(plainError("assignment to entry in nil map")) } ... if h.flags&hashWriting != 0 { throw("concurrent map writes") } hash := t.hasher(key, uintptr(h.hash0)) // Set hashWriting after calling t.hasher, since t.hasher may panic, // in which case we have not actually done a write. h.flags ^= hashWriting
以上,mapassign
会做map
的空值校验和并发写校验,这里也说明,map
是并发不安全的;并且在hash
之后再置标志位的行,代码也做了解释:即hasher
函数可能panic
,这种情况下并没有在写入(but
,我并没有十分理解,panic
了也没有recover
,程序都崩溃了,还能咋地?再说,并发写的时候,两个协程同时执行到取hash步骤,可能导致throw
那一行无法触发呀!)
again: bucket := hash & bucketMask(h.B) if h.growing() { growWork(t, h, bucket) } b := (*bmap)(add(h.buckets, bucket*uintptr(t.bucketsize))) top := tophash(hash) var inserti *uint8 var insertk unsafe.Pointer var elem unsafe.Pointer
以上代码中,bucketMask
函数会根据h.B
的大小,返回不同的掩码,说白了,就是根据bucket
的数目生成掩码,其实就是从最低位开始数B
个1。可以说,上述代码中的bucket
其实就是桶序号(从0开始)。这时候还要检查一下是否在扩容,如果是的话,需要先执行扩容操作。接着,会根据前面的桶序号生成指向这个桶的指针b
。最后声明三个指针,inserti
表示目标元素的在桶中的索引,insertk
和 elem
分别表示键值对的地址。
bucketloop: for { for i := uintptr(0); i < bucketCnt; i++ { if b.tophash[i] != top { if isEmpty(b.tophash[i]) && inserti == nil { inserti = &b.tophash[i] insertk = add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize)) elem = add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize)) } if b.tophash[i] == emptyRest { break bucketloop } continue } k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize)) if t.indirectkey() { k = *((*unsafe.Pointer)(k)) } if !t.key.equal(key, k) { continue } // already have a mapping for key. Update it. if t.needkeyupdate() { typedmemmove(t.key, k, key) } elem = add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize)) goto done } ovf := b.overflow(t) if ovf == nil { break } b = ovf }
以上代码,接下来就是在桶内寻找空隙或者原有的key值进行插入或者修改,基本逻辑就是,循环遍历这个桶的八个槽,通过tophash
判断,效率可能会高一些,如果未匹配且这个槽是空的状态(可能是刚初始化的空,即tophash[i]
值为0,也有可能是被删除后的空,即tophash[i]
的值为1),我们先讲以上三个指针赋值到此槽对应的位置;如果是后者,即是未被使用过的槽,那直接跳出循环,将此key-value
插入到这个位置(因为不可能存在其他的槽插入过这个键值)。如果找到了,那么更新数据就好,这里不赘述。
值得注意的是,如果将整个桶都找遍了,还是没有找到,那么会通过b.overflow(t)
检查是否有溢出桶在此桶后面,如果有的话,会继续搜寻;如果没有,则在后续判断是否需要扩容,或者是否需要新建溢出桶。
// Did not find mapping for key. Allocate new cell & add entry. // If we hit the max load factor or we have too many overflow buckets, // and we're not already in the middle of growing, start growing. if !h.growing() && (overLoadFactor(h.count+1, h.B) || tooManyOverflowBuckets(h.noverflow, h.B)) { hashGrow(t, h) goto again // Growing the table invalidates everything, so try again } if inserti == nil { // The current bucket and all the overflow buckets connected to it are full, allocate a new one. newb := h.newoverflow(t, b) inserti = &newb.tophash[0] insertk = add(unsafe.Pointer(newb), dataOffset) elem = add(insertk, bucketCnt*uintptr(t.keysize)) } // store new key/elem at insert position if t.indirectkey() { kmem := newobject(t.key) *(*unsafe.Pointer)(insertk) = kmem insertk = kmem } if t.indirectelem() { vmem := newobject(t.elem) *(*unsafe.Pointer)(elem) = vmem } typedmemmove(t.key, insertk, key) *inserti = top h.count++
以上代码,都是在原先所有的桶中没有找到的一些处理,首先是通过overLoadFactor(h.count+1, h.B) || tooManyOverflowBuckets(h.noverflow, h.B)
来判断map
是否需要扩容,这里涉及到两种扩容条件,分别是装载因子过高和溢出桶过多,只要满足一种,都将引起扩容,并且返回到again
标记处进行扩容处理,之后再进行一次主流程。扩容的机制在后面介绍。
如果不需要进行扩容,那么就需要在现有桶的链表后(这里需要提及的是,Go
中的map
使用拉链法解哈希冲突[相关知识可以参考文末补充内容])新增一个溢出桶,然后分配我们的数据未知,其思路也很简单,如果预先申请了空余的溢出桶,那使用这个桶,如果没有,那么申请一个桶,并且设置一些参数和标志等。
done: if h.flags&hashWriting == 0 { throw("concurrent map writes") } h.flags &^= hashWriting if t.indirectelem() { elem = *((*unsafe.Pointer)(elem)) } return elem
以上,最后一段就是标志位的处理,并且返回找到的value
的地址,在其他函数中对这段地址进行赋值操作等,此不赘述了。
1.4 读操作
v := m[k] // 如果存在对应 v,则返回 v;如果不存在,则返回 对应零值 v, ok := m[k] // 如果存在对应 v,则返回 v, true;如果不存在,则返回 对应零值, false
我们都知道,map
读取操作有以上两种方式,那对应的runtime
函数也应该有两种方式,分别是mapaccess1
和mapaccess2
,前者只返回值,后者返回值和是否存在,其他没有什么区别,同理,针对一些类型,Go SDK
也做了对应优化:
key类型 | 读取函数1 | 读取函数2 | 备注 |
---|---|---|---|
uint32 | runtime.mapaccess1_fast32 | runtime.mapaccess2_fast32 | |
uint64 | runtime.mapaccess1_fast64 | runtime.mapaccess2_fast64 | int类型时也会用这个函数 |
string | runtime.mapaccess1_faststr | runtime.mapaccess2_faststr |
下面,我们以mapaccess1
为例,分析一下map
的读操作。
if h == nil || h.count == 0 { if t.hashMightPanic() { t.hasher(key, 0) // see issue 23734 } return unsafe.Pointer(&zeroVal[0]) } if h.flags&hashWriting != 0 { throw("concurrent map read and map write") }
以上代码,当表为空时,直接返回零值,当有并发写操作时,报panic
。我们把中间一部分和扩容相关的内容留待后续讲解,直接看下面的代码。
bucketloop: for ; b != nil; b = b.overflow(t) { for i := uintptr(0); i < bucketCnt; i++ { if b.tophash[i] != top { if b.tophash[i] == emptyRest { break bucketloop } continue } k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize)) if t.indirectkey() { k = *((*unsafe.Pointer)(k)) } if t.key.equal(key, k) { e := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize)) if t.indirectelem() { e = *((*unsafe.Pointer)(e)) } return e } } } return unsafe.Pointer(&zeroVal[0])
和写操作一样,在确定了需要读取的桶之后,有以上这个循环函数,我们先看内循环,如果在槽i
不匹配且该槽未被使用过,说明其后的槽也肯定没有使用过,说明这个key
不可能在表中,可以直接返回零值。而如果不满足则一个一个找,本桶找完以后还会通过外循环去找溢出桶(如果有的话),找到了就返回;如果最后还没找到,说明不存在,则返回零值。
1.5 for-range操作
在map
的迭代操作中,其依托于以下结构体,我们需要关注的是key
、elem
和startBucket
、offset
两对参数需要关注一下。
// A hash iteration structure. // If you modify hiter, also change cmd/compile/internal/reflectdata/reflect.go // and reflect/value.go to match the layout of this structure. type hiter struct { key unsafe.Pointer // Must be in first position. Write nil to indicate iteration end (see cmd/compile/internal/walk/range.go). elem unsafe.Pointer // Must be in second position (see cmd/compile/internal/walk/range.go). t *maptype h *hmap buckets unsafe.Pointer // bucket ptr at hash_iter initialization time bptr *bmap // current bucket overflow *[]*bmap // keeps overflow buckets of hmap.buckets alive oldoverflow *[]*bmap // keeps overflow buckets of hmap.oldbuckets alive startBucket uintptr // bucket iteration started at offset uint8 // intra-bucket offset to start from during iteration (should be big enough to hold bucketCnt-1) wrapped bool // already wrapped around from end of bucket array to beginning B uint8 i uint8 bucket uintptr checkBucket uintptr }
1.5.1 注意遍历时的闭包
可以看到,hiter
作为for-range
遍历时的结构体,key
和elem
作为指向key-value
的指针,在整个遍历期间,其只有一份,所以在如下的一些场景下,可能出现错误。
m := map[int]string{ 1: "hello", 2: "world", 3: "hello", 4: "go", } wg := sync.WaitGroup{} for k, v := range m { wg.Add(1) go func() { defer wg.Done() fmt.Println(k, v) }() } wg.Wait()
最后的打印如下,并不符合最初的设计。这是因为闭包持有的是捕获变量的引用,而不是复制,而map
的遍历是始终只有一对指针在指向遍历元素(其实所有的类型遍历都是),导致最后打印的内容并不是想要的。
4 go
4 go
4 go
4 go
1.5.2 map的遍历是无序的
前面说过,map
的遍历围绕着hiter
这个结构体展开,在结构体中,startBucket
字段表示开始遍历的桶,offset
表示在这个桐中的偏移量,在hiter
的初始化函数runtime.mapiterinit
中有如下代码,可以看到,起始位置是随机的。
// decide where to start r := uintptr(fastrand()) if h.B > 31-bucketCntBits { r += uintptr(fastrand()) << 31 } it.startBucket = r & bucketMask(h.B) it.offset = uint8(r >> h.B & (bucketCnt - 1)) // iterator state it.bucket = it.startBucket
这是因为,一旦map
发生扩容,那么位置可能会变,而且如上所示,Go SDK
加入了随机值使得每次的遍历都是随机位置起始,也是为了不给程序员带来困扰。
1.6 删除操作
和读写操作一样,map
的删除操作一般而言会调用runtime.mapdelete
函数,同时也有几个特殊类型的优化操作,如下。和写操作一样,如果删除过程中发现正在扩容中,那么则会进行一次数据迁移操作。
key类型 | 删除函数 | 备注 |
---|---|---|
uint32 | runtime.mapdelete_fast32 | |
uint64 | runtime.mapdelete_fast64 | int类型时也会用这个函数 |
string | runtime.mapdelete_faststr |
删除操作的整体和之前的读操作比较类似,都是先找到位置,然后删除,删除之后,将tophash[i]
的标志位置为1;但是其中有个操作是,当这个桶没有后继的溢出桶,且以1结束,则将这些1都置为0。这是因为,前面的读写操作都有如果查找到该位置标志为0时则直接不再查找或者直接插入,这是因为,在map
的实现设计中,如果一个桶的槽标志为0,说明这个位置及之后的槽都没有被占用,且肯定没有后继的溢出桶;所以删除的时候这么设计,可以提高map
的读写效率。
// If the bucket now ends in a bunch of emptyOne states, // change those to emptyRest states. // It would be nice to make this a separate function, but // for loops are not currently inlineable. if i == bucketCnt-1 { if b.overflow(t) != nil && b.overflow(t).tophash[0] != emptyRest { goto notLast } } else { if b.tophash[i+1] != emptyRest { goto notLast } } for { b.tophash[i] = emptyRest if i == 0 { if b == bOrig { break // beginning of initial bucket, we're done. } // Find previous bucket, continue at its last entry. c := b for b = bOrig; b.overflow(t) != c; b = b.overflow(t) { } i = bucketCnt - 1 } else { i-- } if b.tophash[i] != emptyOne { break } } notLast: h.count--
值得注意的是,在删除操作中,我们并不会真正将这个桶对应的内存真正的释放,而只是将tophash[i]
标记成了emptyOne
。
1.7 扩容
在map
中,只有在写操作时,触发以下两种情况才会触发扩容,扩容会带来数据的迁移,而在写操作和删除操作时,都会判断是否在数据迁移中,如果是,那都将进行一次数据迁移工作。
overLoadFactor(h.count+1, h.B)
,判断新增一个数据(h.count+1
)导致装载因子是否超过6.5;tooManyOverflowBuckets(h.noverflow, h.B)
,当使用的溢出桶过多时,会进行一次扩容;不过此次扩容并不新增桶的个数,而是等量扩容sameSizeGrow
,sameSizeGrow
是一种特殊情况下发生的扩容,当我们持续向哈希中插入数据并将它们全部删除时,如果哈希表中的数据量没有超过阈值,就会不断积累溢出桶造成缓慢的内存泄漏。
在判断需要进行扩容操作之后,会调用runtime.hashGrow
函数,这是开始扩容的入口,在这个函数中,其实相当于做一些扩容前的准备工作,首先会判断是不是装载因子过大,如果是的话,则bigger
为1,如果不是则为0,即对应了上面的分类,如果是装载因子过大,则发生真实的扩容,即整个桶的大小翻倍(2B+1 = 2*2B);如果不是的话,那桶的大小维持不变。接下来会通过runtime.makeBucketArray
创建一组新桶和预创建的溢出桶,随后将原有的桶数组设置到 oldbuckets
上并将新的空桶设置到 buckets
上h.buckets
则指向新申请的桶。
func hashGrow(t *maptype, h *hmap) { // If we've hit the load factor, get bigger. // Otherwise, there are too many overflow buckets, // so keep the same number of buckets and "grow" laterally. bigger := uint8(1) if !overLoadFactor(h.count+1, h.B) { bigger = 0 h.flags |= sameSizeGrow } oldbuckets := h.buckets newbuckets, nextOverflow := makeBucketArray(t, h.B+bigger, nil) flags := h.flags &^ (iterator | oldIterator) if h.flags&iterator != 0 { flags |= oldIterator } // commit the grow (atomic wrt gc) h.B += bigger h.flags = flags h.oldbuckets = oldbuckets h.buckets = newbuckets h.nevacuate = 0 h.noverflow = 0 if h.extra != nil && h.extra.overflow != nil { // Promote current overflow buckets to the old generation. if h.extra.oldoverflow != nil { throw("oldoverflow is not nil") } h.extra.oldoverflow = h.extra.overflow h.extra.overflow = nil } if nextOverflow != nil { if h.extra == nil { h.extra = new(mapextra) } h.extra.nextOverflow = nextOverflow } // the actual copying of the hash table data is done incrementally // by growWork() and evacuate(). }
扩容真正的操作实际是在以下runtime.growWork
中完成的,这里有一点需要注意,hmap
有个参数是nevacuate
,作为已经扩容的bucket
的计数,所有低于这个数的桶序号(即hash后的桶序号,注意,是旧桶的序号)都已经被扩容,当nevacuate
等于旧桶数时,说明扩容结束了。
func growWork(t *maptype, h *hmap, bucket uintptr) { // make sure we evacuate the oldbucket corresponding // to the bucket we're about to use evacuate(t, h, bucket&h.oldbucketmask()) // evacuate one more oldbucket to make progress on growing if h.growing() { evacuate(t, h, h.nevacuate) } }
那是怎么保证这点的呢,在接下来看到的runtime.evacuate
中,当迁移结束,nevacuate
等于桶序号时才会调用advanceEvacuationMark
函数将计数+1,所以在runtime.growWork
函数中做了两次桶迁移,即第一次保证此次操作(写操作或者删除操作)的桶数据会迁移,如果这个桶序号和nevacuate
不相等,则利用第二次的evacuate(t, h, h.nevacuate)
保证这个计数会加一。这个过程中也不用担心桶会被重复迁移,因为if !evacuated(b)
判断条件会判断桶是否做过迁移了,只有没有做过迁移的桶才会进行操作,这里判断的标志位还是占用的tophash[0]
,有兴趣可以看看代码。
func evacuate(t *maptype, h *hmap, oldbucket uintptr) { b := (*bmap)(add(h.oldbuckets, oldbucket*uintptr(t.bucketsize))) newbit := h.noldbuckets() if !evacuated(b) { ... } if oldbucket == h.nevacuate { advanceEvacuationMark(h, t, newbit) } }
接下来可以看看以上省略号中,即真正的迁移发生了什么,runtime.evacuate
会将一个旧桶中的数据分流到两个新桶,会创建两个用于保存分配上下文的runtime.evacDst
结构体,这两个结构体分别指向了一个新桶,如果是等量扩容,那么第二个runtime.evacDst
结构体不会被创建。
// TODO: reuse overflow buckets instead of using new ones, if there // is no iterator using the old buckets. (If !oldIterator.) // xy contains the x and y (low and high) evacuation destinations. var xy [2]evacDst x := &xy[0] x.b = (*bmap)(add(h.buckets, oldbucket*uintptr(t.bucketsize))) x.k = add(unsafe.Pointer(x.b), dataOffset) x.e = add(x.k, bucketCnt*uintptr(t.keysize)) if !h.sameSizeGrow() { // Only calculate y pointers if we're growing bigger. // Otherwise GC can see bad pointers. y := &xy[1] y.b = (*bmap)(add(h.buckets, (oldbucket+newbit)*uintptr(t.bucketsize))) y.k = add(unsafe.Pointer(y.b), dataOffset) y.e = add(y.k, bucketCnt*uintptr(t.keysize)) }
接下来就是循环这个bucket
以及其后的溢出桶,有些逻辑都是一些常规逻辑,就不一一分析了,对于等量扩容,因为只有一个runtime.evacDst
对象,所以会直接通过指针复制或者typedmemmove
的值复制来复制键值对到新的桶。
for ; b != nil; b = b.overflow(t) { k := add(unsafe.Pointer(b), dataOffset) e := add(k, bucketCnt*uintptr(t.keysize)) for i := 0; i < bucketCnt; i, k, e = i+1, add(k, uintptr(t.keysize)), add(e, uintptr(t.elemsize)) { top := b.tophash[i] if isEmpty(top) { b.tophash[i] = evacuatedEmpty continue } if top < minTopHash { throw("bad map state") } k2 := k if t.indirectkey() { k2 = *((*unsafe.Pointer)(k2)) } var useY uint8 if !h.sameSizeGrow() { // Compute hash to make our evacuation decision (whether we need // to send this key/elem to bucket x or bucket y). hash := t.hasher(k2, uintptr(h.hash0)) if h.flags&iterator != 0 && !t.reflexivekey() && !t.key.equal(k2, k2) { // If key != key (NaNs), then the hash could be (and probably // will be) entirely different from the old hash. Moreover, // it isn't reproducible. Reproducibility is required in the // presence of iterators, as our evacuation decision must // match whatever decision the iterator made. // Fortunately, we have the freedom to send these keys either // way. Also, tophash is meaningless for these kinds of keys. // We let the low bit of tophash drive the evacuation decision. // We recompute a new random tophash for the next level so // these keys will get evenly distributed across all buckets // after multiple grows. useY = top & 1 top = tophash(hash) } else { if hash&newbit != 0 { useY = 1 } } } if evacuatedX+1 != evacuatedY || evacuatedX^1 != evacuatedY { throw("bad evacuatedN") } b.tophash[i] = evacuatedX + useY // evacuatedX + 1 == evacuatedY dst := &xy[useY] // evacuation destination if dst.i == bucketCnt { dst.b = h.newoverflow(t, dst.b) dst.i = 0 dst.k = add(unsafe.Pointer(dst.b), dataOffset) dst.e = add(dst.k, bucketCnt*uintptr(t.keysize)) } dst.b.tophash[dst.i&(bucketCnt-1)] = top // mask dst.i as an optimization, to avoid a bounds check if t.indirectkey() { *(*unsafe.Pointer)(dst.k) = k2 // copy pointer } else { typedmemmove(t.key, dst.k, k) // copy elem } if t.indirectelem() { *(*unsafe.Pointer)(dst.e) = *(*unsafe.Pointer)(e) } else { typedmemmove(t.elem, dst.e, e) } dst.i++ // These updates might push these pointers past the end of the // key or elem arrays. That's ok, as we have the overflow pointer // at the end of the bucket to protect against pointing past the // end of the bucket. dst.k = add(dst.k, uintptr(t.keysize)) dst.e = add(dst.e, uintptr(t.elemsize)) } }
如果是增量扩容,假设原来的B是2,那么就是四个桶,其mask
就是0b11
,hash & 0b11
会有四种结果,最后分配到四个桶中,假设发生了增量扩容,此时用旧的桶数newbits
(4)和hash
相与,即hash & 0b100
,即相当于通过新的mask
(0b111
)的最高位来决定这个数据是分配到X
桶还是Y
桶,实现了分流(上述代码中的hash&newbit
)。当然,if h.flags&iterator != 0 && !t.reflexivekey() && !t.key.equal(k2, k2)
中对特殊情况做了处理,这里就不详述了。
值得注意的是以下代码,前面说过,只有当旧桶编号(hash
和旧mask
相与)与nevacuate
相等时,才会调用advanceEvacuationMark(h, t, newbit)
进行计数+1
,所以在runtime.growWork
中会调用两次evacuate
函数,保证小于等于nevacuate
的桶都被迁移了。
if oldbucket == h.nevacuate { advanceEvacuationMark(h, t, newbit) }
另外,在读表的时候,当判断旧桶还没有被迁移的时候,会从旧桶中取出数据。
func mapaccess1(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer { ... hash := t.hasher(key, uintptr(h.hash0)) m := bucketMask(h.B) b := (*bmap)(add(h.buckets, (hash&m)*uintptr(t.bucketsize))) if c := h.oldbuckets; c != nil { if !h.sameSizeGrow() { // There used to be half as many buckets; mask down one more power of two. m >>= 1 } oldb := (*bmap)(add(c, (hash&m)*uintptr(t.bucketsize))) if !evacuated(oldb) { b = oldb } } ... }
从上面可以看出,map
表数据的迁移是渐进式的,是在调用写、删除操作时增量进行的,不会造成瞬间性能的巨大抖动。其实这个和redis
的rehash
技术是类似的原理。
2. Map使用的一些注意事项
通过以上内容,我们知道了map
构建的基本原理,所以我们在实际工作中,使用字典表时,需要有一些注意事项。
2.1 大数据量map不使用指针作为key-value
通过上面学习,我们知道,当map
的kv
类型都不为指针时,那么GC
就不会扫描整个表,具体实现是在GC
过程中,检查runtime._type.gcdata
字段,这是个指针的bitmap
,当其为全零时,说明整个对象中无需扫描的下一级指针,从而节省时间,具体可参考深度剖析 Golang 的 GC 扫描对象实现。
// Needs to be in sync with ../cmd/link/internal/ld/decodesym.go:/^func.commonsize, // ../cmd/compile/internal/reflectdata/reflect.go:/^func.dcommontype and // ../reflect/type.go:/^type.rtype. // ../internal/reflectlite/type.go:/^type.rtype. type _type struct { size uintptr ptrdata uintptr // size of memory prefix holding all pointers hash uint32 tflag tflag align uint8 fieldAlign uint8 kind uint8 // function for comparing objects of this type // (ptr to object A, ptr to object B) -> ==? equal func(unsafe.Pointer, unsafe.Pointer) bool // gcdata stores the GC type data for the garbage collector. // If the KindGCProg bit is set in kind, gcdata is a GC program. // Otherwise it is a ptrmask bitmap. See mbitmap.go for details. gcdata *byte str nameOff ptrToThis typeOff }
为验证以上观点,我们写出如下的测试函数,测试在从10到100万数据量的情形下,以整型和整型指针作为value
类型的映射表在GC时的耗时差异。
func TestGCTimeWithoutPointer(t *testing.T) { for _, N := range Ns { runtime.GC() m1 := make(map[int]int) for i := 0; i < N; i++ { m1[i] = rand.Int() } start := time.Now() runtime.GC() delta := time.Since(start) t.Logf("GC without pointer spend %+v, when N = %d", delta, N) runtime.KeepAlive(m1) } } func TestGCTimeWithPointer(t *testing.T) { for _, N := range Ns { runtime.GC() m2 := make(map[int]*int) for i := 0; i < N; i++ { val := rand.Int() m2[i] = &val } start := time.Now() runtime.GC() delta := time.Since(start) t.Logf("GC with pointer spend %+v, when N = %d", delta, N) runtime.KeepAlive(m2) } }
测试结果如下,可以发现,在没有指针的情形下,不管表的大小是什么数量级,其GC时间几乎无差异;而在有指针的情形下,其GC
时间在100万数量级的时候已经达到了15ms,这将大大影响程序的性能。
=== RUN TestGCTimeWithoutPointer map_test.go:63: GC without pointer spend 252.208µs, when N = 10 map_test.go:63: GC without pointer spend 297.292µs, when N = 100 map_test.go:63: GC without pointer spend 438.208µs, when N = 1000 map_test.go:63: GC without pointer spend 377µs, when N = 10000 map_test.go:63: GC without pointer spend 205.666µs, when N = 100000 map_test.go:63: GC without pointer spend 380.584µs, when N = 1000000 --- PASS: TestGCTimeWithoutPointer (0.13s) === RUN TestGCTimeWithPointer map_test.go:81: GC with pointer spend 570.041µs, when N = 10 map_test.go:81: GC with pointer spend 325.708µs, when N = 100 map_test.go:81: GC with pointer spend 287.542µs, when N = 1000 map_test.go:81: GC with pointer spend 476.709µs, when N = 10000 map_test.go:81: GC with pointer spend 1.714833ms, when N = 100000 map_test.go:81: GC with pointer spend 15.756958ms, when N = 1000000 --- PASS: TestGCTimeWithPointer (0.18s)
值得注意的是,在正常桶后面跟着的溢出桶的地址会存放在hmap.extra.overflow
中,避免被GC
误伤。
这一点也同样适用于其他容器类型,比如切片、数组和通道。
2.1.1 引申1——使用字节数组代替字符串作为key
每个字符串的底层包括一个指针,用来指向其底层数组,如果一个映射值的key
类型是字符串类型,且其有一个最大长度、且最大长度较小,可设置为N
,则我们可以使用[N]byte
来代替字符串作为键值,可以避免垃圾回收时扫描整个表。当然,这是在数据量比较大的情形下考虑的优化。
2.2 清空表操作
前面说过,map
表有删除操作,但是删除后的表所占的内存空间并不会释放,除非保证后续会有很多新的条目放入到表中,否则我们使用以下方法清空映射表。
m = nil // 后续不再使用 m = make(map[K]V) // 后续继续使用
2.3 确定大小时尽量传入hint
前面说过,传入的hint
可以让Go SDK
预测这个映射表中最大的条目数量,所以我们如果已知表的大小,尽量在创建表的时候传入。
知识补充
HashMap拉链法简介
1.拉链法用途
解决hash冲突(即put操作时计算key值问题)。
2.拉链法原理
把具有相同散列地址的关键字(同义词)值放在同一个单链表中,称为同义词链表。
有m个散列地址就有m个链表,同时用指针数组A[0,1,2…m-1]存放各个链表的头指针,凡是散列地址为i的记录都以结点方式插入到以A[i]为指针的单链表中。A中各分量的初值为空指针。
3.拉链法原理解释
HashMap是一个数组,数组中的每个元素是链表。put元素进去的时候,会通过计算key的hash值来获取到一个index,根据index找到数组中的位置,进行元素插入。当新来的元素映射到冲突的数组位置时,就会插入到链表的头部。
HashMap采用拉链法将HashMap的key是转化成了hashcode,但hashcode可能重复,所以采用求交集方式解决冲突。
4.举例如下
有序集合a1={1,3,5,7,8,9},有序集合a2={2,3,4,5,6,7}
两个指针指向首元素,比较元素的大小:
(1)如果相同,放入结果集,随意移动一个指针
(2)否则,移动值较小的一个指针,直到队尾
好处:
(1)集合中的元素最多被比较一次,时间复杂度为O(n)。
(2)多个有序集合可以同时进行,这适用于多个分词的item求url_id交集。
这个方法就像一条拉链的两边齿轮,然后逐个对比,故称为拉链法。
以上就是Golang基础学习之map的示例详解的详细内容,更多关于Golang map的资料请关注脚本之家其它相关文章!