当前位置: 首页 > 编程笔记 >

Java从同步容器到并发容器的操作过程

曾光誉
2023-03-14
本文向大家介绍Java从同步容器到并发容器的操作过程,包括了Java从同步容器到并发容器的操作过程的使用技巧和注意事项,需要的朋友参考一下

引言

容器是Java基础类库中使用频率最高的一部分,Java集合包中提供了大量的容器类来帮组我们简化开发,我前面的文章中对Java集合包中的关键容器进行过一个系列的分析,但这些集合类都是非线程安全的,即在多线程的环境下,都需要其他额外的手段来保证数据的正确性,最简单的就是通过synchronized关键字将所有使用到非线程安全的容器代码全部同步执行。这种方式虽然可以达到线程安全的目的,但存在几个明显的问题:首先编码上存在一定的复杂性,相关的代码段都需要添加锁。其次这种一刀切的做法在高并发情况下性能并不理想,基本相当于串行执行。JDK1.5中为我们提供了一系列的并发容器,集中在java.util.concurrent包下,用来解决这两个问题,先从同步容器说起。

同步容器Vector和HashTable

为了简化代码开发的过程,早期的JDK在java.util包中提供了Vector和HashTable两个同步容器,这两个容器的实现和早期的ArrayList和HashMap代码实现基本一样,不同在于Vector和HashTable在每个方法上都添加了synchronized关键字来保证同一个实例同时只有一个线程能访问,部分源码如下:

//Vector
public synchronized int size() {};
public synchronized E get(int index) {};
//HashTable 
public synchronized V put(K key, V value) {};
public synchronized V remove(Object key) {};

通过对每个方法添加synchronized,保证了多次操作的串行。这种方式虽然使用起来方便了,但并没有解决高并发下的性能问题,与手动锁住ArrayList和HashMap并没有什么区别,不论读还是写都会锁住整个容器。其次这种方式存在另一个问题:当多个线程进行复合操作时,是线程不安全的。可以通过下面的代码来说明这个问题:

public static void deleteVector(){
 int index = vectors.size() - 1;
 vectors.remove(index);
}

代码中对Vector进行了两步操作,首先获取size,然后移除最后一个元素,多线程情况下如果两个线程交叉执行,A线程调用size后,B线程移除最后一个元素,这时A线程继续remove将会抛出索引超出的错误。

那么怎么解决这个问题呢?最直接的修改方案就是对代码块加锁来防止多线程同时执行:

public static void deleteVector(){
 synchronized (vectors) {
  int index = vectors.size() - 1;
  vectors.remove(index);
 }
}

如果上面的问题通过加锁来解决没有太直观的影响,那么来看看对vectors进行迭代的情况:

public static void foreachVector(){
 synchronized (vectors) {
  for (int i = 0; i < vectors.size(); i++) {
   System.out.println(vectors.get(i).toString());
  }
 }
}

为了避免多线程情况下在迭代的过程中其他线程对vectors进行了修改,就不得不对整个迭代过程加锁,想象这么一个场景,如果迭代操作非常频繁,或者vectors元素很大,那么所有的修改和读取操作将不得不在锁外等待,这将会对多线程性能造成极大的影响。那么有没有什么方式能够很好的对容器的迭代操作和修改操作进行分离,在修改时不影响容器的迭代操作呢?这就需要java.util.concurrent包中的各种并发容器了出场了。

并发容器CopyOnWrite

CopyOnWrite--写时复制容器是一种常用的并发容器,它通过多线程下读写分离来达到提高并发性能的目的,和前面我们讲解StampedLock时所用的解决方案类似:任何时候都可以进行读操作,写操作则需要加锁。不同的是,在CopyOnWrite中,对容器的修改操作加锁后,通过copy一个新的容器来进行修改,修改完毕后将容器替换为新的容器即可。

