最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

JUC并发—8.并发安全集合一

网站源码admin2浏览0评论

JUC并发—8.并发安全集合一

大纲

1.JDK 1.7的HashMap的死循环与数据丢失

2.ConcurrentHashMap的并发安全

3.ConcurrentHashMap的设计介绍

4.ConcurrentHashMap的put操作流程

5.ConcurrentHashMap的Node数组初始化

6.ConcurrentHashMap对Hash冲突的处理

7.ConcurrentHashMap的并发扩容机制

8.ConcurrentHashMap的分段锁统计元素数据

9.ConcurrentHashMap的查询操作是否涉及锁

10.ConcurrentHashMap中红黑树的使用

1.JDK 1.7的HashMap的死循环与数据丢失

(1)JDK 1.7的HashMap工作原理

(2)JDK 1.7的HashMap并发下导致的环形链表

(3)环形链表引发的死循环与数据丢失

(4)JDK 1.7和JDK 1.8的HashMap对比

(5)并发安全的集合

(1)JDK 1.7的HashMap工作原理

一.Hash算法

put(key, value) => 对key执行Hash算法 => 根据Hash值用类似取模的算法 => 定位数组的某一个元素 => 如果数组元素为空,则将value存放在该数组元素里。

二.Hash冲突

如果两个key的Hash值,经过取模算法定位到数组的同一个位置,此时就会用链表处理这种Hash冲突。

三.数组扩容

如果数组元素达到了:数组大小 * loadFactor(0.75),此时就会数组扩容。扩容时会按照倍数扩容,首先创建一个两倍大小的新数组。然后遍历原来的数组元素,对每个元素的key值进行Hash运算。接着将Hash运算后的Hash值对新数组大小进行取模,定位到新数组位置。

(2)JDK 1.7的HashMap并发下导致的环形链表

多线程并发操作HashMap时,可能会在扩容过程中形成一个环形链表。比如两个线程同时插入一个元素,而此时恰好两个线程同时触发了数组扩容。那么在数组扩容的过程中,就可能会形成一个环形链表。

下面是JDK1.7中HashMap扩容的核心源码。进行数组扩容时,会使用头插法来进行链表迁移。如果并发执行的两个线程同时使用头插法进行链表迁移,那么就有可能形成一个环形链表。

代码语言:javascript代码运行次数:0运行复制
//JDK1.7的HashMap扩容的核心方法
void transfer(Entry[] newTable) {
    Entry[] src = table;//旧的数组
    int newCapacity = newTable.length;
    for (int j = 0; j < src.length; j++) {
        Entry<K,V> e = src[j];
        if (e != null) {
            src[j] = null;
            do {
                Entry<K,V> next = e.next;
                //线程1执行到这里,假设此时的链表为:newTable[i] = <k1,v1> -> <k2,v2>
                //那么可知:e = <k1,v1>,next = <k2,v2>
                //恰好此时CPU发生了上下文切换,于是切换到线程2去执行扩容
                //线程2扩容时处理完链表的这两个节点后,newTable[i]就变成了:<k2,v2> -> <k1,v1>
                //然后CPU又切换回线程1来执行,由于此时e = <k1,v1>,那么后续代码对e.next赋值后,e就成为环形链表了:
                //也就是e = <k1,v1> -> <k2,v2> -> <k1,v1>,最后e又赋值给newTable[i]
                int i = indexFor(e.hash, newCapacity);//在新数组的位置
                //头插法:刚开始newTable[i]为null,后来newTable[i]变为<k1,v1>;
                //然后当e=<k2,v2>时,这里e设为<k2,v2> -> <k1,v1>,并又赋值给newTable[i]
                //接着遍历链表当e=<k3,v3>时,这里e设为<k3,v3> -> <k2,v2> -> <k1,v1> ...
                e.next = newTable[i];
                newTable[i] = e;
                e = next;
            } while (e != null);
        }
    }
}

(3)环形链表引发的死循环与数据丢失

一.环形链表导致死循环

假如执行get(k3)时,k3的Hash取模算法定位到环形链表的位置。于是开始遍历环形链表,但由于环形链表里没有k3的值,所以会导致在环形链表中无法找到k3对应的值进行返回。这样就导致了一直在环形链表中进行死循环,无法退出遍历。最后导致CPU飙升,线上系统被这个get操作卡死。

二.环形链表导致丢失数据

上面例子就导致了从出发是无法找到的,因此这条数据就永久丢失了,甚至会被垃圾回收掉。

(4)JDK 1.7和JDK 1.8的HashMap对比

在JDK1.7中,HashMap采用数组 + 链表的数据结构来存储数据。在多个线程并发扩容时,可能会造成环形链表最终导致死循环和数据丢失。

在JDK1.8中,HashMap采用数组 + 链表 + 红黑树的数据结构来存储数据,并且优化了JDK1.7中的数组扩容方案,解决了死循环和数据丢失的问题。但是在并发场景下调用put()方法时,有可能会存在数据覆盖的问题。

(5)并发安全的集合

比如HashTable使用synchronized来保证线程的安全性,比如Collections.synchronizedMap可以把一个线程不安全的Map,通过synchronized的方式,将其变成安全的。

但是这些方法在线程竞争激烈的情况下,效率都比较低。因为它们都是在方法层面上使用了synchronized实现的锁机制,从而导致不管是put操作还是get操作都需要去竞争同一把锁。

ConcurrentHashMap既能保证并发安全,性能也好于HashTable等集合。

2.ConcurrentHashMap的并发安全

(1)如何理解ConcurrentHashMap的并发安全

(2)ConcurrentHashMap在复合操作中的安全问题

(3)ConcurrentMap可解决复合操作的安全问题

(4)ConcurrentMap支持lambda表达式操作

(1)如何理解ConcurrentHashMap的并发安全

只能保证多线程并发执行时,容器中的数据不会被破坏。无法保证涉及多个线程的复合操作的正确性,复合操作会有并发安全问题。

(2)ConcurrentHashMap在复合操作中的安全问题

假设需要通过一个ConcurrentHashMap来记录每个用户的访问次数。如果指定用户已经有访问次数的记录,则进行递增,否则添加新访问记录。

如下代码在多线程并发调用时,会存在并发安全问题。虽然ConcurrentHashMap对于数据操作本身是安全的,但这里是复合操作,也就是"读—修改—写",而这三个操作作为一个整体却不是原子的。所以当多个线程访问同一个用户时,很可能会覆盖相互操作的结果,从而造成该用户的访问记录次数少于实际访问次数。

代码语言:javascript代码运行次数:0运行复制
public class Demo {
    private static final ConcurrentHashMap<String, Long> USER_ACCESS_COUNT = new ConcurrentHashMap<>(64);
    
		public static void main(String[] args) throws InterruptedException {
        Long accessCount = USER_ACCESS_COUNT.get("张三");
        if (accessCount == null) {
            USER_ACCESS_COUNT.put("张三", 1L);
        } else {
            USER_ACCESS_COUNT.put("张三", accessCount + 1);
        }
    }
}

