`
BrokenDreams
  • 浏览: 248717 次
  • 性别: Icon_minigender_1
  • 来自: 北京
博客专栏
68ec41aa-0ce6-3f83-961b-5aa541d59e48
Java并发包源码解析
浏览量:97828
社区版块
存档分类
最新评论

Jdk1.6 JUC源码解析(25)-ConcurrentHashMap

阅读更多

Jdk1.6 JUC源码解析(25)-ConcurrentHashMap

作者:大飞

 

功能简介:
  • ConcurrentHashMap是一种线程安全的HashMap。相对于HashTable和Collections.synchronizedMap(),ConcurrentHashMap具有更好的性能和伸缩性,是由于其使用了分段锁的策略,将内部数据分为多个段,每个段单独加锁,而不是整个HashMap加锁,这样能减少很多不必要的锁争用。
 
源码分析:
  • ConcurrentHashMap实现了ConcurrentMap接口,先简单了解下这个接口:

 

public interface ConcurrentMap<K, V> extends Map<K, V> {
    /**
     * 如果map中已经存在给定的key,返回map中key对应的value;
     * 如果不存在给定的key,插入给定的key和value。
     * 这个是一个原子操作,逻辑相当于:
     *   if (!map.containsKey(key))
     *       return map.put(key, value);
     *   else
     *       return map.get(key);
     */
    V putIfAbsent(K key, V value);
    /**
     * 如果map中存在给定的key,并且map中对应的value也等于给定的value,
     * 那么删除这个key和value。
     * 这是一个原子操作,逻辑相当于:
     *   if (map.containsKey(key) && map.get(key).equals(value)) {
     *       map.remove(key);
     *       return true;
     *   } else return false;
     */
    boolean remove(Object key, Object value);
    /**
     * 如果map中存在给定的key,并且map中对应的value也等于给定的oldValue,
     * 那么将这个key对应的value替换成newValue。
     * 这是一个原子操作,逻辑相当于:
     *   if (map.containsKey(key) && map.get(key).equals(oldValue)) {
     *       map.put(key, newValue);
     *       return true;
     *   } else return false;
     */
    boolean replace(K key, V oldValue, V newValue);
    /**
     * 如果map中已经存在给定的key, 
     * 那么将这个key对应的value替换成给定的value。
     * 这是一个原子操作,逻辑相当于:
     *   if (map.containsKey(key)) {
     *       return map.put(key, value);
     *   } else return null;
     */
    V replace(K key, V value);
}
       ConcurrentMap扩展了Map接口,定义了上面4个原子操作方法。

 

 
  • 接下来看下ConcurrentHashMap的内部结构:

 

    /**
     * segment数组, 每一个segment都是一个hash table。
     */
    final Segment<K,V>[] segments;
 

 

 

       重点看下segment的实现吧,首先看数据结构:
    static final class Segment<K,V> extends ReentrantLock implements Serializable {
        private static final long serialVersionUID = 2249069246763182397L;
        /**
         * 记录segment(哈希表)中的元素数量。
         * 另一个重要角色就是其他操作会利用count的volatile读写来保证可见性,避免使用锁。
         */
        transient volatile int count;
        /**
         * 统计跟踪修改,用来保证一些批量操作的一致性。
         * 比如统计所有segment元素个数时,如果统计过程发现modCount变化
         * 那么需要重试。
         */
        transient int modCount;
        /**
         * 当哈希表的容量超过了这个阀值,表会扩容,里面的元素会重新散列。
         * 这个值一般是:capacity * loadFactor
         */
        transient int threshold;
        /**
         * 存放数组的哈希表。
         */
        transient volatile HashEntry<K,V>[] table;
        /**
         * 哈希表的加载因子。
         * @serial
         */
        final float loadFactor;
 
 

       再看下segment的构造方法:

        Segment(int initialCapacity, float lf) {
            loadFactor = lf;
            setTable(HashEntry.<K,V>newArray(initialCapacity));
        }
        
        void setTable(HashEntry<K,V>[] newTable) {
            threshold = (int)(newTable.length * loadFactor);
            table = newTable;
        }
    static final class HashEntry<K,V> {
        final K key;
        final int hash;
        volatile V value;
        final HashEntry<K,V> next;
        HashEntry(K key, int hash, HashEntry<K,V> next, V value) {
            this.key = key;
            this.hash = hash;
            this.next = next;
            this.value = value;
        }
	    @SuppressWarnings("unchecked")
	    static final <K,V> HashEntry<K,V>[] newArray(int i) {
	        return new HashEntry[i];
	    }
    }

 

 

       可见,构造segment时需要一个初始容量和一个加载因子,segment内部会创建一个长度为初始容量大小的HashEntry数组。
 
       如果对哈希表数据结构比较熟悉的话会知道,哈希表内部一般会有初始容量ic和加载因子lf,当哈希表中的元素数量达到(ic * lf)的时候,就会触发哈希表进行rehash。这有什么影响呢?假设哈希表使用链表法来解决哈希冲突,那么如果加载因子太大,会导致哈希表中每个桶里面的链表平均长度过长,这样会影响查询性能;但如果加载因子过小,又会浪费太多内存空间。所以也是一种时间和空间的权衡,需要按实际情况来选择合适的加载因子。
 
       最后看下ConcurrentHashMap的构造方法:
    /* ---------------- Constants -------------- */
    //默认segment中hashTable长度。
    static final int DEFAULT_INITIAL_CAPACITY = 16;
    //默认加载因子。
    static final float DEFAULT_LOAD_FACTOR = 0.75f;
    //默认table的并发级别,其实就是segment数组长度。
    static final int DEFAULT_CONCURRENCY_LEVEL = 16;
    //table的最大容量。
    static final int MAXIMUM_CAPACITY = 1 << 30;
    //允许的最大的segment数组长度。
    static final int MAX_SEGMENTS = 1 << 16; 
    /**
     * 在size和containsValue方法中,加锁之前的尝试操作次数。
     */
    static final int RETRIES_BEFORE_LOCK = 2;
    /* ---------------- Fields -------------- */
    /**
     * 计算segment下标的掩码。一个key的hash code高位(由segmentShift确定)用来确定segment下标。
     */
    final int segmentMask;
    /**
     * segment下标的位移值。
     */
    final int segmentShift;

    public ConcurrentHashMap(int initialCapacity,
                             float loadFactor, int concurrencyLevel) {
        if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
            throw new IllegalArgumentException();
        if (concurrencyLevel > MAX_SEGMENTS)
            concurrencyLevel = MAX_SEGMENTS; //concurrencyLevel不能超过最大值
        // Find power-of-two sizes best matching arguments
        int sshift = 0;
        int ssize = 1;
        while (ssize < concurrencyLevel) { 
            ++sshift;
            ssize <<= 1; //ssize最后是比concurrencyLevel大的最小的2的幂。
        }
        /*
         * 假设传入的concurrencyLevel是50,
         * 那么ssize就是64,sshift就是6,segmentMask就是 00000000 00000000 00000000 00111111*/
        segmentShift = 32 - sshift;
        segmentMask = ssize - 1; 
        this.segments = Segment.newArray(ssize);
        if (initialCapacity > MAXIMUM_CAPACITY)
            initialCapacity = MAXIMUM_CAPACITY;
        int c = initialCapacity / ssize;
        if (c * ssize < initialCapacity)
            ++c;
        int cap = 1;
        while (cap < c)
            cap <<= 1; //cap其实就是比总体容量平均分到每个segment的数量大的最小的2的幂...有点绕,
        for (int i = 0; i < this.segments.length; ++i)
            this.segments[i] = new Segment<K,V>(cap, loadFactor); //把segment都初始化一下。
    }

    public ConcurrentHashMap(int initialCapacity, float loadFactor) {
        this(initialCapacity, loadFactor, DEFAULT_CONCURRENCY_LEVEL);
    }

    public ConcurrentHashMap(int initialCapacity) {
        this(initialCapacity, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
    }
       可以看到构造方法中计算了几个重要的数:segment掩码、segment位移值、segment数组长度和segment内部哈希表容量,注意后两个都是2的幂,想到了什么? a & (b - 1)吧哈哈。
 
       这里做个小结:ConcurrentMap内部包含一个segment的数组;而segment本身又是一个哈希表,并且自带锁;内部哈希表使用链表法解决哈希冲突,每个数组元素是一个单链表。
 
  • 现在从插入和获取操作切入,来理解源码。先看下插入操作:
    public V put(K key, V value) {
        if (value == null)
            throw new NullPointerException();
        int hash = hash(key.hashCode());
        return segmentFor(hash).put(key, hash, value, false);
    }

       注意到,put过程中,首先要根据key的hashCode,再次算一个hash值出来;其次是要根据这个hash值来确定一个segment,然后把key-value存到这个segment里面。

    /**
     * Applies a supplemental hash function to a given hashCode, which
     * defends against poor quality hash functions.  This is critical
     * because ConcurrentHashMap uses power-of-two length hash tables,
     * that otherwise encounter collisions for hashCodes that do not
     * differ in lower or upper bits.
     */
    private static int hash(int h) {
        // Spread bits to regularize both segment and index locations,
        // using variant of single-word Wang/Jenkins hash.
        h += (h <<  15) ^ 0xffffcd7d;
        h ^= (h >>> 10);
        h += (h <<   3);
        h ^= (h >>>  6);
        h += (h <<   2) + (h << 14);
        return h ^ (h >>> 16);
    }
    /**
     * Returns the segment that should be used for key with given hash
     * @param hash the hash code for the key
     * @return the segment
     */
    final Segment<K,V> segmentFor(int hash) {
        return segments[(hash >>> segmentShift) & segmentMask];
    }

       首先看下这个hash算法,它相当于在key本身的hashCode上做了加强,再次hash一次,使得hash值更加散列。这样做的原因是因为ConcurrentHashMap中哈希表的长度都是2的幂,会增加一些冲突几率,比如两个hashCode高位不同但低位相同,对哈希表长度取模时正好忽略了这些高位,造成冲突。这里是采用了Wang/Jenkins哈希算法的一个变种,更多相关信息可以google之。

       接下来是确定segment的步骤,在上面ConcurrentHashMap的构造方法中我们看到,sshift和segmentMask有个关系,如果sshift=6,那么segmentMask后面就有6个为1的bit。 其实这里是用hash值除去低sshift位剩余的高位,来确定segment的下标。
 
       定位到了segment,继续看segment中怎么put元素的:
        V put(K key, int hash, V value, boolean onlyIfAbsent) {
            lock();// 加锁
            try {
                int c = count;
                if (c++ > threshold) // ensure capacity
                    rehash(); // 如果添加一个元素后,超过扩容阀值,那么进行rehash。
                HashEntry<K,V>[] tab = table;
                int index = hash & (tab.length - 1); // 对hash取模算出key对应的哈希表的桶的下标。
                HashEntry<K,V> first = tab[index]; // 找出桶的第一个节点
                HashEntry<K,V> e = first;
                while (e != null && (e.hash != hash || !key.equals(e.key)))
                    e = e.next; // 遍历一下桶里的单链表,看看有没有相同的。
                V oldValue;
                if (e != null) {
                    // 如果找到了相同的,记录旧值
                    oldValue = e.value;
                    if (!onlyIfAbsent) // 并且有覆盖标识
                        e.value = value; // 那么覆盖这个值
                }
                else {
                    // 如果没找到相同的。
                    oldValue = null;
                    ++modCount; // 因为会改变哈希表元素个数,所以modCount累加。
                    // 将元素设置为桶内新的第一个节点。
                    tab[index] = new HashEntry<K,V>(key, hash, first, value); 
                    count = c; // 注意这里做了一个volatile写。
                }
                return oldValue; // 返回旧值。
            } finally {
                unlock(); // 解锁。
            }
        }
       代码也比较容易理解,注意有rehash的情况,看下rehash方法:
        void rehash() {
            HashEntry<K,V>[] oldTable = table;
            int oldCapacity = oldTable.length;
            if (oldCapacity >= MAXIMUM_CAPACITY)
                return; //不能超过最大容量。
            /*
             * 将哈希表中所有桶里的节点重新分配到新哈希表中。
             * 由于使用的容量是2的幂,所有一部分节点会分配到新哈希表中相同 
             * 下标的桶里,这样我们就可以重用这些节点,而无需重新创建。
             * 按照统计数据,在默认的加载因子下,大约只有六分之一的节点在 
             * 哈希表扩容的时候需要拷贝(重新创建对象)。
             */
            HashEntry<K,V>[] newTable = HashEntry.newArray(oldCapacity<<1);
            threshold = (int)(newTable.length * loadFactor);
            int sizeMask = newTable.length - 1;
            for (int i = 0; i < oldCapacity ; i++) {
                HashEntry<K,V> e = oldTable[i];
                if (e != null) {
                    HashEntry<K,V> next = e.next;
                    int idx = e.hash & sizeMask;
                    if (next == null)
                        newTable[idx] = e; // 链表上唯一的节点,直接复制到新table。
                    else {
                        // 重用从尾部往前能定位到新table中同一个桶的,最长的连续节点。
                        HashEntry<K,V> lastRun = e;
                        int lastIdx = idx;
                        for (HashEntry<K,V> last = next;
                             last != null;
                             last = last.next) {
                            int k = last.hash & sizeMask;
                            if (k != lastIdx) {
                                lastIdx = k;
                                lastRun = last;
                            }
                        }
                        newTable[lastIdx] = lastRun;
                        // 其他的节点就copy过去了。
                        for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
                            int k = p.hash & sizeMask;
                            HashEntry<K,V> n = newTable[k];
                            newTable[k] = new HashEntry<K,V>(p.key, p.hash,
                                                             n, p.value);
                        }
                    }
                }
            }
            table = newTable;
        }
 
 

       put的实现看完了,继续看下从ConcurrentHashMap中get的实现:

    public V get(Object key) {
        int hash = hash(key.hashCode());
        return segmentFor(hash).get(key, hash);
    }

       流程还是先算hash值,然后确定到segment,然后调用segment的get方法:

        V get(Object key, int hash) {
            if (count != 0) { // 这里做一个volatile读
                HashEntry<K,V> e = getFirst(hash); //获取相应桶的第一个节点。
                while (e != null) {
                    if (e.hash == hash && key.equals(e.key)) {
                        V v = e.value;
                        if (v != null)
                            return v; //如果是相同的key,返回value。
                        return readValueUnderLock(e); // recheck
                    }
                    e = e.next;
                }
            }
            return null;
        }
        HashEntry<K,V> getFirst(int hash) {
            HashEntry<K,V>[] tab = table;
            return tab[hash & (tab.length - 1)];
        }
        /**
         * 加锁读value。如果value为nul的情况下调用这个方法。
         * 只有在编译器将HashEntry的初始化和其赋值给table的指令重排才会 
         * 出现这种情况,这在内存模型下是合法的,但从没发生过。
         */
        V readValueUnderLock(HashEntry<K,V> e) {
            lock();
            try {
                return e.value;
            } finally {
                unlock();
            }
        }

 

 

  • 理解了put和get过程,其他方法也很好理解了:

 

        boolean containsKey(Object key, int hash) {
            if (count != 0) { // read-volatile
                HashEntry<K,V> e = getFirst(hash);
                while (e != null) {
                    if (e.hash == hash && key.equals(e.key))
                        return true;
                    e = e.next;
                }
            }
            return false;
        }

 

        V remove(Object key, int hash, Object value) {
            lock();
            try {
                int c = count - 1;
                HashEntry<K,V>[] tab = table;
                int index = hash & (tab.length - 1);
                HashEntry<K,V> first = tab[index];
                HashEntry<K,V> e = first;
                while (e != null && (e.hash != hash || !key.equals(e.key)))
                    e = e.next;
                V oldValue = null;
                if (e != null) {
                    V v = e.value;
                    if (value == null || value.equals(v)) {
                        oldValue = v;
                        // All entries following removed node can stay
                        // in list, but all preceding ones need to be
                        // cloned.
                        ++modCount;
                        HashEntry<K,V> newFirst = e.next;
                        for (HashEntry<K,V> p = first; p != e; p = p.next)
                            newFirst = new HashEntry<K,V>(p.key, p.hash,
                                                          newFirst, p.value);
                        tab[index] = newFirst;
                        count = c; // write-volatile
                    }
                }
                return oldValue;
            } finally {
                unlock();
            }
        }

       我们会发现所有的写操作最后都会写一下count,而且所有的读操作最前面都会读一下count,由于count是volatile修饰的,所以这样相当于加了内存屏障(volatile写和后面的volatile读不能重排),保证了读操作能够看到最新的写的变化。

       以上理解有偏差,感谢@不待人亲指正。

       仔细看源码注释发现:

       All (synchronized) write operations should write to the "count" field after structurally changing any bin.

       也就是说只有bin(HashEntry链)的结构变化之后才会写count(覆盖的情况不会写count)。

       所以这里纠正一下:所有改变bin结构的写操作都会写一下count,可以保证HashEntry的可见性(因为无论是添加还是删除,bin起始的HashEntry都会发生变化,由于HashEntry的next域是不变的,所以删除时需要将目标HashEntry之前的Entry都拷贝一下)。

       而覆盖旧值的情况下不会写count,因为HashEntry的value本身也是volatile的,可以保证自身的可见性。

 
  • 我们上面还看到了有一个RETRIES_BEFORE_LOCK值,看看这个值起什么作用:
    public int size() {
        final Segment<K,V>[] segments = this.segments;
        long sum = 0;
        long check = 0;
        int[] mc = new int[segments.length];
        // Try a few times to get accurate count. On failure due to
        // continuous async changes in table, resort to locking.
        for (int k = 0; k < RETRIES_BEFORE_LOCK; ++k) {
            check = 0;
            sum = 0;
            int mcsum = 0;
            for (int i = 0; i < segments.length; ++i) {
                sum += segments[i].count;
                mcsum += mc[i] = segments[i].modCount;
            }
            if (mcsum != 0) {
                for (int i = 0; i < segments.length; ++i) {
                    check += segments[i].count;
                    if (mc[i] != segments[i].modCount) {
                        check = -1; // force retry
                        break;
                    }
                }
            }
            if (check == sum)
                break;
        }
        if (check != sum) { // Resort to locking all segments
            sum = 0;
            for (int i = 0; i < segments.length; ++i)
                segments[i].lock();
            for (int i = 0; i < segments.length; ++i)
                sum += segments[i].count;
            for (int i = 0; i < segments.length; ++i)
                segments[i].unlock();
        }
        if (sum > Integer.MAX_VALUE)
            return Integer.MAX_VALUE;
        else
            return (int)sum;
    }

        上面size的过程就是,累加所有segment中的count,如果过程中segment中元素数量发生了变化,那么重试。如果重试了RETRIES_BEFORE_LOCK次(默认是2)都不行,那么将所有segment加锁,然后累加count,然后再解锁。在containsValue里面也是这么玩儿的,代码就不贴了。

 
  • 其他代码也很容易看懂了,就分析到这里。最后注意下,ConcurrentHashMap也提供了Key和Value的集合视图,它们和ConcurrentHashMap共享一份数据,它们的迭代器是弱一致的。
 
       ConcurrentHashMap的代码解析完毕!
 
 
分享到:
评论
2 楼 BrokenDreams 2015-12-07  
budairenqin 写道
博主思路清晰易懂     , 很牛逼, 大部分JUC系列的文章我都看了一遍, 学习了.

以下这句我感觉稍微不够严谨:

我们会发现所有的写操作最后都会写一下count,而且所有的读操作最前面都会读一下count,由于count是volatile修饰的,所以这样相当于加了内存屏障(volatile写和后面的volatile读不能重排),保证了读操作能够看到最新的写的变化。

应该是只有添加新元素时才对 count 进行write-volatile, 这是为了保证HashEntry的可见性.
而覆盖HashEntry.value的时候, 也就是count不增加的情况下, HashEntry.value本身就是volatile修饰, value具有可见性


感谢不待人亲指正  之前的理解确实有误,已经改正。 希望这样的讨论越来越多哈
1 楼 budairenqin 2015-12-07  
博主思路清晰易懂     , 很牛逼, 大部分JUC系列的文章我都看了一遍, 学习了.

以下这句我感觉稍微不够严谨:

我们会发现所有的写操作最后都会写一下count,而且所有的读操作最前面都会读一下count,由于count是volatile修饰的,所以这样相当于加了内存屏障(volatile写和后面的volatile读不能重排),保证了读操作能够看到最新的写的变化。

应该是只有添加新元素时才对 count 进行write-volatile, 这是为了保证HashEntry的可见性.
而覆盖HashEntry.value的时候, 也就是count不增加的情况下, HashEntry.value本身就是volatile修饰, value具有可见性

相关推荐

Global site tag (gtag.js) - Google Analytics