Disruptor測試結果運算1億次,耗時5503ms,吞吐量18171000/s,於是我扒開了Disruptor高性能的外衣

語言: CN / TW / HK

hello 小夥伴兒們,昨天搞了一篇Disruptor的入門文章,看大家反饋不錯,在大家一再催更下,昨天熬夜至下班,終於續寫了第二篇Disruptor的高性能原理剖析的文章,為大家揭開Disruptor高性能的神祕外衣。 如果小夥伴,錯過了入門Disruptor的入門篇的文章,在這裏自行查看:

如此狂妄,自稱高性能隊列的 Disruptor 有啥來頭?

能對比測試

為了直觀地感受 Disruptor **有多快,設計了一個性能對比測試:Producer 發佈 1 億次事件,從發佈第一個事件開始計時,捕捉 Consumer 處理完所有事件的耗時。

測試用例在 Producer 如何將事件通知到 Consumer 的實現方式上,設計了兩種不同的實現:

  1. Producer 的事件發佈和 Consumer 的事件處理在不同的線程,通過 ArrayBlockingQueue 傳遞給 Consumer 進行處理;
  2. Producer 的事件發佈和 Consumer 的事件處理在不同的線程,通過 Disruptor 傳遞給 Consumer 進行處理;

3.1 代碼實現

3.1.1 計算代碼

進行CAS累加運算

public class CommonUtils {
    private static AtomicLong count = new AtomicLong(0);

    public static void calculation() {
        count.incrementAndGet();
    }

    public static long get() {
        return count.get();
    }
}
3.1.2 抽象類

進行一億次 CAS運算計算耗時

/**
 * 抽象類
 *
 * @param <T>
 */
public abstract class AbstractTask<T> {
    private static final Logger logger = LoggerFactory.getLogger(AbstractTask.class);
    //線程池
    private static final ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() + 1);
    private static final CountDownLatch countDownLatch = new CountDownLatch(1);
    //一億次測試
    public static long tasksize = 100000000;


    /**
     * 開始調用測試
     */
    public void invok() {
        //計算當前事件
        long currentTime = System.currentTimeMillis();
        //獲取到監聽器
        Runnable monitor = monitor();
        if (null != monitor) {
            executor.execute(monitor);
        }
        //啟動
        start();

        //執行任務發佈
        Runnable runnable = getTask();
        for (long i = 0; i < tasksize; i++) {
            runnable.run();
        }

        //停止任務
        stop();
        //等待任務發佈完成
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        executor.shutdown();
        //獲取處理結果
        T result = getResult();
        //計算耗時
        long duration = System.currentTimeMillis() - currentTime;
        //計算吞吐量
        long throughput = (tasksize / duration) * 1000;
        logger.info("每秒吞吐量:[{}];({}/{})", throughput, result, duration);
    }


    /**
     * 獲取監聽器
     *
     * @return
     */
    public Runnable monitor() {
        return null;
    }

    /**
     * 啟動任務
     */
    public void start() {

    }

    /**
     * 完成任務
     */
    public void complete() {
        countDownLatch.countDown();
    }

    /**
     * 停止任務
     */
    public void stop() {

    }

    /**
     * 獲取需要執行的任務
     *
     * @return
     */
    public abstract Runnable getTask();

    /**
     * 獲取運行結果
     *
     * @return
     */
    public abstract T getResult();
}

3.1.3 Disruptor性能測試代碼
public class DisruptorTest extends AbstractTask<Long> {
    //定義隨機數生成器
    private static final Random r = new Random();
    //定義Disruptor對象
    private Disruptor disruptor = null;
    //定義Disruptor事件發佈對象
    private LongEventProducerWithTranslator translator = null;

    /**
     * 啟動
     */
    @Override
    public void start() {
        //定義事件工廠
        EventFactory<LongEvent> eventFactory = new LongEventFactory();
        // RingBuffer 大小,必須是 2 的 N 次方;
        int ringBufferSize = 1024 * 1024;
        //構建disruptor對象
        disruptor = new Disruptor<LongEvent>(eventFactory,
                ringBufferSize, Executors.defaultThreadFactory(), ProducerType.SINGLE,
                new YieldingWaitStrategy());
        //定義事件處理類
        EventHandler<LongEvent> eventHandler = new LongEventHandler();
        //配置事件處理類
        disruptor.handleEventsWith(eventHandler);
        //啟動disruptor
        disruptor.start();
        //創建事件發佈對象
        translator = new LongEventProducerWithTranslator(disruptor.getRingBuffer());
    }

