Why not RocksDB in Streaming State?

語言: CN / TW / HK

本文以 Apache Flink 為例,聊聊為什麼 RocksDB 不是流計算引擎中理想的狀態存儲。Flink 中使用 RocksDB 作為大狀態的存儲後端,但在實際線上大規模的生產應用中,我們發現 RocksDB 和流計算場景的組合,即使在參數調優及技術優化後,也始終達不到預期的理想狀態。

背景

RocksDB 是一個非常優秀的 Key-Value 存儲,並且在經過 Facebook 多年的迭代和優化後,穩定性和功能的豐富性都能滿足於各種主流場景的需要。我相信當開發者在進行 Embeded KV 存儲選型時,RocksDB 仍然是一個很主流的選擇,這可能也是為什麼 Apache Flink 最初設計大狀態 KeyedStateBackend 時,選擇了 RocksDB 作為底層存儲的原因吧(當然近年來也有不少優秀的存儲如 Titan、TerarkDB 被設計出來解決新硬件和新場景下 RocksDB 不太給力的問題,在此不作討論)。RocksDB 作為流計算引擎的狀態存儲,從使用者的角度來説,一般情況下並不會有很大的缺陷,但從分佈式計算引擎角度來看,在實際線上大規模的生產應用中,我們會發現 RocksDB 和流計算放在一起,始終無法成為一個完美的組合。

RocksDB

RocksDB [1] 使用 LSM-Tree 的結構,數據以類似 Log 的方式追加寫入,不斷產生新文件,並通過 Compaction 合併來去除不同文件中重複、過期、已刪除的 Key-Value 數據。底層文件使用 SSTable 格式,SST 文件中的 Key-Value 數據按 Key 進行排序,並以一定規則劃分為多個 Data Block,並基於 Data Block 的元信息來構建 Index Block,以保證較好的讀取性能。

SST 文件在 RocksDB 中,以不同 Level(層級)的形式來組織。數據從內存中持久化成 SST 文件後,會先存在於 L0 層,當 L0 層數據到達 Compaction 觸發條件時,數據會被 compact 到 L1 層,以此類推,存活時間越長的數據,最終到達的 Level 層級會越高。

數據寫入

如上圖所示,數據寫入會經過如下的過程:序列化、API 調用、寫入 Memtable、持久化到 SST 文件。RocksDB 默認(可配置)在內存中維護了 2 個 Memtable,當用户調用 RocksDB 的寫入 API 時,數據會同步寫入到 Memtable 中,如果 Memtable 已經寫滿或達到其他 flush 條件,則會轉換為 Immutable Memtable,並調度 RocksDB 的 flush 線程異步對其按 Key 整理、去重等操作,並持久化成 L0 層的一個新的 SST 文件。(如果用户開啟 WAL,數據會同步寫入到 WAL 中)

這裏要注意,數據的寫入一定是 append-only 的,這裏的寫入同樣包括了更新。因為 RocksDB 採用 LSM-Tree 的數據結構,所以多次寫入相同 Key 的數據,會在觸發 Compaction 或者 Flush 操作時進行合併,而不是直接在 Memtable 中原地更新(update in-place)。這同樣適用於 Delete 操作,對於單個 Key 的刪除操作,在 SST 文件中以 (Key -> DeleteType) 的形式存在。(Memtable 使用的數據結構默認以 Skip-List 形式存在)

數據讀取

這裏先介紹 RocksDB SST 文件在不同 Level 的特性:

L0 層:SST 文件資身是按 Key 排序,但 L0 層的 SST 文件之間是無序的,每個 L0 層的 SST 文件之間會發生 Key Range 的重合,也就是説相同 Key 的數據可能存在於在 L0 層的每一個 SST 文件中。 L1 ~ Ln 層:多個 L0 層的 SST 文件達到 Compaction 條件後,與若干個 L1 層文件進行 Compaction 後形成新的 L1 層 SST 文件,L1 層 SST 文件之間不會出現 Key Range 的重合,也就是説相同 Key 的數據最多隻會存在於 L1 層的一個 SST 文件中(L2 ~ Ln 層同理)。

讀取數據時,數據可能存在於 Memtable、Block Cache、SST 文件中。讀取操作分為兩種類型:

