# LongAdder原理及出现背景

# AtomicInteger、AtomicLong的弊端

要搞清楚AtomicInteger、AtomicInteger的弊端,先要弄明白,它们干了什么事情;

要搞清楚干了什么事情,即 AtomicInteger、AtomicInteger的原理;

要明白原理,那么最简单、粗暴、也高效地方式就是查阅源码啦,接下来,让我们走进源码

我们以AtomicInteger为例,通过源码查看,其核心代码如下:

image-20230117164109995

再去查看Unsafe.javagetAndAddInt的方法

image-20230117164128431

发现是个do-while循环,先根据对象及偏移量,获取对象在该偏移量的值,赋值给int v

然后执行 compareAndSwapInt(o, offset, v, newValue) 方法,(对象在该偏移量的值)与v进行比对

若相等,则(对象在该偏移量的地址)存放(v + delta)的值 返回 true

若不相等,则 返回 false;所以该方法就是将比对值,然后再交换值,而加上do-while就是比对不成功后,再重试的操作

我们主要关注 compareAndSwapInt 的内部实现,于是再跟进去

image-20230117164147181

发现该方法是个native方法,用到了JNI的规范,于是再源码中需要找到Unsafe.cpp文件 查找compareAndSwapInt

image-20230117164213668

oop p = JNIHandles::resolve(obj); // 将jobject obj 强转为了 oop p

image-20230117164239588

jint* addr = (jint *) index_oop_from_field_offset_long(p, offset); // 获取对象的首地址+偏移地址 绝对地址

(jint)(Atomic::cmpxchg(x, addr, e)) == e要弄明白这个,要先弄明白 Atomic::cmpxchg(x, addr, e),于是,跟进去

image-20230117164319508

原来它这个内联汇编inline,于是按照内联汇编的语法,我们将输出项、输入项逐一解释;

输出项:<: "=a" (exchange_value)>的意思是将使用eax寄存器作为输出结果,即:执行完该指令后,会将eax寄存器的值赋值到exchange_value,其位置表示为<%0>

接下来是输入项<: "r" (exchange_value), "a" (compare_value), "r" (dest), "r" (mp)>,其中<r>代表任意一个通用寄存器,<a>代表eax寄存器,按照顺序 依次的位置表示为<%1,%2,%3,%4>

于是将其待入:<LOCK_IF_MP(mp) "cmpxchgl exchange_value,(dest)">;``LOCK_IF_MP(mp)// 判断当前执行的平台是不是SMP架构的,若是则返回 lock; 加入lock指令前缀;所以我们关注 cmpxchg 指令到底干了什么事情即可

通过查看Intel开发手册,了解到该指令,是将eax中的值与首操作数比较:

1、如果相等,第2操作数的值装载到首操作数,zf置1。(相当于相减为0,所以0标志位 置位)

2、如果不等,首操作数的值装载到eax寄存器,并将zf清0

了解该指令后,对号入座即可,eax的值:compare_value,首操作数:(dest),第2操作数:exchange_value

compare_value(expected期望值)和(dest)(内存中的值)相等,说明没有其它线程将其变更,则exchange_value 赋值到(dest)即将 更新值 exchange_value 赋值到内存地址指向的值中,最后 eax的值compare_value 赋值到 exchange_value 返回,该值一定==e,所以返回true

compare_value(expected期望值)和(dest)(内存中的值)不等,说明有其它线程将其变更,则 (dest)赋值到eax中,最后 eax的值 (dest)赋值到 exchange_value返回,该值一定!=e,所以返回false

综上所述:得出如下总结

AtomicInteger的原理是多个线程竞争地访问lock;cmpxchg指令,但是lock指令前缀,确定了只能允许一个线程访问执行cmpxchg指令,若比对成功则更新值,比对失败则自旋地尝试下一次,直到比对成功,结束

至此,明白了AtomicInteger的原理后,再来分析一下它的弊端;也是CAS的弊端

随着CPU的核心数增多,大量的线程在等待着获取执行cmpxchg指令的锁,但是只能一个线程陷入进去,那么其它线程只能空转,可能造成CPU飙百的现象,这就是CAS的弊端,也是AtomicInteger的弊端;