(3)ConcurrentMap可解决复合操作的安全问题

虽然ConcurrentHashMap是并发安全的,但对于其复合操作需要特别关注。上述复合操作的安全问题的解决方案是,可以对复合操作加锁,也可以使用ConcurrentMap接口来解决复合操作的安全问题。

ConcurrentMap是一个支持并发访问的Map集合,相当于在原本的Map集合上新增了一些方法来扩展Map的功能。

ConcurrentMap接口定义的如下4个方法,都能满足原子性的,可以用在ConcurrentHashMap的复合操作场景中。

代码语言:javascript代码运行次数:0运行复制
//A java.util.Map providing thread safety and atomicity guarantees.
public interface ConcurrentMap<K, V> extends Map<K, V> {
    ...
    //向ConcurrentHashMap集合插入数据
    //如果插入数据的key不存在于集合中,则保存当前数据并且返回null
    //如果key已经存在,则返回存在的key对应的value
    V putIfAbsent(K key, V value);
    
    //根据key和value来删除ConcurrentHashMap集合中的元素
    //该删除操作必须保证key和value完全匹配,删除成功则返回true,否则返回false
    boolean remove(Object key, Object value);
    
    //根据key和oldValue来替换ConcurrentHashMap中已经存在的值,新的值是newValue
    //该替换操作必须保证key和oldValue玩去匹配,替换成功则返回true,否则返回false
    boolean replace(K key, V oldValue, V newValue);
    
    //和replace(key, oldValue, newValue)不同之处在于,少了对oldValue的判断
    //如果替换成功,则返回替换之前的value,否则返回null
    V replace(K key, V value);
    ...
}

因此,可以基于ConcurrentMap提供的接口对上述Demo进行改造。将原来ConcurrentHashMap第一次的put()方法替换为putIfAbsent()方法,将原来ConcurrentHashMap修改用的put()方法替换为replace()方法。由于putIfAbsent()方法和replace()方法都能保证原子性,所以并发安全了。同时增加一个while(true)方法以实现一个类似自旋的操作,确保操作成功。

代码语言:javascript代码运行次数:0运行复制
public class KeyUtil {
    private static final ConcurrentHashMap<String, Long> USER_ACCESS_COUNT = new ConcurrentHashMap<>(64);

    public static void main(String[] args) throws InterruptedException {
        while (true) {
            Long accessCount = USER_ACCESS_COUNT.get("张三");
            if (accessCount == null) {
                if (USER_ACCESS_COUNT.putIfAbsent("张三", 1L) == null) {
                    break;
                }
            } else {
                if (USER_ACCESS_COUNT.replace("张三", accessCount, accessCount + 1)) {
                    break;
                }
            }
        }
    }
}

(4)ConcurrentMap支持lambda表达式操作

一puteIfAbsent()方法

该方法通过判断传入的key是否存在来对ConcurrentMap进行数据初始化。如果key存在,则不做任何处理。如果key不存在,则调用mappingFunction计算出value值添加到Map。

代码语言:javascript代码运行次数:0运行复制
//如果张三这个用户不存在,则下面代码会初始化张三这个用户的值为1
USER_ACCESS_COUNTputeIfAbsent("张三", k -> 1L);

二puteIfPresent()方法

该方法对已经存在的key对应的value值进行修改。如果key不存在,则返回null。如果key存在,则调用mappingFunction计算出value值修改Map。

代码语言:javascript代码运行次数:0运行复制
//如果要对张三这个已经存在的用户的value值进行修改,可以使用如下代码:
USER_ACCESS_COUNTputeIfPresent("张三", (k,v) -> v + 1);

三pute()方法

compute()方法是computeIfAbsent()方法和computeIfPresent()方法的结合体。不管key是否存在,都会调用mappingFunction计算出value值。如果key存在,则对value进行修改。如果key不存在,则进行初始化处理。

代码语言:javascript代码运行次数:0运行复制
//如果张三这个用户不存在,则下面代码会初始化张三这个用户的值为1
USER_ACCESS_COUNTputeIfAbsent("张三", k -> 1L);

//如果要对张三这个已经存在的用户的value值进行修改,可以使用如下代码:
USER_ACCESS_COUNTputeIfPresent("张三", (k,v) -> v + 1);

//如果张三这个用户存在,则对其value加1,否则初始化其值为1
USER_ACCESS_COUNTpute("张三", (k,v) -> (v == null) ? 1L : v + 1);

3.ConcurrentHashMap的设计介绍

(1)JDK1.8相比于JDK1.7的改进

(2)ConcurrentHashMap的设计思想

(3)ConcurrentHashMap的数据结构定义

(1)JDK1.8相比于JDK1.7的改进

一.取消了segment分段设计,直接使用Node数组来保存数据

采用Node数组元素作为锁的粒度,进一步减少并发冲突的范围和概率。

二.引入红黑树设计

红黑树降低了极端情况下查询某个结点数据的时间复杂度,从O(n)降低到了O(logn),提升了查找性能。

(2)ConcurrentHashMap的设计思想

一.通过对数组元素加锁来降低锁的粒度

二.多线程进行并发扩容

三.高低位迁移方法

四.链表转红黑树及红黑树转链表

五.分段锁来实现数据统计

(3)ConcurrentHashMap的数据结构定义

ConcurrentHashMap采用Node数组来存储数据,数组长度默认为16。Node表示数组中的一个具体的数据结点,并且实现了Map.Entry接口。Node的key和val属性,表示实际存储的key和value。Node的hash属性,表示当前key对应的hash值。Node的next属性,表示如果是链表结构,则指向下一个Node结点。

当链表长度大于等于8 + Node数组长度大于64时,链表会转为红黑树,红黑树的存储使用TreeNode来实现。

代码语言:javascript代码运行次数:0运行复制
public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> implements ConcurrentMap<K,V>, Serializable { 
    ...
    //The default initial table capacity.
    //Must be a power of 2 (i.e., at least 1) and at most MAXIMUM_CAPACITY.
    private static final int DEFAULT_CAPACITY = 16;
    
    //The array of bins. Lazily initialized upon first insertion.
    //Size is always a power of two. Accessed directly by iterators.
    transient volatile Node<K,V>[] table;//用来存储ConcurrentHashMap数据的Node数组
    
    //Key-value entry.  
    //This class is never exported out as a user-mutable Map.Entry 
    //(i.e., one supporting setValue; see MapEntry below), 
    //but can be used for read-only traversals used in bulk tasks.
    //Subclasses of Node with a negative hash field are special, 
    //and contain null keys and values (but are never exported).  
    //Otherwise, keys and vals are never null.
    static class Node<K,V> implements Map.Entry<K,V> {
        final int hash;//当前key对应的hash值
        final K key;//实际存储的key
        volatile V val;//实际存储的value
        volatile Node<K,V> next;//如果是链表结构,则指向下一个Node结点
     
        Node(int hash, K key, V val, Node<K,V> next) {
            this.hash = hash;
            this.key = key;
            this.val = val;
            this.next = next;
        }
        ...
    }
    
    //Nodes for use in TreeBins
    static final class TreeNode<K,V> extends Node<K,V> {
        TreeNode<K,V> parent;//red-black tree links
        TreeNode<K,V> left;
        TreeNode<K,V> right;
        TreeNode<K,V> prev;//needed to unlink next upon deletion
        boolean red;


        TreeNode(int hash, K key, V val, Node<K,V> next, TreeNode<K,V> parent) {
            super(hash, key, val, next);
            this.parent = parent;
        }
        ...
    }
    ...
}