Point Lookup(點查):先從 Memtable 和 Block Cache 中嘗試獲取結果,如果沒有找到則會按照層級查找 SST 文件。對於 L0 層 SST 文件,先通過 KeyRange 過濾出可能包含此 Key 的 SST 文件再進行查找;再對於 L1~Ln 層的文件進行二分查找定位對應的 SST 文件並進行讀取。 Range Scan:多路歸併的思想,返回給用户的 Iterator 由多個 Iterator 組成:每個 Memtable、Immutable Memtable、L0 層 SST 文件、以及多個 L1 ~ Ln 層 SST 文件中構建 Iterator,並以多路歸併的方式返回給用户具體的值。

上述的操作有很多默認的優化策略在此不一一列舉了,比如點查操作中每個 SST 文件可以構建 bloom filter 來快速判斷 Key 是否存在,遍歷操作中每個 Iterator 會對底層的數據進行預讀取以獲得更少的 IO 次數。對於單個 SST 文件而言,它的 文件結構 [2] 如下所示,單個 SST 文件查詢會通過對 index block 進行二分查找來定位到具體的 data block :

<beginning_of_file>

[data block 1] // 具體的 KV 數據

[data block 2] // 具體的 KV 數據

...

[data block N] // 具體的 KV 數據

[meta block 1: filter block] // Filter 信息,比如 bloom filter

[meta block 2: index block] // data block 對應的 index,查詢中通過對 index block 進行二分查找來定位到具體的 data block

... (compression/range deletion/stats block)

[meta block K: future extended block]

[metaindex block]

[Footer]

<end_of_file>

Compaction 策略

為什麼要進行 Compaction?:或者説 Compaction 有什麼作用,我們都知道 Compaction 是將多個文件合併成一個文件的過程,在合併過程中會進行相同 Key 的去重,過期 Key 的刪除等操作。一次 Compaction 可以簡單看作將 N 個文件數據讀取後,經過整理再重新寫一遍的過程。在這裏舉兩個極端的例子:

完全不發生 Compaction:SST 文件只存在於 L0 層,由於 L0 層不保證 SST 之間的 Key Range 不發生重合,所以數據讀取需要訪問很多 L0 層 SST 文件,在讀取性能上會非常差。 持續發生 Compaction:假如每生成一個 SST 文件,我們就將它和其他 SST 文件進行 Compaction,那麼數據寫入的開銷則會非常大。

可以看出,Compaction 策略的不同決定了讀寫放大,也決定了讀寫的性能,所以一個合理的 Compaction 策略其實是對讀寫性能的平衡,針對不同場景的需求,我們應該認真考慮其場景所適合的 Compaction 策略。RocksDB 默認提供三種 Compaction 策略,每個策略的觸發條件都比較複雜,原理可看對應鏈接,這裏僅描述一下它們的特點:

Leveled Compaction(默認策略) [3] :Compaction 觸發頻率相對高,讀放大低,寫放大高 Universal Compaction [4] :Compaction 觸發頻率相對低,讀放大高,寫放大低 FIFO Compaction [5] :幾乎不發生 Compaction,讀放大高,寫放大幾乎沒有

流式場景和狀態訪問

這裏以 Apache Flink 為例來看看 Streaming State 在流式場景中是如何使用的。

場景一:WordCount,統計每 60s 內,每個 word 出現的次數:

對於每條數據:

1. Window 算子根據 word 和時間戳,找到該數據所屬的窗口 2. 將 word、窗口標識符(即窗口起始時間和結束時間)和其他信息(如 KeyGroup)拼接成 RocksDB 的 Key 並序列化成 byte[] 3. 調用 RocksDB API 讀取窗口的中間結果數據並反序列化 4. 使用新 word 更新中間結果 5. 序列化中間結果並調用 RocksDB API 寫入

場景二:A 流 Join B 流,邏輯如下:

SELECT *

FROM a, b

WHERE a.id = b.id

AND a.time BETWEEN b.time - INTERVAL '4' HOUR AND b.time

對於 A 流的每條數據(B 流同理):

1. Join 算子收到 A 流數據後,遍歷 B 流的狀態數據列表並逐一反序列化 2. 從 B 流的狀態中找到符合 Join 條件的數據並拼接起來發送給下游 3. 取出 A 流狀態數據列表,反序列化後將新數據 append 到列表末尾 4. A 流狀態數據列表重新序列化並寫入

