当前位置: 首页 > 工具软件 > JUCA > 使用案例 >

学习juca:Striped64(1.8)

单于亮
2023-12-01

Striped64类

Striped64是java1.8 juca中新增的多个累加器类的基础类。它的基本思想其实与并发数据结构的发展息息相关:

  • 最原始的并发数据结构使用粗粒度的阻塞锁。如HashTable,直接将并行转换为串行,性能很差(在那个时代,它并没有错,“需要优化时才进行优化”)。
  • 然后的想法是改进锁的粒度,仍然使用阻塞锁,但对加锁范围进行限定。如ConcurrentHashMap(1.6,1.7),使用了与CPU个数有关的桶,每个桶单独一个锁,但线程增多,锁争用的情况仍然明显。
  • 再然后是抛弃阻塞锁,因为阻塞锁的睡眠唤醒机制需要陷入内核,直接使用底层的硬件机制如CAS,进行无锁同步。如AtomicXXX系列类。
  • 现在是避免线程争用,无论是锁还是CAS,如本文要探讨的Striped64,以及ConcurrentHashMap(1.8)。让每个线程尽量使用不同的单元,从而避免争用导致的性能下降。

当然具体情况还要具体分析,阻塞锁,无锁(CAS,自旋锁),避免线程争用(单元个数)都需要在考虑实际的情况下使用,比如:

  • 阻塞锁,阻塞锁实在自旋锁的基础上发明的,是为了解决多个等待线程自旋所造成的CPU浪费,而引入了等待线程队列、wait()和awake()系统原语。如果临界区内的工作任务较重,就比较适合使用阻塞锁而非自选锁。比如ConcurrentHashMap。
  • 无锁,采用CPU为实现锁提供的底层机制,是对锁使用的一种回调。如果临界区内的工作任务较轻,如只是加减数,就比较适合使用无锁。比如AtomicXXX。
  • 线程争用。ConcurrentHashMap(1.8)因为其结构性质,天然适合对每个插槽单独进行同步处理,因此它的同步基本单元可以近乎无限增长(当然存在上限,但一般不用考虑),而与CPU个数大小没有太大关系。Striped64却没必要这么做,过度的分散并不会明显的增长性能,反而影响汇总操作。

设计思想

具体的设计思想可以参考其注释:

这个类维护一个懒惰初始化的原子更新变量的表,外加一个额外的“base”字段。table的大小是2的幂。索引使用掩码下的每个线程的hash code。这个类中几乎所有的声明都是包私有的,由子类直接访问。

表项是Cell类,AtomicLong填充(通过@sun.misc.Contended)的一个变体,以减少缓存争用。对于大多数原子类来说,填充是多余的,因为它们通常不规则地分散在内存中,因此不会相互干扰。但是驻留在数组中的原子对象将倾向于彼此相邻放置,因此在没有这种预防措施的情况下,最常见的情况是共享缓存行(具有巨大的负面性能影响)。

因为Cells相对较大,所以我们避免创建Cells,直到需要它们。当没有争用时,将对base字段进行所有更新。第一次争用时(base更新时CAS失败),该表被初始化为大小2。在进一步竞争时,表的大小加倍,直到达到大于或等于CPU数量的2的最近幂。表插槽保持为空(null),直到需要为止。

单个自旋锁(“cellsBusy”)用于初始化表,调整其大小,以及用新的Cells填充插槽。不需要阻塞锁;当锁不可用时,线程会尝试其他插槽(或base)。在这些重试过程中,竞争加剧,局部性降低,但这仍然优于替代方案。

通过ThreadLocalRandom维护的Thread.probe字段用作每个线程的哈希代码。我们让它们保持未初始化(为零)(如果它们以这种方式出现),直到它们在插槽0竞争。然后将它们初始化为通常不会与其他值冲突的值。执行更新操作时,竞争和/或表冲突由失败的情况指示。发生冲突时,如果表的大小小于容量,那么它的大小将加倍,除非其他线程持有锁。如果散列槽为空,并且锁可用,则会创建一个新的Cells。否则,如果插槽存在,将尝试CAS。重试通过“双散列”进行,使用二级散列(Marsaglia XorShift)来尝试找到空闲插槽。

表的大小是有上限的,因为当线程多于CPU时,假设每个线程都绑定到一个CPU,就会有一个完美的散列函数将线程映射到插槽,从而消除冲突。当我们达到容量时,我们通过随机改变冲突线程的哈希代码来搜索这个映射。因为搜索是随机的,冲突只有通过CAS失败才知道,收敛可能会很慢,而且因为线程通常不会永远绑定到CPU,所以根本不会发生。然而,尽管有这些限制,在这些情况下观察到的竞争率通常很低。

当一个Cell的散列到它的线程一旦终止时,以及在表大小加倍导致没有线程在扩展掩码下散列到它的情况下,该Cell可能会变成未使用。我们不会试图检测或移除这些Cell,因为考虑对于长期运行的情况,观察到的竞争级别会再次出现,因此最终会再次需要这些Cell。短时间的未使用,并不重要。

