Map容器

HashMap

HashMap基于哈希表来实现,哈希表通过哈希计算可以快速地定位到元素的位置,这样插入,删除,查找元素的时间复杂度都是O(1),但是哈希计算有可能会产生哈希冲突,解决的办法包括拉链法,开放地址法等。

HashMap 是基于哈希表的 Map 接口的实现,以 Key-Value 的形式存在,即存储的对象是 Node(同时包含了 Key 和 Value) 。

它根据键的hashCode值存储数据,大多数情况下可以直接定位到它的值,因而具有很快的访问速度,但遍历顺序却是不确定的。 HashMap最多只允许一条记录的键为null,允许多条记录的值为null。HashMap非线程安全。

在存储结构上,HashMap是数组+链表+红黑树(JDK1.8增加了红黑树部分)实现的。

HshMap

也免不了会出现拉链过长的情况,一旦出现拉链过长,则会严重影响HashMap的性能。于是,在JDK1.8版本中,对数据结构做了进一步的优化,引入了红黑树。而当链表长度太长(默认超过8)时,链表就转换为红黑树,利用红黑树快速增删改查的特点提高HashMap的性能,其中会用到红黑树的插入、删除、查找等算法。

hashCode

由 Object 类定义的 hashCode 方法会针对不同的对象返回不同的整数,hashcode更像是说返回对象的摘要。

重写对象的equals()必须要先重写hashCode(),两个对象相等,hashCode()一定相等,但是hashCode’()相等,不一定equals().

假如只重写了euqals()而没有重写hashCode(),默认hashCode()是内存地址,那么equals()相等的两个对象都会插入到哈希表中。

hashCode在遇到使用哈希表存储的数据结构时会有用。

比如说,当数据结构的元素不允许重复。一种办法就是每次出入一个元素,一个个进行比对,但是,这样,当元素个数过多的时候,效率会很低。

于是,Java采用了哈希表的原理。 这样,我们对每个要存入集合的元素使用哈希算法算出一个值,然后根据该值计算出元素应该在数组的位置。

所以,当集合要添加新的元素时,可分为两个步骤:  

先调用这个元素的 hashCode 方法,然后根据所得到的值计算出元素应该在数组的位置。如果这个位置上没有元素,那么直接将它存储在这个位置上;

如果这个位置上已经有元素了,那么调用它的equals方法与新元素进行比较:相同的话就不存了,否则,将其存在这个位置对应的链表中(Java 中 HashSet, HashMap 和 Hashtable的实现总将元素放到链表的表头)。

存储结构

1
transient Node<K,V>[] table;

底层实现还是一个数组,存储的是Node(包含key-value)。

Node成员:

1
2
3
4
final int hash; //hash(key.hashCode())
final K key; //key
V value; //value
Node<K,V> next; //下一个节点

所以总的来说,HashMap底层是一个数组,而数组的每个元素存储的是一个链表,链表的每一个元素由键值对组成。9420a703-1f9d-42ce-808e-bcb82b56483d

构造函数

1
2
3
4
5
6
7
8
9
10
11
12
public HashMap(int initialCapacity, float loadFactor) {
if (initialCapacity < 0)
throw new IllegalArgumentException("Illegal initial capacity: " +
initialCapacity);
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
if (loadFactor <= 0 || Float.isNaN(loadFactor))
throw new IllegalArgumentException("Illegal load factor: " +
loadFactor);
this.loadFactor = loadFactor;
this.threshold = tableSizeFor(initialCapacity);
}

initialCapacity:table数组的大小。

loadFactor:table 能够使用的比例,当table使用率超过了loadFactor,进行扩容。**加载因子越大,填满的元素越多,好处是,空间利用率高了,但:冲突的机会加大了.反之,加载因子越小,填满的元素越少,好处是:冲突的机会减小了,但空间浪费多了**.默认loadFactor是0.75.因此,当我们内存比较大又对时间效率要求比较高,可以将loadFactor设置比较小,当内存比较紧张,可以将loadFactor设置比较大。

源码分析

确定数组索引位置

使用Hash算法计算出元素应该插入到数组哪一位置。

1
2
3
4
5
6
7
8
9
10
11
方法一:
static final int hash(Object key) { //jdk1.8 & jdk1.7
int h;
// h = key.hashCode() 为第一步 取hashCode值
// h ^ (h >>> 16) 为第二步 高位参与运算
return (key == null) ? 0 : (h = key.hashCode()) ^ (h >>> 16);
}
方法二:
static int indexFor(int h, int length) { //jdk1.7的源码,jdk1.8没有这个方法,但是实现原理一样的
return h & (length-1); //第三步 取模运算
}
  • 计算key的hashCode
  • key的hashcode并不一定在table的length之内,需要将其限制在length之内。
  • 最简单的方法就是取余了,但是,模运算的消耗还是比较大的。jdk采用h & (length-1)来解决,等价于h%length,但是效率更高。
  • 但是,这样还有一个问题,要是table的length很小,这样h & (length-1),h只有低位会参与运算,为了使分布更加均匀
  • 于是,h = key.hashCode()) ^ (h >>> 16),将hashCode()的高16位异或低16位

put方法

插入元素的时候,主要有三个问题解决:

  • 计算hash,确定数组索引。
  • 插入元素。
  • 处理扩容。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
final V putVal(int hash, K key, V value, boolean onlyIfAbsent,
boolean evict) {
Node<K,V>[] tab; Node<K,V> p; int n, i;
//table是否为空,是空就创建
if ((tab = table) == null || (n = tab.length) == 0)
n = (tab = resize()).length;
//hash=hash(key),通过hash计算需要插入的数组位置,若该位置没有元素,直接创建插入
if ((p = tab[i = (n - 1) & hash]) == null)
tab[i] = newNode(hash, key, value, null);
else {
Node<K,V> e; K k;
//首个元素是否和key一样,如果相同直接覆盖value
if (p.hash == hash &&
((k = p.key) == key || (key != null && key.equals(k))))
e = p;
else if (p instanceof TreeNode)
//table[i]是一个红黑树,直接插入树
e = ((TreeNode<K,V>)p).putTreeVal(this, tab, hash, key, value);
else {
//遍历链表中的元素,假如链表长度大于规定值,反转成红黑树,
//遍历过程中若发现key已经存在直接覆盖value即可
for (int binCount = 0; ; ++binCount) {
if ((e = p.next) == null) {
//这是尾插法
p.next = newNode(hash, key, value, null);
if (binCount >= TREEIFY_THRESHOLD - 1) // -1 for 1st
treeifyBin(tab, hash);
break;
}
if (e.hash == hash &&
((k = e.key) == key || (key != null && key.equals(k))))
break;
p = e;
}
}
if (e != null) { // existing mapping for key
V oldValue = e.value;
if (!onlyIfAbsent || oldValue == null)
e.value = value;
afterNodeAccess(e);
return oldValue;
}
}
//超过threshold,进行扩容
++modCount;
if (++size > threshold)
resize();
afterNodeInsertion(evict);
return null;
}

d669d29c

扩容机制

默认扩容是之前数组的两倍,扩容操作同样需要把 oldTable 的所有键值对重新插入 newTable 中,因此这一步是很费时的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
void resize(int newCapacity) {
Entry[] oldTable = table;
int oldCapacity = oldTable.length;
if (oldCapacity == MAXIMUM_CAPACITY) {
threshold = Integer.MAX_VALUE;
return;
}
Entry[] newTable = new Entry[newCapacity];
transfer(newTable);
table = newTable;
threshold = (int)(newCapacity * loadFactor);
}

void transfer(Entry[] newTable) {
Entry[] src = table;
int newCapacity = newTable.length;
for (int j = 0; j < src.length; j++) {
Entry<K,V> e = src[j];
if (e != null) {
src[j] = null;
do {
Entry<K,V> next = e.next;
int i = indexFor(e.hash, newCapacity);
//也就是使用了单链表的头插入方式,同一位置上新元素总会被放在链表的头部位置
e.next = newTable[i];
newTable[i] = e;
e = next;
} while (e != null);
}
}
}

在JDK1.8,我们在扩充HashMap的时候,不需要像JDK1.7的实现那样重新计算hash。

1
h & (length-1)

因为我们每次扩展都是比上一次扩展了两倍,所以,length-1变化。

1
扩容两倍length-1变化:0000 0000 0000 0111->0000 0000 0000 1111

所以,我们在对hash做&运算的时候,只需要计算变化的那个位置即可,若hash值那个位置是0,那么 h & (length-1)不变,索引位置不改变;如果hash值那个位置是1,那么直接移动二次幂个位置。

一个table[i]中的key可以不一样,但是他们的hash计算是一样的。

HashMap底层为什么总是2的n次方?

但当底层数组的length为2的n次方时, h&(length - 1) 就相当于对length取模,而且速度比直接取模快得多,这是HashMap在速度上的一个优化。

这个计算方式是 :(数组长度 - 1) & hash。

数组长度是二的次方,2的2次方,二进制是 100,3次方 1000,4次方 10000。

那么按照这个规律,那么长度 - 1, 刚好是 011, 0111, 01111。这个刚好就可以当做掩码,来计算数组下标。那么就用掩码和hash做个与运算。

011 & 101010100101001001101 = 01 下标=1,数组长度=4

0111 & 101010100101001001101 = 101 下标=5,数组长度=8

01111 & 101010100101001001101 = 1101 下标=13,数组长度=16

可以发现,通过 掩码 & hash,得出的数组下标不会越界。而数组的总长度总是2的次方,就是为了方便取得掩码的。HashMap length总是2的n次方

所以,HashMap1.8相比1.7所做的优化主要包括两方面:

  • 红黑树。当链表中元素超过某个数值,将链表自动转化为红黑树。
  • resize。resize的时候不需要重新计算hash。rehash 的过程也进行了改动,基于复制的算法思想,不直接操作原链,而是定义了两条链表分别完成对原链的结点分离操作,即使是多线程的情况下也是安全的,不会产生死循环。

多线程问题

HashMap不是线程安全的。可能会出现以下并发问题。

  • put数据丢失。

    设t1执行put(“key2”, “value2”),t2执行put(“key3”, “value3”),并且已经有一个key1存在链表中。

    理想情况是这样:

但是,假设t1先执行p.next = newNode(hash, key, value, null);,但是还没有将p=key2赋值。

然后t2继续执行p.next = newNode(hash, key, value, null);,这样,p.next就会指向key3,而key2没有人指向他,就会被丢弃。

169776786ed4a7ed

