【優化技術專題】「執行緒間的高效能訊息框架」終極關注Disruptor的核心原始碼和Java8的@Contended偽共享指南

語言: CN / TW / HK

theme: smartblue

小知識,大挑戰!本文正在參與“程式設計師必備小知識”創作活動。

Disruptor原理分析

Disruptor關聯好任務處理事件後,就呼叫了disruptor.start() 方法,可以看出在呼叫了 start() 方法後,消費者執行緒就已經開啟。

啟動Disruptor

start() ->開啟 Disruptor,執行事件處理器。

java public RingBuffer<T> start(){ checkOnlyStartedOnce(); //在前面 handleEventsWith() 方法裡新增的 handler 物件會加入到 consumerRepository 裡,這裡遍歷 consumerRepository 開啟消費者執行緒 for (final ConsumerInfo consumerInfo : consumerRepository){ //從執行緒池中獲取一個執行緒來開啟消費事件處理器。(消費者開啟監聽,一旦有生產者投遞,即可消費) //這裡開啟的執行緒物件為BatchEventProcessor的例項 consumerInfo.start(executor)。 } return ringBuffer。 }

關聯事件

handleEventsWith() -> createEventProcessors()呼叫的核心方法,作用是建立事件處理器。

java @SafeVarargs public final EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers){ return createEventProcessors(new Sequence[0], handlers); }

儲存事件

將EventHandler物件繫結儲存到consumerRepository內部,並且交由BatchEventProcessor處理器進行代理執行。

