詳解 Apache Pulsar 訊息生命週期

語言: CN / TW / HK

文章摘要

本文整理自 Pulsar Summit Asia 2022 騰訊雲高階研發工程師冉小龍的演講《Deep Dive into Apache Pulsar Lifecycle》。Apache Pulsar 中抽象了 Topic 來承載使用者傳送的訊息,一條訊息傳送到 Topic 中之後會經過 Broker 的計算儲存到 Bookie 中。本文將詳細闡述訊息是如何傳送到 Broker 並經過 Broker 的計算以及元資料處理最終儲存到 Bookie 中,然後會進一步闡述 Bookie 如何利用垃圾回收機制回收 Topic 中的資料,以及 Broker 中的 TTL 和 Retention 策略如何作用到 Bookie Client 來觸發垃圾回收的機制。

作者簡介

冉小龍,騰訊雲高階研發工程師,Apache Pulsar Committer,RoP maintainer,Apache Pulsar Go Client、Pulsarctl 與 Go Functions 作者與主要維護者。

導讀

本文分為以下幾個部分:

  1. 1. 從使用者的視角看訊息收發流程
  2. 2. TTL 與 Retention 策略(與訊息生命週期息息相關)
  3. 3. 從 Topic 的角度看訊息儲存模型
  4. 4. Bookie GC 回收機制
  5. 5. 髒資料如孤兒 Ledger 的產生
  6. 6. 如何清理髒資料
  7. 1、2、3 主要在 Broker 層面分析原理,5 和 6 根據生產環境中遇到的問題來分析髒資料的產生與清理。

使用者視角下的訊息收發流程

在使用者視角下,MQ 可以理解為 Pub-Sub 模型,在 Broker 抽象一個 Topic,訊息經由生產者傳送到 Topic 中然後進入消費者進行消費。

圖片

首先需要了解兩個概念,Pending Queue 和 Receive Queue。

  • Pending Queue:傳送過程中的概念。訊息傳送時並不是每次直接投遞給 Broker,而是在本地抽象 Pending Queue,所有資料先進入 Pending Queue 再被髮送到 Broker。

  • Receive Queue:接收過程中的概念。同 Pending Queue 原理相同,訊息接收時並不是每次直接從 Broker 要資料,而是在本地抽象 Receive Queue,資料按批次進入 Receive Queue,再結合 Pulsar 訊息推拉機制不斷地填充 Receive Queue 來調動整體流程。

在 Pulsar 中,Broker 不解析批訊息,因此 Broker 無法知道訊息是否是批訊息,這裡抽象了一個 Entry 的概念,Entry 內可能包含批訊息或者非批訊息。

下圖是使用者視角下更深入的架構圖。生產者和消費者可以理解為 Client 模型,Client 把訊息傳送給 Broker。Broker 可以理解為 BookKeeper Client,BookKeeper Client 通過增刪改查的操作將資料傳遞給 Bookie。BookKeeper 和 Broker 都有元資料管理中心,目前使用較多的是 ZooKeeper,其內包含所有節點資訊,如節點排程資訊。

圖片

下面解析一下資料從 Client 到 Broker 再到 BookKeeper 是怎樣的整體流程。首先,BookKeeper 儲存層功能比較單一且純粹。作為一個分散式日誌檔案系統,它暴露給上層系統的、能夠供上層系統呼叫的僅僅是增刪改查的操作,伴隨這些操作可以觀察從 Client 到 BookKeeper 的操作鏈路:

  • Send -> Broker -> add Entry -> Bookie:傳送 Send 命令到 Broker,Broker 向 BookKeeper addEntry
  • Receive -> Broker -> read Entry -> Bookie:傳送 Receive 命令到 Broker,Broker 呼叫 BookKeeper readEntry 介面從 Bookie 中讀取訊息
  • Ack -> Broker (TTL) -> move cursor (markDeletePosition) -> Bookie:傳送 Ack 命令到 Broker,Broker 會執行 move cursor 操作。Broker 抽象的 Topic 裡面有一條條的訊息,Ack 相當於操作 cursor 的行為,指標隨著 Ack 行為移動,此處抽象了 markDeletePosition 的指標。在 markDeletePosition 之前,所有的訊息都已被正確消費。
  • Retention -> delete Entry -> Bookie:接收到 Retention 策略後,Broker 觸發 Retention 閾值後會呼叫 Bookie delete Entry 介面,來刪除 BookKeeper 中資料。delete Entry 是本文重點討論的話題,後文將具體介紹觸發 Retention 策略後,Entry 如何被從 BookKeeper 中刪除。