    /**
     * 停止任務
     */
    @Override
    public void stop() {
        disruptor.shutdown();
        System.out.println("運算結果:" + CommonUtils.get());
        //完成任務
        complete();
    }

    /**
     * 獲取需要執行的任務
     *
     * @return
     */
    @Override
    public Runnable getTask() {
        return () -> {
            publishEvent();
        };
    }

    /**
     * 獲取運行結果
     *
     * @return
     */
    @Override
    public Long getResult() {
        return CommonUtils.get();
    }


    /**
     * 發佈對象
     */
    private void publishEvent() {
        //獲取要通過事件傳遞的業務數據
        Long data = r.nextLong();
        // 發佈事件
        translator.onData(data);
    }


    public static void main(String[] args) {
        DisruptorTest disruptorTest = new DisruptorTest();
        disruptorTest.invok();
    }

}

輸出結果

10:45:22.941 [main] INFO com.heima.task.AbstractTask - 每秒吞吐量:[18171000];(100000000/5503)
ArrayBlockingQueue性能測試代碼
public class ArrayBlockingQueueTest extends AbstractTask {
    private static final Random r = new Random();
    private static final ArrayBlockingQueue<Long> queue = new ArrayBlockingQueue(10000000);


    @Override
    public Runnable monitor() {
        return () -> {
            try {
                for (int i = 0; i < tasksize; i++) {
                    //獲取一個元素
                    queue.take();
                    //執行計算
                    CommonUtils.calculation();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            complete();
        };
    }

    public static void main(String[] args) {
        ArrayBlockingQueueTest test = new ArrayBlockingQueueTest();
        test.invok();
    }

    @Override
    public Runnable getTask() {
        return () -> {
            publish();
        };
    }

    @Override
    public Object getResult() {
        return CommonUtils.get();
    }

    public void publish() {
        Long data = r.nextLong();
        try {
            queue.put(data);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

輸出結果

10:45:46.379 [main] INFO com.heima.task.AbstractTask - 每秒吞吐量:[6192000];(100000000/16148)

3.2 測試對比

測試類 運算次數 耗時(ms) 吞吐量/s
ArrayBlockingQueue 1億次 16148 6192000
Disruptor 1億次 5503 18171000

3.3 Disruptor官方性能測試

Disruptor論文中講述了一個實驗:

  • 這個測試程序調用了一個函數,該函數會對一個64位的計數器循環自增5億次。
  • 機器環境:2.4G 6核
  • 運算: 64位的計數器累加5億次
Method Time (ms)
單線程 300
單線程使用 CAS 5,700
單線程使用鎖 10,000
單線程使用volatile 4,700
多線程使用 CAS 30,000
多線程使用鎖 224,000

4. 高性能原理

  • 引入環形的數組結構:數組元素不會被回收,避免頻繁的GC,
  • 無鎖的設計:採用CAS無鎖方式,保證線程的安全性
  • 屬性填充:通過添加額外的無用信息,避免偽共享問題
  • 元素位置的定位:採用跟一致性哈希一樣的方式,一個索引,進行自增

4.1 偽共享概念

4.1.1 計算機緩存構成

​ 下圖是計算的基本結構。L1、L2、L3分別表示一級緩存、二級緩存、三級緩存,越靠近CPU的緩存,速度越快,容量也越小,所以L1緩存很小但很快,並且緊靠着在使用它的CPU內核;L2大一些,也慢一些,並且仍然只能被一個單獨的CPU核使用;L3更大、更慢,並且被單個插槽上的所有CPU核共享;最後是主存,由全部插槽上的所有CPU核共享。

file

​ 當CPU要讀取一個數據時,首先從一級緩存中查找,如果沒有找到再從二級緩存中查找,如果還是沒有就從三級緩存或內存中查找。一般來説,每級緩存的命中率大概都在80%左右,也就是説全部數據量的80%都可以在一級緩存中找到,只剩下20%的總數據量才需要從二級緩存、三級緩存或內存中讀取,由此可見一級緩存是整個CPU緩存架構中最為重要的部分。

file

下表是一些緩存未命中的消耗數據:

從CPU到 大約需要的CPU週期 大約需要的時間
主存 約60-80ns
QPI總線 約20ns
L3 cache 約40-45cycles 約15ns
L2 cache 約10cycles 約3ns
L1 cache 約3-4cycles 約1ns
寄存器 1cycle

可見CPU讀取主存中的數據會比從L1中讀取慢了近2個數量級。

4.1.2 什麼是緩存行

​ 為了解決計算機系統中主內存與 CPU 之間運行速度差問題,會在 CPU 與主內存之間 添加一級或者多級高速緩衝存儲器( Cache)。這個 Cache 一般是被集成到 CPU 內部的, 所以也叫 CPU Cache,如圖所示是兩級 Cache 結構。

file

​ Cache內部是按行存儲的,其中每一行稱為一個cache line,由很多個 Cache line 組成的,Cache line 是 cache 和 RAM 交換數據的最小單位,cache行的大小一般為2的冪次數字節,通常為 64 Byte。Cache line是Cache與主內存進行數據交換的單位。

file

​ 當 CPU 把內存的數據載入 cache 時,會把臨近的共 64 Byte 的數據一同放入同一個Cache line,因為空間局部性:臨近的數據在將來被訪問的可能性大。

linux 查看緩存行大小

more /sys/devices/system/cpu/cpu1/cache/index0/coherency_line_size
64
4.1.3 什麼是共享

​ CPU緩存是以緩存行(cache line)為單位存儲的。緩存行通常是 64 字節,並且它有效地引用主內存中的一塊地址。一個 Java 的 long 類型是 8 字節,因此在一個緩存行中可以存 8 個 long 類型的變量。所以,如果你訪問一個 long 數組,當數組中的一個值被加載到緩存中,它會額外加載另外 7 個,以致你能非常快地遍歷這個數組。事實上,你可以非常快速的遍歷在連續的內存塊中分配的任意數據結構。而如果你在數據結構中的項在內存中不是彼此相鄰的(如鏈表),你將得不到免費緩存加載所帶來的優勢,並且在這些數據結構中的每一個項都可能會出現緩存未命中。下圖是一個CPU緩存行的示意圖:

file

​ 表面上 X 和 Y 都是被獨立線程操作的,而且兩操作之間也沒有任何關係。只不過它們共享了一個緩存行,但所有競爭衝突都是來源於共享。

4.1.4 什麼是偽共享

​ 當CPU訪問某一個變量時候,首先會去看CPU Cache內是否有該變量,如果有則直接從中獲取,否者就去主內存裏面獲取該變量,然後把該變量所在內存區域的一個Cache行大小的內存拷貝到Cache(cache行是Cache與主內存進行數據交換的單位)。

​ 由於存放到Cache行的的是內存塊而不是單個變量,所以可能會把多個變量存放到了一個cache行。當多個線程同時修改一個緩存行裏面的多個變量時候,由於同時只能有一個線程操作緩存行,所以相比每個變量放到一個緩存行性能會有所下降,這就是偽共享。

file

​ 如上圖變量x,y同時被放到了CPU的一級和二級緩存,當線程1使用CPU1對變量x進行更新時候,首先會修改cpu1的一級緩存變量x所在緩存行,這時候緩存一致性協議會導致cpu2中變量x對應的緩存行失效,那麼線程2寫入變量x的時候就只能去二級緩存去查找,這就破壞了一級緩存,而一級緩存比二級緩存更快。更壞的情況下如果cpu只有一級緩存,那麼會導致頻繁的直接訪問主內存。

​ 我們的緩存都是以緩存行作為一個單位來處理的,所以失效x的緩存的同時,也會把y失效,反之亦然。

4.1.5 為何會出現偽共享

​ 偽共享的產生是因為多個變量被放入了一個緩存行,並且多個線程同時去寫入緩存行中不同變量。那麼為何多個變量會被放入一個緩存行那。其實是因為Cache與內存交換數據的單位就是Cache line,當CPU要訪問的變量沒有在Cache命中時候,根據程序運行的局部性原理會把該變量在內存中大小為Cache行的內存放如緩存行。

long a;
long b;
long c;
long d;

​ 如上代碼,聲明瞭四個long變量,假設cache line的大小為32個字節,那麼當cpu訪問變量a時候發現該變量沒有在cache命中,那麼就會去主內存把變量a以及內存地址附近的b,c,d放入緩存行。也就是地址連續的多個變量才有可能會被放到一個緩存行中,當創建數組時候,數組裏面的多個元素就會被放入到同一個緩存行。那麼單線程下多個變量放入緩存行對性能有影響?其實正常情況下單線程訪問時候由於數組元素被放入到了一個或者多個cache行對代碼執行是有利的,因為數據都在緩存中,代碼執行會更快。

4.1.6 如何解偽共享

​ 解決偽共享最直接的方法就是填充(padding),例如下面的VolatileLong,一個long佔8個字節,Java的對象頭佔用8個字節(32位系統)或者12字節(64位系統,默認開啟對象頭壓縮,不開啟佔16字節)。一個緩存行64字節,那麼我們可以填充6個long(6 * 8 = 48 個字節)。

4.1.6.1 不使用字段填充
public class VolatileData {
    // 佔用 8個字節 +48 + 對象頭 = 64字節

    //需要操作的數據
    volatile long value;

    public VolatileData() {
    }

    public VolatileData(long defValue) {
        value = defValue;
    }

    public long accumulationAdd() {
        //因為單線程操作不需要加鎖
        value++;
        return value;
    }

    public long getValue() {
        return value;
    }
}

內存佈局

file

4.6.1.2 填充字段

因為JDK1.7以後就自動優化代碼會刪除無用的代碼,在JDK1.7以後的版本這些不生效了。

/**
 * 緩存行填充父類
 */
public class DataPadding {
    //填充 5個long類型字段 8*5 = 40 個字節
    private long p1, p2, p3, p4, p5; //jvm 優化 刪除無用代碼
    //需要操作的數據
    volatile long value;
}

內存佈局

file

4.1.6.3 繼承的方式
/**
 * 緩存行填充父類
 */
public class DataPadding {
    //填充 5個long類型字段 8*5 = 40 個字節
    private long p1, p2, p3, p4, p5;
}

繼承緩存填充類

/**
 * 繼承DataPadding
 */
public class VolatileData extends DataPadding {
    // 佔用 8個字節 +48 + 對象頭 = 64字節

    public VolatileData() {
    }

    public VolatileData(long defValue) {
        value = defValue;
    }

    public long accumulationAdd() {
        //因為單線程操作不需要加鎖
        value++;
        return value;
    }

    public long getValue() {
        return value;
    }
}

內存佈局

file

4.1.6.4 Disruptor填充方式
class LhsPadding {
    protected long p1, p2, p3, p4, p5, p6, p7;
}

class Value extends LhsPadding {
    protected volatile long value;
}

class RhsPadding extends Value {
    protected long p9, p10, p11, p12, p13, p14, p15;
}

繼承填充類

public class VolatileData extends RhsPadding {
    // 佔用 8個字節 +48 + 對象頭 = 64字節

    //需要操作的數據
    volatile long value;

    public VolatileData() {
    }

    public VolatileData(long defValue) {
        value = defValue;
    }

    public long accumulationAdd() {
        //因為單線程操作不需要加鎖
        value++;
        return value;
    }

    public long getValue() {
        return value;
    }
}

內存佈局

file

4.1.6.5 @Contended註解
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.FIELD, ElementType.TYPE})
public @interface Contended {
    String value() default "";
}

註解填充類

@Contended
public class VolatileData  {
    // 佔用 8個字節 +48 + 對象頭 = 64字節

    //需要操作的數據
    volatile long value;
    
    public VolatileData() {
    }

    public VolatileData(long defValue) {
        value = defValue;
    }

    public long accumulationAdd() {
        //因為單線程操作不需要加鎖
        value++;
        return value;
    }

    public long getValue() {
        return value;
    }
}

內存佈局

file

注意事項

在Java8中提供了**@sun.misc.Contended來避免偽共享時,在運行時需要設置JVM啟動參數-XX:-RestrictContended**否則可能不生效。

4.1.7 性能對比
4.1.7.1 測試代碼

使用和不使用緩存行填充的對比

/**
 * 緩存行測試
 */
public class CacheLineTest {
    /**
     * 通過緩存行填充的變量
     */
    private VolatileData volatileData1 = new VolatileData(0);
    private VolatileData volatileData2 = new VolatileData(0);
    private VolatileData volatileData3 = new VolatileData(0);
    private VolatileData volatileData4 = new VolatileData(0);
    private VolatileData volatileData5 = new VolatileData(0);
    private VolatileData volatileData6 = new VolatileData(0);
    private VolatileData volatileData7 = new VolatileData(0);

    /**
     * 循環次數
     */
    private final long size = 100000000;

    /**
     * 進行累加操作
     */
    public void accumulationX(VolatileData volatileData) {
        //計算耗時
        long currentTime = System.currentTimeMillis();
        long value = 0;
        //循環累加
        for (int i = 0; i < size; i++) {
            //使用緩存行填充的方式
            value = volatileData.accumulationAdd();


        }
        //打印
        System.out.println(value);
        //打印耗時
        System.out.println("耗時:" + (System.currentTimeMillis() - currentTime));
    }


    public static void main(String[] args) {
        //創建對象
        CacheLineTest cacheRowTest = new CacheLineTest();
        //創建線程池
        ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
        //啟動三個線程個調用他們各自的方法
        executorService.execute(() -> cacheRowTest.accumulationX(cacheRowTest.volatileData1));
        executorService.execute(() -> cacheRowTest.accumulationX(cacheRowTest.volatileData2));
        executorService.execute(() -> cacheRowTest.accumulationX(cacheRowTest.volatileData3));
        executorService.execute(() -> cacheRowTest.accumulationX(cacheRowTest.volatileData4));
        executorService.execute(() -> cacheRowTest.accumulationX(cacheRowTest.volatileData5));
        executorService.execute(() -> cacheRowTest.accumulationX(cacheRowTest.volatileData6));
        executorService.execute(() -> cacheRowTest.accumulationX(cacheRowTest.volatileData7));
        executorService.shutdown();
    }
}
4.1.7.2 測試數據

同樣的結構他們之間差了 將近 50倍的速度差距

對象 NoPadding(MS) DataPadding(MS) RhsPadding(MS) Contended(MS)
volatileData1 3751 1323 1307 1291
volatileData2 3790 1383 1311 1314
volatileData3 7551 1400 1311 1333
volatileData4 7669 1407 1317 1356
volatileData5 8577 1447 1327 1361
volatileData6 8705 1479 1339 1375
volatileData6 8741 1512 1368 1389
4.1.8 Disruptor解決偽共享

​ 在Disruptor中有一個重要的類Sequence,該類包裝了一個volatile修飾的long類型數據value,無論是Disruptor中的基於數組實現的緩衝區RingBuffer,還是生產者,消費者,都有各自獨立的Sequence,RingBuffer緩衝區中,Sequence標示着寫入進度,例如每次生產者要寫入數據進緩衝區時,都要調用RingBuffer.next()來獲得下一個可使用的相對位置。對於生產者和消費者來説,Sequence標示着它們的事件序號,來看看Sequence類的源碼:

class LhsPadding {
	protected long p1, p2, p3, p4, p5, p6, p7;
}

class Value extends LhsPadding {
	protected volatile long value;
}

class RhsPadding extends Value {
	protected long p9, p10, p11, p12, p13, p14, p15;
}

public class Sequence extends RhsPadding {
	static final long INITIAL_VALUE = -1L;
	private static final Unsafe UNSAFE;
	private static final long VALUE_OFFSET;
	static {
		UNSAFE = Util.getUnsafe();
		try {
			VALUE_OFFSET = UNSAFE.objectFieldOffset(Value.class.getDeclaredField("value"));
		} catch(final Exception e) {
			 throw new RuntimeException(e);
		}
	}
	


    public Sequence() {
        this(INITIAL_VALUE);
    }

    public Sequence(final long initialValue) {
        UNSAFE.putOrderedLong(this, VALUE_OFFSET, initialValue);
    }

}

​ 從第1到11行可以看到,真正使用到的變量value,它的前後空間都由8個long型的變量填補了,對於一個大小為64字節的緩存行,它剛好被填補滿(一個long型變量value,8個字節加上前/後個7long型變量填補,7*8=56,56+8=64字節)。這樣做每次把變量value讀進高速緩存中時,都能把緩存行填充滿(對於大小為64個字節的緩存行來説,如果緩存行大小大於64個字節,那麼還是會出現偽共享問題),保證每次處理數據時都不會與其他變量發生衝突。

4.2 無鎖的設計

4.2.1 鎖機制存在的問題
  • 在多線程競爭下,加鎖、釋放鎖會導致比較多的上下文切換和調度延時,引起性能問題,而且在上下文切換的時候,cpu之前緩存的指令和數據都將失效,對性能有很大的損失,用户態的鎖雖然避免了這些問題,但是其實它們只是在沒有真實的競爭時才有效。

  • 一個線程持有鎖會導致其它所有需要此鎖的線程掛起直至該鎖釋放。

  • 如果一個優先級高的線程等待一個優先級低的線程釋放鎖會導致導致優先級反轉(Priority Inversion),引起性能風險。

4.2.2 CAS無鎖算法

​ 實現無鎖(lock-free)的非阻塞算法有多種實現方法,其中 CAS(比較與交換,Compare and swap) 是一種有名的無鎖算法。CAS的語義是“我認為V的值應該為A,如果是,那麼將V的值更新為B,否則不修改並告訴V的值實際為多少”,CAS是一種 樂觀鎖 技術,當多個線程嘗試使用CAS同時更新同一個變量時,只有其中一個線程能更新變量的值,而其它線程都失敗,失敗的線程並不會被掛起,而是被告知這次競爭中失敗,並可以再次嘗試。CAS有3個操作數,內存值V,舊的預期值A,要修改的新值B。當且僅當預期值A和內存值V相同時,將內存值V修改為B,否則什麼都不做。

​ 這是一個CPU級別的指令,在我的意識中,它的工作方式有點像樂觀鎖——CPU去更新一個值,但如果想改的值不再是原來的值,操作就失敗,因為很明顯,有其它操作先改變了這個值。

file

注意,這可以是CPU的兩個不同的核心,但不會是兩個獨立的CPU。

​ CAS操作比鎖消耗資源少的多,因為它們不牽涉操作系統,它們直接在CPU上操作。但它們並非沒有代價——在上面的試驗中,單線程無鎖耗時300ms,單線程有鎖耗時10000ms,單線程使用CAS耗時5700ms。所以它比使用鎖耗時少,但比不需要考慮競爭的單線程耗時多。

4.2.3 傳統隊列問題

隊列的底層數據結構一般分成三種:數組、鏈表和堆。其中,堆這裏是為了實現帶有優先級特性的隊列,暫且不考慮。

隊列 有界性 數據結構
ArrayBlockingQueue bounded 加鎖 arraylist
LinkedBlockingQueue optionally-bounded 加鎖 linkedlist
ConcurrentLinkedQueue unbounded 無鎖 linkedlist
LinkedTransferQueue unbounded 無鎖 linkedlist
PriorityBlockingQueue unbounded 加鎖 heap
DelayQueue unbounded 加鎖 heap

​ 在穩定性和性能要求特別高的系統中,為了防止生產者速度過快,導致內存溢出,只能選擇有界隊列;

​ 同時,為了減少Java的垃圾回收對系統性能的影響,會盡量選擇array/heap格式的數據結構。這樣篩選下來,符合條件的隊列就只有ArrayBlockingQueue,但是ArrayBlockingQueue是通過加鎖的方式保證線程安全,而且ArrayBlockingQueue還存在偽共享問題,這兩個問題嚴重影響了性能。

4.2.3.1 Disruptor的無鎖設計

​ 多線程環境下,多個生產者通過do/while循環的條件CAS,來判斷每次申請的空間是否已經被其他生產者佔據。假如已經被佔據,該函數會返回失敗,While循環重新執行,申請寫入空間。

do
{
    current = cursor.get();
    next = current + n;

    if (!hasAvailableCapacity(gatingSequences, n, current))
    {
        throw InsufficientCapacityException.INSTANCE;
    }
}
while (!cursor.compareAndSet(current, next));
//next 類比於ArrayBlockQueue的數組索引index
return next;

4.3 環形數組結構

環形數組結構是整個Disruptor的核心所在。

4.3.1 什麼是環形數組

​ RingBuffer 是一個環(首尾相連的環),用做在不同上下文(線程)間傳遞數據的buffer,RingBuffer 擁有一個序號,這個序號指向數組中下一個可用元素。

file

4.3.2 為什麼使用環形數組

為了避免垃圾回收,採用數組而非鏈表。同時,數組對處理器的緩存機制更加友好

​ 首先因為是數組,所以要比鏈表快,而且根據我們對上面緩存行的解釋知道,數組中的一個元素加載,相鄰的數組元素也是會被預加載的,因此在這樣的結構中,cpu無需時不時去主存加載數組中的下一個元素。

​ 而且,你可以為數組預先分配內存,使得數組對象一直存在(除非程序終止)。這就意味着不需要花大量的時間用於垃圾回收。

​ 此外,不像鏈表那樣,需要為每一個添加到其上面的對象創造節點對象—對應的,當刪除節點時,需要執行相應的內存清理操作。環形數組中的元素採用覆蓋方式,避免了jvm的GC。

​ 其次結構作為環形,數組的大小為2的n次方,這樣元素定位可以通過位運算效率會更高,這個跟一致性哈希中的環形策略有點像。在disruptor中,這個牛逼的環形結構就是RingBuffer,既然是數組,那麼就有大小,而且這個大小必須是2的n次方,結構如下:

file

​ 其實質只是一個普通的數組,只是當放置數據填充滿隊列(即到達2^n-1位置)之後,再填充數據,就會從0開始,覆蓋之前的數據,於是就相當於一個環。

4.4 元素位置定位

​ 數組長度2^n,通過位運算,加快定位的速度。下標採取遞增的形式。不用擔心index溢出的問題。index是long類型,即使100萬QPS的處理速度,也需要30萬年才能用完。

4.5 等待策略

​ 定義 Consumer 如何進行等待下一個事件的策略。 (注:Disruptor 定義了多種不同的策略,針對不同的場景,提供了不一樣的性能表現)根據實際運行環境的 CPU 的硬件特點選擇恰當的策略,並配合特定的 JVM 的配置參數,能夠實現不同的性能提升。

4.5.1 BlockingWaitStrategy

​ Disruptor的默認策略是BlockingWaitStrategy,在BlockingWaitStrategy內部是使用鎖和condition來控制線程的喚醒

​ BlockingWaitStrategy是最低效的策略,但其對CPU的消耗最小並且在各種不同部署環境中能提供更加一致的性能表現。

4.5.2 SleepingWaitStrategy

​ SleepingWaitStrategy 的性能表現跟 BlockingWaitStrategy 差不多,對 CPU 的消耗也類似,但其對生產者線程的影響最小,通過使用LockSupport.parkNanos(1)來實現循環等待,適合用於異步日誌類似的場景;

4.5.3 YieldingWaitStrategy

​ YieldingWaitStrategy是可以使用在低延遲系統的策略之一,YieldingWaitStrategy將自旋以等待序列增加到適當的值。在循環體內,將調用Thread.yield()以允許其他排隊的線程運行。在要求極高性能且事件處理線數小於 CPU 邏輯核心數的場景中,推薦使用此策略;

4.5.4 BusySpinWaitStrategy

​ 性能最好,適合用於低延遲的系統。在要求極高性能且事件處理線程數小於CPU邏輯核心數的場景中,推薦使用此策略;

4.5.5 PhasedBackoffWaitStrategy

​ 自旋 + yield + 自定義策略,CPU資源緊缺,吞吐量和延遲並不 的場景。

本文由傳智教育博學谷教研團隊發佈。

如果本文對您有幫助,歡迎關注點贊;如果您有任何建議也可留言評論私信,您的支持是我堅持創作的動力。

轉載請註明出處!