位元組跳動 Flink 狀態查詢實踐與優化

語言: CN / TW / HK

背景

眾所周知,Flink 中的 State 儲存了運算元計算過程的中間結果。當任務出現異常時,可以通過查詢任務快照中的 State 獲取有效線索。

但目前對於 Flink SQL 任務來說,當我們想要查詢作業 State 時,通常會因為無法獲知 State 的定義方式和具體型別等資訊,而導致查詢 State 的成本過高。

為了解決這個問題,位元組跳動流式計算團隊在內部提出了 State Query on Flink SQL 的解決方案——使用者通過寫 SQL 的方式就可以簡單地查詢 State。本文將主要介紹位元組跳動在 Flink 狀態查詢這方面所進行的相關工作。

State Processor API 介紹

01.png

提到狀態查詢,我們自然會聯想到 Flink 在 1.9 版本提出的特性 -- State Processor API。使用 State Processor API,我們可以將作業產生的 Savepoint 轉換成 DataSet,然後使用 DataSet API 完成對 State 的查詢、修改和初始化等操作。

02.png

下面簡單介紹一下如何使用 State Processor API 來完成 State 的查詢:

  • 首先建立 ExistingSavepoint 用來表示一個 Savepoint。初始化 ExistingSavepoint 時需要提供 Savepoint 路徑和 StateBackend 等資訊;
  • 然後實現 ReaderFunction 用於重新註冊所需要查詢的 State 以及定義處理 State 的方式。查詢狀態的過程中會遍歷所有的 Key 並按照我們定義的方式去操作 State;
  • 最後,呼叫 Savepoint.readKeyedState 並傳入運算元的 uid 和 ReaderFunction,就可以完成 State的查詢。

03.png

接下來為大家簡述一下 State 查詢背後的原理

在 Savepoint 目錄中包含兩種檔案,一種是狀態資料檔案,比如上圖中的 opA-1-state ,這個檔案裡面儲存著運算元 A 在第一個 SubTask 狀態的明細資料;還有一種元資料檔案,對應上圖中的 _metadata,元資料檔案中儲存了每個運算元和狀態檔案的對映關係。

當我們在進行狀態查詢的時候。首先在 Client 端會根據 Savepoint 路徑去解析 metadata 檔案。通過運算元ID,可以獲取需要查詢的狀態所對應的檔案的控制代碼。當狀態查詢真正執行時,負責讀取狀態的 Task 會建立一個新的 StateBackend ,然後將狀態檔案中的資料恢復到 Statebackend 中。等到狀態恢復完成之後就會遍歷全部的 Key 並把對應的狀態交給 ReaderFunction 處理。

04.png

有些同學可能會問,既然社群已經提供了查詢 State 的功能,我們為什麼還要去做同樣的工作呢?主要是因為我們在使用 State Processor API 的過程中發現一些問題:

  1. 每次查詢 State 我們都需要獨立開發一個 Flink Batch 任務,對使用者來說具有一定的開發成本;
  1. 實現 ReaderFunction 的時候需要比較清晰地瞭解任務狀態的定義方式,包括 State 的名稱、型別以及 State Descriptor 等資訊,對使用者來說使用門檻高較高;
  1. 使用 State Processor API 時,只能查詢單個運算元狀態,無法同時查詢多個運算元的狀態;
  1. 無法直接查詢任務狀態的元資訊,比如查詢任務使用了哪些狀態,或者查詢某個狀態的型別。

05.png

總體來說,我們的目標有兩個,一是降低使用者的使用成本;二是增強狀態查詢的功能。我們希望使用者在查詢 State 時能用最簡單的方式;同時也不需要知道任何資訊。

此外,我們還希望使用者能同時查詢多個運算元的 State ,也可以直接查詢作業使用了哪些 State,每個 State 的型別是什麼。

因此,我們提出了 State Query on Flink SQL 的解決方案。 簡單來說是把 State 當成資料庫一樣,讓使用者通過寫 SQL 的方式就可以很簡單地查詢 State。

06.png