TTL 與 Retention 策略

首先需要明確 TTL 策略和 Retention 策略的概念。

TTL 策略

TTL 策略指訊息在指定時間內沒有被使用者 Ack 時會在 Broker 主動 Ack 掉。

Client 在消費者側暴露兩個介面 Receive 和 Ack。當用戶消費者接收到訊息時,Broker 並不知道此時使用者已經正確接收到訊息,需要使用者手動呼叫 Ack 告訴 Broker 自己成功接收到了當前訊息,所以 Client 要發起 Oneway 的 Ack 請求通知 Broker 進行下一步處理。不論訊息是否被推送到 Broker,生產者傳送到 Topic 的訊息都會產生 TTL(生命週期)。所有訊息都在 TTL 內受管控,超出這個時間後 Broker 會代替使用者把訊息 Ack 掉。

此處需注意,在上述過程中沒有任何與刪除相關的操作,因為 TTL 不涉及與刪除相關的操作。TTL 的作用僅僅是用於 Ack 掉在 TTL 範圍內應被 Ack 的訊息,真正刪除的操作與 Pulsar 中抽象出來的 Retention 策略相關。

Retention 策略

Retention 策略指訊息被 Ack 之後(消費者 Ack 或者 TTL Ack)繼續在 Bookie 側保留的時間,以 Ledger 為最小操作單元。

訊息被 Ack 之後(消費者 Ack 或者 TTL Ack)就歸屬於 Retention 策略,即在 BookKeeper 保留一定時間,比如在離線訊息場景下會將資料保留一段時間來進行回查等操作。Retention 以 Ledger 為最小操作單元,刪除即是刪除整個 Ledger。

下面是在 TTL 內 Ack 訊息的示意圖。在 T1 時間段有 10 條訊息,m1 - m5 是被 Ack 的訊息,m6 - m10 是未被 Ack 的訊息。在 T2 時間段,假設到達 TTL 的 3 分鐘閾值後訊息還沒有被 Ack,m6 - m8 就會被 TTL 策略檢查到,Broker 主動將其 Ack。在 T3 時間段,m6 - m8 已被 Broker Ack。這就是 TTL 策略操作行為與作用範圍。

圖片

Pulsar 內的所有策略都在 Broker 抽象了執行緒池,週期性地執行執行緒,比如 TTL 策略或者 Retention 策略預設 5 分鐘檢查一輪。TTL 策略就是根據設定的時間,定期檢查,不斷更新 Cursor 的位置(等價於 Consumer 側暴露的 Ack 介面),將訊息過期掉;Retention 策略是檢查 Ledger 的建立時間以及 Entry 的大小來決定是否要刪除某一個 Ledger。

TTL 策略和 Retention 策略的生命週期在時限上有如下規則:

  • TTL 時間 < Retention 時間,訊息的生命週期等於 TTL 時間 + Retention 時間。

  • TTL 時間 ≥ Retention 時間,訊息的生命週期等於 TTL 時間。在 TTL 檢查時,有一個判斷標準是 Ledger 是否進行切換,如發生切換且達到 TTL 時間,Ledger 會進入 Retention 策略刪除動作。所以如果 TTL 時間 ≥ Retention 時間,訊息生命週期就是 TTL 時間。

從 Topic 的角度看訊息儲存模型

圖片

講到訊息儲存模型,首先接觸到的是 Topic,生產者向這個 Topic 傳送訊息、消費者從 Topic 消費訊息。Topic 內部抽象了 Partition 的概念,一個 Topic 內可以建立多個 Partition,作用是增加併發處理的能力,即一個 Topic 中的訊息可以分發到多個 Partition,由多個 Partition 承載 Topic 的服務。