那么如何规避这个弊端呢?这要了解lock指令的原理了,根据查阅Intel开发手册描述:

image-20230117164030146

所以得出结论:原子性操作加上lock指令前缀:

1、总线锁保证原子性 2、数据已在缓存行中,通过缓存一致性协议(MESI)保证其原子性,PS:一定是数据已在缓存行中,才会是缓存行锁,否则会退化为总线锁的 这就是lock指令的原理

通过了解了lock指令的原理,AtomicInteger实现流程,就可以画出在CPU、内存、缓存行层面的示意图,如下所示:

image-20230117163123687

# LongAdder 出现原因

由于弊端是多个线程去竞争一个变量A的修改,那么我们将变量A分为多个变量,然后打散到缓存行中,这样,多个线程获取不到一个变量的CAS操作,就会去竞争其它的变量的CAS操作,巧妙地分散了注意力,提高了并行度,那么这个值如何统计呢?其实很简单,就是将变量A与分出去的N个变量相加即可,这就是LongAdder出现的原因,如下图,将展示LongAdder 的实现流程的示意图

image-20230117164647565

既然,明白了这些推理过程,就需要验证所想,于是,去看LongAdder的实现过程

让我们愉快地看看它的源码吧

源码查看,先看看LongAdder的继承结构:LongAdder -> Striped64 -> Number

查看Striped64类,发现了Cell 内部类

image-20230117164829091

发现了这个Cell类,其实是将long value 包装了一下,然后提供cas方法对value进行CAS操作,原理和AtomicInteger 差不多,底层都是Intel指令cmpxchg指令,接下来,查看 LongAdder类中的add方法,看看它是如何加值的,是不是我们推理的那样

image-20230117164855986

观看源码,(as = cells) != null || !casBase(b = base, b + x) 语句,有如下几种情况会返回true

1.1、(as = cells) == null && (!casBase(b = base, b + x) == true)

(as = cells) == null说明 cells 数组还未初始化;!casBase(b = base, b + x) == true说明 casBase(b = base, b + x) == false,即 尝试CAS base变量失败了,说明不止当前线程这一个线程在操作base变量,产生了竞争,反过来想,若是casBase成功了,说明增加操作已成功,所以就不需要进入到if语句块中了,直接结束即可

1.2、(as = cells) != null 说明 cells 数组已被初始化或被更新扩展等等

符合1.1和1.2中任意一个的线程,进入if语句块后,又要进行判断:

2.1、 as == null || (m = as.length - 1) < 0 说明 cells 数组还未初始化;PS:此时 uncontended = true

2.2、(a = as[getProbe() & m]) == null

getProbe()是当前线程的随机种子,mas数组长度-1,即m=N-1getProbe() & m 其实这是个技巧,使用&与操作 取代 求余操作(%),但是前提:N必须是2的倍数,这里就不详细展开了,所以 getProbe() & m 其实就是计算个哈希值,用于找到对应的cells 数组的下标位置,很显然该分支的意思是 对应cells数组的cellnullPS:此时 uncontended = true

2.3、!(uncontended = a.cas(v = a.value, v + x)) 对应cells数组的cell不是null,但是执行a.cas操作的失败的线程;PS:此时 uncontended = false

符合2.1和2.2、2.3中任意一个的线程,去执行 longAccumulate(x, null, uncontended) 方法

所以可以猜一下,由于2.1是 cells 数组=null 所以需要初始化;2.2是cells 数组已初始化,但是与当前线程对应的数组下标的cell=null,所以需要初始化cell;2.3是 cells 数组已初始化,当前线程对应的数组下标的cell已初始化,但是cell.cas操作失败了,所以还需要再次执行其它的操作,所以longAccumulate方法 就会有对 cells 数组 的初始化操作,对 当前线程对应的数组下标的cell的初始化操作等等

最核心的就要来了,接下来就去看 longAccumulate(x, null, uncontended) 方法

image-20230117164647565

int h;
if ((h = getProbe()) == 0) {
 ThreadLocalRandom.current(); // force initialization
 h = getProbe();
 wasUncontended = true;

} // 初始化线程随机数
boolean collide = false; // 哈希冲突的标志位 说明需要重新更新一下线程随机数然后再次哈希一个新的值