我們發現,不管是窗口聚合還是雙流 Join 的場景, 我們可以看到狀態存儲的讀寫總是和當前數據所涉及的時間邊界範圍內的狀態有關 ,比如窗口聚合場景中只會對數據所在的窗口進行讀寫,雙流 Join 場景中只會對 Join 條件中的時間範圍內狀態進行讀取和寫入,而並非像 Web 服務中的 ACID 一樣去操作所有時間段的數據。這個特性點恰好也和我們常説的,越接近當前時間的數據價值越高,越久遠的數據價值越低是不謀而合的。

RocksDB 作為狀態存儲

Apache Flink 目前使用 RocksDB 作為狀態存儲,在小狀態場景下,可以使用少量宂餘的資源來掩蓋狀態存儲帶來的問題;在大狀態場景或是數據傾斜的場景下,我們為了流式作業的高性能吞吐需要,往往需要付出非常大的 overhead。

選擇哪一種 Compaction 策略?

上面提到 RocksDB 內置的三種 Compaction 策略,以 Leveled Compaction 為例,會出現以下問題:

寫放大問題:Leveled Compaction 針對的是少寫多讀的場景,而在流式計算中,新數據的處理通常都會產生多次的狀態訪問和狀態更新,大部分場景更接近於讀寫比例 1:1(比如典型的滾動窗口計算場景中間結果的 update)。頻繁的數據寫入會造成 Leveled Compaction 上各個層級頻繁觸發 Full Compaction,尤其是每次 Checkpoint 都會強制產生一個 L0 層文件,很容易就達到 Leveled Compaction 的默認觸發條件。

共振問題:如果用户側有 TaskManager 整體的 CPU 監控,我們很容易看到,每 4 個 Checkpoint 觸發時間點,就會有一次 CPU 陡增的現象,並且作業的吞吐會出現明顯的下降,這是因為 RocksDB 的 Leveled Compaction 默認在 L0 層的 SST 文件數量達到 4 個時就會觸發 L0->L1 層的 Compaction 操作,而各個 Task 的 Checkpoint 操作通常在一兩分鐘內同時觸發,所以此時會導致 Compaction 共振問題,CPU 陡增也會影響 Task 正常的數據處理線程。

潮汐問題:通常來説,數據流量越大,RocksDB 的寫入越多,Compaction 的觸發越頻繁。而恰好流式場景會有非常典型的潮汐現象,高峯和低峯流量往往會差好幾倍,但是實際情況中,我們會發現 Compaction 的資源開銷越在高峯階段,開銷越大,而真正處理作業邏輯的算力更加不夠,只能通過繼續增大資源的方式來緩解問題,也就造成了低峯時期的資源利用率會非常非常低。

對於 Universal Compaction ,會稍微好一些但仍然有類似的問題。這裏我們可以着重看一下功能最少,表面看起來最雞肋的 RocksDB 的  FIFO Compaction ,描述摘自  wiki [6]

FIFO compaction style is the simplest compaction strategy. It is suited for keeping event log

data with very low overhead (query log for example). It periodically deletes the old data, so it's

basically a TTL compaction style.



In FIFO compaction, all files are in level 0. When total size of the data exceeds configured size

(CompactionOptionsFIFO::max_table_files_size), we delete the oldest table file. This means that

write amplification of data is always 1 (in addition to WAL write amplification).

FIFO 的方式,其實是在 L0 層以 SST 文件的形式維護了一個“流”,存活時間越長的數據優先級越低,越有可能被 TTL 刪除,這樣就和流式場景比較相像了。不過使用 FIFO Compaction 策略會導致 L0 層文件過多,數據讀取性能變差,RocksDB 也提供了一些非常簡易的 Compaction 策略來緩解這一問題,我們也可以通過增加 bloom filter + cache 的形式來減少數據查詢的文件 IO 次數。

在我看來 FIFO 是三者中最適合流式場景的 Compaction 策略,但是由於沒有和 Flink 內部機制打通(比如 TTL 兩邊不對齊),可能會出現數據丟失的風險,我們也不推薦用户進行使用。(當然,我們可以通過 RocksDB 的 API 來自定義 Compaction 策略,或者去稍微改改 Flink or RocksDB 的源碼以滿足需求)

Embeded Storage 和分佈式計算

RocksDB 是 Embeded Storage,Embeded 也就意味着在分佈式計算應用中,每個 Task 維護的 DB 實例是相互隔離的,很難拿到一個 全局視角 以做出最優的方案。關於狀態存儲,我在  Hazelcast Jet 論文 [7] 中有提到過 Embeded 和 Distributed Storage 的對比,這裏從另一個角度講講。