在 Bookie 儲存層,一個 Partition 由多個 Ledger 構成。如圖,Partition 3 下面有 5 個 Ledger。Ledger 裡面儲存的是多條 Entry。如前文所說的 Entry 概念,根據訊息是否是批訊息,Entry 就可以分為批和非批兩種。如果訊息是批訊息,那麼 Entry 裡面有多條 Message;如果訊息是非批的,那麼一條 Entry 等於一條 Message。這就是 Topic 視角下的儲存模型。

Bookie GC 回收機制

前面三個部分都圍繞 Broker 層,Broker 作為計算層,本質是 Bookie Client,呼叫 Bookie 側暴露的增刪查的介面來進行相關的操作,操作邏輯簡單。下面將重點介紹 BookKeeper 層如何將資料進行壓縮和回收。

Bookie 壓縮型別

壓縮型別分為兩種:

  • 自動壓縮:Bookie 有周期性執行的 GC Compaction 執行緒,GC 分為 Minor GC 和 Major GC,後文會詳細介紹兩種 GC 的區別。

  • 手動壓縮:通過 BookKeeper 暴露的 Http 呼叫 Admin Rest API 介面來觸發 GC 請求。這個操作在日常急救運維中很常見,比如 Bookie 磁碟記憶體突然大幅度上漲,使用者想要緊急回收資料,那麼就可以跳過 Minor GC 和 Major GC 檢查週期,手動觸發 GC 來釋放磁碟空間。

Bookie 壓縮方式

Bookie 的壓縮方式分為兩種:

  • 按照 Entry 大小

    • compactionRateByEntries
    • isThrottleByBytes
  • 按照 Entry 數量(預設)

    • compactionRateByEntries
  • 生產環境中推薦按照 Entry 大小壓縮,從實際生產環境的經驗來看,每次壓縮 100MB,曲線相對平穩。為什麼不推薦按照 Entry 數量壓縮呢?首先如前文提到的 Entry 的概念,一個 Entry 可能是單條訊息,也可能是批訊息(包含很多 Message),因此如果按照數量壓縮的話,每次壓縮的 Message 數量是不一定的。另外,每一個 Message 的 Payload 不同,訊息大小不一致會導致每次壓縮大小不同,GC 壓縮回收的曲線不平穩。Bookie GC 佔用磁碟 IO,每一臺機器的磁碟 IO 恆定,極端情況下,不平穩的壓縮會對映到 Bookie 主鏈路讀寫流程,影響穩定性。按照 Entry 大小壓縮,壓縮曲線平穩,對穩定性影響較小。

Minor GC 和 Major GC

從程式碼實現邏輯上來看,Minor GC 和 Major GC 完全相同,二者區別在於觸發時機和觸發閾值。

Minor GC Major GC
壓縮時間 1h 24h
壓縮閾值比例 20% (minorCompactionThreshold) 80% (majorCompactionThreshold)
GC 執行最大耗時 minorCompactionMaxTimeMillis majorCompactionMaxTimeMillis
  • Minor GC 壓縮時間是 1h,Major GC 壓縮時間是 24h。
  • 壓縮閾值比例的含義是 Bookie 裡面有用資料的佔比。在 Minor GC 內,Bookie 有用資料佔比為 20%;在 Major GC 內,Bookie 有用資料佔比為 80%。當有用資料佔比超過 20% 和 80% 時,不對資料進行回收。Entrylog 裡檔案大小固定為 1.1 GB,假設 Major GC 有用資料超過 80%,那麼可以理解為大部分資料都是有用的且不可被刪除,Entrylog 全部保留。剩下的 20% 資料沒必要耗費磁碟 IO 進行回收,通過多佔用一定空間的方式降低磁碟 IO 的損耗。
  • 為了避免一次 GC 執行時間過長,因此設定了 GC 執行最大耗時。超過規定的耗時就會強行中止 GC。

注意:

  • 壓縮閾值比例不可以超過 100%。
  • Minor GC 的閾值必須小於 Major GC。
  • 壓縮時,必須要保證磁碟還有一定的可用空間。

Bookie 壓縮