HashMap JDK7与JDK8的不同

  • JDK8中的HashMap存储结构采用的是数组+链表+红黑树,当链表长度超过某个值,会自动转化为红黑树。而JDK7中的HashMap没有用红黑树。使用红黑树可以大大提高查找效率。
  • 扩容的时候。JDK需要重新计算Hash,重新插入,而JDK8中的不需要。
  • JDK7中扩容插入链表的时候,采用的是头插法,会造成

HashMap与HashTable的不同

  • HashTable通过同步保证线程安全,锁粒度是整张表。
  • HashTable不允许null值。
  • HashTable直接使用对象的HashCode计算hash,HashMap需要通过HashCode再计算。

    ConcurrentHashMap

JDK1.7

  • 以Segment为粒度加锁。
  • 扩容操作next是final的问题。

926638-20170809132445011-2033999443

Segment数组的意义就是将一个大的table分割成多个小的table来进行加锁,也就是上面的提到的锁分离技术,而每一个Segment元素存储的是HashEntry数组+链表,这个和HashMap的数据存储结构一样.默认的segment是16个。

注意,假设ConcurrentHashMap一共分为2^n个段,每个段中有2^m个桶,那么段的定位方式是将key的hash值的高n位与(2^n-1)相与。在定位到某个段后,再将key的hash值的低m位与(2^m-1)相与,定位到具体的桶位。

Segment

Segment 类继承于 ReentrantLock 类,从而使得 Segment 对象能充当锁的角色。每一个segment有一个table,里面存放的是HashEntry。

HashEntry

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
static final class HashEntry<K,V> {
final K key; // 声明 key 为 final 的
final int hash; // 声明 hash 值为 final 的
volatile V value; // 声明 value 被volatile所修饰
final HashEntry<K,V> next; // 声明 next 为 final 的

HashEntry(K key, int hash, HashEntry<K,V> next, V value) {
this.key = key;
this.hash = hash;
this.next = next;
this.value = value;
}

@SuppressWarnings("unchecked")
static final <K,V> HashEntry<K,V>[] newArray(int i) {
return new HashEntry[i];
}
}

key,hash,next都是final,这样,对于其他线程是可见的,value是volatile的,因此HashEntry对象几乎是不可变的,只有value是可变的,有volatile修饰,一旦改变别的线程也可以感知到。成员变量是final的问题

next是final,我们不可能从链表尾部或者是中间添加元素,只可能采用头插法

实际上,concurrentHashMap的写操作需要对Segment加锁,但是读操作不需要加锁,这就是next是final的作用

put操作

需要对Segment加锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public V put(K key, V value) {
//不允许null值
if (value == null)
throw new NullPointerException();
int hash = hash(key.hashCode());
return segmentFor(hash).put(key, hash, value, false);
}

//段的定位方式是将key的hash值的高n位与(2^n-1)相与,2^n为segment个数
final Segment<K,V> segmentFor(int hash) {
return segments[(hash >>> segmentShift) & segmentMask];
}

//将元素插入到segment的table中
V put(K key, int hash, V value, boolean onlyIfAbsent) {
lock(); // 上锁
try {
int c = count;
if (c++ > threshold) // ensure capacity
rehash();
HashEntry<K,V>[] tab = table; // table是Volatile的
int index = hash & (tab.length - 1); // 定位到段中特定的桶
HashEntry<K,V> first = tab[index]; // first指向桶中链表的表头
HashEntry<K,V> e = first;

// 检查该桶中是否存在相同key的结点
while (e != null && (e.hash != hash || !key.equals(e.key)))
e = e.next;

V oldValue;
if (e != null) { // 该桶中存在相同key的结点
oldValue = e.value;
if (!onlyIfAbsent)
e.value = value; // 更新value值
}else { // 该桶中不存在相同key的结点
oldValue = null;
++modCount; // 结构性修改,modCount加1
tab[index] = new HashEntry<K,V>(key, hash, first, value); // 创建HashEntry并将其链到表头
count = c; //write-volatile,count值的更新一定要放在最后一步(volatile变量)
}
return oldValue; // 返回旧值(该桶中不存在相同key的结点,则返回null)
} finally {
unlock(); // 在finally子句中解锁
}
}
  • put操作时,第一步看插入的是不是null值,假如是,抛异常。
  • 定位segment。将key的hash值的高n位与(2^n-1)相与,n为segment个数。
  • 元素插入到segment的table内
    • 对Segment的put操作是加锁完成的,segment实际上继承了ReentrantLock。这里的加锁操作是针对某个具体的Segment,锁定的也是该Segment而不是整个ConcurrentHashMap。 相比较于 HashTable 和由同步包装器包装的HashMap每次只能有一个线程执行读或写操作,ConcurrentHashMap 在并发访问性能上有了质的提高。在理想状态下,ConcurrentHashMap 可以支持 16 个线程执行并发写操作(如果并发级别设置为 16),及任意数量线程的读操作。
    • 查看size是否超过threshold,超过了进行扩容。
    • 定位table位置。将key的hash值的低m位与(2^m-1)相与,定位到具体位置。
    • 插入数据。就与HashMap的过程差不多。若桶内没有元素,直接插入,若有元素,遍历整个链表,若有重复key值,直接替换,没有的话,再插入元素。

rehash操作

扩容时进行的rehash操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
void rehash() {
HashEntry<K,V>[] oldTable = table; // 扩容前的table
int oldCapacity = oldTable.length;
if (oldCapacity >= MAXIMUM_CAPACITY) // 已经扩到最大容量,直接返回
return;
// 新创建一个table,其容量是原来的2倍
HashEntry<K,V>[] newTable = HashEntry.newArray(oldCapacity<<1);
threshold = (int)(newTable.length * loadFactor); // 新的阈值
int sizeMask = newTable.length - 1; // 用于定位桶
for (int i = 0; i < oldCapacity ; i++) {
// We need to guarantee that any existing reads of old Map can
// proceed. So we cannot yet null out each bin.
HashEntry<K,V> e = oldTable[i]; // 依次指向旧table中的每个桶的链表表头

if (e != null) { // 旧table的该桶中链表不为空
HashEntry<K,V> next = e.next;
int idx = e.hash & sizeMask; // 重哈希已定位到新桶
if (next == null) // 旧table的该桶中只有一个节点
newTable[idx] = e;
else {
// Reuse trailing consecutive sequence at same slot
HashEntry<K,V> lastRun = e;
int lastIdx = idx;
for (HashEntry<K,V> last = next;
last != null;
last = last.next) {
int k = last.hash & sizeMask;
// 寻找k值相同的子链,该子链尾节点与父链的尾节点必须是同一个
if (k != lastIdx) {
lastIdx = k;
lastRun = last;
}
}

// JDK直接将子链lastRun放到newTable[lastIdx]桶中
newTable[lastIdx] = lastRun;

// 对该子链之前的结点,JDK会挨个遍历并把它们复制到新桶中
for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
int k = p.hash & sizeMask;
HashEntry<K,V> n = newTable[k];
newTable[k] = new HashEntry<K,V>(p.key, p.hash,
n, p.value);
}
}
}
}
table = newTable; // 扩容完成
}

resize操作与HashMap的操作也没有多大的区别,也是将将size double,然后将元素复制到新的table中去。

但是,还有一点需要注意。

我们知道链接指针next是final的,因此看起来我们好像只能把该桶的HashEntry链中的每个节点复制到新的桶中(这意味着我们要重新创建每个节点),但事实上JDK做了一些优化。

在理论上原桶里的HashEntry链可能存在一条子链,这条子链上的节点都会被重哈希到同一个新的桶中,这样我们只要拿到该子链的头结点就可以直接把该子链放到新的桶中,从而避免了一些节点不必要的创建,提升了一定的效率。因此,JDK为了提高效率,它会首先去查找这样的一个子链,而且这个子链的尾节点必须与原hash链的尾节点是同一个(因为next是final不可变的,他只能使用头插法,所以尾部的所有元素都不能变),那么就只需要把这个子链的头结点放到新的桶中,其后面跟的一串子节点自然也就连接上了。对于这个子链头结点之前的结点,JDK会挨个遍历并把它们复制到新桶的链头(只能在表头插入元素)中。

HashMap因为next不是final,不需要新建节点,直接改指向就可以了。

get操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

public V get(Object key) {
//定位segment
int hash = hash(key.hashCode());
return segmentFor(hash).get(key, hash);
}

V get(Object key, int hash) {
if (count != 0) { // read-volatile,首先读 count 变量
HashEntry<K,V> e = getFirst(hash); // 获取桶中链表头结点
while (e != null) {
if (e.hash == hash && key.equals(e.key)) { // 查找链中是否存在指定Key的键值对
V v = e.value;
if (v != null) // 如果读到value域不为 null,直接返回
return v;
// 如果读到value域为null,说明发生了重排序,加锁后重新读取
return readValueUnderLock(e); // recheck
}
e = e.next;
}
}
return null; // 如果不存在,直接返回null
}

但是有一个情况需要特别注意,就是链中存在指定Key的键值对并且其对应的Value值为null的情况。在剖析ConcurrentHashMap的put操作时,我们就知道ConcurrentHashMap不同于HashMap,它既不允许key值为null,也不允许value值为null。但是,此处怎么会存在键值对存在且的Value值为null的情形呢?JDK官方给出的解释是,这种情形发生的场景是:初始化HashEntry时发生的指令重排序导致的,也就是在HashEntry初始化完成之前便返回了它的引用。这时,JDK给出的解决之道就是加锁重读。

size

如果我们要统计整个 ConcurrentHashMap 里元素的大小,就必须统计所有 Segment 里元素的大小后求和。Segment 里的全局变量 count 是一个 volatile 变量,那么在多线程场景下,我们是不是直接把所有 Segment 的 count 相加就可以得到整个 ConcurrentHashMap 大小了呢?

不是的,虽然相加时可以获取每个 Segment 的 count 的最新值,但是拿到之后可能累加前使用的 count 发生了变化,那么统计结果就不准了。所以最安全的做法,是在统计 size 的时候把所有 Segment 的 put,remove 和 clean 方法全部锁住,但是这种做法显然非常低效。

因为在累加 count 操作过程中,之前累加过的 count 发生变化的几率非常小,所以 ConcurrentHashMap 的做法是先尝试 2 次通过不锁住 Segment 的方式来统计各个 Segment 大小,如果统计的过程中,容器的 count 发生了变化,则再采用加锁的方式来统计所有 Segment 的大小。

那么 ConcurrentHashMap 是如何判断在统计的时候容器是否发生了变化呢?使用 modCount 变量,在 put , remove 和 clean 方法里操作元素前都会将变量 modCount 进行加 1,那么在统计 size 前后比较 modCount 是否发生变化,从而得知容器的大小是否发生变化。

也是乐观锁的原理

remove