具体实现

Striped64类的核心是其定义了进行数据更新的逻辑方法longAccumulatedoubleAccumulate方法,后者方法是对前者方法的复制和修改,因此这里只介绍前一个方法。

longAccumulate本身的逻辑并不复杂,只是因为使用无锁,使用了死循环,并包含了大量的检测代码,因为在理解过程中,一般不需要对具体的无锁技巧进行深度理解(如果水平到了也可以这样做),只需要结合其设计思想,理解设计思想即可。下面是注释的源码:

    /**
     处理涉及初始化、调整大小、创建Cell,和/或争用的更新情况。请参见上面的解释。
     这种方法存在乐观重试代码的常见非模块化问题,依赖于重新检查的读取集。
     *
     * @param x 值
     * @param fn 更新函数,如果为null,则默认为add
     * @param 如果在调用前CAS已经失败
     */
    final void longAccumulate(long x, LongBinaryOperator fn,
                              boolean wasUncontended) {
        int h;
        if ((h = getProbe()) == 0) {//如果未初始化hash,初始化
            ThreadLocalRandom.current();
            h = getProbe();
            wasUncontended = true;//因为未初始化的hash,不算CAS失败
        }
        boolean collide = false;                //记录是否发生了碰撞,如果上个插槽不为空,为true
        for (;;) {
            Cell[] as; Cell a; int n; long v;
            if ((as = cells) != null && (n = as.length) > 0) {//普遍情况,cells不为空
                if ((a = as[(n - 1) & h]) == null) {//cell为空
                    if (cellsBusy == 0) {       //锁没有被使用
                        Cell r = new Cell(x);   //乐观的创建,其实是将创建代码取出临界区,提高性能
                        if (cellsBusy == 0 && casCellsBusy()) {//加锁成功
                            boolean created = false;
                            try {               //重新检查,并尝试插入
                                Cell[] rs; int m, j;
                                if ((rs = cells) != null &&
                                    (m = rs.length) > 0 &&
                                    rs[j = (m - 1) & h] == null) {
                                    rs[j] = r;
                                    created = true;
                                }
                            } finally {
                                cellsBusy = 0;
                            }
                            if (created)//插入成功,退出循环(方法)
                                break;
                            continue;//失败,可能多种原因,再循环
                        }
                    }
                    collide = false;//获取锁失败,认为没有碰撞,rehash后再循环
                }
                else if (!wasUncontended)       //CAS已经失败,rehash后再循环
                    wasUncontended = true;
                else if (a.cas(v = a.value, ((fn == null) ? v + x ://尝试CAS,如果成功退出循环(方法)
                                             fn.applyAsLong(v, x))))
                    break;
                else if (n >= NCPU || cells != as)//cells已经最大,或者cells大小发生了调整
                    collide = false;            //不需要调整大小,rehash后再循环
                else if (!collide)//更新碰撞状态,要求下次调整大小,rehash后再循环
                    collide = true;
                else if (cellsBusy == 0 && casCellsBusy()) {//加锁,失败,rehash后再循环
                    try {
                        //尝试调整大小,可能已经被调整
                        if (cells == as) { 
                            Cell[] rs = new Cell[n << 1];
                            for (int i = 0; i < n; ++i)
                                rs[i] = as[i];
                            cells = rs;
                        }
                    } finally {
                        cellsBusy = 0;
                    }
                    collide = false;           //更新碰撞状态
                    continue;                   //再循环
                }
                h = advanceProbe(h);//rehash
            }
            else if (cellsBusy == 0 && cells == as && casCellsBusy()) {//初始化,失败了再循环
                boolean init = false;
                try {                           // Initialize table
                    if (cells == as) {
                        Cell[] rs = new Cell[2];
                        rs[h & 1] = new Cell(x);
                        cells = rs;
                        init = true;
                    }
                } finally {
                    cellsBusy = 0;
                }
                if (init)
                    break;
            }
            else if (casBase(v = base, ((fn == null) ? v + x ://回退到使用base,成功退出循环(方法),失败了再循环
                                        fn.applyAsLong(v, x))))
                break; 
        }
    }

其它知识点

伪共享

在设计思想中,提到了通过@sun.misc.Contended进行填充,以防止缓存行共享问题,这是与cpu cache相关的问题,如果只是了解,可以参考这篇文章[^1]。如果你看的恍恍惚惚,最好还是去系统的学习一下计算机体系结构,以及看看《深入理解计算机系统》。

双散列,完美散列

散列表相关的专业术语,可以看看《算法导论》或是《数据结构与算法分析》了解下散列表的高级知识。

Marsaglia XorShift

伪随机数算法[^2],上面两本书同样适合阅读。

[^1] Java专家系列:CPU Cache与高性能编程 https://blog.csdn.net/karamos/article/details/80126704
[^2] https://en.wikipedia.org/wiki/Xorshift

转载于:https://www.cnblogs.com/redreampt/p/9445540.html

 类似资料: