位元組跳動 Flink 單點恢復功能及 Regional CheckPoint 優化實踐

語言: CN / TW / HK

一、單點恢復機制

在位元組跳動的實時推薦場景中,我們使用 Flink 將使用者特徵與使用者行為進行實時拼接,拼接樣本作為實時模型的輸入。拼接服務的時延和穩定性直接影響了線上產品對使用者的推薦效果,而這種拼接服務在 Flink 中是一個類似雙流 Join 的實現,Job 中的任何一個 Task 或節點出現故障,都會導致整個 Job 發生 Failover,影響對應業務的實時推薦效果。

在介紹單點恢復之前,先來回顧一下 Flink 的 Failover 策略。

  • Individual-Failover:

只重啟出錯的 Task,適用於 Task 間無連線的情況,應用場景有限。

  • Region-Failover:

該策略會根據 Task 之前的連通性將所有 Task 劃分為數個 Region。當有 Task 發生故障時,它會嘗試找出進行故障恢復需要重啟的最小 Region 集合。相比於全域性重啟故障恢復策略,這種策略在一些場景下的故障恢復需要重啟的 Task 會更少。

如果使用 Region-Failover 策略,但因為 Job 是一個全連線的拓撲,本身就是一個大 Region。重啟 Region 相當於重啟整個 Job,所以我們考慮是否可以用 Flink Individual-Task-Failover 策略去替代 Region-Failover 策略,而 Individual-task-failover 的策略在這種拓撲下是完全不適用的。所以我們對於以下特徵的場景,需要設計開發一個新的 Failover 策略:

  • 多流 Join
  • 流量大(30M QPS)、高併發度(16K*16K)
  • 允許短時間內小部分資料丟失
  • 對資料輸出的持續性要求高

在講述技術方案之前,有必要先來了解 Flink 現有的資料傳輸機制。

從左往右看(SubTaskA):

  1. 當資料流入時會先被 RecordWriter 接收
  1. RecordWriter 根據資料的資訊,例如 key,將資料進行 Shuffle 選擇對應的 Channel
  1. 將資料裝載到 Buffer 中,並放到 Channel 對應的 Buffer 佇列裡
  1. 通過 Netty Server 向下遊傳送
  1. 下游 Netty Client 接收資料
  1. 根據 Buffer 中的分割槽資訊,轉發發到下游對應的 Channel 中
  1. 由 InputProcessor 將資料從 Buffer 中取出,執行 Operator 邏輯

根據上面提出的思路我們要解決以下幾個問題:

  • 如何讓上游 Task 感知下游 Failure
  • 下游 Task 失敗後,如何讓上游 Task 向正常的 Task 傳送資料
  • 上游 Task 失敗後,如何讓下游 Task 繼續消費 buffer 中的資料
  • 上下游中不完整的資料如何處理
  • 如何建立新的連線

針對上述問題提出解決方案。

■ 如何讓上游 Task 感知下游 Failure

下游 SubTask 主動將失敗資訊傳遞給上游,或者 TM 被關閉上游 Netty Server 也可以感知到。上圖中用 X 表示不可用的 SubPartition。

首先將 SubPartition1 和對應的 View (Netty Server 用來取 SubPartition 資料的一個結構)置為不可用。

之後當 Record Writer 接收到新資料需要向 SubPartition1 傳送資料,此時需要進行一個可用性判斷,當 SubPartition 狀態可用則正常傳送,不可用直接丟棄資料。

■ 上游 Task 接收到下游 Task 新的連線

下游 SubTask 被重新排程啟動後,向上遊傳送 Partition Request,上游 Netty Server 收到 Partition Request 後重新給下游 SubTask 建立對用的 View, 此時上游 Record Writer 就可以正常寫資料。

■ 下游 Task 感知上游 Task 失敗

同樣的下游 Netty Client 能感知到上游有 SubTask 失敗了,這時找出對應的 Channel ,在末尾插入一個不可用的事件(這裡用感嘆號來表示事件)。我們的目的是想要儘可能的少丟資料,此時 Channel 中的 Buffer 任可以被 InputProcessor 正常消費,直到讀取到“不可用事件”。再進行 Channel 不可用標記和對應的 Buffer 佇列清理。

■ Buffer 中有不完整的資料

首先要知道不完整的資料存放在哪裡,它存在於 Input Process 的內部,Input Process 會給每一個 Channel 維護一個小的 Buffer 佇列。當收到一個 Buffer ,它是不完整的資料,那麼等到接收到下一個 Buffer 後再拼接成一條完整的資料發往 Operator。

■ 下游 Task 和上游 Task 重新連線