因为next是final类型的,next不可变,所以删除元素肯定不可以直接将next指向别的元素。

对于next,只能通过新建元素通过头插法来解决。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
        /**
* Remove; match on key only if value null, else match both.
*/
V remove(Object key, int hash, Object value) {
lock(); // 加锁
try {
int c = count - 1;
HashEntry<K,V>[] tab = table;
int index = hash & (tab.length - 1); // 定位桶
HashEntry<K,V> first = tab[index];
HashEntry<K,V> e = first;
while (e != null && (e.hash != hash || !key.equals(e.key))) // 查找待删除的键值对
e = e.next;

V oldValue = null;
if (e != null) { // 找到
V v = e.value;
if (value == null || value.equals(v)) {
oldValue = v;
// All entries following removed node can stay
// in list, but all preceding ones need to be
// cloned.
++modCount;
// 所有处于待删除节点之后的节点原样保留在链表中
HashEntry<K,V> newFirst = e.next;
// 所有处于待删除节点之前的节点被克隆到新链表中
for (HashEntry<K,V> p = first; p != e; p = p.next)
newFirst = new HashEntry<K,V>(p.key, p.hash,newFirst, p.value);

tab[index] = newFirst; // 将删除指定节点并重组后的链重新放到桶中
count = c; // write-volatile,更新Volatile变量count
}
}
return oldValue;
} finally {
unlock(); // finally子句解锁
}
}

Segment的remove操作和前面提到的get操作类似,首先根据散列码找到具体的链表,然后遍历这个链表找到要删除的节点,最后把待删除节点之后的所有节点原样保留在新链表中,把待删除节点之前的每个节点克隆到新链表中。

20170527170625098

我们可以看出,删除节点C之后的所有节点原样保留到新链表中;删除节点C之前的每个节点被克隆到新链表中(它们在新链表中的链接顺序被反转了)。因此,在执行remove操作时,原始链表并没有被修改,也就是说,读线程不会受同时执行 remove 操作的并发写线程的干扰。

为何线程安全

  • 用HashEntery对象的不变性来降低读操作对加锁的需求;
  • 用Volatile变量协调读写线程间的内存可见性;value是volatile的。
  • 若读时发生指令重排序现象,则加锁重读;由于在ConcurrentHashMap中不允许用null作为键和值,所以当读线程读到某个HashEntry的value为null时,便知道产生了冲突 —— 发生了重排序现象,此时便会加锁重新读入这个value值。

next为什么设置成final的呢

当next设置成final,肯定是不可以变的,这样读和写就可以并发执行了,也就是读操作不需要加锁。

区别:

  • 定位方式。先定位到Segment,再定位到table中的位置。
  • 不允许插入null值,否则抛异常。
  • 写操作需要加锁。

JDK1.8

jdk 1.8 取消了基于 Segment 的分段锁思想,改用 CAS + synchronized 控制并发操作,在某些方面提升了性能。并且追随 1.8 版本的 HashMap 底层实现,使用数组+链表+红黑树进行数据存储。最重要的是加了一个sizeCtl字段。

e6ac01f07ca641a54ff6f17c41a386df

关键成员

table volatile Node<K,V>[] table://装载Node的数组,作为ConcurrentHashMap的数据容器,采用懒加载的方式,直到第一次插入数据的时候才会进行初始化操作,数组的大小总是为2的幂次方。

nextTable volatile Node<K,V>[] nextTable; //扩容时使用,平时为null,只有在扩容的时候才为非null

sizeCtl : -1表示正在初始化 -N则表示当前正有N-1个线程进行扩容操作。

​ 正数:table未初始化,表示需要新建数组长度;table初始化了,表示table可用容量。

一些内部类

Node:存放key-value。

TreeNode 树节点,继承于承载数据的Node类。

构造方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public ConcurrentHashMap(int initialCapacity) {
//1. 小于0直接抛异常
if (initialCapacity < 0)
throw new IllegalArgumentException();
//2. 判断是否超过了允许的最大值,超过了话则取最大值,否则再对该值进一步处理
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
//3. 赋值给sizeCtl
this.sizeCtl = cap;
}

private static final int tableSizeFor(int c) {
int n = c - 1;
n |= n >>> 1;
n |= n >>> 2;
n |= n >>> 4;
n |= n >>> 8;
n |= n >>> 16;
return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
}

需要注意的有两点。

  • ConcurrentHashMap的大小一定是2的幂次方,比如,当指定大小为18时,为了满足2的幂次方特性,实际上concurrentHashMapd的大小为2的5次方(32)。
  • 在构造方法中,并没有初始化table,这是一个懒加载,直到插入第一个元素才会初始化table。

initTable方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
if ((sc = sizeCtl) < 0)
// 1. 保证只有一个线程正在进行初始化操作
Thread.yield(); // lost initialization race; just spin
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
// 2. 得出数组的大小
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
// 3. 这里才真正的初始化数组
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
// 4. 计算数组中可用的大小:实际大小n*0.75(加载因子)
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}
  • 当sizeCtl<0时,表明已有一个线程对其进行初始化,因此当前线程让出CPU。

  • 进行初始化的线程需要将sizeCtl设置为-1(CAS),而且数组的容量为0.75*数组实际大小n。

