京東自適應資料傾斜處理演算法

語言: CN / TW / HK

本文討論了京東Spark計算引擎研發團隊關於自主研發並落地資料傾斜解決方案,助力京東大規模離線計算場景的探索和實踐。近年來,大資料技術在各行各業的應用越來越廣泛,Spark自UCBerkeley的AMP實驗室誕生到如今3.0版本的釋出,已過十年,儼然已經成為大資料計算領域名副其實的老專案。雖經過不斷的迭代和優化,Spark功能日趨成熟與完善,但在效能及穩定性方面,仍然還有很多可以提升的地方。資料傾斜問題作為大資料計算領域中的一個頑疾,就是其中重點之一。我們希望在京東超大規模資料體量及複雜業務場景的背景下,通過自研資料傾斜優化演算法,解決現有問題,打造穩定高效的JDSpark計算引擎,助力京東大促過程中的一些應用實踐,能夠給大家提供一些思路和啟發,同時也歡迎大家多多交流,給我們提出寶貴建議。

什麼是資料傾斜

資料傾斜是分散式計算領域中最為棘手的問題之一,它是指在分散式的資料並行處理過程中,由於資料分片不均勻,導致大量資料集中到一個或少數計算節點上,導致少數任務的處理速度遠低於平均速度,甚至直接導致OOM記憶體溢位,進而拖慢整個計算環節。

資料傾斜的發生往往與資料Shuffle緊密相連,在Join(連線)、Aggregate(聚合)、Window(視窗)等計算中,必須從各個前序計算節點中將相同key相關的資料拉取到某個節點上進行計算,當資料分佈不均存在hot key時,其對應計算節點上的資料量便很可能超過其他節點。

社群版實現

通過對線上任務的分析,我們發現約80%的資料傾斜是由Join計算導致,傾斜Stage嚴重影響了任務的時效性,解決SkewedJoin迫在眉睫。所幸的是,從3.0開始,Spark引入了Adaptive Query Execution(AQE)機制,可以通過分析已完成Stage的統計資訊,來動態的調整後續Stage的執行,以達到更佳的優化效果。圖1展示了AQE的基本架構。而SkewedJoin的優化處理,正是AQE中最重要的應用之一。

圖1 Adaptive Query Execution

圖2、3來自於Databricks介紹AQE的部落格。其中圖2展示了兩表Join情況下的資料傾斜,其中左側Table A發生了嚴重的資料傾斜,其分割槽A0的資料量遠大於其他三個分割槽A1~A3;在計算過程中,A0-B0對應的任務Task0的計算壓力將大於其他三個Task,進而拖慢整個Stage。

圖2 資料傾斜

基於AQE框架,社群版本是這樣處理2表Join場景下的資料傾斜:在前序Stage完成後,AQE可以獲取其各個分割槽大小;其中的OptimizeSkewedJoin優化規則,將首先分析JoinType(如Inner、Left、Right、LeftSemi等)來判斷左右兩側是否可以進行Split分割處理,判斷的依據是Split之後再執行Join是否會破壞資料正確性;若可處理,則進一步分析分割槽大小的分佈,檢測是否存在傾斜分割槽(預設邏輯為分割槽大小超過中位數的5倍);分析結束後,若存在可處理的傾斜分割槽(圖2中的A0),則對其進行分割(分割為A0-0,A0-1),並將對應分割槽(B0)進行Duplicate複製處理,這樣Task0(A0 - B0)將被分割成為兩個Task(A0-0 - B0;A0-1 - B0)。處理後的Stage將由4個Task變為5個Task,但各個Task的計算壓力得到了平滑,整個Stage的執行效率得到了提升。

在實際中,我們發現這裡有三點值得注意的技術細節:

1、 JoinType約束:目前的Split+Duplicate處理機制,會對JoinType產生約束;具體見表1,如果圖3中的JoinType為RightOuter,則不能對A0進行分割,否則會導致資料正確性問題。

2、 處理開銷:對應非傾斜分割槽會被多次讀取,如圖3中的B0被讀取兩次。

3、 組合爆炸:以Table A Inner Join Table B為例,若左右兩側的分割槽A0和B0均發生了傾斜,且分別被分割成M份和N份,則Task0最終會被拆分成M X N個Task,當M X N數值過大時,可能會導致一定的效能迴歸。

圖3 基於AQE處理資料傾斜

表1 JoinType約束

業界的改進

通過觀察線上任務,社群版資料傾斜處理能力可以明顯提升命中任務的時效性,但其依然存在一些不足:

1、模式限制:目前社群版本僅支援2表Join的場景,且匹配模式非常嚴格(見圖4,左側為SortMergeJoin,右側為ShuffledHashJoin);當前支援的模式中,兩側的輸入不能存在聚合、對映等其他節點。

2、語義限制:即上文所提的JoinType約束,目前在Split+Duplicate處理機制下,類似Table A leftJoin skewed Table B的情況是不能被處理的。

圖4 目前社群版所支援的資料傾斜模式

目前業界主要的優化方向在於放寬模式限制,即在AQE框架下支援更多的模式,以如圖5為例,假設Shuffle2中發生資料傾斜,但由於Shuffle1端存在Aggregate運算元,導致匹配不上圖4中的模式。目前業界主要有兩種實踐方向:

方案1:主動引入額外的Shuffle將複雜Stage轉化為社群版可以處理的簡單模式。在Aggregate節點後,主動插入Shuffle,導致Shuffle3與Shuffle2之間的Join計算滿足圖4中的模式;。

方案2:擴充套件並維護傾斜模式集,需要不斷將線上遇到的傾斜模式新增進集合,如圖5這種單側存在聚合的模式。

圖5 左側存在聚合節點的資料傾斜

京東自適應資料傾斜演算法

