深入理解ConcurrentHashMap

本文主要介绍了 ConcurrentHashMap 在 JDK1.8 和 JDK1.7 中的主要原理和结构。参考见文末。

前言

使用 HashMap

在高并发情况下为什么需要使用 ConcurrentHashMap 而不直接使用 HashMap,也就是说 HashMap 在并发环境下存在说明缺点?

下面这篇文章介绍了 JDK7 中的 HashMap 在多线程操作下可能存在的并发死链问题,简单概括就是 HashMap 中的 Rehash 在在并发的情况下可能出现循环死链问题,导致 Entry 的 next 节点永远不为空,就会产生死循环获取 Entry。详见下面的资料:

当然,使用 HashMap 存在的问题不仅仅是死链问题。

使用 HashTable

HashTable容器使用synchronized来保证线程安全,但在线程竞争激烈的情况下HashTable 的效率非常低下。

因为当一个线程访问 HashTable 的同步方法时,就将整个表锁足,以至于其他线程想要访问表就会进入阻塞或者轮询状态,效率第。

既然 HashTable 存在锁粒度过大的问题,有没有说明办法可以把锁粒度降低呢?我们可以使用 ConcurrentHashMap。

使用 ConcurrentHashMap

ConcurrentHashMap 是 JUC 包提供的线程安全集合类, Concurrent 类型的容器有以下特点

  • 内部很多操作采用 CAS 机制,一般可以提供较高的吞吐量
  • 弱一致性:
    • 遍历时弱一致性,例如,当利用迭代器遍历时,如果容器发生修改,迭代器仍然可以继续进行遍 历,这时内容是旧的
    • 求大小弱一致性,size 操作未必是 100% 准确
    • 读取弱一致性

ConcurrentHashMap 中不仅仅采用了 CAS 机制,还提供了锁分段的技术来提高并发访问率。

HashTable容器在竞争激烈的并发环境下表现出效率低下的原因是所有访问HashTable的线程都必须竞争同一把锁,假如容器里有多把锁,每一把锁用于锁容器其中一部分数据,那么当多线程访问容器里不同数据段的数据时,线程间就不会存在锁竞争,从而可以有效提高并发访问效率,这就是ConcurrentHashMap所使用的锁分段技术。首先将数据分成一段一段地存储,然后给每一段数据配一把锁,当一个线程占用锁访问其中一个段数据的时候,其他段的数据也能被其他线程访问。

在 JDK7 和 JDK8 中 ConcurrentHashMap 的实现有较大不同,下面我将从 JDK7 中讲起,最后介绍 JDK8 中的实现。

JDK7 基于分段锁的 ConcurrentHashMap

结构

ConcurrentHashMap 是由 Segment 数组结构和 HashEntry 数组结构组成。Segment 继承自 ReentranLock,在 ConcurrentHashMap 里扮演锁的角色,HashEntry 则用于存储键值对数据。

一个 ConcurrentHashMap 里包含一个 Segment 数组,Segment 的结构和 HashMap 类似,是一种数组和链表结构, 一个 Segment 里包含一个 HashEntry 数组,每个 HashEntry 是一个链表结构的元素。Segment 中的 HashEntry 数组可以后期扩充。

//ConcurrentHashMap中真正存储数据的对象 HashEntry 的结构
static final class HashEntry<K,V> {
    final int hash; //通过运算,得到的键的hash值
    final K key; // 存入的键
    volatile V value; //存入的值
    volatile HashEntry<K,V> next; //记录下一个元素,形成单向链表

单个 Segment 的结构
单个 Segment 的结构

默认的 Segement 有 16 个所以最多可以支持 16 个线程并发操作,可以在初始化时指定为其他值,初始化之后就不能再修改。

每个 Segment 守护者一个 HashEntry 数组里的元素,当对 HashEntry 数组的数据进行修改时,必须首先获得它对应的 Segment 锁。但是而在 JAVA8之后采用 CAS 无锁算法,这种乐观操作在完成前进行判断,如果符合预期结果才给予执行,对并发操作提供良好的优化。

有些方法需要跨段,比如size()和containsValue(),它们可能需要锁定整个表而而不仅仅是某个段,这需要按顺序锁定所有段,操作完毕后,又按顺序释放所有段的锁。

初始化

代码恐惧症患者可以直接跳过全参构造方法的源代码。

1、空参构造方法:

//空参构造
public ConcurrentHashMap() {
    //调用本类的带参构造
    //DEFAULT_INITIAL_CAPACITY = 16 定义ConcurrentHashMap存放元素的容量
    //DEFAULT_LOAD_FACTOR = 0.75f 负载因子,在判断扩容的时候用到,默认是0.75
    //DEFAULT_CONCURRENCY_LEVEL = 16 定义ConcurrentHashMap中Segment[]的大小,也就是分段锁的个数
    this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
}

2、全参构造方法:

public ConcurrentHashMap(int initialCapacity,
                         float loadFactor, int concurrencyLevel) {
   
    int sshift = 0;
    int ssize = 1;
    //ssize。计算Segment[]的大小,保证是2的幂次方数
    while (ssize < concurrencyLevel) {
        ++sshift;
        ssize <<= 1;
    }
    //这两个值用于后面计算Segment[]的角标
    this.segmentShift = 32 - sshift;
    this.segmentMask = ssize - 1;
    
    //计算每个Segment中存储元素的个数
    int c = initialCapacity / ssize;
    if (c * ssize < initialCapacity)
        ++c;
    //最小Segment中存储元素的个数为2
    int cap = MIN_SEGMENT_TABLE_CAPACITY;
    ////矫正每个Segment中存储元素的个数,保证是2的幂次方,最小为2
    while (cap < c)
        cap <<= 1;
    //创建一个Segment对象,作为其他Segment对象的模板
    Segment<K,V> s0 =
        new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
                         (HashEntry<K,V>[])new HashEntry[cap]);
    Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
    //利用Unsafe类,将创建的Segment对象存入0角标位置。 SBASE是Segment数组在内存中的偏移量
    UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
    this.segments = ss;
}

3、结论

1)ConcurrentHashMap 中保存了一个默认长度为16的 Segment[] ,每个 Segment 元素中保存了一个**默认长度为 2 的 HashEntry[] ,HashEntry[] 中每个节点代表一个链表的头节点。**我们添加的元素,是存入对应的 Segment 中的 HashEntry[] 中。所以 ConcurrentHashMap 中默认元素的长度是 32 个。

默认初始化 ConcurrentHashMap 的结构
默认初始化 ConcurrentHashMap 的结构

2) ConcurrentHashMap 没有实现懒惰初始化,空间占用不友好。

