Java基础 - ConcurrentHashmap

优质
小牛编辑
127浏览
2023-12-01

JDK1.7

ConcurrentHashMap的锁分段技术:假如容器里有多把锁,每一把锁用于锁容器其中一部分数据,那么当多线程访问容器里不同数据段的数据时,线程间就不会存在锁竞争,从而可以有效的提高并发访问效率,这就是ConcurrentHashMap所使用的锁分段技术。首先将数据分成一段一段的存储,然后给每一段数据配一把锁,当一个线程占用锁访问其中一个段数据的时候,其他段的数据也能被其他线程访问。

ConcurrentHashMap不允许Key或者Value的值为NULL

ConcurrentHashmap - 图1

Segment类

Put

将一个HashEntry放入到该Segment中,使用自旋机制,减少了加锁的可能性。

  1. final V put(K key, int hash, V value, boolean onlyIfAbsent) {
  2. HashEntry<K,V> node = tryLock() ? null :
  3. scanAndLockForPut(key, hash, value); //如果加锁失败,则调用该方法
  4. V oldValue;
  5. try {
  6. HashEntry<K,V>[] tab = table;
  7. int index = (tab.length - 1) & hash; //同hashMap相同的哈希定位方式
  8. HashEntry<K,V> first = entryAt(tab, index);
  9. for (HashEntry<K,V> e = first;;) {
  10. if (e != null) {
  11. //若不为null,则持续查找,知道找到key和hash值相同的节点,将其value更新
  12. K k;
  13. if ((k = e.key) == key ||
  14. (e.hash == hash && key.equals(k))) {
  15. oldValue = e.value;
  16. if (!onlyIfAbsent) {
  17. e.value = value;
  18. ++modCount;
  19. }
  20. break;
  21. }
  22. e = e.next;
  23. }
  24. else { //若头结点为null
  25. if (node != null) //在遍历key对应节点链时没有找到相应的节点
  26. node.setNext(first);
  27. //当前修改并不需要让其他线程知道,在锁退出时修改自然会
  28. //更新到内存中,可提升性能
  29. else
  30. node = new HashEntry<K,V>(hash, key, value, first);
  31. int c = count + 1;
  32. if (c > threshold && tab.length < MAXIMUM_CAPACITY)
  33. rehash(node); //如果超过阈值,则进行rehash操作
  34. else
  35. setEntryAt(tab, index, node);
  36. ++modCount;
  37. count = c;
  38. oldValue = null;
  39. break;
  40. }
  41. }
  42. } finally {
  43. unlock();
  44. }
  45. return oldValue;
  46. }

scanAndLockForPut

该操作持续查找key对应的节点链中是否已存在该节点,如果没有找到已存在的节点,则预创建一个新节点,并且尝试n次,直到尝试次数超出限制,才真正进入等待状态,即所谓的 自旋等待

  1. private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
  2. //根据hash值找到segment中的HashEntry节点
  3. HashEntry<K,V> first = entryForHash(this, hash); //首先获取头结点
  4. HashEntry<K,V> e = first;
  5. HashEntry<K,V> node = null;
  6. int retries = -1; // negative while locating node
  7. while (!tryLock()) { //持续遍历该哈希链
  8. HashEntry<K,V> f; // to recheck first below
  9. if (retries < 0) {
  10. if (e == null) {
  11. if (node == null) //若不存在要插入的节点,则创建一个新的节点
  12. node = new HashEntry<K,V>(hash, key, value, null);
  13. retries = 0;
  14. }
  15. else if (key.equals(e.key))
  16. retries = 0;
  17. else
  18. e = e.next;
  19. }
  20. else if (++retries > MAX_SCAN_RETRIES) {
  21. //尝试次数超出限制,则进行自旋等待
  22. lock();
  23. break;
  24. }
  25. /*当在自旋过程中发现节点链的链头发生了变化,则更新节点链的链头,
  26. 并重置retries值为-1,重新为尝试获取锁而自旋遍历*/
  27. else if ((retries & 1) == 0 &&
  28. (f = entryForHash(this, hash)) != first) {
  29. e = first = f; // re-traverse if entry changed
  30. retries = -1;
  31. }
  32. }
  33. return node;
  34. }

remove

用于移除某个节点,返回移除的节点值。

  1. final V remove(Object key, int hash, Object value) {
  2. if (!tryLock())
  3. scanAndLock(key, hash);
  4. V oldValue = null;
  5. try {
  6. HashEntry<K,V>[] tab = table;
  7. int index = (tab.length - 1) & hash;
  8. //根据这种哈希定位方式来定位对应的HashEntry
  9. HashEntry<K,V> e = entryAt(tab, index);
  10. HashEntry<K,V> pred = null;
  11. while (e != null) {
  12. K k;
  13. HashEntry<K,V> next = e.next;
  14. if ((k = e.key) == key ||
  15. (e.hash == hash && key.equals(k))) {
  16. V v = e.value;
  17. if (value == null || value == v || value.equals(v)) {
  18. if (pred == null)
  19. setEntryAt(tab, index, next);
  20. else
  21. pred.setNext(next);
  22. ++modCount;
  23. --count;
  24. oldValue = v;
  25. }
  26. break;
  27. }
  28. pred = e;
  29. e = next;
  30. }
  31. } finally {
  32. unlock();
  33. }
  34. return oldValue;
  35. }