java EventHandlerGroup<T> createEventProcessors( final Sequence[] barrierSequences, final EventHandler<? super T>[] eventHandlers){ ... final Sequence[] processorSequences = new Sequence[eventHandlers.length]; //建立 sequence 序號柵欄 final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences)。 for (int i = 0, eventHandlersLength = eventHandlers.length。i < eventHandlersLength。i++){ final EventHandler<? super T> eventHandler = eventHandlers[i]; final BatchEventProcessor<T> batchEventProcessor = new BatchEventProcessor<>(ringBuffer, barrier, eventHandler)。 ... //這裡將消費者加入到 consumerRepository 中---ConsumerRepository consumerRepository.add(batchEventProcessor, eventHandler, barrier)。 processorSequences[i] = batchEventProcessor.getSequence()。 } ... }

  • handleEventsWith() 方法中,可以看到構建了一個 BatchEventProcessor(繼承了 Runnable 介面)物件,start()方法啟動的同樣也是這個物件的例項。

  • 這個物件繼承自 EventProcessor ,EventProcessor 是 Disruptor 裡非常核心的一個介面,它的實現類的作用是輪詢接收RingBuffer提供的事件,並在沒有可處理事件是實現等待策略。

  • 這個介面的實現類必須要關聯一個執行緒去執行,通常我們不需要自己去實現它。

BatchEventProcessor類

BatchEventProcessor:主要事件迴圈,處理 Disruptor 中的 event,擁有消費者的 Sequence。

核心私有成員變數
  • Sequence :維護當前消費者消費的 ID。

  • SequenceBarrier :序號屏障,協調消費者的消費 ID,主要作用是獲取消費者的可用序號,並提供等待策略的執行。

  • EventHandler<? super T> :消費者的消費邏輯(我們實現的業務邏輯)。

  • DataProvider :獲取消費物件。RingBuffer 實現了此介面,主要是提供業務物件。

核心方法
  • processEvents():由於 BatchEventProcessor 繼承自 Runnable 介面,所以在前面啟動它後,run() 方法會執行,而 run() 方法內部則會呼叫此方法。

java private void processEvents() { T event = null。 獲取當前消費者維護的序號中並+1,即下一個消費序號 long nextSequence = sequence.get() + 1L。 while (true) { try { //獲取可執行的最大的任務 ID,如果沒有,waitFor() 方法內會進行等待 final long availableSequence = sequenceBarrier.waitFor(nextSequence)。 if (batchStartAware != null && availableSequence >= nextSequence) { batchStartAware.onBatchStart(availableSequence - nextSequence + 1)。 } //不斷獲取對應位置的任務進行消費 直到上面查詢到的 availableSequence 消費完 while (nextSequence <= availableSequence) { event = dataProvider.get(nextSequence)。 eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence)。 nextSequence++。 } sequence.set(availableSequence)。 } ... } }

  • 消費者事件處理器的核心程式碼,sequenceBarrier.waitFor(nextSequence) 方法內部,會比較當前消費者序號與可用序號的大小:

    • 當可用序號(availableSequence)大於當前消費者序號(nextSequence),再獲取到當前可用的最大的事件序號 ID(waitFot()方法內部呼叫 sequencer.getHighestPublishedSequence(sequence, availableSequence)),進行迴圈消費。
    • 可用序號是維護在 ProcessingSequenceBarrier 裡的,ProcessingSequenceBarrier 是通過 ringBuffer.newBarrier() 創建出來的。

由圖可以看出,在獲得可用序號時,SequenceBarrier 在 EventProcessor 和 RingBuffer中充當協調的角色。

多消費事件和單消費事件在dependentSequence 上的處理有一些不同,可以看下 ProcessingSequenceBarrier 的 dependentSequence 的賦值以及 get() 方法 (Util.getMinimumSequence(sequences))。

啟動過程分析之生產者

首先呼叫了 ringBuffer.next() 方法,獲取可用序號,再獲取到該序號下事先通過 Eventfactory 建立好的空事件物件,在我們對空事件物件進行賦值後,再呼叫 publish 方法將事件釋出,則消費者就可以獲取進行消費了。

生產者這裡的核心程式碼如下,這裡我擷取的是多生產者模式下的程式碼:

java public long next(int n){ if (n < 1 || n > bufferSize) { throw new IllegalArgumentException("n must be > 0 and < bufferSize")。 } long current。 long next。 do{ //cursor 為生產者維護的 sequence 序列,獲取到當前可投遞的的下標,即當前投遞到該位置 current = cursor.get()。 //再+n獲取下一個下標,即下一次投遞的位置。 next = current + n。 long wrapPoint = next - bufferSize。 //目的:也是實現快讀的讀寫。gatingSequenceCache獨佔快取行 long cachedGatingSequence = gatingSequenceCache.get()。 if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current){ //獲取消費者最小序號 long gatingSequence = Util.getMinimumSequence(gatingSequences, current)。 if (wrapPoint > gatingSequence) { //如果不符合,則阻塞執行緒 1ns(park()不會有死鎖的問題) LockSupport.parkNanos(1)。 // TODO, should we spin based on the wait strategy? continue。 } gatingSequenceCache.set(gatingSequence)。 } //多個生產者時要保證執行緒安全(這裡更新的 cursor 同時也是等待策略裡的 waitFor() 方法的 cursor 引數,因此這裡更新成功後,則等待策略會通過,表示有新的任務進來,就會消費) else if (cursor.compareAndSet(current, next)){ break。 } }while (true); return next。 }

cursor 物件和 Util.getMinimumSequence(gatingSequences, current) 方法,cursor 物件是生產者維護的一個生產者序號,標示當前生產者已經生產到哪一個位置以及下一個位置,它是 Sequence 類的一個例項化物件

  • 從圖裡可以看出,Sequence 繼承以及間接繼承了 RhsPadding 和 LhsPadding 類,而這倆個類都各定義了 7 個 long 型別的成員變數。

  • 而 Sequence 的 get() 方法返回的也是一個 long 型別的值 value。這是上一篇文章介紹的充快取行,消除偽共享。

  • 在 64 位的計算機中,單個快取行一般佔 64 個位元組,當 cpu 從換存裡取資料時,會將該相關資料的其它資料取出來填滿一個快取行,這時如果其它資料更新,則快取行快取的該資料也會失效,當下次需要使用該資料時又需要重新從記憶體中提取資料。

  • ArrayBlockingQueue 獲取資料時,很容易碰到偽共享導致快取行失效,而 Disruptor這裡當在 value 的左右各填充 7 個 long 型別的資料時,每次取都能確保該資料獨佔快取行,也不會有其他的資料更新導致該資料失效。避免了偽共享的問題( jdk 的併發包下也有一些消除偽共享的設計)。


RingBuffer:它是一個首尾相接的環狀的容器,用來在多執行緒中傳遞資料。第一張圖裡面建立 Disruptor 的多個引數其實都是用來建立 RingBuffer 的,比如生產者型別(單 or 多)、例項化工廠、容器長度、等待策略等。

簡單分析,多個生產者同時向 ringbuffer 投遞資料,假設此時倆個生產者將 ringbuffer 已經填滿,因為 sequence 的序號是自增+1(若不滿足獲取條件則迴圈掛起當前執行緒),所以生產的時候能保證執行緒安全,只需要一個 sequence 即可。

當多消費者來消費的時候,因為消費速度不同,例如消費者 1 來消費 0、1,消費者 2 消費 2、4,消費者 3 消費 3。

當消費者消費完 0 後,消費者 2 消費完 2 後,消費者 3 消費完 3 後,生產者再往佇列投遞資料時,其他位置還未被消費,會投遞到第 0 個位置, 此時再想投遞資料時,雖然消費 2 的第二個位置空缺、消費者 3 的第三個位置空缺,消費者還在消費 1 時,無法繼續投遞。因為是通過比較消費者自身維護的 sequence 的最小的序號,來進行比較。

Util.getMinimumSequence(gatingSequences, current) 方法也就無需再多說,它就是為了獲取到多個消費者的最小序號,判斷當前 ringBuffer 中的剩餘可用序號是否大於消費者最小序號,是的話,則不能投遞,需要阻塞當前執行緒(LockSupport.parkNanos(1))。

當消費者消費速度大於生產者生產者速度,生產者還未來得及往佇列寫入,或者生產者生產速度大於消費者消費速度,此時怎麼辦呢?而且上面也多次提到沒有滿足條件的消費事件時,消費者會等待,接下來說一下消費者的等待策略。

個人常用的策略:

  • BlockingWaitStrategy 使用了鎖,低效的策略。

  • SleepingWaitStrategy 對生產者執行緒的影響最小,適合用於非同步日誌類似的場景。(不加鎖空等)

  • YieldingWaitStrategy 效能最好,適合用於低延遲的系統,在要求極高效能且之間處理線數小於 cpu 邏輯核心數的場景中,推薦使用。

```java @Override public long waitFor( final long sequence, Sequence cursor, final Sequence dependentSequence, final SequenceBarrier barrier) throws AlertException, InterruptedException{ long availableSequence。 int counter = SPIN_TRIES。//100 while ((availableSequence = dependentSequence.get()) < sequence){ counter = applyWaitMethod(barrier, counter)。 } return availableSequence。 } private int applyWaitMethod(final SequenceBarrier barrier, int counter) throws AlertException { barrier.checkAlert()。

    if (0 == counter)
    {
        Thread.yield()。
    }
    else
    {
        --counter。
    }
    return counter。
}

```

Java 8 Contended註解

  • 在Java 8中,可以採用@Contended在類級別上的註釋,來進行快取行填充。這樣,可以解決多執行緒情況下的偽共享衝突問題。

  • Contended可以用於類級別的修飾,同時也可以用於欄位級別的修飾,當應用於欄位級別時,被註釋的欄位將和其他欄位隔離開來,會被載入在獨立的快取行上。在欄位級別上,@Contended還支援一個“contention group”屬性(Class-Level不支援),同一group的欄位們在記憶體上將是連續(64位元組範圍內),但和其他他欄位隔離開來。

@Contended註釋的行為如下所示:

在類上應用Contended:

java @Contended public static class ContendedTest2 { private Object plainField1; private Object plainField2; private Object plainField3; private Object plainField4; }

將使整個欄位塊的兩端都被填充:(以下是使用 –XX:+PrintFieldLayout的輸出)

TestContended$ContendedTest2: field layout Entire class is marked contended @140 --- instance fields start --- @140 "plainField1" Ljava.lang.Object; @144 "plainField2" Ljava.lang.Object; @148 "plainField3" Ljava.lang.Object; @152 "plainField4" Ljava.lang.Object; @288 --- instance fields end --- @288 --- instance ends ---

注意,我們使用了128 bytes的填充 – 2倍於大多數硬體快取行的大小(cache line一般為64 bytes) – 來避免相鄰扇區預取導致的偽共享衝突。

在欄位上應用Contended:

java public static class ContendedTest1 { @Contended private Object contendedField1; private Object plainField1; private Object plainField2; private Object plainField3; private Object plainField4; }

將導致該欄位從連續的欄位塊中分離開來並高效的新增填充:

TestContended$ContendedTest1: field layout @ 12 --- instance fields start --- @ 12 "plainField1" Ljava.lang.Object; @ 16 "plainField2" Ljava.lang.Object; @ 20 "plainField3" Ljava.lang.Object; @ 24 "plainField4" Ljava.lang.Object; @156 "contendedField1" Ljava.lang.Object; (contended, group = 0) @288 --- instance fields end --- @288 --- instance ends ---

註解多個欄位使他們分別被填充:

java public static class ContendedTest4 { @Contended private Object contendedField1; @Contended private Object contendedField2; private Object plainField3; private Object plainField4; }

被註解的2個欄位都被獨立地填充:

TestContended$ContendedTest4: field layout @ 12 --- instance fields start --- @ 12 "plainField3" Ljava.lang.Object; @ 16 "plainField4" Ljava.lang.Object; @148 "contendedField1" Ljava.lang.Object; (contended, group = 0) @280 "contendedField2" Ljava.lang.Object; (contended, group = 0) @416 --- instance fields end --- @416 --- instance ends ---

在有些cases中,你會想對欄位進行分組,同一組的欄位會和其他欄位有訪問衝突,但是和同一組的沒有。例如,(同一個執行緒的)程式碼同時更新2個欄位是很常見的情況。

```java public static class ContendedTest5 { @Contended("updater1") private Object contendedField1;

    @Contended("updater1")
    private Object contendedField2;

    @Contended("updater2")
    private Object contendedField3;

    private Object plainField5;
    private Object plainField6;
}

```

記憶體佈局是: TestContended$ContendedTest5: field layout @ 12 --- instance fields start --- @ 12 "plainField5" Ljava.lang.Object; @ 16 "plainField6" Ljava.lang.Object; @148 "contendedField1" Ljava.lang.Object; (contended, group = 12) @152 "contendedField2" Ljava.lang.Object; (contended, group = 12) @284 "contendedField3" Ljava.lang.Object; (contended, group = 15) @416 --- instance fields end --- @416 --- instance ends ---

@Contended在欄位級別,並且帶分組的情況下,是否能解決偽快取問題。

java import sun.misc.Contended; public class VolatileLong { @Contended("group0") public volatile long value1 = 0L; @Contended("group0") public volatile long value2 = 0L; @Contended("group1") public volatile long value3 = 0L; @Contended("group1") public volatile long value4 = 0L; }

用2個執行緒來修改欄位

  • 測試1:執行緒0修改value1和value2;執行緒1修改value3和value4;他們都在同一組中。

  • 測試2:執行緒0修改value1和value3;執行緒1修改value2和value4;他們在不同組中。

測試1

```java public final class FalseSharing implements Runnable { public final static long ITERATIONS = 500L * 1000L * 1000L; private static Volatile Long volatileLong; private String groupId; public FalseSharing(String groupId) { this.groupId = groupId; } public static void main(final String[] args) throws Exception { // Thread.sleep(10000); System.out.println("starting...."); volatileLong = new VolatileLong(); final long start = System.nanoTime(); runTest(); System.out.println("duration = " + (System.nanoTime() - start)); }

private static void runTest() throws InterruptedException {
    Thread t0 = new Thread(new FalseSharing("t0"));
    Thread t1 = new Thread(new FalseSharing("t1"));
    t0.start();
    t1.start();
    t0.join();
    t1.join();
}
public void run() {
    long i = ITERATIONS + 1;
    if (groupId.equals("t0")) {
        while (0 != --i) {
            volatileLong.value1 = i;
            volatileLong.value2 = i;
        }
    } else if (groupId.equals("t1")) {
        while (0 != --i) {
            volatileLong.value3 = i;
            volatileLong.value4 = i;
        }
    }
}

}

public void run() { long i = ITERATIONS + 1; if (groupId.equals("t0")) { while (0 != --i) { volatileLong.value1 = i; volatileLong.value3 = i; } } else if (groupId.equals("t1")) { while (0 != --i) { volatileLong.value2 = i; volatileLong.value4 = i; } } } ```

「其他文章」