4.ConcurrentHashMap的put操作流程

(1)ConcurrentHashMap的put操作流程

(2)ConcurrentHashMap和HashMap的put操作

(3)为什么ConcurrentHashMap是并发安全的

(1)ConcurrentHashMap的put操作流程

首先通过key的hashCode的高低16位的位与操作来计算key的hash值,让32位的hashCode都参与运算以降低数组大小小于32时哈希冲突的概率。

然后判断Node数组是否为空或者Node数组的长度是否为0。如果为空或者为0,则调用initTable()方法进行初始化。如果不为空,则通过hash & (n - 1)计算当前key在Node数组中的下标位置。并通过tabAt()方法获取该位置的值f,然后判断该位置的值f是否为空。

如果该位置的值f为空,则把当前的key和value封装成Node对象。然后尝试通过casTabAt()方法使用CAS设置该位置的值f为封装好的Node对象。如果CAS设置成功,则退出for循环,否则继续进行下一次for循环。

如果该位置的值f不为空,则判断Node数组是否正处于扩容中。如果是,那么当前线程就调用helpTransfer()方法进行并发扩容。如果不是,那么说明当前的key在Node数组中出现了Hash冲突。于是通过synchronized关键字,对该位置的值f进行Hash冲突处理。其实JUC还可以继续优化,比如先用CAS尝试修改哈希冲突下的链表或红黑树。如果CAS修改失败,那么再通过使用synchronized对该数组元素加锁来进行处理。

最后,会调用addCount()方法统计Node数组中的元素个数。

代码语言:javascript代码运行次数:0运行复制
public class ConcurrentHashMapDemo {
    public static void main(String[] args) {
        ConcurrentHashMap<String, String> map = new ConcurrentHashMap<String, String>();
        map.put("k1", "v1");
    }
}

public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> implements ConcurrentMap<K,V>, Serializable { 
    ...
    //The array of bins. Lazily initialized upon first insertion.
    //Size is always a power of two. Accessed directly by iterators.
    transient volatile Node<K,V>[] table;
    
    //Creates a new, empty map with the default initial table size (16).
    public ConcurrentHashMap() {
        
    }
    
    //Creates a new, empty map with an initial table size 
    //accommodating the specified number of elements without the need to dynamically resize.
    //@param initialCapacity The implementation performs internal sizing to accommodate this many elements.
    public ConcurrentHashMap(int initialCapacity) {
        if (initialCapacity < 0) {
            throw new IllegalArgumentException();
        }
        int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
        this.sizeCtl = cap;
    }
    
    //Returns a power of two table size for the given desired capacity.
    private static final int tableSizeFor(int c) {
        int n = c - 1;
        n |= n >>> 1;
        n |= n >>> 2;
        n |= n >>> 4;
        n |= n >>> 8;
        n |= n >>> 16;
        return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
    }
    
    static final int spread(int h) {
        //通过hashCode的高低16位的异或运算来计算hash值,以降低数组大小比32小的时候的哈希冲突概率
        return (h ^ (h >>> 16)) & HASH_BITS;
    }
    
    //Maps the specified key to the specified value in this table.
    //Neither the key nor the value can be null.
    public V put(K key, V value) {
        return putVal(key, value, false);
    }
    
    //获取Node数组在位置i的元素,通过Unsafe类让数组中的元素具有可见性
    //虽然table变量使用了volatile修饰,但这只保证了table引用对于所有线程的可见性,还不能保证table数组中的元素的修改对于所有线程是可见的
    //因此需要通过Unsafe类的getObjectVolatile()来保证table数组中的元素的可见性
    static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
        return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
    }
    
    //CAS设置Node数组的元素为某个Node对象
    static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v) {
        return UpareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
    }
    
    final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) {
            throw new NullPointerException();
        }
      
        //通过key的hashCode的高低16位的位与操作来计算hash值
        int hash = spread(key.hashCode());
        int binCount = 0;
       
        //这是一个没有结束条件的for循环,用来自旋
        //其中Node数组的引用赋值给了tab变量
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            if (tab == null || (n = tab.length) == 0) {
                //调用initTable()方法初始化Node数组
                tab = initTable();
            } else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                //如果通过CAS设置Node数组位置i的值为key/value封装的Node对象,则退出for循环
                if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) {
                    break;// no lock when adding to empty bin
                }
            } else if ((fh = f.hash) == MOVED) {
                //如果发现Node数组正处于扩容中,那么就进行并发扩容
                tab = helpTransfer(tab, f);
            } else {
                V oldVal = null;
                //处理Hash冲突
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        if (fh >= 0) {//如果是链表
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent) {
                                        e.val = value;
                                    }
                                    break;
                                }
                                Node<K,V> pred = e;
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key, value, null);
                                    break;
                                }
                            }
                        } else if (f instanceof TreeBin) {//如果是红黑树
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent) {
                                    p.val = value;
                                }
                            }
                        }
                    }
                }
                if (binCount != 0) {
                    //如果链表的元素大于等于8
                    if (binCount >= TREEIFY_THRESHOLD) {//TREEIFY_THRESHOLD = 8
                        treeifyBin(tab, i);//链表转红黑树
                    }
                    if (oldVal != null) {
                        return oldVal;
                    }
                    break;
                }
            }
        }
        //调用addCount()方法统计Node数组元素的个数
        addCount(1L, binCount);
        return null;
    }
    ...
}

(2)ConcurrentHashMap和HashMap的put操作

都是通过key的hashCode的高低16位的异或运算,来降低Hash冲突概率。

都是通过Hash值与数组大小-1的位与运算(取模),来定位key在数组的位置。

但ConcurrentHashMap使用了自旋 + CAS + synchronized来处理put操作,从而保证了多个线程对数组里某个key进行赋值时的效率 + 并发安全性。

代码语言:javascript代码运行次数:0运行复制
public class HashMap<K,V> extends AbstractMap<K,V> implements Map<K,V>, Cloneable, Serializable {
    static final int TREEIFY_THRESHOLD = 8;//链表转红黑树的阈值
    ...
    
    public V put(K key, V value) {
        return putVal(hash(key), key, value, false, true);
    }
  