这种方式的好处显而易见:通过copy一个新的容器来进行修改,这样读操作就不需要加锁,可以并发读,因为在读的过程中是采用的旧的容器,即使新容器做了修改对旧容器也没有影响,同时也很好的解决了迭代过程中其他线程修改导致的并发问题。

JDK中提供的并发容器包括CopyOnWriteArrayList和CopyOnWriteArraySet,下面通过CopyOnWriteArrayList的部分源码来理解这种思想:

//添加元素
public boolean add(E e) {
 //独占锁
 final ReentrantLock lock = this.lock;
 lock.lock();
 try {
  Object[] elements = getArray();
  int len = elements.length;
  //复制一个新的数组newElements
  Object[] newElements = Arrays.copyOf(elements, len + 1);
  newElements[len] = e;
  //修改后指向新的数组
  setArray(newElements);
  return true;
 } finally {
  lock.unlock();
 }
}
public E get(int index) {
 //未加锁,直接获取
 return get(getArray(), index);
}

代码很简单,在add操作中通过一个共享的ReentrantLock来获取锁,这样可以防止多线程下多个线程同时修改容器内容。获取锁后通过Arrays.copyOf复制了一个新的容器,然后对新的容器进行了修改,最后直接通过setArray将原数组引用指向了新的数组,避免了在修改过程中迭代数据出现错误。get操作由于是读操作,未加锁,直接读取就行。

CopyOnWriteArraySet类似,这里不做过多讲解。

CopyOnWrite容器虽然在多线程下使用是安全的,相比较Vector也大大提高了读写的性能,但它也有自身的问题。

首先就是性能,在讲解ArrayList的文章中提到过,ArrayList的扩容由于使用了Arrays.copyOf每次都需要申请更大的空间以及复制现有的元素到新的数组,对性能存在一定影响。CopyOnWrite容器也不例外,每次修改操作都会申请新的数组空间,然后进行替换。所以在高并发频繁修改容器的情况下,会不断申请新的空间,同时会造成频繁的GC,这时使用CopyOnWrite容器并不是一个好的选择。

其次还有一个数据一致性问题,由于在修改中copy了新的数组进行替换,同时旧数组如果还在被使用,那么新的数据就不能被及时读取到,这样就造成了数据不一致,如果需要强数据一致性,CopyOnWrite容器也不太适合。

并发容器ConcurrentHashMap

ConcurrentHashMap容器相较于CopyOnWrite容器在并发加锁粒度上有了更大一步的优化,它通过修改对单个hash桶元素加锁的达到了更细粒度的并发控制。在了解ConcurrentHashMap容器之前,推荐大家先阅读我之前对HashMap源码分析的文章--Java集合(5)一 HashMap与HashSet,因为在底层数据结构上,ConcurrentHashMap和HashMap都使用了数组+链表+红黑树的方式,只是在HashMap的基础上添加了并发相关的一些控制,所以这里只对ConcurrentHashMap中并发相关代码做一些分析。

 还是先从ConcurrentHashMap的写操作开始,这里就是put方法:

final V putVal(K key, V value, boolean onlyIfAbsent) {
 if (key == null || value == null) throw new NullPointerException();
 int hash = spread(key.hashCode()); //计算桶的hash值
 int binCount = 0;
 //循环插入元素,避免并发插入失败
 for (Node<K,V>[] tab = table;;) {
  Node<K,V> f; int n, i, fh;
  if (tab == null || (n = tab.length) == 0)
   tab = initTable();
  else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
   //如果当前桶无元素,则通过cas操作插入新节点
   if (casTabAt(tab, i, null,
       new Node<K,V>(hash, key, value, null)))
    break;     
  }
  //如果当前桶正在扩容,则协助扩容
  else if ((fh = f.hash) == MOVED)
   tab = helpTransfer(tab, f);
  else {
   V oldVal = null;
   //hash冲突时锁住当前需要添加节点的头元素,可能是链表头节点或者红黑树的根节点
   synchronized (f) { 
    if (tabAt(tab, i) == f) {
     if (fh >= 0) {
      binCount = 1;
      for (Node<K,V> e = f;; ++binCount) {
       K ek;
       if (e.hash == hash &&
        ((ek = e.key) == key ||
         (ek != null && key.equals(ek)))) {
        oldVal = e.val;
        if (!onlyIfAbsent)
         e.val = value;
        break;
       }
       Node<K,V> pred = e;
       if ((e = e.next) == null) {
        pred.next = new Node<K,V>(hash, key,
               value, null);
        break;
       }
      }
     }
     else if (f instanceof TreeBin) {
      Node<K,V> p;
      binCount = 2;
      if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
              value)) != null) {
       oldVal = p.val;
       if (!onlyIfAbsent)
        p.val = value;
      }
     }
    }
   }
   if (binCount != 0) {
    if (binCount >= TREEIFY_THRESHOLD)
     treeifyBin(tab, i);
    if (oldVal != null)
     return oldVal;
    break;
   }
  }
 }
 addCount(1L, binCount);
 return null;
}

