深入解析Apache Pulsar系列(一):客戶端訊息確認

語言: CN / TW / HK

解析 Apache Pulsar —— 客戶端訊息確認

作者介紹:

騰訊雲中間件專家工程師

Apache Pulsar PMC,《深入解析Apache Pulsar》作者。

目前專注於中介軟體領域,在訊息佇列和微服務方向具有豐富的經驗。

負責 CKafka、TDMQ的設計與開發工作,目前致力於打造穩定、高效和可擴充套件的基礎元件與服務。

導語

在 Apache Pulsar 中,為了避免訊息的重複投遞,消費者進行訊息確認是非常重要的一步。當一條訊息被消費者消費後,需要消費者傳送一個Ack請求給Broker,Broker才會認為這條訊息被真正消費掉。被標記為已經消費的訊息,後續不會再次重複投遞給消費者。在這篇文章中,我們會介紹Pulsar中訊息確認的模式,以及正常訊息確認在Broker側是如何實現的。

1 確認訊息的模式

在瞭解Pulsar訊息確認模式之前,我們需要先了解一些前置知識 —— Pulsar中的訂閱以及遊標(Cursor)。Pulsar中有多種消費模式,如:Share、Key_share、Failover等等,無論使用者使用哪種消費模式都會建立一個訂閱。訂閱分為持久訂閱和非持久訂閱,對於持久訂閱,Broker上會有一個持久化的Cursor,即Cursor的元資料被記錄在ZooKeeper。Cursor以訂閱(或稱為消費組)為單位,儲存了當前訂閱已經消費到哪個位置了。因為不同消費者使用的訂閱模式不同,可以進行的ack行為也不一樣。總體來說可以分為以下幾種Ack場景:

(1)單條訊息確認(Acknowledge)

和其他的一些訊息系統不同,Pulsar支援一個Partition被多個消費者消費。假設訊息1、2、3傳送給了Consumer-A,訊息4、5、6傳送給了Consumer-B,而Consumer-B又消費的比較快,先Ack了訊息4,此時Cursor中會單獨記錄訊息4為已Ack狀態。如果其他訊息都被消費,但沒有被Ack,並且兩個消費者都下線或Ack超時,則Broker會只推送訊息1、2、3、5、6,已經被Ack的訊息4不會被再次推送。

(2)累積訊息確認(AcknowledgeCumulative)

假設Consumer接受到了訊息1、2、3、4、5,為了提升Ack的效能,Consumer可以不分別Ack 5條訊息,只需要呼叫AcknowledgeCumulative,然後把訊息5傳入,Broker會把訊息5以及之前的訊息全部標記為已Ack。

(3)批訊息中的單個訊息確認(Acknowledge)

這種訊息確認模式,呼叫的介面和單條訊息的確認一樣,但是這個能力需要Broker開啟配置項AcknowledgmentAtBatchIndexLevelEnabled。當開啟後,Pulsar可以支援只Ack一個Batch裡面的某些訊息。假設Consumer拿到了一個批訊息,裡面有訊息1、2、3,如果不開啟這個選項,我們只能消費整個Batch再Ack,否則Broker會以批為單位重新全部投遞一次。前面介紹的選項開啟之後,我們可以通過Acknowledge方法來確認批訊息中的單條訊息。

(4)否定應答(NegativeAcknowledge)

客戶端傳送一個RedeliverUnacknowledgedMessages命令給Broker,明確告知Broker,當前Consumer無法消費這條訊息,訊息將會被重新投遞。

並不是所有的訂閱模式下都能用上述這些ack行為,例如:Shared或者Key_shared模式下就不支援累積訊息確認(AcknowledgeCumulative)。因為在Shared或者Key_Shared模式下,前面的訊息不一定是被當前Consumer消費的,如果使用AcknowledgeCumulative,會把別人的訊息也一起確認掉。訂閱模式與訊息確認之間的關係如下所示:

訂閱模式 單條Ack 累積Ack 批量訊息中單個Ack 否定Ack
Exclusive 支援 支援 支援 不支援
Shared 支援 不支援 支援 支援
Failover 支援 支援 支援 不支援
Key_Shared 支援 不支援 支援 支援

2 Acknowledge與AcknowledgeCumulative的實現

Acknowledge與AcknowledgeCumulative介面不會直接傳送訊息確認請求給Broker,而是把請求轉交給AcknowledgmentsGroupingTracker處理。這是我們要介紹的Consumer裡的第一個Tracker,它只是一個介面,介面下有兩個實現,一個是持久化訂閱的實現,另一個是非持久化訂閱的實現。由於非持久化訂閱的Tracker實現都是空,即不做任何操作,因此我們只介紹持久化訂閱的實現——PersistentAcknowledgmentsGroupingTracker。

在Pulsar中,為了保證訊息確認的效能,並避免Broker接收到非常高併發的Ack請求,Tracker中預設支援批量確認,即使是單條訊息的確認,也會先進入佇列,然後再一批發往Broker。我們在建立Consumer時可以設定引數AcknowledgementGroupTimeMicros,如果設定為0,則Consumer每次都會立即傳送確認請求。所有的單條確認(individualAck)請求會先放入一個名為PendingIndividualAcks的Set,預設是每100ms或者堆積的確認請求超過1000,則傳送一批確認請求。