在這個方案中,我們需要解決兩個問題:

  • 如何對使用者遮蔽 State 的資訊:參考 State Processor API 我們可以知道,查詢 State 需要提供非常多的資訊,比如 Savepoint 路徑、 StateBacked 型別、運算元 id 、State Descriptor 等等。通過 SQL 語句顯然難以完整地表述這些複雜的資訊,那麼查詢狀態到底需要哪些內容,我們又如何對使用者遮蔽 State 裡複雜的細節呢?這是我們面對的第一個難點。
  • 如何用 SQL 表達 State:State 在 Flink 中的儲存方式並不像 Database 一樣,我們如何去用 SQL 來表達狀態的查詢過程呢?這是我們要解決的另一個難點。

StateMeta Snapshot 機制

07.png

首先我們來回答第一個問題,查詢一個 State 需要哪些資訊呢

可以參考上文中 State Processor API 的示例,當我們建立 ExistingSavepoint 和 ReaderFunction 的時候,我們需要提供的資訊有 Savepoint 路徑、Backend 型別、OperatorID、運算元 key 的型別、State 名稱以及 Serializer 等等,我們可以將這些統一稱為狀態的元資訊。

對於 Flink SQL 任務來說,要清楚地瞭解這些資訊,對使用者來說門檻是非常高的。我們的想法是讓使用者只需要提供最簡單的資訊,即 Savepoint ID ,然後由 Flink 框架把其他的元資訊都存在 Savepoint 中,這樣就可以對使用者遮蔽 State 那些複雜的細節,完成狀態的查詢。因此,我們引入了 StateMeta Snapshot 機制。

08.png

StateMeta Snapshot 簡單來說就是把狀態的元資訊新增到 Savepoint Metadata 的過程,具體步驟如下:

  • 首先在 State 註冊的時候,Task 會把 operatorName\ID\KeySerializer\StateDescriptors 等元資訊都儲存在 Task 的記憶體中;
  • 觸發 Savepoint 時,Task 會在製作快照的同時,對狀態的元資訊也同樣進行快照。快照完成之後將狀態的元資訊 (StateMeta) 和狀態檔案的控制代碼 (StateHandle) 一起上報給 JobManager;
  • JobManager 在收到所有 Task 上報的 StateMeta 資訊之後 ,將這些狀態元資訊進行合併,最後會把合併之後的狀態元資訊儲存到 Savepoint 目錄里名為 stateInfo 的檔案中。

之後在狀態查詢時就只需解析 Savepoint 中的 stateInfo 檔案,而不再需要使用者通過程式碼去輸入這些 State 的元資訊。通過這樣的方式可以很大程度地降低使用者查詢狀態的成本。

State as Database

接下來我們來回答第二個問題,我們如何用 SQL 來表達 State。其實社群在設計 State Processor API 的時候就提出了一些解決思路,也就是 State As Database。

09.png

在傳統的資料庫中,通常用 Catalog、Database、Table 這個三個元素來表示一個 Table,其實我們也可以將用樣的邏輯到對映到 Flink State 上。我們可以把 Flink 的 State 當作一種特殊的資料來源,作業每次產生的 Savepoint 都當作一個獨立 DB 。在這個 DB 中,我們將 State 元資訊、State 的明細資料,都抽象成不同的 Table 暴露給使用者,使用者直接查詢這些 Table 就可以獲取任務的狀態資訊。

10.png

首先我們來看如何把 State 表示為 Table。我們都知道 Flink 中,常用的 State 兩種型別 ,分別是 KeyedState 和 OperatorState

  • 對於 OperatorState 來說,它只有 Value 這一個屬性,用來表示這個 State 具體的值。因此我們可以把 OperatorState 表示為只包含一個 Value 欄位的表結構。
  • 對於 KeyedState 來說,每個 State 在不同的 Key 和 Namespace 下的值可能都不一樣, 因此我們可以將 KeyedState 表示為一個包含 Key、Namespace、Value 這三個欄位的表結構。

11.png

當我們抽象出了單個 State 之後,想要表示多個 State 就比較容易了。可以看到在上圖的例子中,這個運算元包含 3 個 State,分別是兩個 KeyedState 和一個 OperatorState,我們只需要將這些 Table 簡單的 union 起來,再通過 state_name 欄位去區分不同的 State,就可以表示這個運算元中所有的 State。

