public static <K,V> Map<K,V> synchronizedMap(Map<K,V> m) { return new SynchronizedMap<>(m); } /** * @serial include */ private static class SynchronizedMap<K,V> implements Map<K,V>, Serializable { private static final long serialVersionUID = 1978198479659022715L; private final Map<K,V> m; // Backing Map //互斥锁 final Object mutex; // Object on which to synchronize SynchronizedMap(Map<K,V> m) { this.m = Objects.requireNonNull(m); mutex = this; } SynchronizedMap(Map<K,V> m, Object mutex) { this.m = m; this.mutex = mutex; } public int size() { synchronized (mutex) {return m.size();} } public boolean isEmpty() { synchronized (mutex) {return m.isEmpty();} } public boolean containsKey(Object key) { synchronized (mutex) {return m.containsKey(key);} } public boolean containsValue(Object value) { synchronized (mutex) {return m.containsValue(value);} } public V get(Object key) { synchronized (mutex) {return m.get(key);} } public V put(K key, V value) { synchronized (mutex) {return m.put(key, value);} } public V remove(Object key) { synchronized (mutex) {return m.remove(key);} } public void putAll(Map<? extends K, ? extends V> map) { synchronized (mutex) {m.putAll(map);} } public void clear() { synchronized (mutex) {m.clear();} } private transient Set<K> keySet; private transient Set<Map.Entry<K,V>> entrySet; private transient Collection<V> values; public Set<K> keySet() { synchronized (mutex) { if (keySet==null) keySet = new SynchronizedSet<>(m.keySet(), mutex); return keySet; } } public Set<Map.Entry<K,V>> entrySet() { synchronized (mutex) { if (entrySet==null) entrySet = new SynchronizedSet<>(m.entrySet(), mutex); return entrySet; } } public Collection<V> values() { synchronized (mutex) { if (values==null) values = new SynchronizedCollection<>(m.values(), mutex); return values; } } public boolean equals(Object o) { if (this == o) return true; synchronized (mutex) {return m.equals(o);} } public int hashCode() { synchronized (mutex) {return m.hashCode();} } public String toString() { synchronized (mutex) {return m.toString();} } // Override default methods in Map @Override public V getOrDefault(Object k, V defaultValue) { synchronized (mutex) {return m.getOrDefault(k, defaultValue);} } @Override public void forEach(BiConsumer<? super K, ? super V> action) { synchronized (mutex) {m.forEach(action);} } @Override public void replaceAll(BiFunction<? super K, ? super V, ? extends V> function) { synchronized (mutex) {m.replaceAll(function);} } @Override public V putIfAbsent(K key, V value) { synchronized (mutex) {return m.putIfAbsent(key, value);} } @Override public boolean remove(Object key, Object value) { synchronized (mutex) {return m.remove(key, value);} } @Override public boolean replace(K key, V oldValue, V newValue) { synchronized (mutex) {return m.replace(key, oldValue, newValue);} } @Override public V replace(K key, V value) { synchronized (mutex) {return m.replace(key, value);} } @Override public V computeIfAbsent(K key, Function<? super K, ? extends V> mappingFunction) { synchronized (mutex) {return m.computeIfAbsent(key, mappingFunction);} } @Override public V computeIfPresent(K key, BiFunction<? super K, ? super V, ? extends V> remappingFunction) { synchronized (mutex) {return m.computeIfPresent(key, remappingFunction);} } @Override public V compute(K key, BiFunction<? super K, ? super V, ? extends V> remappingFunction) { synchronized (mutex) {return m.compute(key, remappingFunction);} } @Override public V merge(K key, V value, BiFunction<? super V, ? super V, ? extends V> remappingFunction) { synchronized (mutex) {return m.merge(key, value, remappingFunction);} } private void writeObject(ObjectOutputStream s) throws IOException { synchronized (mutex) {s.defaultWriteObject();} } }
static final class Segment<K,V> extends ReentrantLock implements Serializable { transient volatile HashEntry<K,V>[] table; transient int count; transient int modCount; transient int threshold; final float loadFactor;
- count:Segment中元素的数量
- modCount:对table的大小造成影响的操作的数量(比如put或者remove操作)
- threshold:扩容阈值
- table:链表数组,数组中的每一个元素代表了一个链表的头部
- loadFactor:负载因子
static final class HashEntry<K,V> { final int hash; final K key; volatile V value; volatile HashEntry<K,V> next; }
//initialCapacity:初始容量 //loadFactor:负载因子 //concurrencyLevel:ConcurrentHashMap内部的Segment的数量 public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); //若concurrencyLevel大于MAX_SEGMENTS,则concurrencyLevel=MAX_SEGMENTS //保证最大并发不超过MAX_SEGMENTS(1<<16) if (concurrencyLevel > MAX_SEGMENTS) concurrencyLevel = MAX_SEGMENTS; //求解concurrencyLevel与2的几次方最近 //如concurrencyLevel=5 则5与2^3=8最近 则sshift=8 ssize=3 int sshift = 0; int ssize = 1; while (ssize < concurrencyLevel) { ++sshift; ssize <<= 1; } //segmentShift和segmentMask主要用于元素的hash this.segmentShift = 32 - sshift; this.segmentMask = ssize - 1; //ConcurrentHashMap初始容量不超过MAXIMUM_CAPACITY(1<<30) if (initialCapacity > MAXIMUM_CAPACITY) initialCapacity = MAXIMUM_CAPACITY; //根据ConcurrentHashMap总容量initialCapacity除以 //Segment[]数组的长度得到单个分段锁segment中HashEntry[]的大小 int c = initialCapacity / ssize; //保证分段锁segment的总容量c不小于初始的容量 if (c * ssize < initialCapacity) ++c; int cap = MIN_SEGMENT_TABLE_CAPACITY; //cap为每个segment的初始容量,其值为离c天花板方向最近的2^n //例:c为5 cap为8 c为12 cap为16 while (cap < c) cap <<= 1; // 创建Segment Segment<K,V> s0 = new Segment<K,V>(loadFactor, (int)(cap * loadFactor), (HashEntry<K,V>[])new HashEntry[cap]); Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize]; UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0] this.segments = ss; }
ConcurrentHashMap put()源码分析
public V put(K key, V value) { Segment<K,V> s; //value不能为空 if (value == null) throw new NullPointerException(); //计算key的hash值 int hash = hash(key); //无符号右移segmentShift(默认16)位 //然后& segmentMask(默认15)得到segment在内存中的位置 int j = (hash >>> segmentShift) & segmentMask; 如果Segment不存在,则调用ensureSegment方法 if ((s = (Segment<K,V>)UNSAFE.getObject (segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment //初始化segment s = ensureSegment(j); //放值 return s.put(key, hash, value, false); }
Segment put()方法源码解析
final V put(K key, int hash, V value, boolean onlyIfAbsent) { // 尝试直接获取锁,获取到锁node为null, //否则调用scanAndLockForPut方法 HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value); V oldValue; try { HashEntry<K,V>[] tab = table; // 获取在tab数组中的位置 int index = (tab.length - 1) & hash; // 得到链表的头节点 HashEntry<K,V> first = entryAt(tab, index); // 遍历链表 for (HashEntry<K,V> e = first;;) { if (e != null) { K k; if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { oldValue = e.value; if (!onlyIfAbsent) { e.value = value; ++modCount; } break; } e = e.next; } // 遍历到链表尾部,没有重复的key,则新插入 else { if (node != null) // 头插法,将node节点设为链表头节点 node.setNext(first); else // 为null,则新建一个节点 node = new HashEntry<K,V>(hash, key, value, first); int c = count + 1; // 若c超过阈值则扩容,并且数组长度小于MAXIMUM_CAPACITY = 1 << 30 if (c > threshold && tab.length < MAXIMUM_CAPACITY) // 扩容并进行重新hash rehash(node); else setEntryAt(tab, index, node); ++modCount; count = c; oldValue = null; break; } } } finally { unlock(); } return oldValue; }
private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) { // 获取链表头结点 HashEntry<K,V> first = entryForHash(this, hash); HashEntry<K,V> e = first; HashEntry<K,V> node = null; int retries = -1; // negative while locating node // 不断尝试获取锁 while (!tryLock()) { HashEntry<K,V> f; // to recheck first below if (retries < 0) { // 链表的头结点为null,或者遍历到链表的尾部 if (e == null) { // 这里加条件是因为,有可能已经初始化node节点了 // 结果由于头结点改变重新遍历链表 if (node == null) // speculatively create node node = new HashEntry<K,V>(hash, key, value, null); retries = 0; } // 找到相同key的节点 else if (key.equals(e.key)) retries = 0; // 没有找到key对应的节点,指向下一个节点 else e = e.next; } // 可用处理器数量大于1,MAX_SCAN_RETRIES=64,否则为1 else if (++retries > MAX_SCAN_RETRIES) { // 调用ReentrantLock中NonfairSync的lock()方法 // 执行过程中有可能不阻塞获取到锁,也有可能被阻塞 // 而不是之前的一直尝试直接获取锁 lock(); break; } // 链表的头结点发生变化,更新头结点,并重置retries值为-1 else if ((retries & 1) == 0 && (f = entryForHash(this, hash)) != first) { e = first = f; // re-traverse if entry changed retries = -1; } } return node; }