在put元素的过程中,有几个并发处理的关键点:

•如果当前桶对应的节点还没有元素插入,通过典型的无锁cas操作尝试插入新节点,减少加锁的概率,并发情况下如果插入不成功,很容易想到自旋,也就是for (Node<K,V>[] tab = table;;)。

•如果当前桶正在扩容,则协助扩容((fh = f.hash) == MOVED)。这里是一个重点,ConcurrentHashMap的扩容和HashMap不一样,它在多线程情况下或使用多个线程同时扩容,每个线程扩容指定的一部分hash桶,当前线程扩容完指定桶之后会继续获取下一个扩容任务,直到扩容全部完成。扩容的大小和HashMap一样,都是翻倍,这样可以有效减少移动的元素数量,也就是使用2的幂次方的原因,在HashMap中也一样。

•在发生hash冲突时仅仅只锁住当前需要添加节点的头元素即可,可能是链表头节点或者红黑树的根节点,其他桶节点都不需要加锁,大大减小了锁粒度。

通过ConcurrentHashMap添加元素的过程,知道了ConcurrentHashMap容器是通过CAS + synchronized一起来实现并发控制的。这里有个额外的问题:为什么使用synchronized而不使用ReentrantLock?前面我的文章也对synchronized以及ReentrantLock的实现方式和性能做过分析,在这里我的理解是synchronized在后期优化空间上比ReentrantLock更大。

并发容器ConcurrentSkipListMap

java.util中对应的容器在java.util.concurrent包中基本都可以找到对应的并发容器:List和Set有对应的CopyOnWriteArrayList与CopyOnWriteArraySet,HashMap有对应的ConcurrentHashMap,但是有序的TreeMap或并没有对应的ConcurrentTreeMap。

为什么没有ConcurrentTreeMap呢?这是因为TreeMap内部使用了红黑树来实现,红黑树是一种自平衡的二叉树,当树被修改时,需要重新平衡,重新平衡操作可能会影响树的大部分节点,如果并发量非常大的情况下,这就需要在许多树节点上添加互斥锁,那并发就失去了意义。所以提供了另外一种并发下的有序map实现:ConcurrentSkipListMap。

ConcurrentSkipListMap内部使用跳表(SkipList)这种数据结构来实现,他的结构相对红黑树来说非常简单理解,实现起来也相对简单,而且在理论上它的查找、插入、删除时间复杂度都为log(n)。在并发上,ConcurrentSkipListMap采用无锁的CAS+自旋来控制。

跳表简单来说就是一个多层的链表,底层是一个普通的链表,然后逐层减少,通常通过一个简单的算法实现每一层元素是下一层的元素的二分之一,这样当搜索元素时从最顶层开始搜索,可以说是另一种形式的二分查找。

一个简单的获取跳表层数概率算法实现如下:

int random_level() { 
 K = 1; 
 while (random(0,1)) 
  K++; 
 
 return K; 
} 