12.png

最後還有一個問題,我們如何知道一個任務到底用了哪些 State 或者這些 State 的具體型別 呢?

為了解決這個問題,我們定義了一種特殊表 -- StateMeta ,用來表示一個 Flink 任務中所有 State 的元資訊。StateMeta 中包含一個任務中每個 State 的名稱、State 所在的運算元 ID 、運算元名稱 、Key 的型別和 Value 的型別等等,這樣使用者直接查詢 StateMeta 這個表就能獲取任務中所有狀態的元資訊。

使用 Flink Batch SQL 查詢任務狀態

13.png

以上就是狀態查詢方案的整體介紹。那我們到底如何去查詢一個 State 呢,我們以一個 Word Count 任務 為例來說明

首先,我們需要建立一個 Flink SQL 任務並啟動。通過 web-ui 可以看到這個任務中包含三個運算元,分別是 Source,Aggregate 還有 Sink。然後,我們可以觸發 Savepoint,當 Savepoint 製作成功之後獲取對應的 SavepointID。我們可以通過 SavepointID 去完成作業狀態的查詢。

14.png

假如我們現在對 Flink SQL 任務中狀態的使用一無所知,那麼首先我們需要查詢的就是這個 Flink 任務中包含哪些 State 以及這些 State 的型別。我們可以從 StateMeta 表獲取這些資訊。如上圖中場景一所示,通過查詢 StateMeta 表,可以看到這個任務包含一個 ListState 和一個 ValueState,分別存在於 Source 運算元和 Aggregate 運算元中。

此外,有些對 Flink 比較瞭解的同學知道,KafkaSource 中的 State 是用於記錄當前消費的 Offset 資訊。 如場景二所示,我們可以通過查詢 Source 運算元的狀態,獲取到任務中消費 Kafka Topic 的 Partition 和 Offset 資訊。

還有一種比較常見的場景,比如下游的業務同學發現某個 key(比如 key_662)的結果異常。我們在定位問題的時候可以直接去查詢作業中 aggregate 運算元中的狀態,同時去指定 key 等於 key_662 作為查詢條件。 如上圖場景三所示,通過查詢的結果可以看到,當 key 為 662 時對應的聚合結果是 11290。使用者使用這樣的方式就可以比較方便地驗證狀態是否正確。

未來展望

15.png

未來,我們計劃進一步豐富 State 的功能,目前我們支援了使用 SQL 查詢 State 的功能 ,其實社群還提供了 State 修改和初始化的能力。在一些場景下,這些能力也比較重要。比如,我們已知狀態中的部分 key 計算錯誤,希望將狀態中這部分的資料進行修正;或者任務邏輯發生變更以後和之前的狀態不能完全相容, 這個時候我們希望可以通過狀態修改和初始化的能力去生成一個新的 Savepoint。同樣,在使用方式上我們也希望使用者能直接使用 SQL 中 insert 和 update 語法來完成狀態的修改和初始化操作。

其次,我們會進一步加強 State 的可用性。我們使用 DAG 編輯的方案解決了作業拓撲發生變化時產生的狀態不相容問題,但是當 Flink SQL 任務修改欄位時 State Serializer 可能會變化,同樣導致狀態無法相容。針對這種情況我們設計了完整的 Flink SQL State Schema Evolution 方案,可以極大的增強 Flink SQL 任務發生變化之後狀態的恢復能力,目前方案正在落地中。此外,我們還提供了完善的狀態恢復事前檢查能力,能夠做到在任務上線之前就檢查出狀態是否相容並告知使用者,避免狀態不相容引起的作業啟動失敗對線上造成影響。

加入我們

位元組跳動實時計算團隊負責公司內部實時計算場景, 支撐了數倉/機器學習/推薦/搜尋/廣告/流媒體/安全和風控等眾多核心業務,我們面臨的挑戰是超大單體作業和超大叢集規模的應用場景,在 SQL, State, Runtime 上都有深度優化。公司仍處於高速發展階段,歡迎有能力、有想法的同學來這裡一起建設實時計算引擎。工作地點:北京、杭州,感興趣的同學歡迎投遞:http://job.toutiao.com/s/YfA8XnA