for (;;) {} // 死循环 你没有看错 Doug Lea 老爷子 经常利用这种写法来编写处理并发的代码

那么可以推理得出,很多线程最终会陷入到不同的分支代码中,所以不同分支就是不同的竞争点,多个线程在进行竞争

1if ((as = cells) != null && (n = as.length) > 0)1个竞争点: cells已被初始化后,线程陷入执行

2else if (cellsBusy == 0 && cells == as && casCellsBusy())2个竞争点: 线程CAS地获取 cellsBusy 这个锁标识,然后对 cells 数组执行初始化操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

image-20230117165439180

该线程获取锁后,再次验证cells 有没有被更新 if(cells == as) 若为false,则释放锁 cellsBusy =0;若为true则是初始化cells数组,并将cell赋值,最后退出循环,因为new Cell(x); 已经将x值加入了,所以这里初始化完成后直接退出循环即可
3else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x))))3个竞争点: 如果cells 没有初始化,或者没有获取 cellsBusy 这个锁标识的线程,去尝试casBase变量,若成功则退出循环,否则再次循环
第一个竞争点 陷入后,执行如下逻辑:
1.1if ((a = as[(n - 1) & h]) == null)1.1个竞争点:当前线程对应着cells 数组下标的cell是null,需要尝试初始化cell;
1
2
3
4
5
6

image-20230117165505331

if(cellsBusy == 0) 则执行 new Cell(x); 再尝试抢 锁标识cellsBusy casCellsBusy();
如果casCellsBusy()返回true,则抢锁成功,执行创建cell操作,进入后 再次检查 该cell有没有被其它的线程创建了,若没有,则该线程创建,创建成功后,则break;退出循环,若创建失败,或者再次检查没有通过 则continue;继续循环
其它线程怎么办呢?判断cellsBusy != 0或者 cellsBusy == 0 && !casCellsBusy() 没有抢到 锁标识cellsBusy的其它线程 执行 collide = false; 最后执行 h = advanceProbe(h); 修改当前线程的随机种子,即 不要竞争这个cell了,再随机个别的吧 不能紧着一棵树造啊
1.2else if (!wasUncontended) wasUncontended = true;1.2个竞争点:这种是个编程技巧,这个标志位 是记录 调用 longAccumulate 之前 是否已经对a.cas执行过了,若wasUncontended == false:说明之前已经对a.cas执行过而且失败了,uncontended = a.cas(v = a.value, v + x) ;
而 else if (!wasUncontended) wasUncontended = true 语句的目的是为了让线程不陷入到它之后的分支中,所以当wasUncontended == false,就会进入到该分支中 else if (!wasUncontended) 并将 wasUncontended 变为true,下次循环就能让线程陷入到它之后的分支中了,从而达到控制语句流程的目的;
wasUncontended == false:说明之前已经对a.cas执行过而且失败了,所以,直接 执行 h = advanceProbe(h); 让当前线程重新选择一个cell即可
1.3else if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x))))1.3个竞争点:当前线程 cells数组已被初始化,对应的cell已被初始化,然后尝试cell.cas操作,成功则 break;退出循环
1.4else if (n >= NCPU || cells != as) collide = false;1.4个竞争点:判断 cells的长度大于等于 CPU的核心数,或者 as已过时,不是最新的cells
最终 执行 h = advanceProbe(h); 让当前线程重新选择一个cell
1.5else if (!collide) collide = true;1.5个竞争点:与 第1.2个竞争点 一样 同一种编程技巧,用于控制语句流程的,当collide == false是,不走它之后的分支语句,直接 执行 h = advanceProbe(h); 让当前线程重新选择一个cell
1.6else if (cellsBusy == 0 && casCellsBusy())1.6个竞争点:执行 cells 数组 扩容操作,if(cells == as) 检查 as 不是过时的,则进行 数组扩容操作 将原来的长度扩容一倍
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

image-20230117165530415

后面 释放锁 cellsBusy = 0; 执行 collide = false; 因为数组扩容,下次循环需要执行 h = advanceProbe(h); 让当前线程重新选择一个cell 最后 continue; 继续下一个循环;
1