前言
面试中常常问到Hashtable、HashMap和ConcurrentHashMap的区别。大家都知道HashMap是线程不安全的,Hashtable和ConcurrentHashMap是线程安全的。Hashtable保证线程安全的方法,基本都是在操作集合的方法上加synchronized关键字,我们有必要知道ConcurrentHashMap底层实现和如何保证线程安全性。ConcurrentHashMap引入了分段锁的概念,对于不同Bucket的操作不需要全局锁来保证线程安全。
建议学习ConcurrentHashMap之前,先学习Hashtable和HashMap,并且要有一些并发的基础。
在看源码的时候一定要注意,本文讲的是JDK1.8的源码,ConcurrentHashMap在JDK1.8相比之前的版本有很大变化,1.8版本的源码达到6000+行,而1.6的源码只有一千多行。

在JDK1.6的实现中,使用的是一个segments数组来存储。
final Segment<K,V>[] segments;

JDK1.8中是使用Node数组来存储。
transient volatile Node<K,V>[] table;
JDK1.8中,Segment类只有在序列化和反序列化时才会被用到。

ConcurrentHashMap数据结构

其底层数据结构实现如下。

可以看到,桶中的结构可能是链表,也可能是红黑树,红黑树是为了提高查找效率。


ConcurrentHashMap的继承关系
public class ConcurrentHashMap<K,V> extends AbstractMap<K,V>
    implements ConcurrentMap<K,V>, Serializable

其内部框架图如下。

ConcurrentHashMap为什么高效?
Hashtable低效主要是因为所有访问Hashtable的线程都争夺一把锁。如果容器有很多把锁,每一把锁控制容器中的一部分数据,那么当多个线程访问容器里的不同部分的数据时,线程之前就不会存在锁的竞争,这样就可以有效的提高并发的访问效率。
这也正是ConcurrentHashMap使用的分段锁技术。将ConcurrentHashMap容器的数据分段存储,每一段数据分配一个Segment(锁),当线程占用其中一个Segment时,其他线程可正常访问其他段数据。

static class Segment<K,V> extends ReentrantLock implements Serializable {
        private static final long serialVersionUID = 2249069246763182397L;
        final float loadFactor;
        Segment(float lf) { this.loadFactor = lf; }
    }


ConcurrentHashMap的用到的重要内部类