3)this.segmentShift 和 this.segmentMask 的作用是决定将 key 的 hash 结果匹配到哪个 segment。先将 hash 值无符号右移 segmentShift 位之后与 segmentMask 做 & 运算,得到的结果就是在 segment 数组的角标。

$$j=hash>>>(segmentShift)\& segmentMask$$

为什么需要右移 segmentShift位?为了避免 Hash 冲突。

⭐put 流程

1、源代码分析

代码恐惧症可以直接跳过,看结论。

/* 一: 先调用 ConcurrentHashMap 中的 put 方法*/
public V put(K key, V value) {
    Segment<K,V> s;
    // 不允许空值空键 
    if (value == null)
        throw new NullPointerException();
    //基于key,计算hash值
    int hash = hash(key);
    //因为一个键要计算两个数组的索引,为了避免冲突,这里取高位计算Segment[]的索引
    int j = (hash >>> segmentShift) & segmentMask;
    //判断该索引位的Segment对象是否创建,没有就创建
    if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
         (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
        s = ensureSegment(j);
    //调用Segmetn的put方法实现元素添加
    return s.put(key, hash, value, false);
}

/* 二: ConcurrentHashMap的ensureSegment方法*/
//创建对应索引位的Segment对象,并返回
private Segment<K,V> ensureSegment(int k) {
    final Segment<K,V>[] ss = this.segments;
    long u = (k << SSHIFT) + SBASE; // raw offset
    Segment<K,V> seg;
    //获取,如果为null,即创建
    if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
        //以0角标位的Segment为模板
        Segment<K,V> proto = ss[0]; // use segment 0 as prototype
        int cap = proto.table.length;
        float lf = proto.loadFactor;
        int threshold = (int)(cap * lf);
        HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
        //获取,如果为null,即创建
        if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
            == null) { // recheck
            //创建
            Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
            //自旋方式,将创建的Segment对象放到Segment[]中,确保线程安全
            while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
                   == null) {
                if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
                    break;
            }
        }
    }
    //返回
    return seg;
}

/* 三: Segment的put方法 */
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
    //尝试获取锁,获取成功,node为null,代码向下执行
    //如果有其他线程占据锁对象,那么去做别的事情,而不是一直等待,提升效率
    //scanAndLockForPut 稍后分析
    HashEntry<K,V> node = tryLock() ? null :
        scanAndLockForPut(key, hash, value);
    V oldValue;
    try {
        HashEntry<K,V>[] tab = table;
        //取hash的低位,计算HashEntry[]的索引
        int index = (tab.length - 1) & hash;
        //获取索引位的元素对象
        HashEntry<K,V> first = entryAt(tab, index);
        for (HashEntry<K,V> e = first;;) {
            //获取的元素对象不为空
            if (e != null) {
                K k;
                //如果是重复元素,覆盖原值
                if ((k = e.key) == key ||
                    (e.hash == hash && key.equals(k))) {
                    oldValue = e.value;
                    if (!onlyIfAbsent) {
                        e.value = value;
                        ++modCount;
                    }
                    break;
                }
                //如果不是重复元素,获取链表的下一个元素,继续循环遍历链表
                e = e.next;
            }
            else { //如果获取到的元素为空
                //当前添加的键值对的HashEntry对象已经创建
                if (node != null)
                    node.setNext(first); //头插法关联即可
                else
                    //创建当前添加的键值对的HashEntry对象
                    node = new HashEntry<K,V>(hash, key, value, first);
                //添加的元素数量递增
                int c = count + 1;
                //判断是否需要扩容
                if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                    //需要扩容
                    rehash(node);
                else
                    //不需要扩容
                    //将当前添加的元素对象,存入数组角标位,完成头插法添加元素
                    setEntryAt(tab, index, node);
                ++modCount;
                count = c;
                oldValue = null;
                break;
            }
        }
    } finally {
        //释放锁
        unlock();
    }
    return oldValue;
}

/* 四:Segment的scanAndLockForPut方法
 * 该方法在线程没有获取到锁的情况下,去完成HashEntry对象的创建,提升效率
*/
private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
    //获取头部元素
    HashEntry<K,V> first = entryForHash(this, hash);
    HashEntry<K,V> e = first;
    HashEntry<K,V> node = null;
    int retries = -1; // negative while locating node
    while (!tryLock()) {
        //获取锁失败
        HashEntry<K,V> f; // to recheck first below
        if (retries < 0) {
            //没有下一个节点,并且也不是重复元素,创建HashEntry对象,不再遍历
            if (e == null) {
                if (node == null) // speculatively create node
                    node = new HashEntry<K,V>(hash, key, value, null);
                retries = 0;
            }
            else if (key.equals(e.key))
                //重复元素,不创建HashEntry对象,不再遍历
                retries = 0;
            else
                //继续遍历下一个节点
                e = e.next;
        }
        else if (++retries > MAX_SCAN_RETRIES) {
            //如果尝试获取锁的次数过多,直接阻塞
            //MAX_SCAN_RETRIES会根据可用cpu核数来确定
            lock();
            break;
        }
        else if ((retries & 1) == 0 &&
                 (f = entryForHash(this, hash)) != first) {
            //如果期间有别的线程获取锁,重新遍历
            e = first = f; // re-traverse if entry changed
            retries = -1;
        }
    }
    return node;
}

2、总结

put 时首先会定位到Segment,之后尝试获取 Segment 的锁,如果获取失败肯定存在其它线程竞争,如何获得锁?

  1. 自旋获取锁
  2. 重试次数达到最大重试次数后,改为阻塞锁获取

获得锁之后在 Segement 中进行插入操作,插入操作需要经历两个过程:

  1. 判断是否需要扩容。
  2. 定位添加元素的位置,放入到 HashEntry 数组。

之后解除之前获得的锁


ConcurrentHashMap 中 Key 和 Value 都不许为 Null。

问题:

1)是否需要扩容?如何扩容? 在插入元素前会先判断 Segment 里的 HashEntry 数组是否超过容量(threshold),如果超过阀值,数组进行扩容。扩容的时候首先会创建一个两倍于原容量的数组,然后将原数组里的元素进行再 hash 后插入到新的数组里。为了高效 ConcurrentHashMap 不会对整个容器进行扩容,而只对某个segment 进行扩容。

2)如插入元素?

  1. 计算出需要放到哪个 Segment
  2. 计算出放在哪个 HashEntry
  3. 将 Entry 头插到链表中。
//定位Segment的算法
(hash >>> segmentShift) & segmentMask;

//定位HashEntry的算法
int index = hash & (tab.length-1);

rehash 流程

发生在 put 中,因为此时已经获得了锁,因此 rehash 时不需要考虑线程安全。