    final V putVal(int hash, K key, V value, boolean onlyIfAbsent, boolean evict) {
        Node<K,V>[] tab; Node<K,V> p; int n, i;
        if ((tab = table) == null || (n = tab.length) == 0) {
            n = (tab = resize()).length;
        }
        if ((p = tab[i = (n - 1) & hash]) == null) {
            //如果通过哈希寻址算法定位到的下标为i的数组元素为空(即tab[i]为空)
            //那么就可以直接将一个新创建的Node对象放到数组的tab[i]这个位置;
            tab[i] = newNode(hash, key, value, null);
        } else {
            Node<K,V> e; K k;
            if (p.hash == hash && ((k = p.key) == key || (key != null && key.equals(k)))) {
                //通过哈希寻址算法定位到的数组位置已有Node元素
                //那么判断是否为相同的key,如果是相同的key则进行value覆盖
                e = p;
            } else if (p instanceof TreeNode) {
                //通过哈希寻址算法定位到的数组位置已有Node元素,而且不是相同的key
                //那么通过"p instanceof TreeNode)",判断数组的tab[i]元素是否是一颗红黑树
                e = ((TreeNode<K,V>)p).putTreeVal(this, tab, hash, key, value);
            } else {
                ...
            }
            ...
        }
        ++modCount;
        //判断数组大小size,是否已经达到了扩容阈值threshold大小
        if (++size > threshold) {
            resize();
        }
        afterNodeInsertion(evict);
        return null;
    }
    ...
}

(3)为什么ConcurrentHashMap是并发安全的

首先在初始化Node数组时,会通过自旋 + CAS去设置sizeCtl的值来获得锁。然后在put()操作时,也会通过自旋 + CAS去设置数组某个位置的值。当出现Hash冲突时,则使用synchronized关键字来修改数组某个位置的值。

5.ConcurrentHashMap的Node数组初始化

(1)调用put()方法时才初始化Node数组

(2)initTable()方法的初始化逻辑

(3)sizeCtl的状态流转

(1)调用put()方法时才初始化Node数组

Node数组的初始化过程是被动的,当调用ConcurrentHashMap.put()方法时,如果发现Node数组还没有被初始化,才会调用initTable()方法完成初始化。

(2)initTable()方法的初始化逻辑

initTable()方法和一般的初始化方法不同,因为需要考虑多线程并发情形。

首先while循环的退出条件是Node数据即table初始化成功,否则一直循环。这其实就使用到了自旋的机制,因为多个线程调用initTable()必然会竞争。而在竞争的情况下如果不采用独占锁机制,就只能通过自旋来不断重试。

然后通过sizeCtl是否小于0来判断当前是否有其他线程正在进行初始化。如果有,则通过Thread.yield()把自己变成就绪状态,释放CPU资源。如果没有,则通过CAS修改sizeCtl变量的值为-1。

如果CAS修改sizeCtl成功,则表示当前线程获取初始化Node数组的锁成功了;

如果CAS修改sizeCtl失败,则表示当前线程获取初始化Node数组的锁失败了;

对于获取锁失败的线程,会继续进入下一次while循环进行重试,这样设计是为了避免出现多个线程同时初始化Node数组。

对于获取锁成功的线程,首先会判断Node数组是否已经初始化完成。如果Node数组已经初始化完成,则退出while循环。如果Node数组还是空,则创建一个Node数组,然后赋值给table变量。并且计算下次扩容的阈值(0.75倍当前数组容量),然后赋值给sizeCtl。

代码语言:javascript代码运行次数:0运行复制
public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> implements ConcurrentMap<K,V>, Serializable { 
    //sizeCtl = -1,表示当前有线程抢占到了初始化Node数组的资格,正在初始化Node数组
    //sizeCtl < -1,用sizeCtl值的二进制低16位来记录当前参与扩容的线程数量
    //sizeCtl = 0,表示Node数组未初始化,并且在ConcurrentHashMap构造方法中没有指定初始容量
    //sizeCtl > 0,如果Node数组已经初始化,那么sizeCtl表示扩容的阈值(初始容量 * 0.75),如果未初始化,则表示数组的初始容量
    private transient volatile int sizeCtl;
    private static final long SIZECTL;
  
    static {
        try {
            U = sun.misc.Unsafe.getUnsafe();//获取UnSafe对象
            Class<?> k = ConcurrentHashMap.class;
            SIZECTL = U.objectFieldOffset(k.getDeclaredField("sizeCtl"));//获取sizeCtl变量的偏移量
            ...
        } catch (Exception e) {
            throw new Error(e);
        }
    }
    ...
    
    //初始化Node数组
    //Initializes table, using the size recorded in sizeCtl.
    private final Node<K,V>[] initTable() {
        Node<K,V>[] tab; int sc;
        //退出while循环的条件是Node数组即table初始化成功
        while ((tab = table) == null || tab.length == 0) {
            //判断当前是否有其他线程正在进行初始化
            if ((sc = sizeCtl) < 0) {
                //如果有,则通过Thread.yield()把自己变成就绪状态,释放CPU资源
                Thread.yield();//lost initialization race; just spin
            } else if (UpareAndSwapInt(this, SIZECTL, sc, -1)) {
                //如果没有线程正在进行初始化,则通过CAS修改sizeCtl变量的值为-1
                //如果CAS修改成功,则表示当前线程获得了初始化数组的锁
                //如果CAS修改失败,则表示当前线程获取初始化数组的锁失败
                try {
                    //再次判断Node数组是否为空,即Node数组是否已经初始化完成
                    //因为执行Thread.yield()让出CPU资源的线程必然会再次执行到这里
                    if ((tab = table) == null || tab.length == 0) {
                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                        @SuppressWarnings("unchecked")
                        //初始化大小为n的Node数组,然后赋值给tab变量和table变量
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        //赋值给ConcurrentHashMap的全局Node数组table
                        table = tab = nt;
                        //计算下次扩容的阈值,阈值的计算是当前数组容量的0.75倍
                        sc = n - (n >>> 2);
                    }
                } finally {
                    //最后将扩容的阈值赋值给sizeCtl
                    sizeCtl = sc;
                }
                break;
            }
        }
        return tab;
    }
    ...
}

(3)sizeCtl的状态流转

一.sizeCtl = -1

表示当前有线程抢占到了初始化Node数组的资格,正在初始化Node数组。

二.sizeCtl < -1

用sizeCtl值的二进制低16位来记录当前参与扩容的线程数量。

三.sizeCtl = 0

表示Node数组未初始化,且创建ConcurrentHashMap时没有指定初始容量。

四.sizeCtl > 0

如果Node数组已经初始化,那么sizeCtl表示扩容的阈值(初始容量 * 0.75)。如果Node数组未初始化,则表示数组的初始容量。

6.ConcurrentHashMap对Hash冲突的处理

(1)Hash冲突的几个解决方案

(2)ConcurrentHashMap对Hash冲突的处理

(3)链表长度大于8时是扩容还是转化为红黑树

(1)Hash冲突的几个解决方案

一.开放寻址法

如果位置i被占用,那么就探查i+1、i+2、i+3的位置。ThreadLocal采用的就是开放寻址法。

二.链式寻址法

Hash表的每个位置都连接一个链表。当发生Hash冲突时,冲突的元素会被加入到这个位置的链表中。ConcurrentHashMap就是基于链式寻址法解决Hash冲突的。