當上遊有問題的 Task 被重新排程後,通過呼叫 TaskManager API 來通知下游。下游 Shuffle Environment 收到通知後判斷對應的 Channel 狀態,如果是不可,用直接生成新的 Channel 並釋放掉老的。如果是可用狀態,說明 Channel 的 Buffer 沒有消費完,需要等待 Buffer 消費完再進行替換操作。

業務收益

上圖是以 4000 並行度的作業為例做了對比測試。業務是將一個使用者展現流和一個使用者行為流的進行 Join,整個作業共有 12000個 Task。

上圖中 單點恢復(預留資源)是使用排程組做的一個 Feature,在申請資源的時,選擇額外多申請一些資源,當發生 Failover 時省去了從 YARN 去申請資源的時間開銷。

最後做到了作業的輸出減少千分之一,恢復時間約 5 秒。因為整個恢復過程時間較短,可以基本做到下游無感知。

二、Regional Checkpoint

一個比較經典的資料整合場景,資料匯入匯出,比如從 Kafka 匯入到 Hive,滿足下面幾個特徵。

  • 拓撲中沒有 All-to-All 的連線
  • 強依賴 Checkpoint 來實現 Exactly-Once 語義下的資料輸出
  • Checkpoint 間隔長,對成功率要求高

在這種情況下,資料沒有任何的 Shuffle 。

在資料整合的場景中遇到哪些問題?

  • 單個 Task Checkpoint 失敗會影響全域性的 Checkpoint 輸出
  • 網路抖動、寫入超時/失敗、儲存環境抖動對作業的影響過於明顯
  • 2000並行以上的作業成功率明顯下降,低於業務預期

在這裡,我們想到作業會根據 Region-Failover 策略將作業的拓撲劃分為多個 Rregion。那麼 Checkpoint 是否可以採取類似的思路,將 Checkpoint 以 Region 的單位來管理?答案是肯定的。

在這種情況下不需要等到所有 Task checkpoint 完成後才去做分割槽歸檔操作(例如 HDFS 檔案 Rename)。而是當某個 Region 完成後即可進行 Region 級別的 Checkpoint 歸檔操作。

介紹方案之前先簡單回顧 Flink 現有的 checkpoint 機制。

現有 ckp1

上圖中是一個 Kafka source 和 Hive sink 運算元的拓撲,並行度為 4 的例子。

首先, Checkpoint Coordinator 觸發 triggerCheckpoint 的操作,傳送到各個 Source Task。在 Task 收到請求之後,觸發 Task 內的 Operator 進行 Snapshot 操作。例子中有 8 個 Operator 狀態。

現有 ckp1

在各 Operator 完成 Snapshot 後,Task 傳送 ACK 訊息給 Checkpoint Coordinator 表示當前 Task 已經完成了 Checkpoint。

之後當 Coordinator 收到所有 Task 成功的 ACK 訊息,那麼 Checkpont 可以認為是成功了。最後觸發 Finalize 操作,儲存對應的 Mmetadata。通知所有 Task Checkpoint 完成。

當我們使用 Region 方式去管理 Checkpoint 時會遇到什麼問題?

  • 如何劃分 Checkpoint Region

把彼此沒有連線的 Task 集合,劃分為 1 個 Region。顯而易見例子中有四個 Region。

  • 失敗 Region 的 Checkpoint 結果如何處理

假設第一次 Checkpoint 能正常完成,每個 Operator 對應的狀態都成功寫入 HDFS Ccheckpoint1 目錄中,並通過邏輯對映,將 8 個 Operator 對映到 4 個 Ccheckpoint Region。注意僅僅是邏輯對映,並沒有對物理檔案做出任何移動和修改。

現有 ckp1

第二次進行 Checkpoint 時 region-4-data(Kafka-4,Hive-4)Checkpoint 失敗。Checkpoint2 (job/chk_2)目錄中沒有對應 Kafka-4-state 和 Hive-4-state 檔案,當前 Checkpoint2 是不完整的。為了保證完整,從上一次或之前成功的 Checkpoint 檔案中尋找 region-4-data 成功的 state 檔案,並進行邏輯對映。這樣當前 Checkpoint 每個 Region 狀態檔案就完整了,可以認為 Checkpoint 完成。

此時如果發生大部分或所有 Region 都失敗,如果都引用前一次 Checkpoint 那麼當前這個 Checkpoint 和上一個 Checkpoint 相同也就沒有意義了。

通過配置 Region 最大失敗比例, 比如 50%,例子中 4 個 Region ,最多能接受兩個 Region 失敗。

  • 如何避免在檔案系統上儲存過多的 Checkpoint 歷史資料

如果有某個 Region 一直失敗(遇到髒資料或程式碼邏輯問題),當前的機制會導致把所有歷史 Checkpoint 檔案都保留下來,顯然這是不合理的。

通過配置支援 Region 最大連續失敗次數。例如2表示 Region 最多能引用前兩次的 C checkpoint 成功的 Region 結果。