Bookie 壓縮時,首先需要了解以下幾個概念。(生產環境中配置 DBLedgerStorage,社群目前使用居多。後文所有 GC 回收流程和 BookKeeper 相關內容都在預設此配置的前提下展開。)

  • Metadata Store:元資料儲存中心預設使用 ZooKeeper。我使用的是社群提供的工具 ZK-Web,可以看到 Ledger 路徑下儲存了很多 Ledger。

    圖片

  • LedgerIndex:RocksDB 中儲存的 Ledger 集合。使用 DBLedgerStorage 即相當於用 RocksDB 做 Entrylog 的索引儲存,讀取資料時先讀取 RocksDB 來找到索引資料,然後去 Entrylog 讀 Value。這是一個拿 Key 取 “V” 的操作。

    圖片

  • LedgersMap:當前的單個 EntryLog 中儲存的 Ledger 集合。

  • EntryLogMetaMap:當前 Bookie 下所有 EntryLog,Key 是 Entrylog ID,Value 是 Entrylog Metadata。EntryLogMetaMap 是 EntryLogMeta 的集合,EntryLogMetaMap 中包含 LedgersMap 集合。

有了上面的抽象後,我們就可以進行判斷。EntryLogMetaMap 的 Key 是 Entrylog ID,對映到 LedgersMap 集合。

圖片

在整個壓縮過程中,有三個核心的處理邏輯與函式:

  1. doGcLedgers():處理 LedgerIndex 的集合(RocksDB),通過集合判斷資料是否可以刪除。

  2. doGcEntryLogs():處理 LedgersMap 和 EntryLogMetaMap 的集合,以 doGcLedgers() 得出的集合為基準來判斷當前 LedgersMap 中哪些 Ledger 可以刪除,以及當前 EntryLogMetaMap 中哪些 Entrylog 可以刪除。

  3. doCompactionEntryLogs():在進行完上面兩個步驟後就可以進行具體的刪除操作。 doCompactionEntryLogs() 處理 EntryLog 檔案本身是否可以被刪除,對於一個 Key Value 庫來說如何進行刪除也是一門學問。刪除操作不能直接從 Key Value 集合刪除,這樣會造成很多訊息空洞(訊息不連續)。BookKeeper 中刪除操作是從舊的 EntryLog 檔案讀取不可刪除的資料寫入到新的 EntryLog 檔案中,相當於在新的 EntryLog 檔案中進行備份,因此舊的 EntryLog 檔案可以一次性刪除。

前文多次提到了 EntryLog,下面將介紹 BookKeeper 中 EntryLog 如何儲存、儲存了什麼。Entrylog 的構成從上至下核心資料分為三部分。下圖可以幫助大家瞭解 Entrylog 的大致結構,如需精確瞭解,可以閱讀相關原始碼。

  • Header:包含指紋資訊(BKLO,標識 Entrylog 檔案,用於校驗)、BookKeeper 版本、Ledgers Map Offset(Offset 偏移量、如何讀取等)與 Ledgers Count(一個 Entrylog 內 Ledger 的數量)。

  • LedgerEntry List:LedgerEntry 物件,包含 Entry Size、Ledger ID、Entry ID 和 Count。

  • Ledgers Map:包含 Ledgers Map Size、Ledgers Count 和 Ledgers Map Entries。每一個 Ledgers Map Entries 是 Key Value 結構,由 Ledger 對映到 Size。

    圖片

資料回收全流程

有了上面介紹的基礎概念,我們就可以把資料從 Broker 到 BookKeeper 的回收流程串聯起來。

圖片

首先 Client 觸發流程。建立 Topic 建議設定 Retention 策略,不設定的話預設策略是消費完成即刪除該訊息。設定 Retention 策略後,Broker 有定期檢查的執行緒,週期性針對 Topic 執行 Retention 策略。到期可刪除的 Ledger 呼叫暴露的 Delete Ledger 介面,如圖 Ledger 0 可刪除,即呼叫 Delete Ledger 刪除 Ledger 0。刪除 Ledger 0 後 ZooKeeper 中移除 Ledger 0 的 ZooKeeper 路徑。這就是完整的刪除流程,上圖不包含返回邏輯。

Delete Ledger 從呼叫到返回成功的過程中沒有使用 BookKeeper 磁碟上的資料。使用者可能會困惑呼叫介面刪除 Ledger 為何沒有釋放磁碟空間,原因在此,因為刪除操作和 BookKeeper 回收磁碟的操作是完全非同步化的。BookKeeper 回收磁碟的操作由 GC Compaction 執行緒固定進行處理。