三.再Hash法

提供多个不同的Hash函数,当发生Hash冲突时,使用第二个、第三个等。

(2)ConcurrentHashMap对Hash冲突的处理

首先使用synchronized对当前位置的Node对象f进行加锁。由于这种锁控制在数组的单个数据元素上,所以长度为16的数组理论上就可以支持16个线程并发写入数据。

然后判断当前位置的Node对象f是链表还是红黑树。如果是链表,那么就把当前的key/value封装成Node对象插入到链表的尾部。如果是红黑树,那么就调用TreeBin的putTreeVal()方法往红黑树插入结点。

最后判断链表的长度是否大于等于8,如果链表的长度大于等于8,再调用treeifyBin()方法决定是扩容数组还是将链表转化为红黑树。

代码语言:javascript代码运行次数:0运行复制
public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> implements ConcurrentMap<K,V>, Serializable { 
    ...
    //The array of bins. Lazily initialized upon first insertion.
    //Size is always a power of two. Accessed directly by iterators.
    transient volatile Node<K,V>[] table;
    
    //Maps the specified key to the specified value in this table.
    //Neither the key nor the value can be null.
    public V put(K key, V value) {
        return putVal(key, value, false);
    }
    
    //获取Node数组在位置i的元素
    //虽然table变量使用了volatile修饰,但这只保证了table引用对于所有线程的可见性,还不能保证table数组中的元素的修改对于所有线程是可见的
    //因此需要通过Unsafe类的getObjectVolatile()来保证table数组中的元素的可见性
    static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
        return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
    }
    
    //CAS设置Node数组的元素为某个Node对象
    static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v) {
        return UpareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
    }
    
    final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) {
            throw new NullPointerException();
        }
      
        //通过key的hashCode的高低16位的位与操作来计算hash值
        int hash = spread(key.hashCode());
        int binCount = 0;
       
        //这是一个没有结束条件的for循环,用来自旋
        //其中Node数组的引用赋值给了tab变量
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            if (tab == null || (n = tab.length) == 0) {
                //调用initTable()方法初始化Node数组
                tab = initTable();
            } else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                //如果通过CAS设置Node数组位置i的值为key/value封装的Node对象,则退出for循环
                if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) {
                    break;// no lock when adding to empty bin
                }
            } else if ((fh = f.hash) == MOVED) {
                //发现Node数组正处于扩容中,那么就进行并发扩容
                tab = helpTransfer(tab, f);
            } else {
                V oldVal = null;
                //处理Hash冲突
                synchronized (f) {//使用synchronized对当前数组位置的Node对象f进行加锁
                    if (tabAt(tab, i) == f) {
                        if (fh >= 0) {//如果是链表
                            binCount = 1;//binCount用来记录链表的长度
                            //从链表的头结点开始遍历每个结点
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                //如果存在相同的key,则修改该key的value
                                if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent) {
                                        e.val = value;
                                    }
                                    break;
                                }
                                Node<K,V> pred = e;
                                //找到链表的最后一个结点
                                if ((e = e.next) == null) {
                                    //把当前的key/value封装成Node对象插入到链表的尾部
                                    pred.next = new Node<K,V>(hash, key, value, null);
                                    break;
                                }
                            }
                        } else if (f instanceof TreeBin) {//如果是红黑树
                            Node<K,V> p;
                            binCount = 2;
                            //调用TreeBin的putTreeVal()方法往红黑树插入结点
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent) {
                                    p.val = value;
                                }
                            }
                        }
                    }
                }
                //最后判断链表的长度是否大于等于8
                if (binCount != 0) {
                    //如果链表的长度大于等于8,再调用treeifyBin()方法决定是扩容数组还是转化为红黑树
                    if (binCount >= TREEIFY_THRESHOLD) {//TREEIFY_THRESHOLD = 8
                        treeifyBin(tab, i);//是扩容数组还是转化为红黑树
                    }
                    if (oldVal != null) {
                        return oldVal;
                    }
                    break;
                }
            }
        }
        //调用addCount()方法统计Node数组元素的个数
        addCount(1L, binCount);
        return null;
    }
    ...
}

(3)链表长度大于8时是扩容还是转化为红黑树

当链表长度 >= 8时ConcurrentHashMap会对链表采用两种方式进行优化。

方式一:对数组进行扩容

当数组长度 <= 64,且链表长度 >= 8时,优先选择对数组进行扩容。

方式二:把链表转化为红黑树

当数组长度 > 64,且链表长度 >= 8时,会将链表转化为红黑树。

treeifyBin()方法的作用是根据相关阈值来决定是扩容还是把链表转为红黑树。

代码语言:javascript代码运行次数:0运行复制
public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> implements ConcurrentMap<K,V>, Serializable { 
    static final int MIN_TREEIFY_CAPACITY = 64;
    ...
    //Replaces all linked nodes in bin at given index unless table is too small, in which case resizes instead.
    private final void treeifyBin(Node<K,V>[] tab, int index) {
        Node<K,V> b; int n, sc;
        if (tab != null) {
            //如果当前数组的长度小于64,则调用tryPresize()方法进行数组扩容
            if ((n = tab.length) < MIN_TREEIFY_CAPACITY) {
                tryPresize(n << 1);//数组扩容
            } else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
                synchronized (b) {
                    if (tabAt(tab, index) == b) {
                        TreeNode<K,V> hd = null, tl = null;
                        for (Node<K,V> e = b; e != null; e = e.next) {
                            //构建一个TreeNode并插入红黑树中
                            TreeNode<K,V> p = new TreeNode<K,V>(e.hash, e.key, e.val, null, null);
                            if ((p.prev = tl) == null) {
                                hd = p;
                            } else {
                                tl.next = p;
                            }
                            tl = p;
                        }
                        setTabAt(tab, index, new TreeBin<K,V>(hd));
                    }
                }
            }
        }
    }
    ...
}

7.ConcurrentHashMap的并发扩容机制

(1)ConcurrentHashMap扩容的前置操作

(2)ConcurrentHashMap并发扩容的机制

(3)ConcurrentHashMap并发扩容的流程

(1)ConcurrentHashMap扩容的前置操作

ConcurrentHashMap的tryPresize()方法用于处理数组扩容前的前置操作,该方法主要分为四部分。

第一部分:

首先通过tableSizeFor()方法计算传入size的最小的2的幂次方。

第二部分:

然后判断Node数组是否已初始化,如果还没初始化则要先进行初始化。初始化时会计算扩容阈值为数组大小的0.75倍 + 将扩容阈值赋值给sizeCtl。

第三部分:

如果Node数组已经初始化,则判断是否需要进行扩容。如果Node数组已经被其他线程完成扩容,则当前线程退出循环,无需扩容。如果Node数组已达到最大容量,则无法再进行扩容,也需退出循环。

第四部分:

调用transfer()方法开始执行扩容操作。如果sizeCtl < 0,说明此时已经有其他线程在执行扩容了。如果sizeCtl >= 0,说明此时没有其他线程进行扩容。当前线程都会先通过CAS成功设置sizeCtl后,再调用transfer()方法来扩容。

