【優化技術專題】「執行緒間的高效能訊息框架」終極關注Disruptor的核心原始碼和Java8的@Contended偽共享指南
theme: smartblue
小知識,大挑戰!本文正在參與“程式設計師必備小知識”創作活動。
Disruptor原理分析
Disruptor關聯好任務處理事件後,就呼叫了disruptor.start() 方法,可以看出在呼叫了 start() 方法後,消費者執行緒就已經開啟。
啟動Disruptor
start() ->開啟 Disruptor,執行事件處理器。
java
public RingBuffer<T> start(){
checkOnlyStartedOnce();
//在前面 handleEventsWith() 方法裡新增的 handler 物件會加入到 consumerRepository 裡,這裡遍歷 consumerRepository 開啟消費者執行緒
for (final ConsumerInfo consumerInfo : consumerRepository){
//從執行緒池中獲取一個執行緒來開啟消費事件處理器。(消費者開啟監聽,一旦有生產者投遞,即可消費)
//這裡開啟的執行緒物件為BatchEventProcessor的例項
consumerInfo.start(executor)。
}
return ringBuffer。
}
關聯事件
handleEventsWith() -> createEventProcessors()呼叫的核心方法,作用是建立事件處理器。
java
@SafeVarargs
public final EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers){
return createEventProcessors(new Sequence[0], handlers);
}
儲存事件
將EventHandler物件繫結儲存到consumerRepository內部,並且交由BatchEventProcessor處理器進行代理執行。
java
EventHandlerGroup<T> createEventProcessors(
final Sequence[] barrierSequences,
final EventHandler<? super T>[] eventHandlers){
...
final Sequence[] processorSequences = new Sequence[eventHandlers.length];
//建立 sequence 序號柵欄
final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences)。
for (int i = 0, eventHandlersLength = eventHandlers.length。i < eventHandlersLength。i++){
final EventHandler<? super T> eventHandler = eventHandlers[i];
final BatchEventProcessor<T> batchEventProcessor = new BatchEventProcessor<>(ringBuffer, barrier, eventHandler)。
...
//這裡將消費者加入到 consumerRepository 中---ConsumerRepository
consumerRepository.add(batchEventProcessor, eventHandler, barrier)。
processorSequences[i] = batchEventProcessor.getSequence()。
}
...
}
-
handleEventsWith() 方法中,可以看到構建了一個 BatchEventProcessor(繼承了 Runnable 介面)物件,start()方法啟動的同樣也是這個物件的例項。
-
這個物件繼承自 EventProcessor ,EventProcessor 是 Disruptor 裡非常核心的一個介面,它的實現類的作用是輪詢接收RingBuffer提供的事件,並在沒有可處理事件是實現等待策略。
-
這個介面的實現類必須要關聯一個執行緒去執行,通常我們不需要自己去實現它。
BatchEventProcessor類
BatchEventProcessor:主要事件迴圈,處理 Disruptor 中的 event,擁有消費者的 Sequence。
核心私有成員變數
-
Sequence :維護當前消費者消費的 ID。
-
SequenceBarrier :序號屏障,協調消費者的消費 ID,主要作用是獲取消費者的可用序號,並提供等待策略的執行。
-
EventHandler<? super T> :消費者的消費邏輯(我們實現的業務邏輯)。
-
DataProvider :獲取消費物件。RingBuffer 實現了此介面,主要是提供業務物件。
核心方法
- processEvents():由於 BatchEventProcessor 繼承自 Runnable 介面,所以在前面啟動它後,run() 方法會執行,而 run() 方法內部則會呼叫此方法。
java
private void processEvents()
{
T event = null。
獲取當前消費者維護的序號中並+1,即下一個消費序號
long nextSequence = sequence.get() + 1L。
while (true) {
try {
//獲取可執行的最大的任務 ID,如果沒有,waitFor() 方法內會進行等待
final long availableSequence = sequenceBarrier.waitFor(nextSequence)。
if (batchStartAware != null && availableSequence >= nextSequence) {
batchStartAware.onBatchStart(availableSequence - nextSequence + 1)。
}
//不斷獲取對應位置的任務進行消費 直到上面查詢到的 availableSequence 消費完
while (nextSequence <= availableSequence) {
event = dataProvider.get(nextSequence)。
eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence)。
nextSequence++。
}
sequence.set(availableSequence)。
}
...
}
}
-
消費者事件處理器的核心程式碼,sequenceBarrier.waitFor(nextSequence) 方法內部,會比較當前消費者序號與可用序號的大小:
- 當可用序號(availableSequence)大於當前消費者序號(nextSequence),再獲取到當前可用的最大的事件序號 ID(waitFot()方法內部呼叫 sequencer.getHighestPublishedSequence(sequence, availableSequence)),進行迴圈消費。
- 可用序號是維護在 ProcessingSequenceBarrier 裡的,ProcessingSequenceBarrier 是通過 ringBuffer.newBarrier() 創建出來的。
由圖可以看出,在獲得可用序號時,SequenceBarrier 在 EventProcessor 和 RingBuffer中充當協調的角色。
多消費事件和單消費事件在dependentSequence 上的處理有一些不同,可以看下 ProcessingSequenceBarrier 的 dependentSequence 的賦值以及 get() 方法 (Util.getMinimumSequence(sequences))。
啟動過程分析之生產者
首先呼叫了 ringBuffer.next() 方法,獲取可用序號,再獲取到該序號下事先通過 Eventfactory 建立好的空事件物件,在我們對空事件物件進行賦值後,再呼叫 publish 方法將事件釋出,則消費者就可以獲取進行消費了。
生產者這裡的核心程式碼如下,這裡我擷取的是多生產者模式下的程式碼:
java
public long next(int n){
if (n < 1 || n > bufferSize) {
throw new IllegalArgumentException("n must be > 0 and < bufferSize")。
}
long current。
long next。
do{
//cursor 為生產者維護的 sequence 序列,獲取到當前可投遞的的下標,即當前投遞到該位置
current = cursor.get()。
//再+n獲取下一個下標,即下一次投遞的位置。
next = current + n。
long wrapPoint = next - bufferSize。
//目的:也是實現快讀的讀寫。gatingSequenceCache獨佔快取行
long cachedGatingSequence = gatingSequenceCache.get()。
if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current){
//獲取消費者最小序號
long gatingSequence = Util.getMinimumSequence(gatingSequences, current)。
if (wrapPoint > gatingSequence) {
//如果不符合,則阻塞執行緒 1ns(park()不會有死鎖的問題)
LockSupport.parkNanos(1)。
// TODO, should we spin based on the wait strategy?
continue。
}
gatingSequenceCache.set(gatingSequence)。
}
//多個生產者時要保證執行緒安全(這裡更新的 cursor 同時也是等待策略裡的 waitFor() 方法的 cursor 引數,因此這裡更新成功後,則等待策略會通過,表示有新的任務進來,就會消費)
else if (cursor.compareAndSet(current, next)){
break。
}
}while (true);
return next。
}
cursor 物件和 Util.getMinimumSequence(gatingSequences, current) 方法,cursor 物件是生產者維護的一個生產者序號,標示當前生產者已經生產到哪一個位置以及下一個位置,它是 Sequence 類的一個例項化物件。
-
從圖裡可以看出,Sequence 繼承以及間接繼承了 RhsPadding 和 LhsPadding 類,而這倆個類都各定義了 7 個 long 型別的成員變數。
-
而 Sequence 的 get() 方法返回的也是一個 long 型別的值 value。這是上一篇文章介紹的充快取行,消除偽共享。
-
在 64 位的計算機中,單個快取行一般佔 64 個位元組,當 cpu 從換存裡取資料時,會將該相關資料的其它資料取出來填滿一個快取行,這時如果其它資料更新,則快取行快取的該資料也會失效,當下次需要使用該資料時又需要重新從記憶體中提取資料。
-
ArrayBlockingQueue 獲取資料時,很容易碰到偽共享導致快取行失效,而 Disruptor這裡當在 value 的左右各填充 7 個 long 型別的資料時,每次取都能確保該資料獨佔快取行,也不會有其他的資料更新導致該資料失效。避免了偽共享的問題( jdk 的併發包下也有一些消除偽共享的設計)。
RingBuffer:它是一個首尾相接的環狀的容器,用來在多執行緒中傳遞資料。第一張圖裡面建立 Disruptor 的多個引數其實都是用來建立 RingBuffer 的,比如生產者型別(單 or 多)、例項化工廠、容器長度、等待策略等。
簡單分析,多個生產者同時向 ringbuffer 投遞資料,假設此時倆個生產者將 ringbuffer 已經填滿,因為 sequence 的序號是自增+1(若不滿足獲取條件則迴圈掛起當前執行緒),所以生產的時候能保證執行緒安全,只需要一個 sequence 即可。
當多消費者來消費的時候,因為消費速度不同,例如消費者 1 來消費 0、1,消費者 2 消費 2、4,消費者 3 消費 3。
當消費者消費完 0 後,消費者 2 消費完 2 後,消費者 3 消費完 3 後,生產者再往佇列投遞資料時,其他位置還未被消費,會投遞到第 0 個位置, 此時再想投遞資料時,雖然消費 2 的第二個位置空缺、消費者 3 的第三個位置空缺,消費者還在消費 1 時,無法繼續投遞。因為是通過比較消費者自身維護的 sequence 的最小的序號,來進行比較。
Util.getMinimumSequence(gatingSequences, current) 方法也就無需再多說,它就是為了獲取到多個消費者的最小序號,判斷當前 ringBuffer 中的剩餘可用序號是否大於消費者最小序號,是的話,則不能投遞,需要阻塞當前執行緒(LockSupport.parkNanos(1))。
當消費者消費速度大於生產者生產者速度,生產者還未來得及往佇列寫入,或者生產者生產速度大於消費者消費速度,此時怎麼辦呢?而且上面也多次提到沒有滿足條件的消費事件時,消費者會等待,接下來說一下消費者的等待策略。
個人常用的策略:
-
BlockingWaitStrategy 使用了鎖,低效的策略。
-
SleepingWaitStrategy 對生產者執行緒的影響最小,適合用於非同步日誌類似的場景。(不加鎖空等)
-
YieldingWaitStrategy 效能最好,適合用於低延遲的系統,在要求極高效能且之間處理線數小於 cpu 邏輯核心數的場景中,推薦使用。
```java @Override public long waitFor( final long sequence, Sequence cursor, final Sequence dependentSequence, final SequenceBarrier barrier) throws AlertException, InterruptedException{ long availableSequence。 int counter = SPIN_TRIES。//100 while ((availableSequence = dependentSequence.get()) < sequence){ counter = applyWaitMethod(barrier, counter)。 } return availableSequence。 } private int applyWaitMethod(final SequenceBarrier barrier, int counter) throws AlertException { barrier.checkAlert()。
if (0 == counter)
{
Thread.yield()。
}
else
{
--counter。
}
return counter。
}
```
Java 8 Contended註解
-
在Java 8中,可以採用@Contended在類級別上的註釋,來進行快取行填充。這樣,可以解決多執行緒情況下的偽共享衝突問題。
-
Contended可以用於類級別的修飾,同時也可以用於欄位級別的修飾,當應用於欄位級別時,被註釋的欄位將和其他欄位隔離開來,會被載入在獨立的快取行上。在欄位級別上,@Contended還支援一個“contention group”屬性(Class-Level不支援),同一group的欄位們在記憶體上將是連續(64位元組範圍內),但和其他他欄位隔離開來。
@Contended註釋的行為如下所示:
在類上應用Contended:
java
@Contended
public static class ContendedTest2 {
private Object plainField1;
private Object plainField2;
private Object plainField3;
private Object plainField4;
}
將使整個欄位塊的兩端都被填充:(以下是使用 –XX:+PrintFieldLayout的輸出)
TestContended$ContendedTest2: field layout
Entire class is marked contended
@140 --- instance fields start ---
@140 "plainField1" Ljava.lang.Object;
@144 "plainField2" Ljava.lang.Object;
@148 "plainField3" Ljava.lang.Object;
@152 "plainField4" Ljava.lang.Object;
@288 --- instance fields end ---
@288 --- instance ends ---
注意,我們使用了128 bytes的填充 – 2倍於大多數硬體快取行的大小(cache line一般為64 bytes) – 來避免相鄰扇區預取導致的偽共享衝突。
在欄位上應用Contended:
java
public static class ContendedTest1 {
@Contended
private Object contendedField1;
private Object plainField1;
private Object plainField2;
private Object plainField3;
private Object plainField4;
}
將導致該欄位從連續的欄位塊中分離開來並高效的新增填充:
TestContended$ContendedTest1: field layout
@ 12 --- instance fields start ---
@ 12 "plainField1" Ljava.lang.Object;
@ 16 "plainField2" Ljava.lang.Object;
@ 20 "plainField3" Ljava.lang.Object;
@ 24 "plainField4" Ljava.lang.Object;
@156 "contendedField1" Ljava.lang.Object; (contended, group = 0)
@288 --- instance fields end ---
@288 --- instance ends ---
註解多個欄位使他們分別被填充:
java
public static class ContendedTest4 {
@Contended
private Object contendedField1;
@Contended
private Object contendedField2;
private Object plainField3;
private Object plainField4;
}
被註解的2個欄位都被獨立地填充:
TestContended$ContendedTest4: field layout
@ 12 --- instance fields start ---
@ 12 "plainField3" Ljava.lang.Object;
@ 16 "plainField4" Ljava.lang.Object;
@148 "contendedField1" Ljava.lang.Object; (contended, group = 0)
@280 "contendedField2" Ljava.lang.Object; (contended, group = 0)
@416 --- instance fields end ---
@416 --- instance ends ---
在有些cases中,你會想對欄位進行分組,同一組的欄位會和其他欄位有訪問衝突,但是和同一組的沒有。例如,(同一個執行緒的)程式碼同時更新2個欄位是很常見的情況。
```java public static class ContendedTest5 { @Contended("updater1") private Object contendedField1;
@Contended("updater1")
private Object contendedField2;
@Contended("updater2")
private Object contendedField3;
private Object plainField5;
private Object plainField6;
}
```
記憶體佈局是:
TestContended$ContendedTest5: field layout
@ 12 --- instance fields start ---
@ 12 "plainField5" Ljava.lang.Object;
@ 16 "plainField6" Ljava.lang.Object;
@148 "contendedField1" Ljava.lang.Object; (contended, group = 12)
@152 "contendedField2" Ljava.lang.Object; (contended, group = 12)
@284 "contendedField3" Ljava.lang.Object; (contended, group = 15)
@416 --- instance fields end ---
@416 --- instance ends ---
@Contended在欄位級別,並且帶分組的情況下,是否能解決偽快取問題。
java
import sun.misc.Contended;
public class VolatileLong {
@Contended("group0")
public volatile long value1 = 0L;
@Contended("group0")
public volatile long value2 = 0L;
@Contended("group1")
public volatile long value3 = 0L;
@Contended("group1")
public volatile long value4 = 0L;
}
用2個執行緒來修改欄位
-
測試1:執行緒0修改value1和value2;執行緒1修改value3和value4;他們都在同一組中。
-
測試2:執行緒0修改value1和value3;執行緒1修改value2和value4;他們在不同組中。
測試1
```java public final class FalseSharing implements Runnable { public final static long ITERATIONS = 500L * 1000L * 1000L; private static Volatile Long volatileLong; private String groupId; public FalseSharing(String groupId) { this.groupId = groupId; } public static void main(final String[] args) throws Exception { // Thread.sleep(10000); System.out.println("starting...."); volatileLong = new VolatileLong(); final long start = System.nanoTime(); runTest(); System.out.println("duration = " + (System.nanoTime() - start)); }
private static void runTest() throws InterruptedException {
Thread t0 = new Thread(new FalseSharing("t0"));
Thread t1 = new Thread(new FalseSharing("t1"));
t0.start();
t1.start();
t0.join();
t1.join();
}
public void run() {
long i = ITERATIONS + 1;
if (groupId.equals("t0")) {
while (0 != --i) {
volatileLong.value1 = i;
volatileLong.value2 = i;
}
} else if (groupId.equals("t1")) {
while (0 != --i) {
volatileLong.value3 = i;
volatileLong.value4 = i;
}
}
}
}
public void run() { long i = ITERATIONS + 1; if (groupId.equals("t0")) { while (0 != --i) { volatileLong.value1 = i; volatileLong.value3 = i; } } else if (groupId.equals("t1")) { while (0 != --i) { volatileLong.value2 = i; volatileLong.value4 = i; } } } ```
- 完整秒殺架構的設計到技術關鍵點的“情報資訊”
- 獨一無二的「MySQL調優金字塔」相信也許你擁有了它,你就很可能擁有了全世界。
- 【MySQL技術之旅】(5)該換換你的資料庫版本了,讓我們一同迎接8.0的到來哦!(初探篇)
- ☕【Java技術指南】「Java8程式設計專題」讓你真正會用對Java新版日期時間API程式設計指南
- 【Fegin技術專題】「原生態」開啟Fegin之RPC技術的開端,你會使用原生態的Fegin嗎?(高階用法)
- 【優化技術專題】「執行緒間的高效能訊息框架」終極關注Disruptor的核心原始碼和Java8的@Contended偽共享指南
- 【優化技術專題】「執行緒間的高效能訊息框架」再次細節領略Disruptor的底層原理和優勢分析
- 【Zookeeper核心原理】Paxos協議的原理和實際執行中的應用流程分析
- ☕【Java技術指南】「JPA程式設計專題」讓你不再對JPA技術中的“持久化型註解”感到陌生了!
- Java技術開發專題系列之【Guava RateLimiter】針對於限流器的入門到精通(含原始碼分析介紹)
- ☕【Java技術指南】「JPA程式設計專題」讓你不再對JPA技術中的“持久化型註解”感到陌生了!
- 【Eureka技術指南】「SpringCloud」從原始碼層面讓你認識Eureka工作流程和運作機制(下)
- MySQL技術專題(6)這也許是你的知識盲區-MySQL主從架構以及[半同步機制]
- 優化技術專題-執行緒間的高效能訊息框架-深入淺出Disruptor的使用和原理
- ☕【Java技術指南】「併發程式設計專題」Fork/Join框架基本使用和原理探究(原理篇)
- ☕【Java技術指南】「併發程式設計專題」Guava RateLimiter針對於限流器的入門到精通(含原始碼分析介紹)
- 【優化技術專題】「溫故而知新」基於Quartz系列的任務排程框架的動態化任務實現分析
- ☕【Java技術指南】「併發程式設計專題」Guava RateLimiter針對於限流器的入門到精通(含實戰和原理分析)
- 【MySQL技術之旅】(4)這也許是你的知識盲區-[MySQL主從架構]之半同步機制
- ☕【Java技術指南】「併發程式設計專題」CompletionService框架基本使用和原理探究(基礎篇)