netty进阶

MpscQueue

Mpsc来自JCTools,即JAVA的高并发增强包,主要提供了一些 JDK 缺失的并发数据结构

  1. Spsc 单生产者单消费者
  2. Mpsc 多生产者单消费者
  3. Spmc 单生产者多消费者
  4. Mpmc 多生产者多消费者

Mpsc 的全称是 Multi Producer Single Consumer,多生产者单消费者,多个生产者线程通过CAS无锁操作提升性能,单个消费者不需要加锁;

Mpsc Queue 可以保证多个生产者同时访问队列是线程安全的,而且同一时刻只允许一个消费者从队列中读取数据。

MpscArrayQueue内部的环形数组容量为 2 的次幂,可以通过位运算快速定位到数组对应下标。

Netty Reactor 线程中的任务队列 taskQueue 必须满足多个生产者可以同时提交任务,所以 JCTools 提供的 Mpsc Queue 非常适合 Netty Reactor 线程模型

MpscArrayQueue为例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
//继承类的所有变量声明
// ConcurrentCircularArrayQueueL0Pad.java
long p01, p02, p03, p04, p05, p06, p07;
long p10, p11, p12, p13, p14, p15, p16, p17;

// ConcurrentCircularArrayQueue.java
// 计算数组下标的掩码
protected final long mask;
// 存放队列数据的数组
protected final E[] buffer;

// MpmcArrayQueueL1Pad.java
long p00, p01, p02, p03, p04, p05, p06, p07;
long p10, p11, p12, p13, p14, p15, p16;

// MpmcArrayQueueProducerIndexField.java
// 生产者索引
private volatile long producerIndex;

// MpscArrayQueueMidPad.java
long p01, p02, p03, p04, p05, p06, p07;
long p10, p11, p12, p13, p14, p15, p16, p17;

// MpscArrayQueueProducerLimitField.java
// 生产者索引的最大值
private volatile long producerLimit;

// MpscArrayQueueL2Pad.java
long p00, p01, p02, p03, p04, p05, p06, p07;
long p10, p11, p12, p13, p14, p15, p16;

// MpscArrayQueueConsumerIndexField.java
// 消费者索引
protected long consumerIndex;

// MpscArrayQueueL3Pad.java
long p01, p02, p03, p04, p05, p06, p07;
long p10, p11, p12, p13, p14, p15, p16, p17;

在pad类中填充了大量long的数据,其命名没有什么特殊的含义,只是起到填充的作用,这是为了解决伪共享(false sharing)问题

伪共享问题

你的行为似乎有利于共享,但是却徒增消耗

为了平衡CPU与内存的速度差异,常常会设立多层缓存机制,一般是三层,CPU 读取数据时,首先会从 L1 查找,如果未命中则继续查找 L2,如果还未能命中则继续查找 L3,最后还没命中的话只能从内存中查找,读取完成后再将数据逐级放入缓存中。

此外,多线程之间共享一份数据的时候,需要其中一个线程将数据写回主存(总线嗅探机制保证可见性),其他线程访问主存数据。

CPU 缓存由若干个缓存行(Cache Line) 组成,缓存行是 CPU 缓存可操作的最小单位

Cache Line 的大小与 CPU 架构有关,在目前主流的 64 位架构下,Cache Line 的大小通常为 64 Byte。

而 Java 中一个 long 类型是 8 Byte,所以一个 Cache Line 可以存储 8 个 long 类型变量

CPU 在加载内存数据时,会将相邻的数据一同读取到 Cache Line 中(一次加载连续的 64 个字节),这样就可以避免 CPU 频繁与内存进行交互了。

例如:如果访问一个 long 型的单独变量 a,并且还有另外一个 long 型变量 b 紧挨着它,那么当加载 a 时候将免费加载 b

伪共享就会在此出现:

  1. 假设有 A、B、C、D 四个变量,线程1尝试修改变量A,于是将A和B、C、D一起都加载到了core1的一个 Cache Line;
  2. 此时,线程2读取变量B,也将A、C、D加载到了core2的同一 Cache Line;
  3. 线程1 对变量 A 进行修改,修改完成后将变量A值写回主存,然后 CPU1 会通知 CPU2 该缓存行已经失效
  4. 线程 2 在Core2 中对变量 C 进行修改时,发现 Cache line 已经失效,所以需要重新从主存中读取数据加载到当前 Cache line 中。

当多个线程同时修改互相独立的变量时,如果这些变量共享同一个缓存行,就会出现写竞争,导致频繁从主存加载数据,影响性能

JCtool

常见的解决思路就是:以空间换时间,让不同线程操作的不相干变量加载到不同缓存行,避免相互影响

1
2
3
4
5
public class FalseSharingPadding {
protected long p1, p2, p3, p4, p5, p6, p7;
protected volatile long value = 0L;
protected long p9, p10, p11, p12, p13, p14, p15;
}

变量 value 前后分别填充了 7 个 long 类型的变量。

这样不论在什么情况下,都可以保证在多线程访问 value 变量时,value 与其他不相关的变量处于不同的 Cache Line

MPSC方法解析

加入元素

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// MpscArrayQueue.java
//生产者加入元素
public boolean offer(final E e) {
if (null == e)
{
throw new NullPointerException();
}

final long mask = this.mask;

// 获取生产者索引最大限制
long producerLimit = lvProducerLimit();
long pIndex;
do
{
// 获取生产者索引
pIndex = lvProducerIndex();
// 如果生产者索引达到了最大值,防止追尾
if (pIndex >= producerLimit)
{
// 消费者索引,以volatile的形式获取,保证获取的是最新的值
final long cIndex = lvConsumerIndex();
// 修改为当前消费者的索引加上数组的大小
producerLimit = cIndex + mask + 1;
// 如果依然达到了最大值,则返回false,表示队列满了,再放元素就追尾了
if (pIndex >= producerLimit)
{
return false; // 队列已满
}
else
{
soProducerLimit(producerLimit); // 更新 producerLimit
}
}
} while (!casProducerIndex(pIndex, pIndex + 1));// CAS 更新生产者索引,更新成功则退出,说明当前生产者已经占领索引值
// 计算生产者索引在数组中下标
final long offset = calcCircularRefElementOffset(pIndex, mask);
// 向数组中放入数据
soRefElement(buffer, offset, e);
return true;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//获取生产者索引最大值和修改生产者索引最大值
// 读取producerLimit
protected final long lvProducerLimit() {
// producerLimit本身就是volatile修饰的
return this.producerLimit;
}
// 保存producerLimit
protected final void soProducerLimit(long v) {
// 这个方法会加StoreStore屏障
// 会把最新值直接更新到主内存中,但其它线程不会立即可见
// 其它线程需要使用volatile语义才能读取到最新值
// 这相当于是一种延时更新的方法,比volatile语义的性能要高一些
UnsafeAccess.UNSAFE.putOrderedLong(this, P_LIMIT_OFFSET, v);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
//获取生产者索引,更新生产者索引
// 读取producerIndex
protected final long lvProducerIndex() {
// producerIndex本身就用volatile修饰了
return this.producerIndex;
}

// CAS更新producerIndex
protected final boolean casProducerIndex(long expect, long newValue) {
// CAS更新
return UnsafeAccess.UNSAFE.compareAndSwapLong(this, P_INDEX_OFFSET, expect, newValue);
}
1
2
3
4
5
6
// 读取consumerIndex
protected final long lvConsumerIndex() {
// 以volatile的形式加载consumerIndex
// 此时会从内存读取最新的值
return UnsafeAccess.UNSAFE.getLongVolatile(this, C_INDEX_OFFSET);
}
1
2
3
4
5
6
7
// 修改数组对应偏移量的值
public static <E> void soElement(E[] buffer, long offset, E e) {
// 比使用下标更新数组元素有两个优势
// 1. 使用Unsafe操作内存更新更快
// 2. 使用putOrderedObject会直接更新到主内存,而使用下标不会立马更新到主内存
UnsafeAccess.UNSAFE.putOrderedObject(buffer, offset, e);
}

MPSC

  1. 初始时,两个线程拿到的 pIndex 都等于producerIndex为0,小于 producerLimit ;
  2. 接着,两个线程都会尝试 CAS 更新 producerIndex + 1 ,必然只有一个线程能更新成功,另一个失败;
  3. 假设 Thread1 CAS 操作成功,那么它拿到的 pIndex 为0,Thread2 失败后就会重新更新 producerIndex ,然后更新成功,拿到 pIndex 为1;
  4. 最后,根据 pIndex 进行位运算,得到数组对应的下标,然后通过 UNSAFE.putOrderedObject() 方法将数据写入到数组中。

获取元素

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
//无CAS
public E poll() {
// 读取consumerIndex的值
long cIndex = this.lpConsumerIndex();
// 计算在数组中的偏移量
long offset = this.calcElementOffset(cIndex);
// 获取存储元素的数组
E[] buffer = this.buffer;
// 取元素,通过StoreStore写入队列,通过LoadLoad取出来的元素是最新值
E e = UnsafeRefArrayAccess.lvElement(buffer, offset);
if (null == e) {
// 元素入队是先更新了producerIndex的值,再把更新元素到数组中
// 如果在两者之间,进行了消费,则此处是无法获取到元素的
// 所以需要进入下面的判断
// 判断consumerIndex是否等于producerIndex
// 只有两者不相等,才可以再消费元素
if (cIndex == this.lvProducerIndex()) {
return null;
}
// 使用死循环来取元素,直到取到为止
do {
e = UnsafeRefArrayAccess.lvElement(buffer, offset);
} while(e == null);
}
// 更新取出的位置元素为null
UnsafeRefArrayAccess.spElement(buffer, offset, (Object)null);
// 修改consumerIndex的索引为新值,使用StoreStore屏障,直接更新到主内存
this.soConsumerIndex(cIndex + 1L);
// 返回出队的元素
return e;
}

因为只有一个消费者线程,所以整个 poll() 的过程没有 CAS 操作。

poll() 方法核心思路是获取消费者索引 consumerIndex,然后根据 consumerIndex 计算得出数组对应的偏移量,然后将数组对应位置的元素取出并返回,最后将 consumerIndex 移动到环形数组下一个位置。

1
2
3
4
5
6
7
8
9
10
11
12
13
protected final long lpConsumerIndex() {
// 直接返回消费者索引
return this.consumerIndex;
}
// 保存消费者索引值
protected void soConsumerIndex(long l) {
// 这个方法会加StoreStore屏障
// 会把最新值直接更新到主内存中,但其它线程不会立即可见
// 其它线程需要使用volatile语义才能读取到最新值
// 这相当于是一种延时更新的方法,比volatile语义的性能要高一些
UnsafeAccess.UNSAFE.putOrderedLong(this, C_INDEX_OFFSET, l);
}

1
2
3
4
5
6
7
8
9
public static <E> void spElement(E[] buffer, long offset, E e) {
// 更新buffer在offset处的元素值
UnsafeAccess.UNSAFE.putObject(buffer, offset, e);
}

public static <E> E lvElement(E[] buffer, long offset) {
// 获取buffer在offset处的元素值
return UnsafeAccess.UNSAFE.getObjectVolatile(buffer, offset);
}
1
2
3
4
5
6
7
8
9
//计算偏移量
public static long calcElementOffset(long index, long mask) {
// REF_ARRAY_BASE,基础地址,数组在内存中的地址
// REF_ELEMENT_SHIFT,可以简单地看作一个元素占用多少字节
// 64位系统中一个引用对象占用64位,也就是8字节,但是压缩模式下占用4字节
// index & mask 计算数组下标
// 比如数组大小为4,mask就为3时,4&3=100&011=0
return REF_ARRAY_BASE + ((index & mask) << REF_ELEMENT_SHIFT);
}
  1. Unsafe 方法
    1. **putOrderedXxx()**,使用 StoreStore 屏障,会把最新值更新到主内存,但不会立即失效其它缓存行中的数据,是一种延时更新机制;
    2. **putXxxVolatile()**,使用 StoreLoad 屏障,会把最新值更新到主内存,同时会把其它缓存行的数据失效,或者说会刷新其它缓存行的数据;
    3. **putXxx(obj, offset)**,不使用任何屏障,更新对象对应偏移量的值;
    4. **getXxxVolatile()**,使用 LoadLoad 屏障,会从主内存获取最新值;
    5. **getXxx(obj, offset)**,不使用任何屏障,读取对象对应偏移量的值;

总结:

MPSC实现高并发和高性能使用了哪些方法?

  1. LazySet 延迟更新机制:在更新producerLimit,消费者索引,和环形数组元素时使用StoreStore屏障,虽然写操作结果有纳秒级的延迟,但是由于没有立刻读取的操作,所以没有问题,且提升了性能
  2. 使用偏移量来更新数组元素,比下标性能更好
  3. 使用UNSAFE方法来直接操作内存
  4. 使用long型变量填充来避免伪共享问题

netty进阶
http://example.com/post/netty进阶.html
作者
SamuelZhou
发布于
2022年11月24日
许可协议