代码语言:javascript代码运行次数:0运行复制
public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> implements ConcurrentMap<K,V>, Serializable { 
    ...
    //Returns a power of two table size for the given desired capacity.
    private static final int tableSizeFor(int c) {
        int n = c - 1;
        n |= n >>> 1;
        n |= n >>> 2;
        n |= n >>> 4;
        n |= n >>> 8;
        n |= n >>> 16;
        return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
    }

    //Tries to presize table to accommodate the given number of elements.
    private final void tryPresize(int size) {
        //一.首先通过tableSizeFor()方法计算传入size的最小的2的幂次方
        int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : tableSizeFor(size + (size >>> 1) + 1);
        int sc;
        while ((sc = sizeCtl) >= 0) {
            Node<K,V>[] tab = table; int n;
            //二.判断Node数组是否已经初始化,如果还没初始化,需要先进行初始化
            if (tab == null || (n = tab.length) == 0) {
                n = (sc > c) ? sc : c;
                if (UpareAndSwapInt(this, SIZECTL, sc, -1)) {
                    try {
                        if (table == tab) {
                            @SuppressWarnings("unchecked")
                            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                            table = nt;
                            sc = n - (n >>> 2);//扩容阈值为数组大小的0.75倍
                        }
                    } finally {
                        sizeCtl = sc;//将扩容阈值赋值给sizeCtl
                    }
                }
            }
            //三.如果Node数组已经初始化,则判断是否需要进行扩容
            else if (c <= sc || n >= MAXIMUM_CAPACITY) {
                //c <= sc,说明Node数组已经被其他线程完成扩容了,不需要再进行扩容
                //n >= MAXIMUM_CAPACITY,说明Node数组已达到最大容量,无法再进行扩容
                break;
            }
            //四.调用transfer()方法开始执行扩容操作
            else if (tab == table) {
                int rs = resizeStamp(n);
                //如果sc < 0,说明此时已经有其他线程在执行扩容了
                //于是当前线程可以先通过CAS成功设置sizeCtl的值后,再调用transfer()方法协助扩容
                if (sc < 0) {
                    Node<K,V>[] nt;
                    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) { 
                        break;
                    }
                    if (UpareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
                        transfer(tab, nt);
                    }
                }
                //如果sc >= 0,说明此时没有其他线程进行扩容
                //于是当前线程也是先通过CAS成功设置sizeCtl的值后,再调用transfer()方法进行扩容
                else if (UpareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) {
                    transfer(tab, null);
                }
            }
        }
    }
    ...
}

(2)ConcurrentHashMap并发扩容的机制

一.ConcurrentHashMap中的扩容设计

二.多线程并发扩容的原理

一.ConcurrentHashMap中的扩容设计

扩容就是创建一个2倍原大小的数组,然后把原数组的数据迁移到新数组中。但多线程环境下的扩容,需要考虑其他线程会同时往数组添加元素的情况。如果简单地对扩容过程增加一把同步锁,保证扩容过程不存在其他线程操作,那么就会对性能的损耗特别大,特别是数据量比较大时,阻塞的线程会很多。

首先使用CAS来实现计算每个线程的迁移区间。然后使用synchronized把锁粒度控制到每个数组元素上。如果数组有16个元素就有16把锁,如果数组有32个元素就有32把锁。接着如果线程A在进行数组扩容时,线程B要修改数组的某个元素f。那么就让修改元素的线程加入迁移,从而实现多线程并发扩容来提高效率。等数组扩容完成后,线程B才继续去修改元素f。最后通过高低位迁移逻辑计算出高位链和低位链,大大减少了数据迁移次数。

二.多线程并发扩容的原理

当存在多个线程并发扩容及数据迁移时,默认会给每个线程分配一个区间。这个区间的默认长度是16,每个线程会负责自己区间内的数据迁移工作。如果只有两个线程对长度为64的数组迁移数据,则每个线程要做2次迁移,迁移过程会依赖transferIndex来更新每个线程的迁移区间。

(3)ConcurrentHashMap并发扩容的流程

ConcurrentHashMap的transfer()方法用于处理数组扩容时的流程细节,该方法主要分为五部分:

第一部分:创建扩容后的数组

第二部分:计算当前线程的数据迁移区间

第三部分:更新扩容标记advance

第四部分:开始数据迁移和扩容

第五部分:完成迁移后的判断

第一部分:创建扩容后的数组

这部分代码主要做两件事情:

一.计算每个线程处理的迁移区间长度,默认是16。

二.初始化一个新的数组nt,赋值给方法入参nextTab和全局变量nextTable。该数组的长度是原数组的2倍,并且设置transferIndex的值为为原数组大小。

代码语言:javascript代码运行次数:0运行复制
public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> implements ConcurrentMap<K,V>, Serializable { 
    //The next table to use; non-null only while resizing.
    private transient volatile Node<K,V>[] nextTable;
    ...
    
    //Moves and/or copies the nodes in each bin to new table. 
    //tab是原数组,nextTab是扩容后的数组
    private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
        int n = tab.length, stride;
        //计算每个线程处理的迁移区间长度,默认是16
        if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) {
            stride = MIN_TRANSFER_STRIDE;//subdivide range
        }
        //初始化一个新的数组nt,赋值给方法入参nextTab和全局变量nextTable,该数组的长度是原数组的2倍
        //并且设置transferIndex的值为为原数组大小
        if (nextTab == null) {//initiating
            try {
                @SuppressWarnings("unchecked")
                Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];//扩容为2n
                nextTab = nt;//将创建的扩容数组赋值给nextTab
            } catch (Throwable ex) {//try to cope with OOME
                sizeCtl = Integer.MAX_VALUE;
                return;
            }
            nextTable = nextTab;//将创建的扩容数组赋值给nextTable
            transferIndex = n;//设置transferIndex为原来的数组大小
        }
        ...
    }
}

第二部分:计算当前线程的数据迁移区间

下面的while循环会计算每个执行到此处的线程需要负责的数据迁移区间。假设当前数组长度是32,需要扩容到64。那么此时transferIndex = 32,nextn = 64,n = 32。

当前线程第一次for循环:nextIndex被transferIndex赋值为32,之后CAS修改transferIndex。CAS修改成功后,nextBound = 32 - 16 = 16,transferIndex = 16。所以bound = 16,i = 31,当前线程负责的迁移区间为[bound, i] = [16, 31]。

当前线程第二次for循环,或者有其他线程进来第一次for循环:由于此时transferIndex = 16,所以nextIndex会被transferIndex赋值为16。之后CAS修改transferIndex为0,修改成功后,nextBound = 16 - 16 = 0。所以bound = 0,i = 15,此时线程负责的迁移区间为[bound, i] = [0, 15]。

需要注意的是:每次循环都会通过if (--i >= bound || finishing)判断区间是否已迁移完成。如果已完成,则会继续进入while循环中的CAS,获取新的迁移区间。