1、源代码分析:

private void rehash(HashEntry<K,V> node) {
    HashEntry<K,V>[] oldTable = table;
    int oldCapacity = oldTable.length;
    //两倍容量
    int newCapacity = oldCapacity << 1;
    threshold = (int)(newCapacity * loadFactor);
    //基于新容量,创建HashEntry数组
    HashEntry<K,V>[] newTable =
        (HashEntry<K,V>[]) new HashEntry[newCapacity];
    int sizeMask = newCapacity - 1;
   	//实现数据迁移
    for (int i = 0; i < oldCapacity ; i++) {
        HashEntry<K,V> e = oldTable[i];
        if (e != null) {
            HashEntry<K,V> next = e.next;
            int idx = e.hash & sizeMask;
            if (next == null)   //  Single node on list
                //原位置只有一个元素,直接放到新数组即可
                newTable[idx] = e;
            else { // 重用一个槽位中的一段连续序列,这一段序列在新槽位中的下标相同
                //=========图一=====================
                HashEntry<K,V> lastRun = e;
                int lastIdx = idx;	
                for (HashEntry<K,V> last = next;
                     last != null;
                     last = last.next) {
                    int k = last.hash & sizeMask;
                    if (k != lastIdx) {
                        lastIdx = k;
                        lastRun = last;
                    }
                }
                //=========图一=====================
                
                //=========图二=====================
                newTable[lastIdx] = lastRun;
                //=========图二=====================
                // Clone remaining nodes
                //=========图三=====================
                for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
                    V v = p.value;
                    int h = p.hash;
                    int k = h & sizeMask;
                    HashEntry<K,V> n = newTable[k];
                    //这里旧的HashEntry不会放到新数组
                    //而是基于原来的数据创建了一个新的HashEntry对象,放入新数组
                    newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
                }
                //=========图三=====================
            }
        }
    }
    //采用头插法,将新元素加入到数组中
    int nodeIndex = node.hash & sizeMask; // add the new node
    node.setNext(newTable[nodeIndex]);
    newTable[nodeIndex] = node;
    table = newTable;
}

图 一
图 一

图 二
图 二

图 三
图 三

总结:

相对于HashMap的 resize,ConcurrentHashMap 的 rehash 原理类似,但是 Doug Lea 为rehash做了一定的优化,避免让所有的节点都进行复制操作:由于扩容是基于 2 的幂指来操作,假设扩容前某 HashEntry 对应到 Segment 中数组的index为i,数组的容量为 capacity,那么扩容后该 HashEntry 对应到新数组中的 index 只可能为 i 或者 i+capacity,因此大多数 HashEntry 节点在扩容前后 index 可以保持不变。

基于上述原理,可以对已有链表进行遍历,对于老的 oldTable 中的每个 HashEntry,从头结点开始遍历,找到一段连续的节点链表,这个链表满足以下要求:

  • 链表中的节点在新的 segment 数组中的位置相同
  • 链表位于 segment 数组末尾

之后直接将这串链表接到新数组上去,不用新 new 节点。

最后将其他节点放入到新的数组中,基于原有节点 new 一个新的节点。

⭐get 流程

1、源代码分析

/* get 操作直接调用给Segment的 get 方法*/
V get(Object key, int hash) {
     if (count != 0) { // read-volatile 当前桶的数据个数是否为0
         HashEntry e = getFirst(hash);  得到头节点
         while (e != null) {
             if (e.hash == hash && key.equals(e.key)) {
                 V v = e.value;
                 if (v != null)
                     return v;
                 return readValueUnderLock(e); // recheck
             }
             e = e.next;
         }
     }
     return null;
 }

2、总结

get 的步骤

  1. 计算 hash 值,找到 segment 数组中的具体位置,或我们前面用的“槽”
  2. 槽中也是一个数组,根据 hash 找到数组中具体的位置
  3. 到这里是链表了,顺着链表进行查找即可

get操作不需要锁。除非读到的值是空的才会加锁重读。


我们知道 HashTable 容器的 get 方法是需要加锁的,那么ConcurrentHashMap 的 get 操作是如何做到不加锁的呢?原因是它的 get 方法里将要使用的共享变量都定义成 volatile。

将变量定义为 volatile 变量的用处。

定义成 volatile 的变量,能够在线程之间保 持可见性,能够被多线程同时读,并且保证不会读到过期的值,但是只能被单线程写(有一种情况 可以被多线程写,就是写入的值不依赖于原值),在 get 操作里只需要读不需要写共享变量 count 和 value,所以可以不用加锁。

之所以不会读到过期的值,是根据 java 内存模型的 happen before 原则,对 volatile 字段的写入操作先于读操作,即使两个线程同时修改和获取 volatile 变量,get 操作也能拿到最新的值,这是用volatile替换锁的经典应用场景。

第一步是访问 count 变量(如果 count 为 0 直接返回 null)。可以保证读取到最新的值。

接下来就是根据 hash 和 key 对 hash 链进行遍历找到要获取的结点,如果没有找到,直接访回 null。对 hash 链进行遍历不需要加锁的原因在于链指针 next 是 final 的。但是头指针却不是 final 的,这是通过 getFirst(hash) 方法返回,也就是存在 table数组中的值。

这使得 getFirst(hash) 可能返回过时的头结点,例如,当执行 get 方法时,刚执行完 getFirst(hash) 之后,另一个线程执行了删除操作并更新头结点,这就导致 get 方法中返回的头结点不是最新的。这是可以允许,通过对 count 变量的协调机制,get 能读取到几乎最新的数据,虽然可能不是最新的。要得到最新的数据,只有采用完全的同步,这也是 ConcurrentHashMap 弱一致性的体现。

最后,如果找到了所求的结点,判断它的值如果非空就直接返回,否则在有锁的状态下再读一次。这似乎有些费解,理论上结点的值不可能为空,这是因为 put 的时候就进行了判断,如果为空就要抛NullPointerException。空值的唯一源头就是 HashEntry 中的默认值,因为 HashEntry 中的 value 不是 final 的,非同步读取有可能读取到空值。一旦value出现null,则代表HashEntry的key/value没有映射完成就被其他线程所见,需要特殊处理。

仔细看下 put 操作的语句:tab[index] = new HashEntry(key, hash, first, value),在这条语句中,HashEntry 构造函数中对 value 的赋值以及对 tab[index] 的赋值可能被重新排序,这就可能导致结点的值为空。

这里当 v 为空时,可能是一个线程正在改变节点,而之前的 get 操作都未进行锁定,根据 bernstein 条件,读后写或写后读都会引起数据的不一致,所以这里要对这个 e 重新上锁再读一遍,以保证得到的是正确值。

