基於 TiSpark 的海量資料批量處理技術

語言: CN / TW / HK

作者介紹:楊哲軒,PingCAP 資深解決方案架構師。

熟悉 TiSpark 的人都知道,TiSpark 是 Spark 的一個外掛,它其實就是給予了 Spark 能夠去訪問 TiDB 底層分散式儲存引擎 TiKV 或者 TiFlash 的能力。之前我們一直在解決讀的問題,寫問題並沒有付出太多的時間去解決。今天就給大家揭祕,我們是怎樣使用 TiSpark 去實現海量資料批處理,然後寫入到 TiDB 裡面去的。

傳統批處理 vs TiSpark 的批處理

在介紹 TiSpark 之前,我們首先來回顧一下傳統批處理架構。

對於傳統批處理架構而言,首先要有一個數據,這個資料可以是使用者的 CSV 檔案,也可以是使用者從 TiDB 或 MySQL,或者是其它異構資料庫裡面讀出來的資料。在拿到這些資料之後,它首先需要做的是任務切分,對於每一個批次的任務,每一個小批的任務,分別去進行資料處理,然後再進行分批提交,最後再去寫入到 TiDB 裡面。

熟悉資料庫的人可能都知道,這一套架構有一個致命的問題:就是它沒有辦法通過資料庫來保證事務的 ACID 特性。

傳統的批處理架構,都需要引入一些任務表的機制,來追蹤每一個子任務的成功狀態。如果說子任務表中有一個狀態是失敗的,那可能就需要把整個任務全部回滾。甚至在一些情況下,都需要人工去介入。

對於 TiSpark 來說,則不需要這樣。

TiSpark 拿到讀取完畢的資料以後,首先把它當做一個整體去進行資料處理,無須分片分批處理。資料處理之後形成的新資料,是直接通過兩階段協議,併發的寫入到 TiKV 裡,不經過 TiDB Server。如果拿 TiSpark 的批處理技術和傳統批處理架構來進行對比,會發現傳統批處理架構有著兩個致命的缺陷。第一點是慢,在一些商業銀行,它的日中批處理任務,都是有一定的時效性的。如果說你的處理速度特別慢,是會影響到第二天的開業。第二,傳統批處理架構也是沒有辦法能夠保證事務。為了解決事務的問題需要引入很多機制,業務側去做這種事務的保證,會特別的複雜,難用,也會影響到整體的處理和寫讀的速度。

TiSpark 解讀

架構

接下來大家可能對 TiSpark 的整體架構會比較感興趣,下圖應該就說的比較清楚了。

首先,左側藍色部分是 TiDB 的分散式儲存引擎,包括 TiKV、TiFlash;粉色部分是 PD;右側綠色部分是 TiDB Server;上方黃色部分是 Spark 叢集。

在一個任務提交到 TiSpark,TiSpark 在處理完資料之後,開始寫入資料之前,會先進行一個鎖表的處理。鎖表的意義是防止其它的事務與 TiSpark 正在寫入的事務發生衝突,導致 TiSpark 的事務進行回滾。大家知道,TiSpark 批處理它所涉及到的資料量都會特別大,可能是成千萬,甚至上億的資料量,如果因為這樣而回滾是我們不想看到的事情,所以我們需要預先做一個鎖表。

這裡需要強調一下,這個鎖表只針對於 3.0.14 以上的版本 。在 4.0 版本中 TiDB 已經原生支援了 10GB 的大事務,它對事務的協議做了一定的修改,這也意味著如果 TiSpark 能夠相容這種協議上的修改,是可以不需要去鎖表的。

第二步就是 TiSpark 會對它將要寫入的資料去定型、統計、抽樣、計算,算出來它這一次批量的寫入,大概會生成多少個新的 Region,然後把這些資訊傳遞給 TiDB,由 TiDB 跟其它的元件去進行互動,把新生成的 Region 預先切分出來。Region 預切分的好處:

  • 第一,是規避了熱點問題。
  • 第二,如果說在 TiSpark 寫入過程中,因為 Region 發生分裂,可能會導致一些寫入效能的降級,通過這種方式,就能夠有效的去規避。

