多线程 1 的最快操作
扫描二维码
随时随地手机看文章
public class LongAdder {
private long count = 0L;
public void add() {
count ;
}
}
可以加锁去实现,但效率太低。public class LongAdder {
private long count = 0L;
public void add() {
synchronized(this){
count ;
}
}
可以用原子类这种乐观锁实现,比加 synchronized 锁效率高很多。public class LongAdder {
private AtomicLong count = new AtomicLong(0L);
public void add() {
count.incrementAndGet();
}
}
当然,更高级的玩法也可以自己调用 UNSAFE 模拟原子类里的 CAS 操作,但实际上就是把原子类的源码给展开了。(v = count 应该放在循环里)public class LongAdder {
private volatile long count = 0L;
public void add() {
boolean success = false;
int v = count;
while(!success) {
success = UNSAFE.compareAndSwapLong(
LongAdder.class, countOffset, v, v 1);
}
}
}
这几段多线程 1 的代码如果看不明白,可以找些资料把这块的基础补一下哈,本文就不赘述了,我们继续。关于这个多线程 1 操作,有没有效率更高的办法呢?分析需求
我们先别急着想,怎么把它变快,一头扎到技术实现上。同我们接一个新产品的需求一样,我们首先分析一下,这个产品提出这个需求的核心目的是什么,有时候往往可以使问题简化。我们想一个极端的场景,成百上千个线程一直连续不断对这个 count 进行 1 操作,一直加上个一年,一年后,我们只需要看一下最终的值是多少,即可。整个功能就是这样,加一年,最后看那么一下。我们看看之前的原子类 1 的代码。
public class LongAdder {
private AtomicLong count = new AtomicLong(0L);
public void add() {
count.incrementAndGet();
}
}
每时每刻都将 1 的操作真真正正计算了一遍,并赋值给 count。但我们只是一年后要读取这个 count 值一次,显然,中间这一年对 count 值准确地计算出结果,就是不必要的。而恰恰是因为每次都要准确计算出它的结果,导致多线程之间发生了竞争,浪费了资源。那思路就打开了!设计思路
我们事先搞出多个这种 count 变量,并且用某种方式让不同线程对应到不同 count 变量上。你看这样,如果仅仅有四个线程,就完全不存在线程竞争的问题,每个线程操作唯一的变量。过一段时间后, 获取最终的值,只需要把它们加和即可。这样,获取 count 值的复杂度增加了,需要做个加和操作,但却是整个过程完全没有线程竞争。牺牲读性能,换取写性能。用空间换时间。你看,即使一个小小的多线程 1 操作的设计,也存在架构思维中的 trade off 思想,这在我之前两篇架构文章中多次提到。正所谓,不存在完美的算法,我们都只是在做平衡,牺牲这个,才能换取那个。
具体实现
设计思路中,我们尽可能把问题简化,才能得到一个大方向。现在我们要具体设计了,就要把刚刚懒得思考的问题,拿出来了,这个过程的确比较痛苦。
懒加载
首先,我们当然希望,整个过程都不存在线程竞争。这样我们一开始就创建了那么多 count,并且把线程一一映射过去,假如本来他们共同对同一个共享变量 1 就不会产生竞争,那这种方式就有很大问题了:1. 浪费了空间2. 多了线程映射的算法逻辑3. 最终获取值时还要加和得不偿失呀。所以我们采用懒加载的办法,一开始,仍然是对同一个共享变量 1,等真正出现竞争了,再开始启用更多的 count。我们把一开始使用的唯一共享变量叫做 base,把之后开启的多个变量叫做 Cell 类,放在一个 Cell[] 数组里。Cell 类里只有一个变量就是 value,存储累加过程中的值。数组扩容
一开始,这个 Cell 数组是空的。等 base 变量出现了一次竞争失败的情况,就初始化这个 Cell[] 数组,第一次里面放两个 Cell。此时,如果只有三个线程 1,就可以保证不会发生竞争。但如果此时又来了一个线程,导致了竞争,即 CAS 失败,那么可以扩容 Cell[] 数组。可以注意到我画的,Cell 数组初始大小为 2,之后扩容也是翻倍的方式,不知道你有没有想到些什么,我们接着往下看。线程映射绑定
刚刚,我们一直默认,线程和 Cell 数组中的每个 Cell 是一一对应的关系,可是怎么做到这一点呢?我们在每个线程中,维护一个局部变量,这个变量属于这个线程,这个变量的值根据 Cell[] 数组的大小哈希取模,就可以映射到其中一个 Cell 上了。那同线程绑定的这个局部变量是怎么来的呢?别担心,JDK 已经帮我们设计好了,这就是 Thread 类里的变量 probe。public class Thread implements Runnable {
...
int threadLocalRandomProbe;
...
}
但是我们不能直接获取,需要借助 ThreadLocalRandom 类的如下办法获取。static final int getProbe() {
return UNSAFE.getInt(Thread.currentThread(), PROBE);
}
当然,获取出的这个值,可能哈希取模后也会发生冲突。没关系,请注意,这只是哈希取模冲突,也就是多个线程可能要对同一个 Cell 里的 value 进行 CAS 1 操作,但不一定会产生竞争。所以,发生哈希取模冲突后,先直接尝试 CAS 1 操作,如果能成功,就没那么多事了。但假如恰好,CAS 的时候又发生了竞争,导致操作失败怎么办?还好,可以用这种方式为该线程的 probe 重新赋值。
probe = 新的值,自己生成一个;
UNSAFE.putInt(Thread.currentThread(), PROBE, probe);
重新赋值后的 probe,再次经过哈希取模后,就不会和之前的冲突了。但很不幸,假如再冲突怎么办?那就再次尝试 CAS 1 操作。但假如又很不幸,CAS 1 操作又失败了,要不要继续重新赋值 probe 呢?要,不过,此时说明竞争已经很激烈了。简单说就是,这个 Cell 数组有点拥挤了,此时我们选择将数组扩容!扩容大家还记得吧,就是上一节中的。这就又回到了上一节中的步骤,如此循环往复。当然,扩容也要有个限度,我们规定,数组大小超过 CPU 核心数后,就不再扩容了。CPU 核心数,可以用如下方式获取。
Runtime.getRuntime().availableProcessors();
如果再发生冲突和竞争的情况,那就不断重新赋值 probe,不断尝试 CAS。有同学可能会说,那一直不成功咋办呢?别忘了,如果不使用我们这个 LongAdder,仅仅用一个原子类不断 1,失败的概率是更高的,我们已经通过将线程分散到不同 Cell,降低了发生竞争失败的概率了。
执行流程
至此,设计思路和实现过程,就都搞定了,我们来看一下整个流程。1. 最开始只有一个 base 变量,多个线程和谐地进行 CAS 1 操作。2. 直到有一天,两个线程发生了竞争,即其中一个线程 CAS 时失败了,那么就创建一个大小为 2 的 Cell[] 数组,用线程私有的局部变量 probe 取模,映射到一个 Cell 上,对其 CAS 1 操作。3. 不过假如线程 probe 取模后,发现那个 Cell 已经被绑定过了,不要紧,先 CAS 1 试一试。4. 但如果没试成功,说明此处有竞争,那重新计算一下线程的 probe 值,映射到一个新的 Cell 上。5. 如果此时又冲突,并且 CAS 1 又失败,那么将 Cell[] 数组扩容。6. 最后当要获取最终的累计和时,用 base 的值,加上所有 Cell[] 数组里的 value 值,得出一个和,返回给调用方。这个破玩意,其实 JDK 中早有实现,又是 Doug Lea 大神写的一个类,名为LongAdder
LongAdder
通过我们刚刚眼花缭乱的分析,再看 Doug Lea 大神的 LongAdder 就非常容易了。我们看最核心的 add 方法,这是最外层的逻辑,很容易理解。
public class LongAdder extends Striped64 implements Serializable {
public void add(long x) {
Cell[] as; long b, v; int m; Cell a;
if ((as = cells) != null || !casBase(b = base, b x)) {
// 已经初始化了 Cell 数组
// 或者对 base 变量 CAS 1 操作失败
// 就走到这里了
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
// 下面这行就是 probe 取模操作
(a = as[getProbe()