# LongAdder原理及出现背景
# AtomicInteger、AtomicLong的弊端
要搞清楚
AtomicInteger、AtomicInteger
的弊端,先要弄明白,它们干了什么事情;要搞清楚干了什么事情,即
AtomicInteger、AtomicInteger
的原理;要明白原理,那么最简单、粗暴、也高效地方式就是查阅源码啦,接下来,让我们走进源码
我们以AtomicInteger
为例,通过源码查看,其核心代码如下:
再去查看Unsafe.java
的getAndAddInt
的方法
发现是个do-while
循环,先根据对象及偏移量,获取对象在该偏移量的值,赋值给int v
然后执行 compareAndSwapInt(o, offset, v, newValue)
方法,(对象在该偏移量的值)与v进行比对
若相等,则(对象在该偏移量的地址)存放(v + delta)的值
返回 true
若不相等,则 返回 false;所以该方法就是将比对值,然后再交换值,而加上do-while
就是比对不成功后,再重试的操作
我们主要关注 compareAndSwapInt
的内部实现,于是再跟进去
发现该方法是个native
方法,用到了JNI
的规范,于是再源码中需要找到Unsafe.cpp
文件 查找compareAndSwapInt
oop p = JNIHandles::resolve(obj); // 将jobject obj 强转为了 oop p
jint* addr = (jint *) index_oop_from_field_offset_long(p, offset); // 获取对象的首地址+偏移地址 绝对地址
(jint)(Atomic::cmpxchg(x, addr, e)) == e
要弄明白这个,要先弄明白 Atomic::cmpxchg(x, addr, e)
,于是,跟进去
原来它这个内联汇编inline
,于是按照内联汇编的语法,我们将输出项、输入项逐一解释;
输出项:<: "=a" (exchange_value)>
的意思是将使用eax寄存器
作为输出结果,即:执行完该指令后,会将eax寄存器
的值赋值到exchange_value
,其位置表示为<%0>
;
接下来是输入项<: "r" (exchange_value), "a" (compare_value), "r" (dest), "r" (mp)>
,其中<r>
代表任意一个通用寄存器,<a>
代表eax寄存器
,按照顺序 依次的位置表示为<%1,%2,%3,%4>
;
于是将其待入:<LOCK_IF_MP(mp) "cmpxchgl exchange_value,(dest)">;``LOCK_IF_MP(mp)// 判断当前执行的平台是不是SMP架构的
,若是则返回 lock; 加入lock指令前缀;所以我们关注 cmpxchg 指令到底干了什么事情即可
通过查看Intel开发手册,了解到该指令,是将eax中的值与首操作数比较:
1、如果相等,第2操作数的值装载到首操作数,zf置1。(相当于相减为0,所以0标志位 置位)
2、如果不等,首操作数的值装载到eax寄存器,并将zf清0
了解该指令后,对号入座即可,eax
的值:compare_value
,首操作数:(dest)
,第2操作数:exchange_value
若compare_value(expected期望值)和(dest)(内存中的值)
相等,说明没有其它线程将其变更,则exchange_value
赋值到(dest)
即将 更新值 exchange_value
赋值到内存地址指向的值中,最后 eax
的值compare_value
赋值到 exchange_value
返回,该值一定==e
,所以返回true
若compare_value(expected期望值)和(dest)(内存中的值)
不等,说明有其它线程将其变更,则 (dest)
赋值到eax
中,最后 eax的值 (dest)赋值到 exchange_value
返回,该值一定!=e
,所以返回false
综上所述:得出如下总结
AtomicInteger
的原理是多个线程竞争地访问lock;cmpxchg
指令,但是lock指令
前缀,确定了只能允许一个线程访问执行cmpxchg指令
,若比对成功则更新值,比对失败则自旋地尝试下一次,直到比对成功,结束
至此,明白了
AtomicInteger
的原理后,再来分析一下它的弊端;也是CAS的弊端随着CPU的核心数增多,大量的线程在等待着获取执行
cmpxchg指令
的锁,但是只能一个线程陷入进去,那么其它线程只能空转,可能造成CPU飙百的现象,这就是CAS的弊端,也是AtomicInteger
的弊端;那么如何规避这个弊端呢?这要了解
lock指令
的原理了,根据查阅Intel开发手册描述:所以得出结论:原子性操作加上
lock指令
前缀:1、总线锁保证原子性 2、数据已在缓存行中,通过缓存一致性协议
(MESI)
保证其原子性,PS:一定是数据已在缓存行中,才会是缓存行锁,否则会退化为总线锁的 这就是lock指令
的原理通过了解了
lock指令
的原理,AtomicInteger
实现流程,就可以画出在CPU、内存、缓存行层面的示意图,如下所示:
# LongAdder 出现原因
由于弊端是多个线程去竞争一个变量A的修改,那么我们将变量A分为多个变量,然后打散到缓存行中,这样,多个线程获取不到一个变量的CAS操作,就会去竞争其它的变量的CAS操作,巧妙地分散了注意力,提高了并行度,那么这个值如何统计呢?其实很简单,就是将变量A与分出去的N个变量相加即可,这就是LongAdder出现的原因,如下图,将展示LongAdder 的实现流程的示意图
既然,明白了这些推理过程,就需要验证所想,于是,去看LongAdder的实现过程
让我们愉快地看看它的源码吧
源码查看,先看看LongAdde
r的继承结构:LongAdder -> Striped64 -> Number
查看Striped64
类,发现了Cell
内部类
发现了这个Cell
类,其实是将long value
包装了一下,然后提供cas
方法对value
进行CAS
操作,原理和AtomicInteger
差不多,底层都是Intel指令
和 cmpxchg指令
,接下来,查看 LongAdder
类中的add
方法,看看它是如何加值的,是不是我们推理的那样
观看源码,(as = cells) != null || !casBase(b = base, b + x)
语句,有如下几种情况会返回true
:
1.1、(as = cells) == null && (!casBase(b = base, b + x) == true)
(as = cells) == null
说明 cells
数组还未初始化;!casBase(b = base, b + x) == true
说明 casBase(b = base, b + x) == false
,即 尝试CAS base
变量失败了,说明不止当前线程这一个线程在操作base变量
,产生了竞争,反过来想,若是casBase
成功了,说明增加操作已成功,所以就不需要进入到if语句块
中了,直接结束即可
1.2、(as = cells) != null
说明 cells
数组已被初始化或被更新扩展等等
符合1.1和1.2中任意一个的线程,进入if
语句块后,又要进行判断:
2.1、 as == null || (m = as.length - 1) < 0
说明 cells
数组还未初始化;PS:
此时 uncontended = true
2.2、(a = as[getProbe() & m]) == null
getProbe()
是当前线程的随机种子,m
是as数组长度-1
,即m=N-1
而 getProbe() & m
其实这是个技巧,使用&与
操作 取代 求余
操作(%)
,但是前提:N
必须是2的倍数,这里就不详细展开了,所以 getProbe() & m
其实就是计算个哈希值,用于找到对应的cells
数组的下标位置,很显然该分支的意思是 对应cells
数组的cell
是null
;PS:
此时 uncontended = true
2.3、!(uncontended = a.cas(v = a.value, v + x))
对应cells
数组的cell
不是null
,但是执行a.cas操作
的失败的线程;PS:
此时 uncontended = false
符合2.1和2.2、2.3中任意一个的线程,去执行 longAccumulate(x, null, uncontended)
方法
所以可以猜一下,由于2.1是 cells
数组=null
所以需要初始化;2.2是cells
数组已初始化,但是与当前线程对应的数组下标的cell=null
,所以需要初始化cell
;2.3是 cells 数组已初始化,当前线程对应的数组下标的cell
已初始化,但是cell.cas
操作失败了,所以还需要再次执行其它的操作,所以longAccumulate
方法 就会有对 cells
数组 的初始化操作,对 当前线程对应的数组下标的cell
的初始化操作等等
最核心的就要来了,接下来就去看 longAccumulate(x, null, uncontended)
方法
int h;
if ((h = getProbe()) == 0) {
ThreadLocalRandom.current(); // force initialization
h = getProbe();
wasUncontended = true;
} // 初始化线程随机数
boolean collide = false; // 哈希冲突的标志位 说明需要重新更新一下线程随机数然后再次哈希一个新的值
for (;;) {} // 死循环 你没有看错 Doug Lea 老爷子 经常利用这种写法来编写处理并发的代码
那么可以推理得出,很多线程最终会陷入到不同的分支代码中,所以不同分支就是不同的竞争点,多个线程在进行竞争
1、if ((as = cells) != null && (n = as.length) > 0)
第1个竞争点: cells已被初始化后,线程陷入执行
2、else if (cellsBusy == 0 && cells == as && casCellsBusy())
第2个竞争点: 线程CAS地获取 cellsBusy 这个锁标识,然后对 cells 数组执行初始化操作
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
该线程获取锁后,再次验证cells 有没有被更新 if(cells == as) 若为false,则释放锁 cellsBusy =0;若为true则是初始化cells数组,并将cell赋值,最后退出循环,因为new Cell(x); 已经将x值加入了,所以这里初始化完成后直接退出循环即可
3、else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x))))
第3个竞争点: 如果cells 没有初始化,或者没有获取 cellsBusy 这个锁标识的线程,去尝试casBase变量,若成功则退出循环,否则再次循环
第一个竞争点 陷入后,执行如下逻辑:
1.1、if ((a = as[(n - 1) & h]) == null)
第1.1个竞争点:当前线程对应着cells 数组下标的cell是null,需要尝试初始化cell;
2
3
4
5
6
if(cellsBusy == 0) 则执行 new Cell(x); 再尝试抢 锁标识cellsBusy casCellsBusy();
如果casCellsBusy()返回true,则抢锁成功,执行创建cell操作,进入后 再次检查 该cell有没有被其它的线程创建了,若没有,则该线程创建,创建成功后,则break;退出循环,若创建失败,或者再次检查没有通过 则continue;继续循环
其它线程怎么办呢?判断cellsBusy != 0或者 cellsBusy == 0 && !casCellsBusy() 没有抢到 锁标识cellsBusy的其它线程 执行 collide = false; 最后执行 h = advanceProbe(h); 修改当前线程的随机种子,即 不要竞争这个cell了,再随机个别的吧 不能紧着一棵树造啊
1.2、else if (!wasUncontended) wasUncontended = true;
第1.2个竞争点:这种是个编程技巧,这个标志位 是记录 调用 longAccumulate 之前 是否已经对a.cas执行过了,若wasUncontended == false:说明之前已经对a.cas执行过而且失败了,uncontended = a.cas(v = a.value, v + x) ;
而 else if (!wasUncontended) wasUncontended = true 语句的目的是为了让线程不陷入到它之后的分支中,所以当wasUncontended == false,就会进入到该分支中 else if (!wasUncontended) 并将 wasUncontended 变为true,下次循环就能让线程陷入到它之后的分支中了,从而达到控制语句流程的目的;
wasUncontended == false:说明之前已经对a.cas执行过而且失败了,所以,直接 执行 h = advanceProbe(h); 让当前线程重新选择一个cell即可
1.3、else if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x))))
第1.3个竞争点:当前线程 cells数组已被初始化,对应的cell已被初始化,然后尝试cell.cas操作,成功则 break;退出循环
1.4、else if (n >= NCPU || cells != as) collide = false;
第1.4个竞争点:判断 cells的长度大于等于 CPU的核心数,或者 as已过时,不是最新的cells
最终 执行 h = advanceProbe(h); 让当前线程重新选择一个cell
1.5、else if (!collide) collide = true;
第1.5个竞争点:与 第1.2个竞争点 一样 同一种编程技巧,用于控制语句流程的,当collide == false是,不走它之后的分支语句,直接 执行 h = advanceProbe(h); 让当前线程重新选择一个cell
1.6、else if (cellsBusy == 0 && casCellsBusy())
第1.6个竞争点:执行 cells 数组 扩容操作,if(cells == as) 检查 as 不是过时的,则进行 数组扩容操作 将原来的长度扩容一倍
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
后面 释放锁 cellsBusy = 0; 执行 collide = false; 因为数组扩容,下次循环需要执行 h = advanceProbe(h); 让当前线程重新选择一个cell 最后 continue; 继续下一个循环;