数组从高位往低位进行迁移,比如第一次for循环,处理的区间是[16, 31]。那么就会从位置为31开始往前进行遍历,对每个数组元素进行数据迁移。

代码语言:javascript代码运行次数:0运行复制
public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> implements ConcurrentMap<K,V>, Serializable { 
    //The next table to use; non-null only while resizing.
    private transient volatile Node<K,V>[] nextTable;
    ...
    
    //Moves and/or copies the nodes in each bin to new table. 
    //tab是原数组,nextTab是扩容后的数组
    private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
        ...
        int nextn = nextTab.length;//扩容后的数组长度
        //继承自Node的ForwardingNode表示一个正在被迁移的Node
        //当原数组中某位置的数据完成迁移后,会对该位置设置一个ForwardingNode,表示该位置已经处理过了
        ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
        //advance字段用来判断是否还有待处理的数据迁移工作,也就是扩容标记
        boolean advance = true;
        boolean finishing = false; // to ensure sweep before committing nextTab
        //当前线程负责的迁移区间是[bound, i]
        for (int i = 0, bound = 0;;) {
            Node<K,V> f; int fh;
            //while循环会计算每个执行到此处的线程需要负责的数据迁移区间
            while (advance) {
                //假设当前数组长度是32,需要扩容到64;
                //那么此时transferIndex = 32,nextn = 64,n = 32;
                //刚开始循环时i = 0,nextIndex被transferIndex赋值为32
                int nextIndex, nextBound;
                if (--i >= bound || finishing) {
                    //一开始i = bound = 0,所以不会进入这里,而是进入UpareAndSwapInt()的条件中
                    //但后来bound = 16, i = 31后,就会进入这里,退出循环
                    //此后,每次--i,当i = bound = 16时,就又会进入UpareAndSwapInt()的条件中,重新获取数据迁移区间
                    advance = false;
                } else if ((nextIndex = transferIndex) <= 0) {
                    //判断当前线程是否已经分配到了新的迁移区间
                    i = -1;
                    advance = false;
                } else if (UpareAndSwapInt(this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { 
                    //如果CAS设置transferIndex从nextIndex=32变为nextIndex-16=16成功后,
                    //那么advance设置为false,退出while循环
                    //当前线程再次进入while循环或者其他线程也进入while循环,就会从transferIndex = 16开始,计算要负责的迁移区间
                    bound = nextBound;//第一次for循环nextBound = 16
                    i = nextIndex - 1;//第一次for循环i = 31
                    advance = false;
                }
            }
            ...
        }
        ...
    }
    
    //A node inserted at head of bins during transfer operations.
    static final class ForwardingNode<K,V> extends Node<K,V> {
        final Node<K,V>[] nextTable;
        ForwardingNode(Node<K,V>[] tab) {
            super(MOVED, null, null, null);
            this.nextTable = tab;
        }
        ...
    }
}

第三部分:更新扩容标记advance

如果位置i的数组元素Node为空,说明该Node对象不需要迁移。所以通过casTabAt()方法修改原数组在位置i的元素为fwd对象,这样其他线程在进行put()操作的时候就可以发现当前数组正在扩容。

如果位置i的数组元素Node的hash值为MOVED,那么说明该Node对象已经被迁移了。所以设置扩容标记位advance为true,等下次for循环时进入while循环--i。

代码语言:javascript代码运行次数:0运行复制
public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> implements ConcurrentMap<K,V>, Serializable { 
    //The next table to use; non-null only while resizing.
    private transient volatile Node<K,V>[] nextTable;
    
    //The array of bins. Lazily initialized upon first insertion.
    //Size is always a power of two. Accessed directly by iterators.
    transient volatile Node<K,V>[] table;
    
    ...
    //Moves and/or copies the nodes in each bin to new table. 
    //tab是原数组,nextTab是扩容后的数组
    private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
        ...
        ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
        ...
        //当前线程负责的迁移区间是[bound, i]
        for (int i = 0, bound = 0;;) {
            ...
            } else if ((f = tabAt(tab, i)) == null) {
                //第三部分开始:更新扩容标记advance,这样其他线程在put()的时候就可以发现当前数组正在扩容
                advance = casTabAt(tab, i, null, fwd);
            } else if ((fh = f.hash) == MOVED) {
                //设置扩容标记位advance为true,等下次for循环时进入while循环--i
                advance = true; // already processed
                //第三部分结束
            } else {
            ...
        }
        ...
    }
    
    //虽然table变量使用了volatile修饰,但这只保证了table引用对于所有线程的可见性,还不能保证table数组中的元素的修改对于所有线程是可见的 
    //因此需要通过Unsafe类的getObjectVolatile()来保证table数组中的元素的可见性
    static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
        return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
    }

    static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v) {
        return UpareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
    }
}

第四部分:开始数据迁移和扩容

首先对当前要迁移的Node结点f添加同步锁synchronized,避免多线程竞争。

如果结点f的哈希值大于0,则表示Node结点f为链表或普通结点,那么此时就需要按照链表或普通结点的方式来进行数据迁移。

如果结点f属于TreeBin类型,则表示结点f为红黑树,那么此时就要按红黑树的规则进行数据迁移。

需要注意的是,数据迁移之后可能会存在红黑树转化为链表的情况,当链表长度小于等于6时,红黑树就会转化为链表。

接着使用高位链和低位链的计算方法构造高位链和低位链,遍历链表的每一个结点,计算p.hash & n的值。如果值为0,表示需要迁移,属于高位链;否则不需要迁移,属于低位链。

比如在数组长度为16的一个链表中,hash值为:4, 20, 52, 68, 84, 100。经过hash & (n - 1)得到的下标位置都是4,接着数组长度需要扩容到32。于是经过hash & (n - 1)计算,发现20, 52, 84对应的下标变成了20。这就意味着,这个链表中hash值为20, 52, 84的结点需要迁移到位置20。

最后把低位链设置到扩容后的数组的位置i,把高位链设置到位置i + n。此时当前线程已处理完位置为i的数据迁移,于是设置advance为true,让后续的for循环可以进入while循环来实现对i的递减继续迁移数据。

第五部分:完成迁移后的判断

如果数据迁移完成了,则把扩容后的数组赋值给table。如果还没完成数据迁移,则通过CAS修改并发扩容的线程数。