工程實現難點

  1. 如何處理 Task Fail 和 Checkpoint Timeout
  1. 同一 Region 內已經 Snapshot 成功的 SubTask 狀態如何處理
  1. 如何保證和 Ccheckpoint Coordinator 的相容性

來看 Flink 目前是如何做的。

現有 coordinator

當發生 Task Failure ,先會通知到 JobMaster FailoverStrategy,通過 FailoverStrategy 來通知 Checkpoint Coordinator 進行 Checkpoint Cancel 操作。

那麼 Checkpoint Timeout 情況如何處理?當 Coordinator 觸發 Checkpoint 時,會開啟 Checkpoint Canceller。Canceller 內有一個定時器,當超過預設時間並且 Coordinator 還未完成 Checkpoint,說明出現 Timeout,通知 Coordinator Cancel 本次 Checkpoint。

無論是 Task fail 還是 Timeout 最終都會指向 Pendding Checkpoint,並且當前指向的 Checkpoint 就會被丟棄。

在做出相應修改前先梳理 Checkpoint 相關的 Message,和 Checkpoint Coordinator 會做出的反應。

Global checkpoint 為 Flink 現有機制。

為了保持和 Checkpoint Coordinator 相容性,新增一個 CheckpointHandle 介面,並添加了兩個實現分別是 GlobalCheckpointHandle 和 RegionalCheckpointHandle 通過過濾訊息的方式實現 Global Checkpoint 和 Region Checkpoint 相關操作。

Region Ccheckpoint 提一點。如果 Handler 接收到失敗訊息,將這個 Region 置為失敗,並嘗試從之前的 Ssuccessful Checkpoint 進行 Region 邏輯對映。同樣 Ccoordinator 傳送 NofityComplate 訊息也會先經過 Handler 的過濾,過濾掉髮送給失敗 Task 的訊息。

業務收益

測試在 5000 並行度下,假設單個 Task Snapshot 的成功率為 99.99%。使用 Global Checkpoint 的成功率為 60.65%, 而使用 Region Checkpoint 仍然能保持 99.99%。

三、Checkpoint 上的其它優化

■ 並行化恢復 operator 狀態

Union State 是一種比較特殊的狀態,在恢復時需要找到 Job 所有的 Task State 再進行 Union 恢復到單個 Task 中。如果 Job 並行度非常大,如 10000, 那麼每個 task 的 Union State 進行恢復時至少需要讀取 10000 個檔案。如果序列恢復這 10000 個檔案裡的狀態,那麼恢復的耗時可想而知是非常漫長的。

雖然 OperatorState 對應的資料結構是無法進行並行操作的,但是我們讀取檔案的過程是可以並行化的,在 OperatorStateBackend 的恢復過程中,我們將讀取 HDFS 檔案的過程並行化,等到所有狀態檔案解析到記憶體後,再用單執行緒去處理,這樣我們可以將幾十分鐘的狀態恢復時間減少到幾分鐘。

■ 增強 CheckpointScheduler 並支援 Checkpoint 整點觸發

Flink Checkpoint 的 Interval,Timeout 在任務提交之後是無法修改的。但剛上線時只能根據經驗值進行設定。而往往在作業高峰期時會發現 Interval,Timeout 等引數設定不合理。這時通常一個方法是修改引數重啟任務,對業務影響比較大,顯然這種方式是不合理的。

在這裡,我們對 CheckpointCoordinator 內部的 Checkpoint 觸發機制做了重構,將已有的 Checkpoint 觸發流程給抽象出來,使得我們可以很快地基於抽象類對 Checkpoint 觸發機制進行定製化。比如在支援資料匯入的場景中,為了更快地形成 Hive 分割槽,我們實現了整點觸發的機制,方便下游儘快地看到資料。

還有很多優化點就不再一一列舉。

四、挑戰 & 未來規劃

目前位元組內部的作業狀態最大能達到 200TB 左右的水平,而對於這種大流量和大狀態的作業,直接使用 RocksDB StateBackend 是無法支撐的。所以未來,我們會之後繼續會在 state 和 checkpoint 效能優化和穩定性上做更多的工作,比如強化已有的 StateBackend、解決傾斜和反壓下 Checkpoint 的速率問題、增強除錯能力等。

位元組跳動實時計算團隊負責公司內部實時計算場景, 支撐了數倉/機器學習/推薦/搜尋/廣告/流媒體/安全和風控等眾多核心業務,我們面臨的挑戰是超大單體作業(千萬級別 QPS),超大叢集規模(上萬臺機器)的應用場景,在 SQL, State, Runtime 上都有深度優化。公司仍處於高速發展階段,歡迎有能力、有想法的同學來這裡一起建設實時計算引擎。感興趣的同學歡迎投遞:http://job.toutiao.com/s/YfA8XnA