V readValueUnderLock(HashEntry e) {
     lock();
     try {
         return e.value;
     } finally {
         unlock();
     }
 }

如用于统计当前 Segement 大小的 count 字段和用于存储值的 HashEntry 的 value。定义成 volatile 的变量,能够在线程之间保持可见性,能够被多线程同时读,并且保证不会读到过期的值,但是只能被单线程写(有一种情况可以被多线程写,就是写入的值不依赖于原值),在 get 操作里只需要读不需要写共享变量 count 和 value,所以可以不用加锁。

size 流程

1、源代码分析:

public int size() {
    // Try a few times to get accurate count. On failure due to
    // continuous async changes in table, resort to locking.
    final Segment<K,V>[] segments = this.segments;
    int size;
    boolean overflow; // true if size overflows 32 bits
    long sum;         // sum of **modCounts**
    long last = 0L;   // previous sum
    int retries = -1; // first iteration isn't retry
    try {
        for (;;) {
            //当第5次走到这个地方时,会将整个Segment[]的所有Segment对象锁住
            if (retries++ == RETRIES_BEFORE_LOCK) {
                for (int j = 0; j < segments.length; ++j)
                    ensureSegment(j).lock(); // force creation
            }
            sum = 0L;
            size = 0;
            overflow = false;
            for (int j = 0; j < segments.length; ++j) {
                Segment<K,V> seg = segmentAt(segments, j);
                if (seg != null) {
                    //累加所有Segment的操作次数
                    sum += seg.modCount;
                    int c = seg.count;
                    //累加所有segment中的元素个数 size+=c
                    if (c < 0 || (size += c) < 0)
                        overflow = true;
                }
            }
            //当这次累加值和上一次累加值一样,证明没有进行新的增删改操作,返回sum
            //第一次last为0,如果有元素的话,这个for循环最少循环两次的
            if (sum == last) // 第一次判断 sum 肯定不等于 l
                break;
            //记录累加的值
            last = sum;
        }
    } finally {
        //如果之前有锁住,解锁
        if (retries > RETRIES_BEFORE_LOCK) {
            for (int j = 0; j < segments.length; ++j)
                segmentAt(segments, j).unlock();
        }
    }
    //溢出,返回int的最大值,否则返回累加的size
    return overflow ? Integer.MAX_VALUE : size;
}

2、总结

put、remove 和 get 操作只需要关心一个 Segment ,而 size 操作需要遍历所有的 Segment 才能算出整个 Map 的大小。一个简单的方案是,先锁住所有 Sgment,计算完后再解锁。但这样做,在做size 操作时,不仅无法对 Map 进行写操作,同时也无法进行读操作,不利于对 Map 的并行操作。

ConcurrentHashMap 的做法是先尝试 2 次通过不锁住 Segment 的方式来统计各个 Segment 大小,如果统计的过程中,容器的 count 发生了变化,则再采用加锁的方式来统计所有 Segment 的大小。

那么 ConcurrentHashMap 是如何判断在统计的时候容器是否发生了变化呢?使用 modCount 变量,在 put、remove 和 clean 方法里操作元素前都会将所操作的 Segment 的 modCount 变量进行加 1 ,那么在统计 size 前后比较 modCount 是否发生变化,从而得知容器的大小是否发生变化。

总流程如下:

  1. 遍历所有的 Segment
  2. 累加所有 Segment 中的操作次数
  3. 累加 Segment 中的元素数量
  4. 判断所有Segment的总修改次数是否大于上一次的总修改次数。如果大于,说明统计过程中有修改,重新统计,尝试次数+1;如果不是。说明没有修改,统计结束。
  5. 如果尝试次数超过阈值,则对每一个Segment加锁,再重新统计。
  6. 再次判断所有Segment的总修改次数是否大于上一次的总修改次数。由于已经加锁,次数一定和上次相等。
  7. 释放锁,结束

JDK8 基于 CAS 的 ConcurrentHashMap

它摒弃了 JDK7 中 Segment 的概念,Segment 有什么缺陷?每次通过 hash 确认位置时需要 2 次才能定位到当前 key 应该落在哪个槽:

  1. 通过 hash 值和 段数组长度-1 进行位运算确认当前 key 属于哪个段,即确认其在 segments 数组的位置。
  2. 再次通过 hash 值和 table 数组(即 ConcurrentHashMap 底层存储数据的数组)长度 - 1进行位运算确认其 HashEntry 桶位置。

于是 在JDK8 中启用了一种全新的方式实现,利用 CAS 算法。它沿用了与它同时期的 HashMap 版本的思想,底层依然由 散列表+红黑树,但是为了做到并发,又增加了很多辅助的类,例如 TreeBin,Traverser 等对象内部类。

结构

重要属性:

// sizeCtl 是一个非常重要的变量
private transient volatile int sizeCtl;

// 整个 ConcurrentHashMap 就是一个 Node[]
static class Node<K,V> implements Map.Entry<K,V> {}

// hash 表 
transient **volatile** Node<K,V>[] table;

// 扩容时的 新 hash 表 
private transient volatile Node<K,V>[] nextTable;

// 扩容时如果某个 bin 迁移完毕, 用 ForwardingNode 作为旧 table bin 的头结点 
static final class ForwardingNode<K,V> extends Node<K,V> {}

// 用在 compute 以及 computeIfAbsent 时, 用来占位, 计算完成后替换为普通 
Node static final class ReservationNode<K,V> extends Node<K,V> {}

// 作为 treebin 的头节点, 存储 root 和 first . 持有一个读写锁
static final class TreeBin<K,V> extends Node<K,V> {}

// 作为 treebin 的节点, 存储 parent, left, right 
static final class TreeNode<K,V> extends Node<K,V> {}

1)sizeCtl

  1. 为0,代表数组未初始化, 且数组的初始容量为16
  2. 为正数,如果数组未初始化,那么其记录的是数组的初始容量,如果数组已经初始化,那么其记录的是数组的扩容阈值
  3. 为-1,表示数组正在进行初始化
  4. 小于0,并且不是-1,表示数组正在扩容, -(1+n),表示此时有n个线程正在共同完成数组的扩容操

ConcurrentHashMap采用Node类作为基本的存储单元,每个键值对(key-value)都存储在一个Node中,使用了volatile关键字修饰value和next,保证并发的可见性:

2)Node

Node 是最核心的内部类,它包装了 key-value 键值对,所有插入 ConcurrentHashMap 的数据都包装在这里面。它与 HashMap 中的定义很相似,但是但是有一些差别它对 value 和 next 属性设置了 volatile 同步锁(与 JDK7 的 Segment 相同),它不允许调用 setValue 方法直接改变 Node 的 value 域,它增加了 find 方法辅助 map.get() 方法。