訊息確認的請求最終都是非同步傳送出去,如果Consumer設定了需要回執(Receipt),則會返回一個CompletableFuture,成功或失敗都能通過Future感知到。預設都是不需要回執的,此時直接返回一個已經完成的CompletableFuture。

對於Batch訊息中的單條確認(IndividualBatchAck),用一個名為PendingIndividualBatchIndexAcks的Map進行儲存,而不是普通單條訊息的Set。這個Map的Key是Batch訊息的MessageId,Value是一個BitSet,記錄這批訊息裡哪些需要Ack。使用BitSet能大幅降低儲存訊息Id的能存佔用,1KB能記錄8192個訊息是否被確認。由於BitSet儲存的內容都是0和1,因此可以很方便地儲存在堆外,BitSet物件也做了池化,可以迴圈使用,不需要每次都建立新的,對記憶體非常友好。

如下圖所示,只用了8位,就表示了Batch裡面8條訊息的Ack情況,下圖表示EntryId為0、2、5、6、7的Entry都被確認了,確認的位置會被置為1:

對於累計確認(CumulativeAck)實現方式就更簡單了,Tracker中只儲存最新的確認位置點即可。例如,現在Tracker中儲存的CumulativeAck位置為5:10,代表該訂閱已經消費到LedgerId=5,EntryId=10的這條訊息上了。後續又ack了一個5:20,則直接替換前面的5:10為5:20即可。

最後就是Tracker的Flush,所有的確認最終都需要通過觸發flush方法傳送到Broker,無論是哪種確認,Flush時建立的都是同一個命令併發送給Broker,不過傳參中帶的AckType會不一樣。

3 NegativeAcknowledge的實現

否定應答和其他訊息確認一樣,不會立即請求Broker,而是把請求轉交給NegativeAcksTracker進行處理。Tracker中記錄著每條訊息以及需要延遲的時間。Tracker複用了PulsarClient的時間輪,預設是33ms左右一個時間刻度進行檢查,預設延遲時間是1分鐘,抽取出已經到期的訊息並觸發重新投遞。Tracker主要存在的意義是為了合併請求。另外如果延遲時間還沒到,訊息會暫存在記憶體,如果業務側有大量的訊息需要延遲消費,還是建議使用ReconsumeLater介面。NegativeAck唯一的好處是,不需要每條訊息都指定時間,可以全域性設定延遲時間。

4 未確認訊息的處理

如果消費者獲取到訊息後一直不Ack會怎麼樣?這要分兩種情況,第一種是業務側已經呼叫了Receive方法,或者已經回調了正在非同步等待的消費者,此時訊息的引用會被儲存進UnAckedMessageTracker,這是Consumer裡的第三個Tracker。UnAckedMessageTracker中維護了一個時間輪,時間輪的刻度根據AckTimeoutTickDurationInMs這兩個引數生成,每個刻度時間=AckTimeout / TickDurationInMs。新追蹤的訊息會放入最後一個刻度,每次排程都會移除佇列頭第一個刻度,並新增一個刻度放入佇列尾,保證刻度總數不變。每次排程,佇列頭刻度裡的訊息將會被清理,UnAckedMessageTracker會自動把這些訊息做重投遞。

重投遞就是客戶端傳送一個RedeliverUnacknowledgedMessages命令給Broker。每一條推送給消費者但是未Ack的訊息,在Broker側都會有一個集合來記錄(PengdingAck),這是用來避免重複投遞的。觸發重投遞後,Broker會把對應的訊息從這個集合裡移除,然後這些訊息就可以再次被消費了。注意,當重投遞時,如果消費者不是Share模式是無法重投遞單條訊息的,只能把這個消費者所有已經接收但是未Ack的訊息全部重新投遞。下圖是一個時間輪的簡單示例:

另外一種情況就是消費者做了預拉取,但是還沒呼叫過任何Receive方法,此時訊息會一直堆積在本地佇列。預拉取是客戶端SDK的預設行為,會預先拉取訊息到本地,我們可以在建立消費者時通過ReceiveQueueSize引數來控制預拉取訊息的數量。Broker側會把這些已經推送到Consumer本地的訊息記錄為PendingAck,並且這些訊息也不會再投遞給別的消費者,且不會Ack超時,除非當前Consumer被關閉,訊息才會被重新投遞。Broker側有一個RedeliveryTracker介面,暫時的實現是記憶體追蹤(InMemoryRedeliveryTracker)。這個Tracker會記錄訊息到底被重新投遞了多少次,每條訊息推送給消費者時,會先從Tracker的雜湊表中查詢一下重投遞的次數,和訊息一併推送給消費者。

由上面的邏輯我們可以知道,建立消費者時設定的ReceiveQueueSize真的要慎重,避免大量的訊息堆積在某一個Consumer的本地預拉取佇列,而其他Consumer又沒有訊息可消費。PulsarClient上可以設定啟用ConsumerStatsRecorder,啟用後,消費者會在固定間隔會打印出當前消費者的metrics資訊,例如:本地訊息堆積量、接受的訊息數等,方便業務排查效能問題。

尾聲

Pulsar中的設計細節非常多,由於篇幅有限,作者會整理一系列的文章進行技術分享,敬請期待。如果各位希望系統性地學習Pulsar,可以購買作者出版的新書《深入解析Apache Pulsar》。