通过简单的0和1获取概率,1层的概率为50%,2层的概率为25%,3层的概率为12.5%,这样逐级递减。

一个三层的跳表添加元素的过程如下:

 插入值为15的节点:

 插入后:

 维基百科中有一个添加节点的动图,这里也贴出来方便理解:

通过分析ConcurrentSkipListMap的put方法来理解跳表以及CAS自旋并发控制:

private V doPut(K key, V value, boolean onlyIfAbsent) {
 Node<K,V> z;    // added node
 if (key == null)
  throw new NullPointerException();
 Comparator<? super K> cmp = comparator;
 outer: for (;;) {
  for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) { //查找前继节点
   if (n != null) { //查找到前继节点
    Object v; int c;
    Node<K,V> f = n.next; //获取后继节点的后继节点
    if (n != b.next) //发生竞争,两次节点获取不一致,并发导致
     break;
    if ((v = n.value) == null) { // 节点已经被删除
     n.helpDelete(b, f);
     break;
    }
    if (b.value == null || v == n) 
     break;
    if ((c = cpr(cmp, key, n.key)) > 0) { //进行下一轮查找,比当前key大
     b = n;
     n = f;
     continue;
    }
    if (c == 0) { //相等时直接cas修改值
     if (onlyIfAbsent || n.casValue(v, value)) {
      @SuppressWarnings("unchecked") V vv = (V)v;
      return vv;
     }
     break; // restart if lost race to replace value
    }
    // else c < 0; fall through
   }
   z = new Node<K,V>(key, value, n); //9. n.key > key > b.key
   if (!b.casNext(n, z)) //cas修改值 
    break;   // restart if lost race to append to b
   break outer;
  }
 }
 int rnd = ThreadLocalRandom.nextSecondarySeed(); //获取随机数
 if ((rnd & 0x80000001) == 0) { // test highest and lowest bits
  int level = 1, max;
  while (((rnd >>>= 1) & 1) != 0) // 获取跳表层级
   ++level;
  Index<K,V> idx = null;
  HeadIndex<K,V> h = head;
  if (level <= (max = h.level)) { //如果获取的调表层级小于等于当前最大层级,则直接添加,并将它们组成一个上下的链表
   for (int i = 1; i <= level; ++i)
    idx = new Index<K,V>(z, idx, null);
  }
  else { // try to grow by one level //否则增加一层level,在这里体现为Index<K,V>数组
   level = max + 1; // hold in array and later pick the one to use
   @SuppressWarnings("unchecked")Index<K,V>[] idxs =
    (Index<K,V>[])new Index<?,?>[level+1];
   for (int i = 1; i <= level; ++i)
    idxs[i] = idx = new Index<K,V>(z, idx, null);
   for (;;) {
    h = head;
    int oldLevel = h.level;
    if (level <= oldLevel) // lost race to add level
     break;
    HeadIndex<K,V> newh = h;
    Node<K,V> oldbase = h.node;
    for (int j = oldLevel+1; j <= level; ++j) //新添加的level层的具体数据
     newh = new HeadIndex<K,V>(oldbase, newh, idxs[j], j);
    if (casHead(h, newh)) {
     h = newh;
     idx = idxs[level = oldLevel];
     break;
    }
   }
  }
  // 逐层插入数据过程
  splice: for (int insertionLevel = level;;) {
   int j = h.level;
   for (Index<K,V> q = h, r = q.right, t = idx;;) {
    if (q == null || t == null)
     break splice;
    if (r != null) {
     Node<K,V> n = r.node;
     // compare before deletion check avoids needing recheck
     int c = cpr(cmp, key, n.key);
     if (n.value == null) {
      if (!q.unlink(r))
       break;
      r = q.right;
      continue;
     }
     if (c > 0) {
      q = r;
      r = r.right;
      continue;
     }
    }
    if (j == insertionLevel) {
     if (!q.link(r, t))
      break; // restart
     if (t.node.value == null) {
      findNode(key);
      break splice;
     }
     if (--insertionLevel == 0)
      break splice;
    }
    if (--j >= insertionLevel && j < level)
     t = t.down;
    q = q.down;
    r = q.right;
   }
  }
 }
 return null;
}