另外,TiSpark 在寫入過程中,也會跟 PD 去進行互動,這個互動主要是兩個方面。第一個方面,是一些元資訊。要知道 TiKV 底層資料是一個鍵值對,TiSpark 在寫入之前也會把所有的行資料,轉換成為鍵值對。既然是一個鍵值對,就需要知道我這個鍵值對需要去哪一個 Region,這就是 Region 具體地址的獲取。另外一方面,TiSpark 在寫入也是保證事務的,它需要向 PD 申請一個時間戳。不熟悉 TiDB 的同學,可以簡單把這個時間戳理解為事務的 ID 號,接下來就非常簡單了,準備工作都已經做完,TiSpark 會直接把它生成的鍵值對,通過 Spark Worker 去併發的多對多的寫入到 TiKV 裡面。

原理

接下來講一講 TiSpark 的原理。原理可以具體分為兩大塊。

第一塊是 TiSpark 實現了一個 Java 版本的 TiKV 客戶端 。這個客戶端的功能是比較多,也比較豐富,完全可以單獨剝離,然後拿去給用 Java 實現的業務去使用,就是跟 TiKV 去進行互動。首先,它實現了 Coprocessor 的介面。這也意味著它可以跟 TiKV 或者是 TiFlash 去進行互動,可以把一些算計進行下推,比如說 Limit、Order,或者是聚合等等。它也會做一些謂詞、索引、鍵值域的處理。比如我有一個查詢,它用了索引以後,或者說用了主鍵以後,它的查詢範圍可能是 10 到 100,如果我還繼續用全表查的話,速度會特別慢。所以,這時全表查會被優化成為 10 到 100 的範圍查。

另外,Java 版本的 TiKV 客戶端也實現了兩階段的協議 。這也是 TiSpark 能夠保證寫入符合 ACID 的核心功能。簡單來說,這個協議在 TiDB 那邊也有一樣的實現。只不過它是用 Golang 實現的。我們所做的工作,就是把這個協議用 Java 重新實現了一遍。此外,這個客戶端也會去維護一些統計資訊,索引資訊。這樣的好處就是在 Spark 做執行計劃的時候,能夠有效的利用到這些資訊,去選擇一條更優的執行路徑。

剛才提到的 Java 版本的 TiKV 的客戶端,只是告訴你,你可以通過這個去跟 TiKV、TiFlash 去進行互動,但是,並沒有解決另外一個很關鍵的問題,就是我怎麼樣把這個東西告訴 Spark。那這個問題的答案就是 TiSpark。

TiSpark 利用了 Spark 的 Extensions Point。我們之所以選擇 Spark Extensions Point 作為一個入口是因為這樣做可以減少維護成本,我們沒有必要單獨去維護一套 Spark 的程式碼。假設說我們現在選擇的是維護 Spark 程式碼,現在去實現訪問 TiKV 或是 TiFlash 的邏輯,那這意味著我們勢必要跟主幹進行分叉。熟悉開發的同學可能都知道,如果你跟主幹離的太遠,主幹的後續更新你是很難再合回來的。所以基於這種考量,我們當時是採用了 Spark Extensions Point 作為方案。這個方案除了上面說的這個好處以外,還有別的好處嗎?答案是有的。它可以劫持 Spark Catalyst 的優化器,能夠將怎麼樣去訪問 TiKV 或者是 TiFlash 的邏輯注入到 Spark 的執行計劃,或者是去進行一些相應的改寫。

並且 TiSpark 無論是單表寫入,還是多表寫入,它都是能夠保證事務的 ACID 特性的。只不過單表寫入,它是完全相容 Spark DataSource API。因為 Spark 裡面,DataFrame 就是一個單表的概念,如果說你想要去做多表的寫入的話,你需要使用的是 TiSpark 額外維護的介面(後文會舉例介紹)。我們能夠保證不管是 TiSpark 以後更新多少個版本,這個介面是不會變的。

融合

剛才是講了一些原理,可能大家會有一個疑問,你這個東西很好,那它怎麼樣能夠跟現有的分散式業務系統去融合呢?答案是,它是可以融合的。

舉一個簡單的例子。我們現在有一個分散式業務系統,它分為三個部分。第一部分是有一個服務應用框架;第二部分是有一個非同步任務應用框架;然後接下來就是批量應用框架。