3)TreeNode

树节点类,另外一个核心的数据结构。包装了 Node 节点,用于实现链表连接。在链表长度过长时会转变为红黑树。使用一个 TreeBin 节点在 Node[] 中占位,指向这个红黑树根节点。

static final class TreeNode<K,V> extends Node<K,V> {
        TreeNode<K,V> parent;  // red-black tree links
        TreeNode<K,V> left;
        TreeNode<K,V> right;
        TreeNode<K,V> prev;    // needed to unlink next upon deletion
        boolean red;
....

4)TreeBin

TreeBin 中并不保存 key、value,它有一个指向 TreeNode 红黑树根节点的指针。TreeBin 放在 Node[] 数组中。另外 TreeBin 持有一个读写锁,实现红黑树的并发访问。

5)ForwardingNode

扩容节点,只是在扩容阶段使用的节点,主要作为一个标记,在处理并发时起着关键作用,有了ForwardingNodes,也是ConcurrentHashMap有了分段的特性,提高了并发效率。一个用于连接两个table的节点类。它包含一个nextTable指针,用于指向下一张表。而且这个节点的 key value next 指针全部为null,它的 hash 值为-1。

重要方法,基于 Unsafe 提供的原子操作实现多并发的无锁操作:

// 获取 Node[] 中第 i 个 Node 
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i)

// cas 修改 Node[] 中第 i 个 Node 的值, c 为旧值, v 为新值 
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v)

// 直接修改 Node[] 中第 i 个 Node 的值, v 为新值 
static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v)

构造器

//没有维护任何变量的操作,如果调用该方法,数组长度默认是16
public ConcurrentHashMap() {
}

//传递进来一个初始容量,ConcurrentHashMap会基于这个值计算一个比这个值大的2的幂次方数作为初始容量
public ConcurrentHashMap(int initialCapacity) {
    if (initialCapacity < 0)
        throw new IllegalArgumentException();
    int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
               MAXIMUM_CAPACITY :
               tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
    this.sizeCtl = cap;
}

//调用三个参数的构造
public ConcurrentHashMap(int initialCapacity, float loadFactor) {
    this(initialCapacity, loadFactor, 1);
}

//计算一个大于或者等于给定的容量值,该值是2的幂次方数作为初始容量
public ConcurrentHashMap(int initialCapacity,
                         float loadFactor, int concurrencyLevel) {
    if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
        throw new IllegalArgumentException();
    if (initialCapacity < concurrencyLevel)   // Use at least as many bins
        initialCapacity = concurrencyLevel;   // as estimated threads
    long size = (long)(1.0 + (long)initialCapacity / loadFactor);
    int cap = (size >= (long)MAXIMUM_CAPACITY) ?
        MAXIMUM_CAPACITY : tableSizeFor((int)size);
    this.sizeCtl = cap;
}

jdk8的ConcurrentHashMap的数组初始化是在第一次添加元素时完成。

⭐get 流程

1、get 源代码

public V get(Object key) {
    Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
    int h = spread(key.hashCode());
    
    // 检查 table 是否存在,hashCode 所在的索引是有为空
    if ((tab = table) != null && (n = tab.length) > 0 &&
        (e = tabAt(tab, (n - 1) & h)) != null) {
        
        if ((eh = e.hash) == h) {
            if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                return e.val;
        }
      
        // 按照树的方式遍历 Node 查找
        else if (eh < 0)
            return (p = e.find(h, key)) != null ? p.val : null;
            
        // 按照链表的方式遍历 Node 查找    
        while ((e = e.next) != null) {
            if (e.hash == h &&
                ((ek = e.key) == key || (ek != null && key.equals(ek))))
                return e.val;
        }
    }
    return null;
}

2、get 流程:

  • 计算 Hash 值,并由此值找到对应的槽位;
  • 如果数组是空的或者该位置为 null,那么直接返回 null 就可以了;
  • 如果该位置处的节点刚好就是我们需要的,直接返回该节点的值;
  • 如果该位置节点是红黑树或者正在扩容,就用 find 方法继续查找;
  • 否则那就是链表,就进行遍历链表查找

⭐put 流程

1、源码分析

public V put(K key, V value) {
    return putVal(key, value, false);
}
/* put 调用 putVal 方法*/
final V putVal(K key, V value, boolean onlyIfAbsent) {
    //如果有空值或者空键,直接抛异常
    if (key == null || value == null) throw new NullPointerException();
    //基于key计算hash值,并进行一定的扰动
    int hash = spread(key.hashCode());
    //记录某个桶上元素的个数,如果超过8个,会转成红黑树
    int binCount = 0;
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        //如果数组还未初始化,先对数组进行初始化
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();
	    //如果hash计算得到的桶位置没有元素,利用cas将元素添加
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            //cas+自旋(和外侧的for构成自旋循环),保证元素添加安全
            if (casTabAt(tab, i, null,
                         new Node<K,V>(hash, key, value, null)))
                break;                   // no lock when adding to empty bin
        }
        //如果hash计算得到的桶位置元素的hash值为MOVED,证明正在扩容,那么协助扩容
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
        else {
            //hash计算的桶位置元素不为空,且当前没有处于扩容操作,进行元素添加
            V oldVal = null;
            //对当前桶进行加锁,保证线程安全,执行元素添加操作
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    //普通链表节点
                    if (fh >= 0) {
                        binCount = 1;
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            Node<K,V> pred = e;
                            if ((e = e.next) == null) {
                                pred.next = new Node<K,V>(hash, key,
                                                          value, null);
                                break;
                            }
                        }
                    }
                    //树节点,将元素添加到红黑树中
                    else if (f instanceof TreeBin) {
                        Node<K,V> p;
                        binCount = 2;
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                       value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }
            if (binCount != 0) {
                //链表长度大于/等于8,将链表转成红黑树
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);
                //如果是重复键,直接将旧值返回
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    //添加的是新元素,维护集合长度,并判断是否要进行扩容操作
    addCount(1L, binCount);
    return null;
}

/* 初始化底层数组 */
private final Node<K,V>[] initTable() {
    Node<K,V>[] tab; int sc;
    //cas+自旋,保证线程安全,对数组进行初始化操作
    while ((tab = table) == null || tab.length == 0) {
        //如果sizeCtl的值(-1)小于0,说明此时正在初始化, 让出cpu
        if ((sc = sizeCtl) < 0)
            Thread.yield(); // lost initialization race; just spin
        //cas修改sizeCtl的值为-1,修改成功,进行数组初始化,失败,继续自旋
        else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
            try {
                if ((tab = table) == null || tab.length == 0) {
                    //sizeCtl为0,取默认长度16,否则去sizeCtl的值
                    int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                    @SuppressWarnings("unchecked")
                    //基于初始长度,构建数组对象
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                    table = tab = nt;
                    //计算扩容阈值,并赋值给sc
                    sc = n - (n >>> 2);
                }
            } finally {
                //将扩容阈值,赋值给sizeCtl
                sizeCtl = sc;
            }
            break;
        }
    }
    return tab;
}