以上面提到的 Compaction 共振問題為例,如果我們像 HBase 一樣,可以通過 jitter(抖動因子參數)將各個 Compaction 操作的時間錯開,那麼我們看到的作業輸出可能是平穩且符合預期的。除此之外,擴縮容問題也是類似的,Apache Flink 中作業的擴縮容,對應着狀態擴縮容示意圖如下:

rocksdb-state-rescale

假設算子最大允許存在 6 個 KeyGroup(任何數據都會被映射到 6 個 KeyGroup 範圍內),縮容前這個算子有 3 個 Task,分別負責 {1,2},{3,4},{5,6} 三個 KeyGroup 的數據處理,在算子並行度從 3 調整為 2 後,新的 2 個 Task 需要處理 6 個 KeyGroup 的數據,則對應負責的 KeyGroup Range 就變成了 {1,2,3} 和 {4,5,6}。這也是流式計算中應對擴縮容的通用做法,每個 Task 會負責一定 KeyGroup Range 內的數據處理,在進行擴縮容時,KeyGroup Range 會根據 Task 的數量進行重新分配,相對應的,每個 Task 負責的 KeyGroup Range 發生變化,也就意味着之前不同 Task 中 RocksDB 實例之間需要進行數據遷移和合並。從單機存儲的設計角度來考慮,在設計之初便是為了服務於單機場景,往往不會為這種擴縮容情況作過多考慮(更別提流式計算這類對擴縮容耗時敏感的場景了)。

資源競爭

資源競爭的問題上面已經提到,RocksDB flush 線程和 compaction 線程所用的 CPU 資源,會和作業處理數據線程的 CPU 資源產生競爭。分佈式計算任務,部署在 Yarn 或者 K8s 上,為了保證其他資源(如 Memory)不到達瓶頸,通常部署的一個實例(container)上 CPU 數不會特別多,也就是個位數的級別。在這種情況下,RocksDB 的異步操作對於作業處理產生的資源競爭影響就會非常大。當然,RocksDB 的 Compaction 線程會被設置為 low-priority,但這在流式場景中數據持續流入的情況下並不起太大的作用,而且當 Compaction 過於滯後時,RocksDB 會出現 Write Stall 等現象,讓 Task 的處理線程在短時間內完全 hang 住。

其它

除了上述問題,篇幅原因,在這裏簡要概括過去在工作中遇到過的其他問題(或是在流式場景中可以改進的問題):

序列化 :在 Flink 的使用下,RocksDB 並不能很好地處理 Read-Modify-Write 的場景,尤其是用户數據結構較為複雜時,現象會非常明顯,一次 Update 即意味着一次讀取時的反序列化和一次寫入時的序列化,很多用户自定義 UDAF 時不注意存儲數據結構的複雜度,這裏很容易出現瓶頸。 壓縮 :數據的壓縮和解壓縮,同上。 小文件 :頻繁 update,造成生成的 SST 文件都是小文件。 時間語義 :缺乏豐富的時間語義(比如事件時間) retract :retract 操作會造成大量 delete 從而降低 scan 和 seek 的性能 ....

總結

本文主要介紹了 RocksDB 的相關原理和與流式場景結合時存在的若干問題。這並不影響用户繼續使用 RocksDBStateBackend 作為大狀態的存儲後端,只是期待未來會有一個跟流式場景更加貼合的 Storage 吧,可能是 Embeded Storage,也可能是 Distributed Storage,隨着流式場景越來越流行,這件事情一定會有人在做或者即將準備做的。

References

[1] RocksDB:  http://github.com/facebook/rocksdb/wiki/RocksDB-Overview

[2] 文件結構:  http://github.com/facebook/rocksdb/wiki/Rocksdb-BlockBasedTable-Format#file-format

[3] Leveled Compaction(默認策略):  http://github.com/facebook/rocksdb/wiki/Leveled-Compaction

[4] Universal Compaction:  http://github.com/facebook/rocksdb/wiki/Universal-Compaction

[5] FIFO Compaction:  http://github.com/facebook/rocksdb/wiki/FIFO-compaction-style

[6] wiki:  http://github.com/facebook/rocksdb/wiki/FIFO-compaction-style

[7] Hazelcast Jet 論文:  http://www.liaojiayi.com/Hazelcast-jet/