put方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
//1. 计算key的hash值
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
//2. 如果当前table还没有初始化先调用initTable方法将tab进行初始化
if (tab == null || (n = tab.length) == 0)
tab = initTable();
//3. tab中索引为i的位置的元素为null,则直接使用CAS将值插入即可
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
//4. 当前正在扩容
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
synchronized (f) {
if (tabAt(tab, i) == f) {
//5. 当前为链表,在链表中插入新的键值对
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
// 6.当前为红黑树,将新的键值对插入到红黑树中
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
// 7.插入完键值对后再根据实际大小看是否需要转换成红黑树
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
//8.对当前容量大小进行检查,如果超过了临界值(实际大小*加载因子)就需要扩容
addCount(1L, binCount);
return null;
}
  • ConcurrentHashMap不接受null值,假如传入的null值,直接抛异常。

  • 计算hash,将key的hashCode的低16位于高16位进行异或运算

  • 初始化table。

  • 计算table中索引位置。(n - 1) & hash

  • 确定好数组的索引i后,就可以可以tabAt()方法(这是线程安全的,调用了Unsafe的方法),获取该位置的元素情况。

  • 假如没有元素,直接使用casTabAt方法将新值插入。

  • 如果当前节点不为null,且该节点为特殊节点(forwardingNode)的话(该节点hash=-1),就说明当前concurrentHashMap正在进行扩容操作.

  • 插入新值。需要加锁

    • 如果是链表节点(fh>0); 1.在链表中如果找到了与待插入的键值对的key相同的节点,就直接覆盖即可;2. 如果直到找到了链表的末尾都没有找到的话,就直接将待插入的键值对追加到链表的末尾即可。
    • 当table[i]为红黑树的根节点,在红黑树中插入新值。
  • 插入完新节点后,查看链表长度是否超过设定值,是否需要将其转成红黑树。

  • 对当前容量大小进行检查,如果超过了临界值(实际大小*加载因子)就需要扩容。

transfer

ConcurrentHashMap主要采用并发扩容的方式,给每个线程分配一定的区间,每个线程扩容自己所分配到的那段,当执行完自己的那一段,若还没有扩容完成,可以继续申请区间,直到扩容完毕。

  • 区间怎么定,根据CPU来平均分配,保证每个线程分配时均匀的。

  • 第一个线程扩容时,会将sizeCtl设置为-1,再来一个,sizeCtl再减去1,当线程执行完扩容,sizeCtl+1.这样,就可以判断是否有线程再扩容。

  • 当线程拿到某个桶的时候,会将一个ForwardingNode,这样,其他线程再看到这个占位符的时候,就会helptransfer。

  • 那么线程如何处理自己的区间呢?

    • 会有一个bound参数,这个参数指的是该线程此次可以处理的区间的最小下标,超过这个下标,就需要重新领取区间或者结束扩容。

    • 获取某个桶位置的元素,假如是null,那么放入ForwardingNode进行占位,这样,当别的线程往里面插元素的时候,看到这个,就会执行helpTransfer。

    • 假如桶里元素不是null,每次处理自己的桶的时候,需要加锁。如果这个桶是链表,那么就将这个链表根据 length 取于拆成两份,取于结果是 0 的放在新表的低位,取于结果是 1 放在新表的高位。红黑树也类似。

      1637856068bf02ed

当第一个线程扩容会将sizeCtl设置为-1,

通过给每个线程分配桶区间,避免线程间的争用,通过为每个桶节点加锁,避免 putVal 方法导致数据不一致。同时,在扩容的时候,也会将链表拆成两份,这点和 HashMap 的 resize 方法类似。

而如果有新的线程想 put 数据时,也会帮助其扩容。鬼斧神工,令人赞叹。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
/**
* Moves and/or copies the nodes in each bin to new table. See
* above for explanation.
*
* transferIndex 表示转移时的下标,初始为扩容前的 length。
*
* 我们假设长度是 32
*/
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
// 将 length / 8 然后除以 CPU核心数。如果得到的结果小于 16,那么就使用 16。
// 这里的目的是让每个 CPU 处理的桶一样多,避免出现转移任务不均匀的现象,如果桶较少的话,默认一个 CPU(一个线程)处理 16 个桶
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range 细分范围 stridea:TODO
// 新的 table 尚未初始化
if (nextTab == null) { // initiating
try {
// 扩容 2 倍
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
// 更新
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
// 扩容失败, sizeCtl 使用 int 最大值。
sizeCtl = Integer.MAX_VALUE;
return;// 结束
}
// 更新成员变量
nextTable = nextTab;
// 更新转移下标,就是 老的 tab 的 length
transferIndex = n;
}
// 新 tab 的 length
int nextn = nextTab.length;
// 创建一个 fwd 节点,用于占位。当别的线程发现这个槽位中是 fwd 类型的节点,则跳过这个节点。
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
// 首次推进为 true,如果等于 true,说明需要再次推进一个下标(i--),反之,如果是 false,那么就不能推进下标,需要将当前的下标处理完毕才能继续推进
boolean advance = true;
// 完成状态,如果是 true,就结束此方法。
boolean finishing = false; // to ensure sweep before committing nextTab
// 死循环,i 表示下标,bound 表示当前线程可以处理的当前桶区间最小下标
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
// 如果当前线程可以向后推进;这个循环就是控制 i 递减。同时,每个线程都会进入这里取得自己需要转移的桶的区间
while (advance) {
int nextIndex, nextBound;
// 对 i 减一,判断是否大于等于 bound (正常情况下,如果大于 bound 不成立,说明该线程上次领取的任务已经完成了。那么,需要在下面继续领取任务)
// 如果对 i 减一大于等于 bound(还需要继续做任务),或者完成了,修改推进状态为 false,不能推进了。任务成功后修改推进状态为 true。
// 通常,第一次进入循环,i-- 这个判断会无法通过,从而走下面的 nextIndex 赋值操作(获取最新的转移下标)。其余情况都是:如果可以推进,将 i 减一,然后修改成不可推进。如果 i 对应的桶处理成功了,改成可以推进。
if (--i >= bound || finishing)
advance = false;// 这里设置 false,是为了防止在没有成功处理一个桶的情况下却进行了推进
// 这里的目的是:1. 当一个线程进入时,会选取最新的转移下标。2. 当一个线程处理完自己的区间时,如果还有剩余区间的没有别的线程处理。再次获取区间。
else if ((nextIndex = transferIndex) <= 0) {
// 如果小于等于0,说明没有区间了 ,i 改成 -1,推进状态变成 false,不再推进,表示,扩容结束了,当前线程可以退出了
// 这个 -1 会在下面的 if 块里判断,从而进入完成状态判断
i = -1;
advance = false;// 这里设置 false,是为了防止在没有成功处理一个桶的情况下却进行了推进
}// CAS 修改 transferIndex,即 length - 区间值,留下剩余的区间值供后面的线程使用
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;// 这个值就是当前线程可以处理的最小当前区间最小下标
i = nextIndex - 1; // 初次对i 赋值,这个就是当前线程可以处理的当前区间的最大下标
advance = false; // 这里设置 false,是为了防止在没有成功处理一个桶的情况下却进行了推进,这样对导致漏掉某个桶。下面的 if (tabAt(tab, i) == f) 判断会出现这样的情况。
}
}// 如果 i 小于0 (不在 tab 下标内,按照上面的判断,领取最后一段区间的线程扩容结束)
// 如果 i >= tab.length(不知道为什么这么判断)
// 如果 i + tab.length >= nextTable.length (不知道为什么这么判断)
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) { // 如果完成了扩容
nextTable = null;// 删除成员变量
table = nextTab;// 更新 table
sizeCtl = (n << 1) - (n >>> 1); // 更新阈值
return;// 结束方法。
}// 如果没完成
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {// 尝试将 sc -1. 表示这个线程结束帮助扩容了,将 sc 的低 16 位减一。
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)// 如果 sc - 2 不等于标识符左移 16 位。如果他们相等了,说明没有线程在帮助他们扩容了。也就是说,扩容结束了。
return;// 不相等,说明没结束,当前线程结束方法。
finishing = advance = true;// 如果相等,扩容结束了,更新 finising 变量
i = n; // 再次循环检查一下整张表
}
}
else if ((f = tabAt(tab, i)) == null) // 获取老 tab i 下标位置的变量,如果是 null,就使用 fwd 占位。
advance = casTabAt(tab, i, null, fwd);// 如果成功写入 fwd 占位,再次推进一个下标
else if ((fh = f.hash) == MOVED)// 如果不是 null 且 hash 值是 MOVED。
advance = true; // already processed // 说明别的线程已经处理过了,再次推进一个下标
else {// 到这里,说明这个位置有实际值了,且不是占位符。对这个节点上锁。为什么上锁,防止 putVal 的时候向链表插入数据
synchronized (f) {
// 判断 i 下标处的桶节点是否和 f 相同
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;// low, height 高位桶,低位桶
// 如果 f 的 hash 值大于 0 。TreeBin 的 hash 是 -2
if (fh >= 0) {
// 对老长度进行与运算(第一个操作数的的第n位于第二个操作数的第n位如果都是1,那么结果的第n为也为1,否则为0)
// 由于 Map 的长度都是 2 的次方(000001000 这类的数字),那么取于 length 只有 2 种结果,一种是 0,一种是1
// 如果是结果是0 ,Doug Lea 将其放在低位,反之放在高位,目的是将链表重新 hash,放到对应的位置上,让新的取于算法能够击中他。
int runBit = fh & n;
Node<K,V> lastRun = f; // 尾节点,且和头节点的 hash 值取于不相等
// 遍历这个桶
for (Node<K,V> p = f.next; p != null; p = p.next) {
// 取于桶中每个节点的 hash 值
int b = p.hash & n;
// 如果节点的 hash 值和首节点的 hash 值取于结果不同
if (b != runBit) {
runBit = b; // 更新 runBit,用于下面判断 lastRun 该赋值给 ln 还是 hn。
lastRun = p; // 这个 lastRun 保证后面的节点与自己的取于值相同,避免后面没有必要的循环
}
}
if (runBit == 0) {// 如果最后更新的 runBit 是 0 ,设置低位节点
ln = lastRun;
hn = null;
}
else {
hn = lastRun; // 如果最后更新的 runBit 是 1, 设置高位节点
ln = null;
}// 再次循环,生成两个链表,lastRun 作为停止条件,这样就是避免无谓的循环(lastRun 后面都是相同的取于结果)
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
// 如果与运算结果是 0,那么就还在低位
if ((ph & n) == 0) // 如果是0 ,那么创建低位节点
ln = new Node<K,V>(ph, pk, pv, ln);
else // 1 则创建高位
hn = new Node<K,V>(ph, pk, pv, hn);
}
// 其实这里类似 hashMap
// 设置低位链表放在新链表的 i
setTabAt(nextTab, i, ln);
// 设置高位链表,在原有长度上加 n
setTabAt(nextTab, i + n, hn);
// 将旧的链表设置成占位符
setTabAt(tab, i, fwd);
// 继续向后推进
advance = true;
}// 如果是红黑树
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
// 遍历
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
// 和链表相同的判断,与运算 == 0 的放在低位
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
} // 不是 0 的放在高位
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
// 如果树的节点数小于等于 6,那么转成链表,反之,创建一个新的树
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
// 低位树
setTabAt(nextTab, i, ln);
// 高位数
setTabAt(nextTab, i + n, hn);
// 旧的设置成占位符
setTabAt(tab, i, fwd);
// 继续向后推进
advance = true;
}
}
}
}
}
}
  • 将nextTable扩容两倍。

  • 确定索引 i。

  • 获取 i 中的元素。假如为null,直接插入一个占位forwardingNode,这样,其他线程再putVal的时候,看到这个占位符,会进行helpTransfer。

  • 假如元素为forwardingNode类型,说明这个位置处理过了,直接跳过。

  • 除此之外,进行该桶内数据的转移。整个过程需要加锁。

    • 桶内元素为链表,那么就将这个链表根据 length 取于拆成两份,取于结果是 0 的放在新表的低位,取于结果是 1 放在新表的高位。

      1637856068bf02ed

    • 桶内元素为红黑树也是类似这样进行处理。

    • 将该位置插入占位符forwarding Node。