2、通过以上源码,我们可以看到,当需要添加元素时,会针对当前元素所对应的桶位进行加锁操作,这样一方面保证元素添加时,多线程的安全,同时对某个桶位加锁不会影响其他桶位的操作,进一步提升多线程的并发效率。

put 操作图示

3、put 流程:

  • 判断Node[] 数组是否初始化,没有则进行初始化操作
  • 通过 hash 定位数组的索引坐标,是否有 Node 节点,如果没有则使用 CAS 进行添加(链表的头节点),添加失败则进入下次循环。
  • 检查到内部正在扩容,就帮助它一块扩容(详见下文)。
  • 如果 f != null ,则使用 synchronized 锁住 f 元素(链表/红黑二叉树的头元素)
    • 如果是 Node (链表结构)则执行链表的添加操作
    • 如果是 TreeBin (树形结构)则执行树添加操作。
  • 判断链表长度已经达到临界值 8 ,当然这个 8 是默认值,允许自定义,当节点数超过这个值就需要把链表转换为树结构。

transfer 流程

transfer 过程发生在扩容过中。扩容操作是 CHM 中难也是最重要的部分。

什么时候会触发扩容?

  1. 在调用 addCount 方法增加集合元素计数后发现当前集合元素个数到达扩容阈值时就会触发扩容
  2. 扩容状态下其他线程对集合进行插入、修改、删除、合并、compute 等操作时遇到 ForwardingNode 节点会触发扩容
  3. putAll 批量插入或者插入节点后发现存在链表长度达到 8 个或以上,但数组长度为 64 以下时会触发扩容

注意:桶上链表长度达到 8 个或者以上,并且数组长度为 64 以下时只会触发扩容而不会将链表转为红黑树 。

1、源码分析:

private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
    int n = tab.length, stride;
    //如果是多cpu,那么每个线程划分任务,最小任务量是16个桶位的迁移
    if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
        stride = MIN_TRANSFER_STRIDE; // subdivide range
    //如果是扩容线程,此时新数组为null
    if (nextTab == null) {            // initiating
        try {
            @SuppressWarnings("unchecked")
            //两倍扩容创建新数组
            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
            nextTab = nt;
        } catch (Throwable ex) {      // try to cope with OOME
            sizeCtl = Integer.MAX_VALUE;
            return;
        }
        nextTable = nextTab;
        //记录线程开始迁移的桶位,从后往前迁移,指向最右边
        transferIndex = n;
    }
    //记录新数组的末尾
    int nextn = nextTab.length;
    //已经迁移的桶位,会用这个节点占位(这个节点的hash值为-1--MOVED)
    ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
    boolean advance = true;
    boolean finishing = false; // to ensure sweep before committing nextTab
    for (int i = 0, bound = 0;;) {
        Node<K,V> f; int fh;
        while (advance) {
            int nextIndex, nextBound;
            //i记录当前正在迁移桶位的索引值
            //bound记录下一次任务迁移的开始桶位
            
            //--i >= bound 成立表示当前线程分配的迁移任务还没有完成
            if (--i >= bound || finishing)
                advance = false;
            //没有元素需要迁移 -- 后续会去将扩容线程数减1,并判断扩容是否完成
            else if ((nextIndex = transferIndex) <= 0) {
                i = -1;
                advance = false;
            }
            //计算下一次任务迁移的开始桶位,并将这个值赋值给transferIndex
            else if (U.compareAndSwapInt
                     (this, TRANSFERINDEX, nextIndex,
                      nextBound = (nextIndex > stride ?
                                   nextIndex - stride : 0))) {
                bound = nextBound;
                i = nextIndex - 1;
                advance = false;
            }
        }
        //如果没有更多的需要迁移的桶位,就进入该if
        if (i < 0 || i >= n || i + n >= nextn) {
            int sc;
            //扩容结束后,保存新数组,并重新计算扩容阈值,赋值给sizeCtl
            if (finishing) {
                nextTable = null;
                table = nextTab;
                sizeCtl = (n << 1) - (n >>> 1);
                return;
            }
		   //扩容任务线程数减1
            if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                //判断当前所有扩容任务线程是否都执行完成
                if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                    return;
                //所有扩容线程都执行完,标识结束
                finishing = advance = true;
                i = n; // recheck before commit
            }
        }
        //当前迁移的桶位没有元素,直接在该位置添加一个fwd节点
        else if ((f = tabAt(tab, i)) == null)
            advance = casTabAt(tab, i, null, fwd);
        //当前节点已经被迁移
        else if ((fh = f.hash) == MOVED)
            advance = true; // already processed
        else {
            //当前节点需要迁移,加锁迁移,保证多线程安全
            //此处迁移逻辑和jdk7的ConcurrentHashMap相同,不再赘述
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    Node<K,V> ln, hn;
                    if (fh >= 0) {
                        int runBit = fh & n;
                        Node<K,V> lastRun = f;
                        for (Node<K,V> p = f.next; p != null; p = p.next) {
                            int b = p.hash & n;
                            if (b != runBit) {
                                runBit = b;
                                lastRun = p;
                            }
                        }
                        if (runBit == 0) {
                            ln = lastRun;
                            hn = null;
                        }
                        else {
                            hn = lastRun;
                            ln = null;
                        }
                        for (Node<K,V> p = f; p != lastRun; p = p.next) {
                            int ph = p.hash; K pk = p.key; V pv = p.val;
                            if ((ph & n) == 0)
                                ln = new Node<K,V>(ph, pk, pv, ln);
                            else
                                hn = new Node<K,V>(ph, pk, pv, hn);
                        }
                        setTabAt(nextTab, i, ln);
                        setTabAt(nextTab, i + n, hn);
                        setTabAt(tab, i, fwd);
                        advance = true;
                    }
                    else if (f instanceof TreeBin) {
                        TreeBin<K,V> t = (TreeBin<K,V>)f;
                        TreeNode<K,V> lo = null, loTail = null;
                        TreeNode<K,V> hi = null, hiTail = null;
                        int lc = 0, hc = 0;
                        for (Node<K,V> e = t.first; e != null; e = e.next) {
                            int h = e.hash;
                            TreeNode<K,V> p = new TreeNode<K,V>
                                (h, e.key, e.val, null, null);
                            if ((h & n) == 0) {
                                if ((p.prev = loTail) == null)
                                    lo = p;
                                else
                                    loTail.next = p;
                                loTail = p;
                                ++lc;
                            }
                            else {
                                if ((p.prev = hiTail) == null)
                                    hi = p;
                                else
                                    hiTail.next = p;
                                hiTail = p;
                                ++hc;
                            }
                        }
                        ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                            (hc != 0) ? new TreeBin<K,V>(lo) : t;
                        hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                            (lc != 0) ? new TreeBin<K,V>(hi) : t;
                        setTabAt(nextTab, i, ln);
                        setTabAt(nextTab, i + n, hn);
                        setTabAt(tab, i, fwd);
                        advance = true;
                    }
                }
            }
        }
    }
}