業界的改進均可以拓寬資料傾斜處理的覆蓋面,但我們在引入京東平臺時,發現了其中的一些缺陷:

方案1:主動引入的Shuffle有時是可以避免的,如圖5中的Shuffle3可以通過調整資料傾斜演算法來避免的。

方案2:除了圖5中的單邊聚合模式,還需要支援多表Join模式、含Union的模式等;特別的,傾斜模式之間可能是關聯的,比如Union節點的每個子節點分屬不同模式,或多表Join中的某些節點又包含Aggregate節點;這導致很難列舉所有可能的模式,且難以維護,很容易陷入頭痛醫頭腳痛醫腳的困境。

通過仔細分析平臺任務的傾斜模式,並深入調研業界已有的改進工作,我們歸納出了一種較為通用的優化演算法。不再使用模式匹配的方法,而是分析節點語義,搜尋出可以分割的傾斜葉子節點,並進行分割和組合處理。其處理邏輯如下:

OptimizeSkewedJoinV2 :

Check operator types

{ Non-Skewed SMJ/SHJ/Aggregrate/Window, BHJ/Sort/Project/Filter/… }

Find Splittable Shuffles:

Split Splittable Skewed partitions & Duplicate other partitions

Check Combinatorial Explosion & Re-split if necessary

Mark all SMJ/SHJ/Aggregate/Window Skewed

步驟一:出於保證資料正確性的考慮,利用白名單(包含Sort、Project、BHJ、Non-Skewed SMJ/SHJ/Aggregate/Window等)對全部子孫節點進行型別檢查,若滿足要求,則可以進行後續優化;

步驟二:語義分析,搜尋Splittable葉子節點:

  • 遇到SMJ/SHJ (Inner/Cross)節點,搜尋左右子節點;

  • 遇到SMJ/SHJ (LeftOuter/LeftSemi/LeftAnti)節點,搜尋左子節點;

  • 遇到SMJ/SHJ (RightOuter)節點,搜尋右子節點;

  • 遇到SMJ/SHJ (FullOuter)節點,停止當前路徑搜尋;

  • 遇到Aggregate/Window節點,停止當前路徑搜尋;

  • 遇到其他節點,則搜尋其子節點;

  • 所有最終到達的Shuffle節點均為Splittable;

步驟三:利用分割槽資訊,分析Splittable葉子節點是否發生傾斜;若發生傾斜,則對其進行分割,並複製其他分割槽資料;

步驟四:對組合數進行檢測,當超過閾值(預設1000)時,會對分割數進行壓縮;

步驟五:對分割後的資料進行組合遍歷,生成最終的Task。並將樹結構中的SMJ/SHJ/Aggregate/Window節點標記為Skewed狀態,以避免EnsureRequirements引入新的Shuffle;

圖6 在複雜Stage中搜索Splittable葉子結點

以圖6為例,步驟二中篩選出Shuffle 0/2/4具備可分割性,當其中發生資料傾斜時,則可以被處理;步驟三中,對於每個分割槽,會通過遍歷所有的Split組合生成優化後的任務;

假設對於分割槽0,Shuffle 0/2/4均發生了資料傾斜,並均被分割成了100份,則通過組合遍歷生成一百萬(100X100X100)個新Task,但這會導致Shuffle 1/3/5中分割槽0的資料均被讀取了一百萬次;步驟四會對組合數進行壓縮,將分割方案由100X100X100壓縮至10X10X10。

線上案例

目前我們設計的新演算法已經上線一段時間,應用於保障關鍵鏈路中的計算任務。

圖7 四表Join傾斜

圖7為一個典型的線上案例,在計算過程中發生了4表Join傾斜(2SMJ+1BHJ),其中最左側的Shuffle資料發生了傾斜;

圖8 四表Join傾斜優化相關日誌

圖8中的日誌記錄了整個處理過程,從Stage的Project節點開始,首先發現兩個SMJ(#2002,#1996)和三個Shuffle輸入(#12,#9,#11);分析中發現JoinType均為LeftOuter,故最左側的Shuffle(#12)滿足Splittable條件;接著對Shuffle(#12)進行分析,發現多個傾斜分割槽,於是進行分割處理;

圖9 四表Join傾斜優化時效性優化

優化前Stage 20共包含3000個Task,耗時長達1~2小時;優化後共生成3051個Task,耗時優化至11分鐘,顯著提高了時效性。

總結與展望

京東Spark平臺承擔了海量的計算任務,常有幾千行的大SQL出沒,計算邏輯千差萬別。我們對線上傾斜模式進行分析,並對業界處理方法進行借鑑,歸納總結出一種較為通用的處理演算法。不過目前仍然存在不少的待優化項需要繼續進行攻克:

1、目前的演算法需要所有葉子節點均為Shuffle/Broadcast型別,尚不支援Bucket Join。後續需要支援DataScan型別的葉子節點,以支援Bucket Join的推廣。

2、突破JoinType限制:通過對線上任務的分析,目前仍然有一定量的類似A leftJoin skewed B、A leftJoin skewed B leftJoin C的傾斜模式。目前Split+Duplicate處理機制不能處理這種傾斜模式,需要突破當前限制,進一步拓展覆蓋度。

3、目前我們的工作主要圍繞Join開展,未來需要進一步支援Window和Aggregate運算元引發的資料傾斜。

在不斷提升平臺整體效能的同時,我們會繼續深度鑽研資料傾斜、資料膨脹等異常場景,保障長尾任務的穩定性,為京東大資料平臺的湖倉一體升級工程打造堅實可依賴的高效引擎基礎。

我們還會同開源社群和業界同行保持密切的技術交流,在支撐好內部計算任務的同時,也會把相關優化工作回饋給社群,共建Spark生態。