Clear

要首先对整个segment加锁,然后将每一个HashEntry都设置为null

  1. final void clear() {
  2. lock();
  3. try {
  4. HashEntry<K,V>[] tab = table;
  5. for (int i = 0; i < tab.length ; i++)
  6. setEntryAt(tab, i, null);
  7. ++modCount;
  8. count = 0;
  9. } finally {
  10. unlock();
  11. }
  12. }

Put

  1. public V put(K key, V value) {
  2. Segment<K,V> s;
  3. if (value == null)
  4. throw new NullPointerException();
  5. int hash = hash(key); //求出key的hash值
  6. int j = (hash >>> segmentShift) & segmentMask;
  7. //求出key在segments数组中的哪一个segment中
  8. if ((s = (Segment<K,V>)UNSAFE.getObject
  9. (segments, (j << SSHIFT) + SBASE)) == null)
  10. s = ensureSegment(j); //使用unsafe操作取出该segment
  11. return s.put(key, hash, value, false); //向segment中put元素
  12. }

Get

  1. public V get(Object key) {
  2. Segment<K,V> s;
  3. HashEntry<K,V>[] tab;
  4. int h = hash(key); //找出对应的segment的位置
  5. long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
  6. if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
  7. (tab = s.table) != null) { //使用Unsafe获取对应的Segmen
  8. for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
  9. (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
  10. e != null; e = e.next) { //找出对应的HashEntry,从头开始遍历
  11. K k;
  12. if ((k = e.key) == key || (e.hash == h && key.equals(k)))
  13. return e.value;
  14. }
  15. }
  16. return null;
  17. }

Size

求出所有的HashEntry的数目,先尝试的遍历查找、计算2遍,如果两遍遍历过程中整个Map没有发生修改(即两次所有Segment实例中modCount值的和一致),则可以认为整个查找、计算过程中Map没有发生改变。否则,需要对所有segment实例进行加锁、计算、解锁,然后返回。

  1. public int size() {
  2. final Segment<K,V>[] segments = this.segments;
  3. int size;
  4. boolean overflow; // true if size overflows 32 bits
  5. long sum; // sum of modCounts
  6. long last = 0L; // previous sum
  7. int retries = -1; // first iteration isn't retry
  8. try {
  9. for (;;) {
  10. if (retries++ == RETRIES_BEFORE_LOCK) {
  11. for (int j = 0; j < segments.length; ++j)
  12. ensureSegment(j).lock(); // force creation
  13. }
  14. sum = 0L;
  15. size = 0;
  16. overflow = false;
  17. for (int j = 0; j < segments.length; ++j) {
  18. Segment<K,V> seg = segmentAt(segments, j);
  19. if (seg != null) {
  20. sum += seg.modCount;
  21. int c = seg.count;
  22. if (c < 0 || (size += c) < 0)
  23. overflow = true;
  24. }
  25. }
  26. if (sum == last)
  27. break;
  28. last = sum;
  29. }
  30. } finally {
  31. if (retries > RETRIES_BEFORE_LOCK) {
  32. for (int j = 0; j < segments.length; ++j)
  33. segmentAt(segments, j).unlock();
  34. }
  35. }
  36. return overflow ? Integer.MAX_VALUE : size;
  37. }

JDK1.8

在JDK1.8中对ConcurrentHashmap做了两个改进:

  • 取消segments字段,直接采用transient volatile HashEntry<K,V>[] table保存数据,采用table数组元素作为锁,从而实现了对每一行数据进行加锁,进一步减少并发冲突的概率

  • 将原先 table数组+单向链表 的数据结构,变更为 table数组+单向链表+红黑树 的结构。对于hash表来说,最核心的能力在于将key hash之后能均匀的分布在数组中。如果hash之后散列的很均匀,那么table数组中的每个队列长度主要为0或者1。但实际情况并非总是如此理想,虽然ConcurrentHashMap类默认的加载因子为0.75,但是在数据量过大或者运气不佳的情况下,还是会存在一些队列长度过长的情况,如果还是采用单向列表方式,那么查询某个节点的时间复杂度为O(n);因此,对于个数超过8(默认值)的列表,jdk1.8中采用了红黑树的结构,那么查询的时间复杂度可以降低到O(logN),可以改进性能。