推荐阅读:ConcurrentHashMap1.8 - 扩容详解:

ConcurrentHashMap1.8 - 扩容详解_ZOKEKAI的博客-CSDN博客_concurrenthashmap扩容

扩容 多线程效率提升

多线程协助扩容的操作会在两个地方被触发:

① 当添加元素时,发现添加的元素对用的桶位为 fwd 节点,就会先去协助扩容,然后再添加元素

② 当添加完元素后,判断当前元素个数达到了扩容阈值,此时发现sizeCtl的值小于0,并且新数组不为空,这个时候,会去协助扩容

采用分而治之的思想,在 ConcurrentHashMap 中采用的是分段扩容法,即每个线程负责一段,默认最小是 16,也就是说如果 ConcurrentHashMap 中只有 16 个槽位,那么就只会有一个线程参与扩容。如果大于 16 则根据当前 CPU 数来进行分配,最大参与扩容线程数不会超过 CPU 数。

其扩容的过程可描述为:

  • 首先扩容过程的中,节点首先移动到过渡表 nextTable ,所有节点移动完毕时替换散列表 table
  • 移动时先将散列表定长等分,然后逆序依次领取任务扩容,设置 sizeCtl 标记正在扩容;
  • 移动完成一个哈希桶或者遇到空桶时,将其标记为 ForwardingNode 节点,并指向 nextTable ;
  • 后有其他线程在操作哈希表时,遇到 ForwardingNode 节点,则先帮助扩容(继续领取分段任务),扩容完成后再继续之前的操作;

图形化表示如下:

1、源码分析

1.1、元素未添加,先协助扩容,扩容完之后再添加元素

final V putVal(K key, V value, boolean onlyIfAbsent) {
    if (key == null || value == null) throw new NullPointerException();
    int hash = spread(key.hashCode());
    int binCount = 0;
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            if (casTabAt(tab, i, null,
                         new Node<K,V>(hash, key, value, null)))
                break;                   // no lock when adding to empty bin
        }
        //发现此处为fwd节点,协助扩容,扩容结束后,再循环回来添加元素
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
        
        //省略代码
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
    Node<K,V>[] nextTab; int sc;
    if (tab != null && (f instanceof ForwardingNode) &&
        (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
        int rs = resizeStamp(tab.length);
        while (nextTab == nextTable && table == tab &&
               (sc = sizeCtl) < 0) {
            if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                sc == rs + MAX_RESIZERS || transferIndex <= 0)
                break;
            if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
                //扩容,传递一个 非null的 nextTab
                transfer(tab, nextTab);
                break;
            }
        }
        return nextTab;
    }
    return table;
}

1.2、先添加元素再协助扩容

private final void addCount(long x, int check) {
    //省略代码
    
    if (check >= 0) {
        Node<K,V>[] tab, nt; int n, sc;
  	    //元素个数达到扩容阈值
        while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
               (n = tab.length) < MAXIMUM_CAPACITY) {
            int rs = resizeStamp(n);
            //sizeCtl小于0,说明正在执行扩容,那么协助扩容
            if (sc < 0) {
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                    transferIndex <= 0)
                    break;
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                    transfer(tab, nt);
            }
            else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                         (rs << RESIZE_STAMP_SHIFT) + 2))
                transfer(tab, null);
            s = sumCount();
        }
    }
}

2、图解

扩容空间和 HashMap 一样,每次扩容都是将原空间大小左移一位,即扩大为之前的两倍。注意这里的 transferIndex 代表的就是推进下标,默认为旧数组的大小。

size 流程

当获取 Map.size 的时候,如果使用 Atomic 变量,很容易导致过度竞争,产生性能瓶颈,所以 CHM 中使用了,计数器的方式。

1、源码分析

1.1、size 方法

public int size() {
    long n = sumCount();
    return ((n < 0L) ? 0 :
            (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
            (int)n);
}

1.2、sumCount 方法

final long sumCount() {
    CounterCell[] as = counterCells; CounterCell a;
    //获取baseCount的值
    long sum = baseCount;
    if (as != null) {
        //遍历CounterCell数组,累加每一个CounterCell的value值
        for (int i = 0; i < as.length; ++i) {
            if ((a = as[i]) != null)
                sum += a.value;
        }
    }
    return sum;
}

2、总结

size,元素个数保存在 baseCount 中,并发时的个数变动保存在 CounterCell[] 当中。最后统计数量时累加 即可。

count 相关流程

count 相关流程用于维护 CounterCell[] 数组。

1.1、addCount 方法:

① CounterCell数组不为空,优先利用数组中的CounterCell记录数量

② 如果数组为空,尝试对baseCount进行累加,失败后,会执行fullAddCount逻辑

③ 如果是添加元素操作,会继续判断是否需要扩容

private final void addCount(long x, int check) {
    CounterCell[] as; long b, s;
    //当CounterCell数组不为空,则优先利用数组中的CounterCell记录数量
    //或者当baseCount的累加操作失败,会利用数组中的CounterCell记录数量
    if ((as = counterCells) != null ||
        !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
        CounterCell a; long v; int m;
        //标识是否有多线程竞争
        boolean uncontended = true;
        //当as数组为空
        //或者当as长度为0
        //或者当前线程对应的as数组桶位的元素为空
        //或者当前线程对应的as数组桶位不为空,但是累加失败
        if (as == null || (m = as.length - 1) < 0 ||
            (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
            !(uncontended =
              U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
            //以上任何一种情况成立,都会进入该方法,传入的uncontended是false
            fullAddCount(x, uncontended);
            return;
        }
        if (check <= 1)
            return;
        //计算元素个数
        s = sumCount();
    }
    if (check >= 0) {
        Node<K,V>[] tab, nt; int n, sc;
        //当元素个数达到扩容阈值
        //并且数组不为空
        //并且数组长度小于限定的最大值
        //满足以上所有条件,执行扩容
        while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
               (n = tab.length) < MAXIMUM_CAPACITY) {
            //这个是一个很大的正数
            int rs = resizeStamp(n);
            //sc小于0,说明有线程正在扩容,那么会协助扩容
            if (sc < 0) {
                //扩容结束或者扩容线程数达到最大值或者扩容后的数组为null或者没有更多的桶位需要转移,结束操作
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                    transferIndex <= 0)
                    break;
                //扩容线程加1,成功后,进行协助扩容操作
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                    //协助扩容,newTable不为null
                    transfer(tab, nt);
            }
            //没有其他线程在进行扩容,达到扩容阈值后,给sizeCtl赋了一个很大的负数
            //1+1=2 --》 代表此时有一个线程在扩容
            
            //rs << RESIZE_STAMP_SHIFT)是一个很大的负数
            else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                         (rs << RESIZE_STAMP_SHIFT) + 2))
                //扩容,newTable为null
                transfer(tab, null);
            s = sumCount();
        }
    }
}