LinkedHashMap(解决Map无序问题)

HashMap是无序的,也就是说,迭代HashMap所得到的元素顺序并不是它们最初放置到HashMap的顺序。

LinkedHashMap解决了这个问题,它通过一个双向列表保证了迭代顺序。这个顺序可以是插入顺序,也可以是访问顺序,默认是插入顺序。所以,实际上,LinkedHashMap = HashMap+双向链表。

LinkedHashMap整个大致存储结构与HashMap没有差别,主要就是在各个节点之间添加了before以及after指针,使这些节点形成了一个双链表。

但是LinkedHashMap是非线程安全的。

4843132-7abca1abd714341d

红色的是一个双向链表。也就是说,当我们插入一个元素的时候,除了要插入到HashMap中,还要插入到双向链表中。

存储结构

1
2
3
4
5
6
7
//Entry除了有HashMap的成员外,还有before和after两个指针。 
static class Entry<K,V> extends HashMap.Node<K,V> {
Entry<K,V> before, after;
Entry(int hash, K key, V value, Node<K,V> next) {
super(hash, key, value, next);
}
}

构造方法

1
2
3
4
5
6
7
//这是一个三个参数的,accessOrder代表迭代顺序,他是按照插入顺序还是访问顺序来完成的
public LinkedHashMap(int initialCapacity,
float loadFactor,
boolean accessOrder) {
super(initialCapacity, loadFactor); // 调用HashMap对应的构造函数
this.accessOrder = accessOrder; // 迭代顺序的默认值
}