服務應用框架是面向業務開發人員的,業務開發人員去實現這個批任務的邏輯,然後提交給服務應用框架,服務應用框架再把這個任務提交給非同步任務框架去進行排程,最終把它放到我們的批量應用框架裡面去真正的執行。有了 TiSpark 以後,TiSpark 可以比較好的融入到我們的批量應用框架裡面,剛才說的那個流程,只不過是非同步任務應用框架在提交給批量應用框架之後,整個的執行路徑是由 TiSpark 來控制。就是 TiSpark 來去控制它的整體的排程和處理,而不是由原來的批量排程框架,或者是批量處理框架去進行排程或者是處理。

應用

剛才一直在說原理和融合,下面就是講一講,TiSpark 到底是怎麼樣可以應用在這個批任務上?我個人認為,批任務最重要的其實是資料處理,在 TiSpark 裡面,資料處理是可以通過 Data Frame 中的介面來實現的。當然如果不那麼熟悉 Data Frame 介面的同學,也可以採用 Spark SQL 的方式來實現。下圖是一個比較簡單的例子:

比如說我有一張使用者表,這個使用者表裡面有它的貸款和利率,然後我想根據這個貸款和利率去計算當月所需要還的利息。那其實很簡單,你可以通過 DataFrame 中的介面,通過列的名字去定位到它的貸款,和它的利率,然後通過一個簡單的算術運算,加減乘除,把它的當月所需還的利息,算出來。然後最終是通過 DF 的另外一個介面 withColumn,把它新重建一個列,這個列的名字就叫做 toBeDeducted。然後就會生成一個新的 DataFrame。這個新的 DataFrame 與原來的 DataFrame 唯一的區別就是它多了一列,叫做 toBeDeducted。

接下來我們可以把這個 toBededucted 跟它原來的餘額去進行減法操作,減法操作完畢之後,這個新的餘額,就是經過批量任務以後真正的餘額。此時可以看到剛才算出來的 toBeDeducted 是一個冗餘列,我們是可以把它作一個丟棄操作。實際上也很簡單,你可以通過一個 job 去處理。另外,剛才這個計算應該是比較簡單,那同時,用這個 DatoFrame 的介面,也可以去實現一些比較複雜的批處理邏輯。

舉個例子,信用卡的積分專案。一個信用卡積分的計算,它可能有三張表,分別是積分資訊、消費資訊、規則資訊。積分資訊是使用者當前的積分;消費資訊就是每個月的消費金額;規則資訊是我在不同的商戶裡面,他的消費的返比是不一樣的。可能在珠寶類的商戶裡面,它的返比是 1:2,也就說,1 塊錢等於 2 積分。那我們可以把這三張表,在 Spark 裡面進行 join,然後生成一個新的 DataFrame。然後 DataFrame 再通過相應的列的名字,去進行一些算術計算,比如把消費金額乘以規則資訊裡面的係數,然後把這個批任務去執行完畢。執行完畢以後,可以去按照不同的表的結構,去對 DataFrame 進行相應的處理和操作。最終,通過我們的寫入介面,能夠又快又好的寫入到 TiDB 裡面。 這裡要強調的就是,TiSpark 的寫入是直接去寫 TiKV,不經過 TiDB 的,它繞過了 TiDB。

然後講一下單表寫入。單表寫入就比較簡單,它是完全相容了 Spark DataSource 的 API,使用上也是非常方便的。如果說你之前沒有 Spark DataSource 的經驗,我覺得學習上也一定會特別的快,因為 Spark 它也相容 Java 語言,然後你也可以用 Java 的介面去實現整個 DF 的寫入。關於寫入我想強調兩點,第一點就是 format 和 options。format 是因為 Spark 裡面的 format 有很多種。比如說,如果你想用 JDBC 寫,那你可能就要用 JDBC。如果你的寫入物件是一個 Parquet,那你可能需要用 parquet。那這邊因為我們的寫入物件是 TiDB 裡面的 TiKV,那我們這邊就要用 TiDB 這個字串。然後這個 options 裡面其實就是一個 TiDB 的 options。這裡面維護的是 TiDB server 的一些相關資訊。包括了它的 IT 地址、埠、使用者名稱、密碼等等。因為前面也提到了,我們的 TiSpark 在寫入的時候,也是需要跟 TiDB 去進行互動的,有了這些資訊,它就可以比較好的去跟 TiDB 進行互動。

