一、ConcurrentHashMap
(1)为什么使用ConcurrentHashMap?
- HashMap是非线程安全的,在并发环境中可能会引起死循环
- HashTable是线程安全的,但是全部使用悲观锁效率非常慢
- ConcurrentHashMap采用分段的技术(降低了锁粒度)、CAS(比较并交换)、volatile关键字等来有效的提高了并发环境下的效率。
二、ConcurrentHashMap原理
在ConcurrentHashMap中通过一个Node<K,V>[]数组来保存添加到map中的键值对,而在同一个数组位置是通过链表和红黑树的形式来保存的。但是这个数组只有在第一次添加元素的时候才会初始化,否则只是初始化一个ConcurrentHashMap对象的话,只是设定了一个sizeCtl变量,这个变量用来判断对象的一些状态和是否需要扩容。
第一次添加元素的时候,默认初期长度为16,当往map中继续添加元素的时候,通过hash值跟数组长度取与来决定放在数组的哪个位置,如果出现放在同一个位置的时候,优先以链表的形式存放,在同一个位置的个数又达到了8个以上,如果数组的长度还小于64的时候,则会扩容数组。如果数组的长度大于等于64了的话,在会将该节点的链表转换成树。 通过扩容数组的方式来把这些节点给分散开。然后将这些元素复制到扩容后的新的数组中,同一个链表中的元素通过hash值的数组长度位来区分,是还是放在原来的位置还是放到扩容的长度的相同位置去 。在扩容完成之后,如果某个节点的是树,同时现在该节点的个数又小于等于6个了,则会将该树转为链表。 取元素的时候,相对来说比较简单,通过计算hash来确定该元素在数组的哪个位置,然后在通过遍历链表或树来判断key和key的hash,取出value值。三、源码
初始化
从ConcurrentHashMap部分代码中可以看到,整段代码只是设置了分段的长度(为2的整数次幂),并没有初始化segment内部的HashEntry[],采用的是懒汉方式进行初始化。
//入参为Segment数组的长度,默认为16public ConcurrentHashMap(int initialCapacity) { //初始化长度小于0,异常抛出 if (initialCapacity < 0) throw new IllegalArgumentException(); //MAXIMUM_CAPACITY 为2的30次方,如果大于2的29次方则为最大值,否则为 //大于等于initialCapacity最小2的整数次幂 int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1)); //设置初始化长度 this.sizeCtl = cap;}
get方法
static finalNode tabAt(Node [] tab, int i) { return (Node )U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);} public V get(Object key) { Node [] tab; Node e, p; int n, eh; K ek; int h = spread(key.hashCode()); if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) { if ((eh = e.hash) == h) { //如果当前e的key值是我们要找的,直接返回 if ((ek = e.key) == key || (ek != null && key.equals(ek))) return e.val; } else if (eh < 0) //如果eh<0说明当前node已经不存在,有另一个线程在进行扩容,直接去newtable中寻找 return (p = e.find(h, key)) != null ? p.val : null; //遍历链表获得Value值 while ((e = e.next) != null) { if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; } } return null;}
putVal方法
final V putVal(K key, V value, boolean onlyIfAbsent) { if (key == null || value == null) throw new NullPointerException(); int hash = spread(key.hashCode());//扰动函数 int binCount = 0; for (Node[] tab = table;;) {//死循环,不断重试,直至插入成功 Node f; int n, i, fh; if (tab == null || (n = tab.length) == 0) //如果为空,进行初始化 tab = initTable(); else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { //根据hash值去除对应节点的数据f,如果为null,则不会发生碰撞 //直采用CAS方式进行赋值,成功后直接返回 if (casTabAt(tab, i, null, new Node (hash, key, value, null))) break; // no lock when adding to empty bin } else if ((fh = f.hash) == MOVED) //如果当前有线程正在进行扩容,则帮助其扩容完成 tab = helpTransfer(tab, f); else { V oldVal = null; //如果当前节点不为空,则加锁插入,hash值相同的链表的头结点 synchronized (f) { if (tabAt(tab, i) == f) { if (fh >= 0) { binCount = 1; //遍历该链表,如果存在key值进行覆盖val,如果没有则添加在末尾 for (Node e = f;; ++binCount) { K ek; if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } Node pred = e; if ((e = e.next) == null) { pred.next = new Node (hash, key, value, null); break; } } } else if (f instanceof TreeBin) { //如果当前节点为树节点则直接调用树方法插入 Node p; binCount = 2; if ((p = ((TreeBin )f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } if (binCount != 0) { //如果当前节点链表数量大于8则转化为shu if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } } //如果是新插入元素,则进行元素个数+1 addCount(1L, binCount); return null;}