1.2、fullAddCount 方法

① 当CounterCell数组不为空,优先对CounterCell数组中的CounterCell的value累加

② 当CounterCell数组为空,会去创建CounterCell数组,默认长度为2,并对数组中的CounterCell的value累加

③ 当数组为空,并且此时有别的线程正在创建数组,那么尝试对baseCount做累加,成功即返回,否则自旋

private final void fullAddCount(long x, boolean wasUncontended) {
    int h;
    //获取当前线程的hash值
    if ((h = ThreadLocalRandom.getProbe()) == 0) {
        ThreadLocalRandom.localInit();      // force initialization
        h = ThreadLocalRandom.getProbe();
        wasUncontended = true;
    }
    //标识是否有冲突,如果最后一个桶不是null,那么为true
    boolean collide = false;                // True if last slot nonempty
    for (;;) {
        CounterCell[] as; CounterCell a; int n; long v;
        //数组不为空,优先对数组中CouterCell的value累加
        if ((as = counterCells) != null && (n = as.length) > 0) {
            //线程对应的桶位为null
            if ((a = as[(n - 1) & h]) == null) {
                if (cellsBusy == 0) {            // Try to attach new Cell
                    //创建CounterCell对象
                    CounterCell r = new CounterCell(x); // Optimistic create
                    //利用CAS修改cellBusy状态为1,成功则将刚才创建的CounterCell对象放入数组中
                    if (cellsBusy == 0 &&
                        U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
                        boolean created = false;
                        try {               // Recheck under lock
                            CounterCell[] rs; int m, j;
                            //桶位为空, 将CounterCell对象放入数组
                            if ((rs = counterCells) != null &&
                                (m = rs.length) > 0 &&
                                rs[j = (m - 1) & h] == null) {
                                rs[j] = r;
                                //表示放入成功
                                created = true;
                            }
                        } finally {
                            cellsBusy = 0;
                        }
                        if (created) //成功退出循环
                            break;
                        //桶位已经被别的线程放置了已给CounterCell对象,继续循环
                        continue;           // Slot is now non-empty
                    }
                }
                collide = false;
            }
            //桶位不为空,重新计算线程hash值,然后继续循环
            else if (!wasUncontended)       // CAS already known to fail
                wasUncontended = true;      // Continue after rehash
            //重新计算了hash值后,对应的桶位依然不为空,对value累加
            //成功则结束循环
            //失败则继续下面判断
            else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
                break;
            //数组被别的线程改变了,或者数组长度超过了可用cpu大小,重新计算线程hash值,否则继续下一个判断
            else if (counterCells != as || n >= NCPU)
                collide = false;            // At max size or stale
            //当没有冲突,修改为有冲突,并重新计算线程hash,继续循环
            else if (!collide)
                collide = true;
            //如果CounterCell的数组长度没有超过cpu核数,对数组进行两倍扩容
            //并继续循环
            else if (cellsBusy == 0 &&
                     U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
                try {
                    if (counterCells == as) {// Expand table unless stale
                        CounterCell[] rs = new CounterCell[n << 1];
                        for (int i = 0; i < n; ++i)
                            rs[i] = as[i];
                        counterCells = rs;
                    }
                } finally {
                    cellsBusy = 0;
                }
                collide = false;
                continue;                   // Retry with expanded table
            }
            h = ThreadLocalRandom.advanceProbe(h);
        }
        //CounterCell数组为空,并且没有线程在创建数组,修改标记,并创建数组
        else if (cellsBusy == 0 && counterCells == as &&
                 U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
            boolean init = false;
            try {                           // Initialize table
                if (counterCells == as) {
                    CounterCell[] rs = new CounterCell[2];
                    rs[h & 1] = new CounterCell(x);
                    counterCells = rs;
                    init = true;
                }
            } finally {
                cellsBusy = 0;
            }
            if (init)
                break;
        }
        //数组为空,并且有别的线程在创建数组,那么尝试对baseCount做累加,成功就退出循环,失败就继续循环
        else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
            break;                          // Fall back on using base
    }
}

2、fullAddCount方法中,当as数组不为空的逻辑图解:

⭐总结

JDK7

在 JDK7 中 ConcurrentHashMap 保证高并发的三个方面:

  1. 用分离锁实现降低锁粒度,提升并发度
  2. 用HashEntery对象的不变性来降低执行读操作的线程在遍历链表期间对加锁的需求
  3. 通过对同一个 volatile 变量的写/读访问,协调不同线程间读/写操作的内存可见性

JDK8

在 JDK8 中,Java8 中的 ConcurrentHashMap 使用的 Synchronized 锁加 CAS 的机制。结构也由 Java7 中的 Segment 数组 + HashEntry 数组 + 链表 进化成了 Node 数组 + 链表 / 红黑树,Node 是类似于一个 HashEntry 的结构。它的冲突再达到一定大小时会转化成红黑树,在冲突小于一定数量时又退回链表。

JDK1.8为什么使用内置锁synchronized来代替重入锁ReentrantLock 有一下几点原因:

  • 因为粒度降低了,在相对而言的低粒度加锁方式,synchronized并不比ReentrantLock差,在粗粒度加锁中ReentrantLock可能通过Condition来控制各个低粒度的边界,更加的灵活,而在低粒度中,Condition的优势就没有了
  • JVM的开发团队从来都没有放弃synchronized,而且基于JVM的synchronized优化空间更大,使用内嵌的关键字比使用API更加自然
  • 在大量的数据操作下,对于JVM的内存压力,基于API的ReentrantLock会开销更多的内存,虽然不是瓶颈,但是也是一个选择依据

参考