【優化技術專題】「執行緒間的高效能訊息框架」再次細節領略Disruptor的底層原理和優勢分析

語言: CN / TW / HK

theme: smartblue

小知識,大挑戰!本文正在參與“程式設計師必備小知識”創作活動。

Disruptor原理

首先Disruptor是為了解決高併發快取的佇列,為執行緒間通訊提供高效的效能,它是如何做到無阻塞、多生產、多消費的?

上圖簡單的畫了一下構建Disruptor的各個引數以及 ringBuffer 的構造,下面簡單的說一下。

生產者需要元件

生產者,產生訊息,並將訊息釋出到RingBuffer記憶體佇列中。

  • Event模型:從生產者傳遞給消費者的資料單位,完全由使用者定義其型別。

java @Data public class SampleEvent { private Long id。 private String sampleDataStr。 }

  • EventFactory:建立事件(任務)的工廠類。(這裡任務會建立好,儲存在記憶體中,可以看做是一個空任務)。

java public class SampleEventFactory implements EventFactory<SampleEvent> { @Override public SampleEvent newInstance() { // 例項化資料(建好空資料,等待後面初始化) return new SampleEvent()。 } }

  • RingBuffer:環形緩衝區通常被認為是Disruptor的主要實現,當前版本即3.0版本之後,RingBuffer僅負責儲存和更新通過Disruptor的資料(Event)。

    • ringBufferSize:容器的長度。( Disruptor 的核心容器是 ringBuffer,環轉陣列,有限長度)。
  • ProductType:生產者型別:單生產者、多生產者。

    • Sequencer:Sequencer是Disruptor的核心API。該介面的2個實現類(SingleProducer,MultiProducer)實現了所有併發演算法,用於在生產者和消費者之間快速,正確地傳遞資料。
  • WaitStrategy:等待策略。(當佇列裡的資料都被消費完之後,消費者和生產者之間的等待策略),等待策略確定消費者如何等待生產者將事件放入Disruptor。

  • RingBuffer:存放資料的容器。

java @Data @AllArgsConstructor public class SampleEventProducer { private RingBuffer<OrderEvent> ringBuffer。 public void sendData(long id) { //獲取下一個可用序號 long sequence = ringBuffer.next()。 try { //獲取一個空物件(沒有填充值) SampleEvent sampleEent = ringBuffer.get(sequence)。 }finally { //提交 ringBuffer.publish(sequence)。 } } }

消費者需要元件

  • Executor:消費者執行緒池,執行任務的執行緒。(每一個消費者都需要從執行緒池裡獲得執行緒去消費任務)。

  • EventProcessor:用於處理來自Disruptor的事件的主事件迴圈,並具有消費者序列的所有權。有一個名為 BatchEventProcessor表示,它包含事件迴圈的有效實現,並將回撥到使用的提供的EventHandler介面實現。

  • EventHandler:事件處理器,由使用者實現並代表Disruptor的使用者的介面,使用者客戶端實現訊息的處理機制,由客戶端具體實現。 ```java public class SampleEventHandler implements EventHandler {

    /* * 事件驅動監聽--消費者消費的主體 / @Override public void onEvent(SampleEvent event, long sequence, boolean endOfBatch) throws Exception { System.out.println(event.getSampleDataStr() + " " +Thread.currentThread().getName())。 } } ```

演算法核心Sequence序號

  • Sequence:Disruptor使用Sequences作為識別特定元件所在位置的方法。

    • 每個消費者(EventProcessor)都像Disruptor本身一樣維護一個Sequence。大多數併發程式碼依賴於這些Sequence值的變化或者叫移動,因此Sequence支援AtomicLong的許多當前功能。

    • 事實上,唯一真正的區別是Sequence包含額外的功能,以防止序列和其他值之間的錯誤共享。

  • Sequence Barrier:序列屏障由Sequencer產生,包含對Sequencer中主要釋出的sequence和任何依賴性消費者的序列的引用。它包含確定是否有任何可供消費者處理的事件的邏輯。

Disruptor的優點:

  1. 多執行緒之間沒有競爭即沒有鎖。

  2. 所有訪問者都記錄自己的序號的實現方式,允許多個生產者與多個消費者共享相同的資料結構。

  3. 每個物件中都能跟蹤序列號(ring buffer, claim strategy,生產者和消費者),加上神奇的快取行填充,就意味著沒有偽共享和非預期的競爭。

下面再簡單介紹下RingBuffer核心實現,來看看佇列的實現細節。

其為環形佇列,有點像一致性Hash演算法中的閉環,但完全不一樣。

底層的話是一個固定大小的陣列結構,相比於佇列來說,其只有一個下標指標cursor,如果槽的個數是2的N次方更有利於基於二進位制的計算機進行計算。如果看過HashMap原始碼應該知道,HashMap定位元素槽時使用了一種巧妙的方式,hash&(length-1)。

RingBuffer同樣是相同的計算方式,sequence&(length-1),當然你可以進行取模操作。

  • 取模操作在暫存器中的計算,需要多次的迭代加操作進行的,所以相對於計算速度來說,對於計算機進行位運算效率絕對是高於取模操作的,尤其是對於高併發狀況下的計算,能夠節省很多單位cpu開銷

一般實現線性儲存有兩種實現方式:

  • 一種是基於連續記憶體分配的HashTable
  • 一種是基於隨機記憶體分配的迭代指標。

為什麼RingBuffer選用陣列作為儲存結構,而不選用連結串列儲存?

快取或者程式的區域性性原理

  • (Good)陣列記憶體屬是連續分配記憶體的預讀策略,也就是記憶體載入時,會將部分連續記憶體地址預先載入到快取記憶體中,即認為你可能會使用,上面我們分析了作業系統中的cpu操作資料的流程,可以看出這種設計是為了不用反覆從記憶體中載入。

  • (Bad)連結串列的記憶體分配是碎片化的所以其儲存地址不是連續的,導致每次都會cpu都會重新計算下一個連結串列位置的地址,並從記憶體中載入相關的資料,資料量小的情況下並不能看出效能的優劣,但是當資料量大的情況下,這種極小的消耗,會對整體的執行效率產生影響。

因為RingBuffer不會涉及到儲存地址的修改和維護,就因為選用陣列就對效能產生了有利和積極的影響。

偽共享

記憶體以快取記憶體行的形式儲存在快取記憶體系統中。快取記憶體行是2的N次方個連續位元組,其大小通常為32-256,最常見的快取行大小為64位元組

偽共享是一個術語,適用於執行緒在修改共享同一快取行的獨立變數時無意中影響彼此的效能。在快取記憶體行上寫入爭用是實現SMP系統中並行執行執行緒的可伸縮性的最大限制因素。(出自百度定義!)

  • 首先我們知道對於鎖來說是關中斷實現,鎖定bus訊息匯流排實現,而對於共享記憶體,計算機使用的是快取行,共享變數的多個執行緒,共享相同的快取行。

  • 實現執行緒數量的線性可伸縮性,我們必須確保沒有兩個執行緒寫入同一個變數或快取行。而當使用volatile的時候,我們讀取直接共享變數從主記憶體或者叫共享記憶體中讀取變數的值,其本質是使計算機快取行失效。

在CPU核心A執行的執行緒想要更新變數X,而CPU核心B上的執行緒想要更新變數Y。

這兩個熱變數位於同一快取行中。每個執行緒都將競爭快取行的所有權,以便他們可以更新它。如果核心A獲得所有權,那麼MESI/MOSI快取子系統將需要使核心B的相應快取行無效。反之也是一樣,極大地影響效能。如果競爭核心在不同的套接字上並且還必須跨越套接字互連,則快取行問題將進一步加劇。

總結一下:如果多個執行緒操作不同的成員變數, 但是這些變數儲存在同一個快取行,如果有處理器更新了快取行的資料並重新整理到主存之後,根據快取一致性原則,其他處理器將失效該快取行(I狀態)導致快取未命中,需要重新去記憶體中讀取最新資料,這就是偽共享問題。

  • 特別是不同的執行緒操作同一個快取行,需要發出RFO(Request for Owner)訊號鎖定快取行,保證寫操作的原子性,此時其他執行緒不能操作這個快取行,這將對效率有極大的影響。

為了避免避免經常執行寫操作的變數因為在同一個快取行而導致的偽共享問題,常用的解決方式就是快取行填充,或者稱為快取行對齊。

快取行填充的概念

當多個執行緒同時對共享的快取行進行寫操作的時候,因為快取系統自身的快取一致性原則,會引發偽共享問題,解決的常用辦法是將共享變數根據快取行大小進行補充對齊,使其載入到快取時能夠獨享快取行,避免與其他共享變數儲存在同一個快取行。

下面是快取行實現,另外快取行填充有一個前提同時分配的物件往往位於同一位置。

java public long p1, p2, p3, p4, p5, p6, p7; // cache line padding private volatile long cursor = INITIAL_CURSOR_VALUE; public long p8, p9, p10, p11, p12, p13, p14; // cache line padding

如果有不同的消費者往不同的欄位寫入,你需要確保各個欄位間不會出現偽共享。

java /** * 陣列儲存了VolatileLongPadding,其中陣列中一個long型別儲存陣列長度,算上 * 自身long型別value,需要再填充6個long型別,就能將陣列中的物件填充滿一個快取行。 * 注意:這裡使用繼承的方式實現快取行對齊,因為Java編譯器會優化無效的欄位。 */ class CacheLinePadding { // 如果不需要填充,只需要註釋掉這段程式碼即可 public volatile long p1, p2, p3, p4, p5, p6; } class CacheLinePaddingObject extends CacheLinePadding { //實際操作的值 public volatile long value = 0L; }

RingBuffer實現上同樣也是使用了快取行填充,保證了陣列中的資料沒有偽共享的存在,RingBuffer除了一個long型別的cursor索引指標,p1->p7都為快取行填充,一般來講8個Long型別的欄位,正好是64Byte,會填充一個快取行,當然前提是你需要,因為畢竟還有你自己的資料資訊欄位。

「其他文章」