Node类
该类是此集合最重要的一个内部类。只要是插入ConcurrentHashMap的结点,都会被包装成Node
static class Node<K,V> implements Map.Entry<K,V> {
        final int hash;
        final K key;
        volatile V val;
        volatile Node<K,V> next;
        Node(int hash, K key, V val, Node<K,V> next) {
            this.hash = hash;
            this.key = key;
            this.val = val;
            this.next = next;
        }
        public final K getKey()       { return key; }
        public final V getValue()     { return val; }
        public final int hashCode()   { return key.hashCode() ^ val.hashCode(); }
        public final String toString(){ return key + \"=\" + val; }
        public final V setValue(V value) {
            throw new UnsupportedOperationException(); //不允许被修改val
        }
        public final boolean equals( o) {
            k, v, u; Map.Entry<?,?> e;
            return ((o instanceof Map.Entry) &&
                    (k = (e = (Map.Entry<?,?>)o).getKey()) != null &&
                    (v = e.getValue()) != null &&
                    (k == key || k.equals(key)) &&
                    (v == (u = val) || v.equals(u)));
        }
        /**
         * 支持map的get方法,通过hashcode和Key来获取一个node结点
         */
        Node<K,V> find(int h, k) {
            Node<K,V> e = this;
            if (k != null) {
                do {
                    K ek;
                    if (e.hash == h &&
                        ((ek = e.key) == k || (ek != null && k.equals(ek))))
                        return e;
                } while ((e = e.next) != null);
            }
            return null;
        }
    }

TreeNode类
static final class TreeNode<K,V> extends Node<K,V> {
        TreeNode<K,V> parent;  // red-black tree s
        TreeNode<K,V> left;
        TreeNode<K,V> right;
        TreeNode<K,V> prev;    // needed to un next upon deletion
        boolean red;
        TreeNode(int hash, K key, V val, Node<K,V> next,
                 TreeNode<K,V> parent) {
            super(hash, key, val, next);
            this.parent = parent;
        }
        Node<K,V> find(int h, k) {
            return findTreeNode(h, k, null);
        }
        /**
         * Returns the TreeNode (or null if not found) for the given key
         * starting at given root.
         */
        final TreeNode<K,V> findTreeNode(int h, k, Class<?> kc) {
            if (k != null) {
                TreeNode<K,V> p = this;
                do  {
                    int ph, dir; K pk; TreeNode<K,V> q;
                    TreeNode<K,V> pl = p.left, pr = p.right;
                    if ((ph = p.hash) > h)
                        p = pl;
                    else if (ph < h)
                        p = pr;
                    else if ((pk = p.key) == k || (pk != null && k.equals(pk)))
                        return p;
                    else if (pl == null)
                        p = pr;
                    else if (pr == null)
                        p = pl;
                    else if ((kc != null ||
                              (kc = comparableClassFor(k)) != null) &&
                             (dir = compareComparables(kc, k, pk)) != 0)
                        p = (dir < 0) ? pl : pr;
                    else if ((q = pr.findTreeNode(h, k, kc)) != null)
                        return q;
                    else
                        p = pl;
                } while (p != null);
            }
            return null;
        }
    }

如果链表的数据过长是会转换为红黑树来处理。当它并不是直接转换,而是将这些链表的节点包装成TreeNode放在TreeBin对象中,然后由TreeBin完成红黑树的转换。TreeBin类的代码很长,其实TreeBin的构造方法就是一个建立红黑树的过程。

ConcurrentHashMap的初始化
ConcurrentHashMap的table初始化是在putVal时发生的。putVal时,如果发现table没有初始化,那么会调用initTable来初始化table,注意initTable在扩容时也会被执行。
private final Node<K,V>[] initTable() {
        Node<K,V>[] tab; int sc;
        while ((tab = table) == null || tab.length == 0) {
            if ((sc = sizeCtl) < 0) //sizeCtl为负数时,说明有其他线程在初始化
                Thread.yield();  //让出时间片
            else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { //把SIZECTL置为-1,接下来本线程对table进行操作
                try {
                    if ((tab = table) == null || tab.length == 0) {
                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                        @SuppressWarnings(\"unchecked\")
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = tab = nt;
                        sc = n - (n >>> 2);  //下一次扩容的阀值,大小为0.75*n
                    }
                } finally {
                    sizeCtl = sc;
                }
                break;
            }
        }
        return tab;
    }


上面的代码中,sizeCtl是一个比较重要的变量。
 /**
     * Table initialization and resizing control.  When negative, the
     * table is being initialized or resized: -1 for initialization,
     * else -(1 + the number of active resizing threads).  Otherwise,
     * when table is null, holds the initial table size to use upon
     * creation, or 0 for default. After initialization, holds the
     * next element count value upon which to resize the table.
     */
    private transient volatile int sizeCtl;

sizeCtl是一个控制标识符,用来控制table初始化和扩容操作的,在不同的地方有不同的用途,其值也不同,所代表的含义也不同:
负数代表正在进行初始化或扩容操作,-1时表示在初始化中,-N表示有N-1个线程正在进行扩容操作。
正数或0代表hash表还没有被初始化,这个数值表示初始化或下一次进行扩容的大小。

所以,如果某个线程想要初始化table或者对table扩容,需要去竞争sizeCtl这个共享变量,获得变量的线程才有许可去进行接下来的操作,没能获得的线程将会一直自旋来尝试获得这个共享变量,所以获得sizeCtl这个变量的线程在完成工作之后需要设置回来,使得其他的线程可以走出自旋进行接下来的操作。

initTable方法中我们可以看到,当线程发现sizeCtl小于0的时候,他就会让出CPU时间,而稍后再进行尝试,当发现sizeCtl不再小于0的时候,就会通过调用方法compareAndSwapInt来把sizeCtl共享变量变为-1,以告诉其他试图获得sizeCtl变量的线程,目前正在由本线程在享用该变量,在我完成我的任务之前你可以先休息一会,等会再来试试吧,我完成工作之后会释放掉的。在完成初始化table的任务之后,线程需要将sizeCtl设置成可以使得其他线程获得变量的状态。

这里需要解释一下这句代码的重要性。
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { //把SIZECTL置为-1,接下来本线程对table进行操作

在并发编程的情况下,线程A和线程B都试图初始化table,线程A优先进行了初始化,但是此时初始化还没有完成,所以table为null,但其实已经在初始化当中了,而线程B发现table为null,所以他会继续竞争sizeCtl来尝试初始化table,如果没有上面的第二次检测的代码,线程B也会进行初始化。

ConcurrentHashMap查询记录方法详解,get方法
在ConcurrentHashMap中查询一条记录首先需要知道这条记录存储的table的位置(可以看成卡槽,每个卡槽中都会有一个链表或者一棵红黑树),该位置上可能为null,如果为null,说明想要查询的记录还不存在于ConcurrentHashMap中,否则,就在该位置上的链表或者红黑树中查找记录。

public V get( key) {
        Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
        int h = spread(key.hashCode()); //计算key对应的hashcode
        if ((tab = table) != null && (n = tab.length) > 0 &&
            (e = tabAt(tab, (n - 1) & h)) != null) { // 表不为空并且表的长度大于0并且key所在的桶不为空
            if ((eh = e.hash) == h) {  //获取到的val去hash然后和前面的hashcode比较
                if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                    return e.val;
            }
            else if (eh < 0) //hash值小于0时,在此桶对应的红黑树查找
                return (p = e.find(h, key)) != null ? p.val : null;
            while ((e = e.next) != null) {  //结点hash值大于0的情况,在链表中查找
                if (e.hash == h &&
                    ((ek = e.key) == key || (ek != null && key.equals(ek))))
                    return e.val;
            }
        }
        return null;
    }

get操作可以总结如下:
计算hash值
判断table是否为空,如果为空,直接返回null
根据hash值获取table中的Node节点(tabAt(tab, (n - 1) & h)),然后根据链表或者树形方式找到相对应的节点,返回其value值。


ConcurrentHashMap的put方法
ConcurrentHashMap的put操作与HashMap并没有多大区别,其核心思想依然是根据hash值计算节点插入在table的位置,如果该位置为空,则直接插入,否则插入到链表或者树中。但是ConcurrentHashMap会涉及到多线程情况就会复杂很多。
public V put(K key, V value) {
        return putVal(key, value, false);
    }
    /** Implementation for put and putIfAbsent */
    final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
        int hash = spread(key.hashCode()); //计算hash
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();  //初始化table
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { //hash到的桶为null,直接保存结点
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);  // 有线程正在进行扩容操作,则先帮助扩容
            else {
                V oldVal = null;
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        if (fh >= 0) {  //fh >= 0 表示为链表,将该节点插入到链表尾部
                            binCount = 1; //binCount表示每个桶里面的node数
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;  //hash和key都一样,替换原来的val
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);  //把结点插入链表尾部
                                    break;
                                }
                            }
                        }
                        else if (f instanceof TreeBin) { //如果结点不是要插入链表,那就要插入红黑树
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                if (binCount != 0) {
                    if (binCount >= TREEIFY_THRESHOLD)  //桶里面的节点数大于TREEIFY_THRESHOLD
                        treeifyBin(tab, i);  //如果链表长度已经达到临界值8 就需要把链表转换为树结构
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        addCount(1L, binCount);
        return null;
    }

putVal操作可以总结如下:
一,判空,ConcurrentHashMap的key、value都不允许为null

二,计算hash。利用方法计算hash值。

三,遍历table,进行节点插入操作,过程如下:
如果table为空,则表示ConcurrentHashMap还没有初始化,则进行初始化操作:initTable()
根据hash值获取节点的位置i,若该位置为空,则直接插入,这个过程是不需要加锁的。计算f位置:i=(n - 1) & hash
如果检测到fh = f.hash == -1,则f是ForwardingNode节点,表示有其他线程正在进行扩容操作,则帮助线程一起进行扩容操作
如果f.hash >= 0 表示是链表结构,则遍历链表,如果存在当前key节点则替换value,否则插入到链表尾部。如果f是TreeBin类型节点,则按照红黑树的方法更新或者增加节点
若链表长度 >= TREEIFY_THRESHOLD(默认是8),则将链表转换为红黑树结构

四,调用addCount方法,ConcurrentHashMap的size + 1


ConcurrentHashMap的treeifyBin方法
在putVal方法中,我们可以看到如果binCount超过了界限值TREEIFY_THRESHOLD,会调用treeifyBin来把链表转化成红黑树。

 private final void treeifyBin(Node<K,V>[] tab, int index) {
        Node<K,V> b; int n, sc;
        if (tab != null) {
            if ((n = tab.length) < MIN_TREEIFY_CAPACITY)  //table的size小于最小treeify的size
                tryPresize(n << 1);  //扩容,避免同一个桶里面的结点数过多

            else if ((b = tabAt(tab, index)) != null && b.hash >= 0) { //桶中存在结点并且结点的hash值大于等于0
                synchronized (b) {  //对桶中第一个结点加锁,也就是对单个桶进行加锁
                    if (tabAt(tab, index) == b) {
                        TreeNode<K,V> hd = null, tl = null;
                        for (Node<K,V> e = b; e != null; e = e.next) {  //遍历链表,修改成红黑树
                            TreeNode<K,V> p =
                                new TreeNode<K,V>(e.hash, e.key, e.val,
                                                  null, null);
                            if ((p.prev = tl) == null)
                                hd = p;
                            else
                                tl.next = p;
                            tl = p;
                        }
                        setTabAt(tab, index, new TreeBin<K,V>(hd));
                    }
                }
            }
        }
    }

此函数用于将桶中的数据结构转化为红黑树,其中,值得注意的是,当table的长度未达到阈值时,会进行一次扩容操作,该操作会使得触发treeifyBin操作的某个桶中的所有元素进行一次重新分配,这样可以避免某个桶中的结点数量太大。
转化的时候,要把对应的桶加锁。


ConcurrentHashMap的size方法
size 和mappingCount都可以用来返回ConcurrentHashMap的估计size,因为在进行统计的时候有其他线程正在进行插入和删除操作,所以size只能是一个估计值。当然为了这个不精确的值,ConcurrentHashMap也是操碎了心。
public int size() {
        long n = sumCount();
        return ((n < 0L) ? 0 :
                (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
                (int)n);
    }

public long mappingCount() {
        long n = sumCount();
        return (n < 0L) ? 0L : n; // ignore transient negative values
    }

从mappingCount的方法注释上可以知道,计算size的时候更推崇mappingCount方法。


size方法调用了sumCount这个方法。
final long sumCount() {
        CounterCell[] as = counterCells; CounterCell a;
        long sum = Count;  //ConcurrentHashMap中元素个数,但返回的不一定是当前Map的真实元素个数。基于CAS无锁更新
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    sum += a.value;
            }
        }
        return sum;
    }
sumCount()就是迭代counterCells来统计sum的过程。我们知道put操作时,肯定会影响size()。putVal时,会调用addCount方法来修改map的size,在addCount中就是通过操作counterCells来保留size的信息的。

ConcurrentHashMap的扩容操作
当ConcurrentHashMap中table元素个数达到了容量阈值(sizeCtl)时,则需要进行扩容操作。在put操作时最后一个会调用addCount(long x, int check),该方法主要做两个工作:1.更新 Count;2.检测是否需要扩容操作。
private final void addCount(long x, int check) {
        CounterCell[] as; long b, s;
        // 这里一段代码省略了,作用是更新 Count
    
        //check >= 0 :则需要进行扩容操作
        if (check >= 0) {
            Node<K,V>[] tab, nt; int n, sc;
            while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
                    (n = tab.length) < MAXIMUM_CAPACITY) {
                int rs = resizeStamp(n);
                if (sc < 0) {
                    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                            sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                            transferIndex <= 0)
                        break;
                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                        transfer(tab, nt);
                }
                //当前线程是唯一的或是第一个发起扩容的线程  此时nextTable=null
                else if (U.compareAndSwapInt(this, SIZECTL, sc,
                        (rs << RESIZE_STAMP_SHIFT) + 2))
                    transfer(tab, null);
                s = sumCount();
            }
        }
    }