代码语言:javascript代码运行次数:0运行复制
public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> implements ConcurrentMap<K,V>, Serializable { 
    //The next table to use; non-null only while resizing.
    private transient volatile Node<K,V>[] nextTable;
    ...
    
    //Moves and/or copies the nodes in each bin to new table. 
    private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
        //第一部分开始:创建扩容后的数组
        int n = tab.length, stride;//n就是原数组大小
        //计算每个线程处理的迁移区间长度,默认是16
        if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) {
            stride = MIN_TRANSFER_STRIDE;//subdivide range
        }
        //初始化一个新的数组nt,赋值给方法入参nextTab和全局变量nextTable,该数组的长度是原数组的2倍
        //并且设置transferIndex的值为为原数组大小
        if (nextTab == null) {//initiating
            try {
                @SuppressWarnings("unchecked")
                Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];//扩容为2n
                nextTab = nt;//将创建的扩容数组赋值给nextTab
            } catch (Throwable ex) {//try to cope with OOME
                sizeCtl = Integer.MAX_VALUE;
                return;
            }
            nextTable = nextTab;//将创建的扩容数组赋值给nextTable
            transferIndex = n;//设置transferIndex为原来的数组大小
        }
        //第一部分结束
        //第二部分开始:计算当前线程的数据迁移区间
        int nextn = nextTab.length;//扩容后的数组长度
        //继承自Node的ForwardingNode表示一个正在被迁移的Node
        //当原数组中某位置的数据完成迁移后,会对该位置设置一个ForwardingNode,表示该位置已经处理过了
        ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
        //advance字段用来判断是否还有待处理的数据迁移工作
        boolean advance = true;
        boolean finishing = false; // to ensure sweep before committing nextTab
       
        //当前线程负责的迁移区间是[bound, i]
        for (int i = 0, bound = 0;;) {
            Node<K,V> f; int fh;
            //while循环会计算每个执行到此处的线程需要负责的数据迁移区间
            while (advance) {
                //假设当前数组长度是32,需要扩容到64;
                //那么此时transferIndex = 32,nextn = 64,n = 32;
                //刚开始循环时i = 0,nextIndex被transferIndex赋值为32
                int nextIndex, nextBound;
                if (--i >= bound || finishing) {
                    //一开始i = bound = 0,所以不会进入这里,而是进入UpareAndSwapInt()的条件中
                    //但后来bound = 16, i = 31后,就会进入这里,退出循环
                    //此后,每次--i,当i = bound = 16时,就又会进入UpareAndSwapInt()的条件中,重新获取数据迁移区间
                    advance = false;
                } else if ((nextIndex = transferIndex) <= 0) {
                    //判断当前线程是否已经分配到了新的迁移区间
                    i = -1;
                    advance = false;
                } else if (UpareAndSwapInt(this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { 
                    //如果CAS设置transferIndex从nextIndex=32变为nextIndex-16=16成功后,
                    //那么advance设置为false,退出while循环
                    //当前线程再次进入while循环或者其他线程也进入while循环,就会从transferIndex = 16开始,计算要负责的迁移区间
                    bound = nextBound;//第一次for循环nextBound = 16
                    i = nextIndex - 1;//第一次for循环i = 31
                    advance = false;
                }
            }
            //第二部分结束
            if (i < 0 || i >= n || i + n >= nextn) {
                //第五部分开始:完成迁移后的判断
                int sc;
                //如果数据迁移完成了,则把扩容后的数组赋值给table
                if (finishing) {
                    nextTable = null;
                    table = nextTab;
                    sizeCtl = (n << 1) - (n >>> 1);
                    return;
                }
                //如果还没完成数据迁移,则通过CAS修改并发扩容的线程数
                if (UpareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                    if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) {
                        return;
                    }
                    finishing = advance = true;
                    i = n; // recheck before commit
                }
                //第五部分结束
            } else if ((f = tabAt(tab, i)) == null) {
                //第三部分开始:更新扩容标记advance,这样其他线程在put()的时候就可以发现当前数组正在扩容
                advance = casTabAt(tab, i, null, fwd);
            } else if ((fh = f.hash) == MOVED) {
                //设置扩容标记位advance为true,等下次for循环时进入while循环--i
                advance = true; // already processed
                //第三部分结束
            } else {
                //第四部分开始:开始数据迁移和扩容
                synchronized (f) {//首先对当前要迁移的Node结点f添加同步锁synchronized,避免多线程竞争
                    if (tabAt(tab, i) == f) {
                        Node<K,V> ln, hn;
                        //如果fh >= 0,则表示Node结点f为链表或普通结点,此时需要按照链表或普通结点的方式来进行数据迁移
                        if (fh >= 0) {
                            int runBit = fh & n;
                            Node<K,V> lastRun = f;
                            //for循环遍历链表,计算出当前链表最后一个需要迁移或者不需要迁移的结点位置
                            //遍历链表的每一个结点,计算p.hash & n,如果值为0,表示需要迁移,否则不需要迁移
                            for (Node<K,V> p = f.next; p != null; p = p.next) {
                                int b = p.hash & n;
                                if (b != runBit) {
                                    runBit = b;
                                    lastRun = p;
                                }
                            }
                            if (runBit == 0) {
                                ln = lastRun;
                                hn = null;
                            } else {
                                hn = lastRun;
                                ln = null;
                            }
                            for (Node<K,V> p = f; p != lastRun; p = p.next) {
                                int ph = p.hash; K pk = p.key; V pv = p.val;
                                if ((ph & n) == 0) {
                                    //将ln作为参数,以ln为基础构造低位链,不需要迁移
                                    ln = new Node<K,V>(ph, pk, pv, ln);
                                } else {
                                    //将hn作为参数,以hn为基础构造高位链,需要迁移
                                    hn = new Node<K,V>(ph, pk, pv, hn);
                                }
                            }
                            //把低位链设置到扩容后的数组的位置i
                            setTabAt(nextTab, i, ln);
                            //把高位链设置到扩容后的数组的位置i + n
                            setTabAt(nextTab, i + n, hn);
                            //当原数组中某位置的数据完成迁移后,会对该位置设置一个ForwardingNode,表示该位置已经处理过了
                            setTabAt(tab, i, fwd);
                            //当前线程已处理完位置为i的数据的迁移,于是设置advance为true,让后续的for循环继续进入while循环来实现对i的递减
                            advance = true;
                        } else if (f instanceof TreeBin) {
                            //如果f instanceof TreeBin,则表示结点f为红黑树,需要按照红黑树的规则进行数据迁移
                            //需要注意的是,数据迁移之后可能会存在红黑树转化为链表的情况,当链表长度小于等于6时,红黑树就会转化为链表
                            TreeBin<K,V> t = (TreeBin<K,V>)f;
                            TreeNode<K,V> lo = null, loTail = null;
                            TreeNode<K,V> hi = null, hiTail = null;
                            int lc = 0, hc = 0;
                            for (Node<K,V> e = t.first; e != null; e = e.next) {
                                int h = e.hash;
                                TreeNode<K,V> p = new TreeNode<K,V>(h, e.key, e.val, null, null);
                                if ((h & n) == 0) {
                                    if ((p.prev = loTail) == null) {
                                        lo = p;
                                    } else {
                                        loTail.next = p;
                                    }
                                    loTail = p;
                                    ++lc;
                                } else {
                                    if ((p.prev = hiTail) == null) {
                                        hi = p;
                                    } else {
                                        hiTail.next = p;
                                    }
                                    hiTail = p;
                                    ++hc;
                                }
                            }
                            ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin<K,V>(lo) : t;
                            hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin<K,V>(hi) : t;
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
                    }
                }
                //第四部分结束
            }
        }
    }
    ...
}

与本文相关的文章

发布评论

评论列表(0)

  1. 暂无评论