下面講一下多表寫入。剛才的單表寫入,能夠完全相容 DataSource API,是因為在 Spark 裡面,DataFrame 就是一個單表的概念。那如果說你想再寫入多表,如果多表寫入也能夠保證 ACID 的特性的話,是沒有辦法去繼續使用 Spark DataSoucre 的 API,你必須去使用 TiSpark 提供的介面。這邊它有一個 DBTable 和 DataFrame 的對映關係。就是一張表你是需要通過它的庫名,它的表名,來對應到它具體的 DataFrame,然後把它放在一個 Map 裡面。假設你現在有三張表需要去同時寫入的話,那它這個 Map 裡面的元素應該是三個。接下來就是進入到我們 TiBatchWrite.write 這裡面的介面,它這裡面就會有一個 data 和 TiDB options。TiDB options 我在單表寫入的時候,也已經詳細介紹過了,這邊就不再贅述。

熟悉 Spark 的同學,可能會問一個問題,因為 Spark 中的 DataFrame 是一個單表的概念,如果說你做一個合併,很可能會有表結構不相容的問題。我是覺得這個問題非常好。但是,你仔細想一想,在 TiKV 裡面,它這個資料格式是什麼?是一個一個的鍵值對,那其實在我們支援多表寫入的時候,前面的邏輯都是單獨的,只有在 DataFrame 轉換成為了鍵值對以後,我們才會去把它合併。

舉個例子,假設說我現在有一張表它是有 100 行資料,兩外一張表是 200 行資料,轉化成為鍵值對以後,可以因為有索引,元件等,擴張了兩倍,就是 200 行變成 400 個鍵值對,100 行變成了 200 個鍵值對。合併完了以後,它是 600 個鍵值對。在合併完之後,我才去做兩階段協議的提交。因為兩階段協議的提交能夠保證你這 600 個鍵值對的提交,要麼是成功,要麼是失敗的。這也意味著,如果說我的 600 個鍵值對提交成功了,我的兩張表寫入是成功的,如果它沒有成功,那麼我們兩張表的寫入是失敗的。

所以說,通過我們的介面寫入的多表的寫入,也是符合 ACID 特性的。

另外一個大家可能會比較好奇的就是我有一個任務提交到了 TiSpark 裡面,我有沒有辦法去看的到它這個任務的進度?答案是可以的。下圖是我在提交了一個 4 百萬行的資料寫入的一個截圖。

從圖中可以看到,差不多接近 5 分鐘左右應該就能寫完。在 0 到 6 這個 job ID 這邊,其實做的都是準備工作。7 到 10 做的是兩階段提交裡面的選一個主鍵(在兩階段協議提交過程中保證事務的原子性)的步驟。然後 job ID 11 是真正的寫入的工作。通過這個監控,可以比較清楚的看到,目前這個批處理任務當前正在處理什麼。

優點

其實回過頭來看,我們這個 TiSpark 的批處理,有著什麼樣的優點?

第一個優點就是快,快,快。 重要的事情說三遍,因為它真的特別快。快的原因是什麼?是因為 TiSpark 繞過了 TiDB,可以多對多的並行寫入 TiKV。這意味著什麼?意味著它可以水平橫向擴充套件。如果我的資源限制在 TiKV 那邊,我可以簡單加一個 TiKV 節點,就是擴充套件它的磁碟資源。如果我的瓶頸在 Spark 這邊,我可以加一個 Spark 的計算節點。通過這種方式,可以又快又好的把這個資料寫到 TiKV 裡面。

第二個優點,是配置比較簡單。 熟悉 spring batch 的同學可能都知道,spring batch 需要配置一大堆的 Item reader 和 Iitem writer ,特別複雜,特別難用。對於 TiSpark 來說,你可能唯一需要配置的就是告訴 Spark 怎麼樣去用這個 TiSpark。而且 TiSpark 所有的批處理邏輯,基本上 99% 都是相容 Spark 的 DataSource API。只要你熟悉了 DataSource API,包括 DataFrame API,那你的批處理邏輯的書寫、寫入邏輯的書寫,都會非常的方便。

第三個優點是不僅快,它還能保證事務。 就是寫入要麼成功,要麼失敗。小孩子才做選擇,成年人全都要,就是又快又能夠保證事務。目前 TiSpark 的寫入效能可以在 8 分鐘內寫完 6000 萬行 TPCH 的 lineitem 的資料。

如果大家對 TiSpark 批處理方案有興趣的話,也歡迎郵件與我聯絡(郵箱:[email protected]),我們一起看看怎麼樣能夠把它融入到現有的分散式業務系統,更好的為使用者帶來價值、謀福利。