put方法

LinkedHashMap完全继承了HashMap的 put(Key,Value) 方法,只是对put(Key,Value)方法所调用的recordAccess方法和addEntry方法进行了重写(1.7),1.8是对new Node重写,不过也差不多,都是对插入元素这块进行了重写

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Node<K,V> newNode(int hash, K key, V value, Node<K,V> e) {
LinkedHashMap.Entry<K,V> p =
new LinkedHashMap.Entry<K,V>(hash, key, value, e);
linkNodeLast(p);
return p;
}
//添加到双链表尾部
private void linkNodeLast(LinkedHashMap.Entry<K,V> p) {
LinkedHashMap.Entry<K,V> last = tail;
tail = p;
if (last == null)
head = p;
else {
p.before = last;
last.after = p;
}
}

相比HashMap而言,LinkedHashMap在向哈希表添加一个键值对的同时,也会将其链入到它所维护的双向链表中,以便设定迭代顺序。

get 方法

get方法跟HahMap也差不多,只不过多了一步。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public V get(Object key) {
Node<K,V> e;
if ((e = getNode(hash(key), key)) == null)
return null;
//假如是该LinkedHashMap按照访问顺序迭代,需要将访问过的元素放到队尾
if (accessOrder)
afterNodeAccess(e);
return e.value;
}

void afterNodeAccess(Node<K,V> e) { // move node to last
LinkedHashMap.Entry<K,V> last;
if (accessOrder && (last = tail) != e) {
LinkedHashMap.Entry<K,V> p =
(LinkedHashMap.Entry<K,V>)e, b = p.before, a = p.after;
p.after = null;
if (b == null)
head = a;
else
b.after = a;
if (a != null)
a.before = b;
else
last = b;
if (last == null)
head = p;
else {
p.before = last;
last.after = p;
}
tail = p;
++modCount;
}
}

总结

  • LinkedHashMap是继承于HashMap,是基于HashMap和双向链表来实现的。
  • HashMap无序;LinkedHashMap有序,可分为插入顺序和访问顺序两种。
  • LinkedHashMap是线程不安全的

WeakHashMap

如果map里面的key只有map本身引用时,就会将key对应的Entry清除掉。主要是因为Entry继承了WeakReference类,是弱引用。

ConcurrentSkipListMap(ToDo)

TreeMap

美团技术:HashMap in JDK8

ConcurrentHashMap in JDK7

ConcurrentHashMap in JDK8

ConcurrentHashMap in JDK8 transfer method

LinkedHashMap