这里的插入方法很复杂,可以分为3大步来理解:第一步获取前继节点后通过CAS来插入节点;第二步对level层数进行判断,如果大于最大层数,则插入一层;第三步插入对应层的数据。整个插入过程全部通过CAS自旋的方式保证并发情况下的数据正确性。

总结

JDK中提供了丰富的并发容器供我们使用,文章中介绍的也并不全面,重点是要通过了解各种并发容器的原理,明白他们各自独特的使用场景。这里简单做个总结:当并发读远多于修改的场景下需要使用List和Set时,可以考虑使用CopyOnWriteArrayList和CopyOnWriteArraySet;当需要并发使用<Key, Value>键值对存取数据时,可以使用ConcurrentHashMap;当要保证并发<Key, Value>键值对有序时可以使用ConcurrentSkipListMap。

以上所述是小编给大家介绍的Java从同步容器到并发容器的操作过程,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对小牛知识库网站的支持!

 类似资料:
  • 容器是 Docker 又一核心概念。 简单的说,容器是独立运行的一个或一组应用,以及它们的运行态环境。对应的,虚拟机可以理解为模拟运行的一整套操作系统(提供了运行态环境和其他系统环境)和跑在上面的应用。 本章将具体介绍如何来管理一个容器,包括创建、启动和停止等。

  • 一个容器是一种用来存储数据对象的机制。一个帐户可 以有很多容器,但容器名称必须是唯一的。这个API允 许客户端创建一个容器,设置访问控制和元数据,检索 一个容器的内容,和删除一个容器。因为这个 API 的 请求涉及到一个特定用户的帐户相关信息,因此在这个 API 中的所有请求都必须经过身份验证,除非一个容器 的访问控制故意设置为公开访问。(即允许匿名的请求)。 Note Amazon S3 API

  • 本节解释了关于Hana容器的几个重要概念:如何创建它们,元素的生命周期和其他问题。 创建容器 虽然在C++中创建对象的通常方式是使用它的构造函数,但异构编程使事情更复杂。 实际上,在大多数情况下,人们对(或甚至不知道)要创建的异构容器的实际类型不感兴趣。 另一方面,虽然可以明确地写出该类型,但是这样做是多余的或繁琐的。 因此,Hana使用从std::make_tuple借用的不同方法来创建新容器。

  • 本章主要内容 等待事件 带有期望的等待一次性事件 在限定时间内等待 使用同步操作简化代码 在上一章中,我们看到各种在线程间保护共享数据的方法。当你不仅想要保护数据,还想对单独的线程进行同步。例如,在第一个线程完成前,可能需要等待另一个线程执行完成。通常情况下,线程会等待一个特定事件的发生,或者等待某一条件达成(为true)。这可能需要定期检查“任务完成”标识,或将类似的东西放到共享数据中,但这与理

  • 我有一个docker-composer.yml,它正在设置两个服务:和。作为服务的node.js服务器使用连接到PostgreSQL数据库;而服务是一个PostgreSQL映像。 在服务器启动时,它尝试连接到数据库,但获得超时。 服务输出: -由服务中的命令使用。 服务输出: 从主机运行的脚本的输出: 此输出表示连接成功。 我无法从容器到达容器,因为连接超时,但我可以从主机计算机到达容器,因为连接

  • 假设我们使用GitHub操作构建并发布应用程序的容器映像。我将选择ASP.NET Core作为应用程序的技术堆栈,尽管这不太重要。 我想讨论两种不同的方法: 1.“外部构建”:在GitHub Actions runner中构建/编译应用程序,将输出复制到容器映像中 例如,我们的GitHub操作工作流文件可能如下所示... ...有一个简单的Dockerfile像这样: 2.“内置”:内置一个容器,