上面的代码中,有一个重要的方法transfer,这个就是核心的扩容操作。分两步:
构建一个nextTable,其大小为原来大小的两倍,这个步骤是在单线程环境下完成的
将原来table里面的内容复制到nextTable中,这个步骤是允许多线程操作的,所以性能得到提升,减少了扩容的时间消耗

private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
        int n = tab.length, stride;
        // 每核处理的量小于16,则强制赋值16
        if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
            stride = MIN_TRANSFER_STRIDE; // subdivide range
        if (nextTab == null) {            // initiating
            try {
                @SuppressWarnings(\"unchecked\")
                Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];        //构建一个nextTable对象,其容量为原来容量的两倍
                nextTab = nt;
            } catch (Throwable ex) {      // try to cope with OOME
                sizeCtl = Integer.MAX_VALUE;
                return;
            }
            nextTable = nextTab;
            transferIndex = n;
        }
        int nextn = nextTab.length;
        // 连接点指针,用于标志位(fwd的hash值为-1,fwd.nextTable=nextTab)
        ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
        // 当advance == true时,表明该节点已经处理过了
        boolean advance = true;
        boolean finishing = false; // to ensure sweep before committing nextTab
        for (int i = 0, bound = 0;;) {
            Node<K,V> f; int fh;
            // 控制 --i ,遍历原hash表中的节点
            while (advance) {
                int nextIndex, nextBound;
                if (--i >= bound || finishing)
                    advance = false;
                else if ((nextIndex = transferIndex) <= 0) {
                    i = -1;
                    advance = false;
                }
                // 用CAS计算得到的transferIndex
                else if (U.compareAndSwapInt
                        (this, TRANSFERINDEX, nextIndex,
                                nextBound = (nextIndex > stride ?
                                        nextIndex - stride : 0))) {
                    bound = nextBound;
                    i = nextIndex - 1;
                    advance = false;
                }
            }
            if (i < 0 || i >= n || i + n >= nextn) {
                int sc;
                
                if (finishing) {  // 已经完成所有节点复制了
                    nextTable = null;
                    table = nextTab;        // table 指向nextTable
                    sizeCtl = (n << 1) - (n >>> 1);     // sizeCtl阈值为原来的1.5倍
                    return;     // 跳出死循环,
                }
                // CAS 更新扩容阈值,在这里面sizectl值减一,说明新加入一个线程参与到扩容操作
                if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                    if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                        return;
                    finishing = advance = true;
                    i = n; // recheck before commit
                }
            }
            // 遍历的节点为null,则放入到ForwardingNode 指针节点
            else if ((f = tabAt(tab, i)) == null)
                advance = casTabAt(tab, i, null, fwd);
            // f.hash == -1 表示遍历到了ForwardingNode节点,意味着该节点已经处理过了
            // 这里是控制并发扩容的核心
            else if ((fh = f.hash) == MOVED)
                advance = true; // already processed
            else {
                // 节点加锁
                synchronized (f) {
                    // 节点复制工作
                    if (tabAt(tab, i) == f) {
                        Node<K,V> ln, hn;
                        // fh >= 0 ,表示为链表节点
                        if (fh >= 0) {
                            // 构造两个链表  一个是原链表  另一个是原链表的反序排列
                            int runBit = fh & n;
                            Node<K,V> lastRun = f;
                            for (Node<K,V> p = f.next; p != null; p = p.next) {
                                int b = p.hash & n;
                                if (b != runBit) {
                                    runBit = b;
                                    lastRun = p;
                                }
                            }
                            if (runBit == 0) {
                                ln = lastRun;
                                hn = null;
                            }
                            else {
                                hn = lastRun;
                                ln = null;
                            }
                            for (Node<K,V> p = f; p != lastRun; p = p.next) {
                                int ph = p.hash; K pk = p.key; V pv = p.val;
                                if ((ph & n) == 0)
                                    ln = new Node<K,V>(ph, pk, pv, ln);
                                else
                                    hn = new Node<K,V>(ph, pk, pv, hn);
                            }
                            // 在nextTable i 位置处插上链表
                            setTabAt(nextTab, i, ln);
                            // 在nextTable i + n 位置处插上链表
                            setTabAt(nextTab, i + n, hn);
                            // 在table i 位置处插上ForwardingNode 表示该节点已经处理过了
                            setTabAt(tab, i, fwd);
                            // advance = true 可以执行--i动作,遍历节点
                            advance = true;
                        }
                        // 如果是TreeBin,则按照红黑树进行处理,处理逻辑与上面一致
                        else if (f instanceof TreeBin) {
                            TreeBin<K,V> t = (TreeBin<K,V>)f;
                            TreeNode<K,V> lo = null, loTail = null;
                            TreeNode<K,V> hi = null, hiTail = null;
                            int lc = 0, hc = 0;
                            for (Node<K,V> e = t.first; e != null; e = e.next) {
                                int h = e.hash;
                                TreeNode<K,V> p = new TreeNode<K,V>
                                        (h, e.key, e.val, null, null);
                                if ((h & n) == 0) {
                                    if ((p.prev = loTail) == null)
                                        lo = p;
                                    else
                                        loTail.next = p;
                                    loTail = p;
                                    ++lc;
                                }
                                else {
                                    if ((p.prev = hiTail) == null)
                                        hi = p;
                                    else
                                        hiTail.next = p;
                                    hiTail = p;
                                    ++hc;
                                }
                            }
                            // 扩容后树节点个数若<=6,将树转链表
                            ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                                    (hc != 0) ? new TreeBin<K,V>(lo) : t;
                            hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                                    (lc != 0) ? new TreeBin<K,V>(hi) : t;
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
                    }
                }
            }
        }
    }