那麼,GC Compaction 週期性執行執行緒如何執行?GC Compaction 週期性執行執行緒就是 Minor GC 和 Major GC。在操作流程上,首先會獲取 ZooKeeper 內所有 Ledger 列表。因為建立 Ledger 需要向 ZooKeeper 註冊對應的 ZooKeeper 路徑,刪除 Ledger 也需要從 ZooKeeper 上刪除路徑。ZooKeeper 上的 Ledger 路徑最全面也最準確,因此以 Metadata Store (zk) 為基準來獲取所有 Ledger 列表的集合。然後進行 doGcLedgers() 操作,把 RocksDB 中所有 Ledger 列表集合與 ZooKeeper 上獲取的 Ledger 列表集合做比較,找出可以刪除的 Ledger。刪除後進行 doGcEntryLogs() 操作,處理 LedgersMap 和 EntryLogMetaMap 的集合,判斷 EntryLog 中哪些 Ledger 可以刪除。進一步刪除後進行 doCompactionEntryLogs() 操作,最理想的情況下,Entrylog 裡面所有的 Ledger 都可以被刪除,那麼就可以直接清除這個 Entrylog。大部分情況是 Entrylog 裡部分資料可刪、另一部分不可刪,那麼如何判斷是否保留 Entrylog 呢?由 Minor GC 和 Major GC 的壓縮閾值比例決定。

我們結合下圖瞭解如何通過 doGcEntryLogs() 來 doCompactionEntryLogs()。假設 doCompactionEntryLogs() 時通過 Major GC 的閾值判定一部分未達標的資料可以進行回收,那麼 GC Compaction 執行緒首先從舊的 Entrylog 中檢查 Ledger 是否可以刪除。假定 Ledger 0 和 Ledger 2 可以刪除,Ledger 1 和 Ledger 3 不可以刪除,檢查到可用性佔比後根據閾值判斷 Entrylog 可以刪除,那麼就把 Ledger 1 和 Ledger 3 的有用資料寫入新的 Entrylog 檔案,有用資料有備份後就可以刪除舊的 Entrylog 檔案。

圖片

此處需要補充一點,建立新的 Entrylog 檔案時還有一個動作叫做 Flush。舊的 Entrylog 檔案在建立時會產生索引資訊,Bookie 裡 Entrylog 在讀取 Entry 時,比如讀取 Entry 0、Ledger 1 的資料,會根據索引資訊來追溯對應的 Entrylog。在刪除舊的 Entrylog 檔案並建立新的 Entrylog 檔案操作完成之後,新的 Entrylog 檔案索引資訊需要更新到 RocksDB,通知上層的讀請求去尋找新的 Entrylog 檔案中生成的十六進位制的 ID 來讀取 Entry 0、Ledger 1 的資料。

以上是訊息完整的生命週期,包含從 TTL 與 Retention 策略到 Bookie GC 回收機制的全流程。

髒資料的產生

下面介紹在實際生產中遇到的問題。在下圖中,我們監控了每個 Bookie 上的 Entrylog 檔案發現,假設設定的 Retention 策略週期為 1 天或 5 天,但是這些 Entrylog 檔案已經存在超出 200 天還沒有被刪除。這是異常情況,檔案不刪除會一直佔用磁碟空間。經過分析,以下三個情況可能導致髒資料的產生:

圖片

  • Ledger 刪除邏輯出錯,導致孤兒 Ledger 產生:回顧資料回收全流程,Ledger 刪除操作分為兩個部分:從 ZooKeeper 中清理路徑和 GC Compaction 執行緒清理 Entrylog。社群發起了 PIP[1] 進行雙階段刪除,來保證刪除過程中不會產生孤兒 Ledger。
  • Broker 不會載入不活躍的 Topic,導致 Retention 策略沒有生效:目前社群正在改進該邏輯。BookKeeper 唯一暴露的 Delete Ledger 操作只有在設定 Retention 策略後才能掉入行為。因此如果 Retention 策略沒有生效,Broker 不活躍 Topic 產生的 Ledger 就無法被刪除。
  • GC 回收閾值設定不合理,導致一部分資料無法從 EntryLog 移除:這是上圖中產生存在 200 多天的 Entrylog 的主要原因。根據對使用者資料的調配發現,系統沒有按照 80% 的有用資料佔比來設定回收閾值,而是調整為 50%,導致一半的資料一直存在於 Entrylog 中,無法刪除 Entrylog。
  • 存在不活躍的 Cursor(不活躍即是 Sub 下沒有對應的消費者),這些 Cursor 對應的 Ledger 無法被刪除:目前提出的方案是增加校驗邏輯,如果 Cursor 一段時間內不更新則刪除,此方案還有待商榷與驗證。無論以上哪一種情況,都會導致 Ledger 髒資料無法刪除。因此下面我們展開講解如何刪除髒資料。在瞭解刪除髒資料前,需要了解一個概念叫 Custom Metadata。在 Broker 生成或者建立 Ledger 時,可以給 Ledger 設定一部分元資料,即自定義 Ledger 的元資料屬性。下圖是 Pulsar 預設提供的 Custom Metadata,通過 BookKeeper Admin ctl 獲取到的 Pulsar Managed Ledger Base64 資訊。這一串屬性反寫出來就是一個 Topic 的資訊,只有擁有 Topic 資訊才能進行後面的操作。

圖片

通過 Ledger Metadata 可以獲取 Topic 資訊,即 Ledger 的 Owner Topic。然後我們就可以開始清除這些髒資料。

清除孤兒 Ledger

清除孤兒 Ledger 使用 Clear Tool 清除工具。過程如下:

  • 從 ZooKeeper Snapshot 中獲取所有的 Ledger 列表(如果線上環境壓力不大,也可以直接連線 ZooKeeper 讀取,不需要使用 Snapshot。)從 ZooKeeper Snapshot 中獲取所有的 Ledger 列表後,通過 BookKeeper Admin 工具獲取 Ledger 的 Custom Metadata。

  • 通過 Custom Metadata 找到該 Ledger 的 Owner Topic,並在 Broker 內檢視是否存在該 Topic。

    • 如果 Broker 內 Topic 不存在,Client 首先訪問 Broker 就無法成功。BookKeeper 儲存資料沒有意義,可以直接刪除。

    • 如果 Broker 內 Topic 存在, 就會進一步檢查 Ledger 是否存在,Topic Stats Internal 列表展示了 Topic 內所有 Ledger 的情況,用來確認該 Ledger 是否包含在該 Topic 中。注意,Topic Stats Internal 命令有時候可以可以獲取到 Ledger 列表,有時無法獲取,解決方法是重複獲取,如果仍獲取不到,那麼將判定為列表不存在。

      ![圖片](http://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/889dc444f8c2467bb8f07aa5f0accd29~tplv-k3u1fbpfcp-zoom-1.image "null")
      

Topic 所有的屬性以及 Topic Stats Internal 等指標資訊都是 Broker 向 ZooKeeper 獲取的。以上檢查都過後就可以從 BookKeeper 中刪除 Ledger。Ledger 刪除邏輯和前文回收流程相同,首先刪除 Ledger 的 ZooKeeper 路徑,Ledger 佔用的磁碟空間通過 GC Compaction 執行緒走非同步流程進行刪除。

此外,Schema 和 Cursor 資訊也會使用 Ledger 來儲存。下圖中有一個資訊是 Pulsar Schema ID,如果使用者指定了 Schema 是 String、Json,那麼就會產生也對應 Ledger 的 Schema 屬性,ZooKeeper 下面也會儲存 Schema 資訊。檢查 Stats Internal 時可以獲取到 Schema Ledger 和 Cursor Ledger,需要仔細檢視。

圖片

注意:清理髒資料時一定要備份。ZooKeeper Snapshot 備份可以在錯誤刪除後恢復資料。

總結

文章從使用者視角出發,講述了訊息儲存到 Bookie 中的流程,並闡述 Bookie 的垃圾回收機制,以及 TTL 和 Retention 策略如何作用到 Bookie Client 觸發垃圾回收機制。希望可以為使用者在生產環境中的操作提供參考。

引用連結

[1] PIP: http://github.com/apache/pulsar/issues/16569