先抛开多线程的环境,在单线程的环境下,分析transfer的步骤如下:
一,为每个内核分任务,并保证其不小于16
二,检查nextTable是否为null,如果是,则初始化nextTable,使其容量为table的两倍
三,死循环遍历节点,知道finished:节点从table复制到nextTable中,支持并发,思路如下:
如果节点 f 为null,则插入ForwardingNode(采用Unsafe.compareAndSwap f方法实现),当一个线程遍历到的节点如果ForwardingNode,则继续往后遍历,如果不是,则将该节点加锁,防止其他线程进入
如果f为链表的头节点(fh >= 0),则先构造一个反序链表,然后把他们分别放在nextTable的i和i + n位置,并将ForwardingNode 插入原节点位置,代表已经处理过了
如果f为TreeBin节点,同样也是构造一个反序 ,同时需要判断是否需要进行unTreeify()操作,并把处理的结果分别插入到nextTable的i 和i+nw位置,并插入ForwardingNode 节点

四,所有节点复制完成后,则将table指向nextTable,同时更新sizeCtl = nextTable的0.75倍(如果又有结点加入map,总的结点数达到这个值,就会触发扩容)。

在多线程环境下,ConcurrentHashMap用两点来保证正确性:ForwardingNode和synchronized。当一个线程遍历到的节点如果是ForwardingNode,则继续往后遍历,如果不是,则将该节点加锁,防止其他线程进入,完成后设置ForwardingNode节点,以便要其他线程可以看到该节点已经处理过了,如此交叉进行,高效而又安全。

扩容过程如下。

参考:
http://www.cnblogs.com/leesf456/p/5453341.html
http://cmsblogs.com/?p=2283
http://www.cnblogs.com/leesf456/p/5453341.html
红黑树:http://cmsblogs.com/?p=2329
ConcurrentHashMap的弱一致性http://ifeve.com/concurrenthashmap-weakly-consistent/

HashMap、Hashtable、HashSet 和 ConcurrentHashMap 的比较http://www.54tianzhisheng.cn/2017/06/13/HashMap-Hashtable/
 

收藏 打印