TiDB x Flink 資料整合實踐

語言: CN / TW / HK

作者簡介:

胡夢宇,知乎大資料基礎架構開發工程師

張允禹,知乎核心架構開發工程師

盛亮,知乎核心架構開發工程師

1 背景

1.1 TiDB 簡介

TiDB 是 PingCAP 公司自主設計、研發的開源分散式關係型資料庫,是一款同時支援線上事務處理與線上分析處理 (Hybrid Transactional and Analytical Processing, HTAP)的融合型分散式資料庫產品,具備水平擴容或者縮容、金融級高可用、實時 HTAP、雲原生的分散式資料庫、相容 MySQL 5.7 協議和 MySQL 生態等重要特性。目標是為使用者提供一站式 OLTP (Online Transactional Processing)、OLAP (Online Analytical Processing)、HTAP 解決方案。TiDB 適合高可用、強一致要求較高、資料規模較大等各種應用場景。

TiDB 由於其高可用,高效能,易擴充套件的特性,在知乎被大規模使用,並應用到一些核心業務場景。

1.2 資料整合平臺簡介

在大資料場景經常會有這樣的需求,將儲存在不同資料來源(如 MySQL, TiDB, Redis, Kafka, Pulsar 等)內的資料進行聯合查詢分析,產出一系列重要的報表,業務方用於衡量收益或者調整戰略。

為滿足上述場景,知乎基於 Flink 構建了資料整合平臺,利用 Flink 豐富的 connector 為使用者提供了實時和離線的資料同步及清洗功能,這樣使用者可以選擇將不同資料來源的資料匯入到同一種資料來源(通常是 Hive 和 HDFS)中儲存, 再借助一系列 OLAP 計算引擎如 MapReduce, Spark, Presto, ClickHouse, Doris 等進行進一步分析處理。

2 TiDB 給資料整合平臺帶來的挑戰

TiDB 在知乎內部主要是用於替代 MySQL, 以解決 MySQL 擴充套件能力不足的問題。在功能方面,因為 TiDB 相容 MySQL 協議,所以在絕大多數場景 TiDB 都能完美替代 MySQL, 甚至在某些特定場景(如超大表儲存, 高併發寫入等)更有優勢。因此在很長一段時間內,資料整合平臺都沒有將 TiDB 作為一個單獨的資料來源,而是將其作為 MySQL, 使用 MySQL 的相關工具來進行相容。

在資料整合場景將 TiDB 作為 MySQL 確實很方便省事,但是同時也為我們帶來一些問題,下面將分幾個小節來說明。

2.1 資料傾斜與資料版本問題

在將 TiDB 作為 MySQL 時,我們使用的是 Flink 社群提供的 flink-jdbc-connector. flink-jdbc-connector 在讀取資料時,會使用如下 SQL:

SELECT
  ${columns}
FROM
  ${tableName}
WHERE
  ${partitionColumn} BETWEEN ${value1} AND ${value2}

在 Flink 任務真正執行的時候,每個子任務會對分割槽欄位選取不同的範圍值做資料分片,從而達到併發抽數的效果。這樣進行資料分片會有兩個問題:首先是不同的資料分片之間資料版本不一致,因為每一條 SQL 都會發放到不同的 task 去執行,不同的 task 執行的時間是不同的,讀到的資料版本也是不同的;其次是需要保證每張表有一個適合分割槽的欄位,一般分割槽欄位會選擇有唯一鍵約束的數值型欄位,這樣可以保證每一個分割槽內的資料在分割槽欄位上不會重複,每一個分割槽內的資料條數可控。以下是我們使用 flink-jdbc-connector 時,遇到的痛點:

  1. 資料分割槽需要自己根據分割槽欄位的上下界設定,比較依賴經驗,不夠智慧;
  2. 表記憶體在唯一鍵約束的數值型欄位,但是不連續(比如雪花演算法生成的 id 以及 auto_random 生成的 id 等),這種情況在 TiDB 的超大表上出現的尤為頻繁。TiDB 的超大表一般是由原來 MySQL 的分庫分表遷移過來的,讀寫量十分巨大,業務方為了提高吞吐,一般會給主鍵加上 auto_random 的關鍵字,導致主鍵的值跨度非常大而且不連續,幾乎無法按照範圍進行資料分片;
  3. 表內有沒有唯一鍵約束的數值型欄位,強行按照非唯一鍵約束的數值型欄位分割槽,當該欄位重複資料較多時,會造成資料分片不均勻,導致 Flink job 執行時間較長或者 OOM 失敗;
  4. 表內既沒有唯一鍵約束的數值型欄位,也沒有非唯一鍵約束的數值型欄位,只能一張表一個數據分片,無法做到併發掃表,也會導致 Flink job 執行時間較長或者 OOM 失敗;

2.2 從庫與多資料中心問題

為了不將所有雞蛋放在同一個籃子裡,知乎目前是採取多資料中心的方案,資料中心按業務場景分為兩部分,第一部分是線上資料中心,主要是知乎主站的一些服務(如評論,已讀等),儲存(如 MySQL, TiDB, Redis 等)和訊息系統(如 Kafka, Pulsar 等);第二部分是離線資料中心,主要是離線分析場景的一些大資料計算引擎和儲存(如 HDFS, Yarn, Hive 等)。每個資料中心包含多個機房,做備災與容錯。

資料整合平臺在讀 MySQL 內的資料時,一般是讀從庫的資料,保證讀寫分離。其實在很多時候,使用者需要從線上資料中心的 MySQL 抽數,匯入到離線資料中心的 HDFS, 供 Hive 表查詢分析,這樣會存在著跨資料中心同步資料的情況,如果同時執行多個同步任務,專線很快就會打滿,影響其他跨專線的服務。

跨資料中心資料同步方案常見的有兩種,第一種是藉助 CDC(Change Data Capture) 的同步方案,將資料打散到每個時間點,避免出現峰值,這種方案通常需要藉助訊息系統和流式計算引擎等額外的元件,鏈路較長容易出錯,好處是可以直接落異構資料來源;第二種是同構資料庫的主從複製方案,將從庫跨資料中心部署,向從庫取數,本質上也是藉助 CDC, 好處是大部分資料庫原生支援,不需要做額外處理,簡單好維護,壞處是從庫比較浪費資源。這兩種方案不能說誰好誰壞,根據業務場景靈活選取即可,知乎內部這兩種方案都有,我們本次討論的重點是第二種同步方式。

我們將 MySQL 的從庫跨資料中心部署,資料整合平臺在進行資料同步時,會根據目標資料來源所在的資料中心來選擇 MySQL 在對應資料中心的從庫,架構圖大致如下:

同樣,資料整合平臺在讀取 TiDB 內資料時,我們也是從 TiDB 的從庫內抽數,只不過這裡的從庫是由 TiCDC 同步而來。架構圖大致如下:t

這個方案雖然解決了跨資料中心抽數的問題,但是也引入了一些新的問題。

首先是利用從庫同步的方案必定會存在成本問題,不管是 MySQL 從庫還是 TiDB 從庫,都需要在兩個資料中心分別搭建兩個同等規模的資料庫叢集,伺服器和人力成本消耗都是非常驚人的,尤其是被當作從庫的叢集, 除了在抽數的時候被用到,其他時候根本不會被業務所使用,屬於資源的極大浪費。

其次是 TiCDC 的問題,在某些極端的場景下,舊版本的 TiCDC 存在著以下問題:

  1. 早期版本 TiCDC 在執行 DDL 的時候會阻塞當前叢集所有 CDC 任務,而某些大庫的索引類 DDL 可能會執行多天,具體可參考 issue , 這是不可接受的。這個問題已經在 v4.0.15 版本被修復,現在的情況為「非同步執行 DDL 語句,不阻塞其他 changefeed」;
  2. TiCDC 在同步的過程中會和每一個 region 建立連線,單獨啟動一個 goroutine, 去監聽掃描出來的變更資料,region 多了之後 goroutine 也會變多。當 goroutine 過多時, 一些用於心跳探活的 goroutine 就難以被排程到,這時 TiCDC Owner 會認為 changefeed 掛掉了,然後重新排程任務和檢測資料,這個故障可能會重複發生,導致資料延遲。此問題預期在 v4.0.16 會得到修復。

TiCDC 修復、測試以及發版需要一定的時間,而業務方對資料同步又有比較強烈的需求,因此,我們需要重新擬定資料同步的方案。

3 資料整合平臺對 TiDB 的優化

資料整合平臺對 TiDB 的優化主要分為兩部分,首先,我們不再使用 Flink 社群提供的 flink-jdbc-connector, 而是自研了更符合 TiDB 分散式架構的 flink-tidb-connector — TiBigData ; 其次,我們對 TiDB 的部署做了一些優化,解決了從庫資源浪費的問題。

3.1 TiDB 原生 Connector - TiBigData

資料整合平臺對 TiDB 資料同步的第一個優化體現在引擎方面。

3.1.1 基於 TiKV Region 的資料分片

TiBigData 首先解決的就是 TiDB 的資料分片問題,這裡需要了解一下 TiKV Region 的概念。

TiKV Region :TiKV 將資料按照 key 的範圍劃分成大致相等的切片(這些切片就稱為 Region),每一個 Region 會有多個副本(通常是 3 個),其中一個副本是 Leader,提供讀寫服務,其他的稱為 Follower,在 3.1 版本後,TiDB 提供了 follower read 功能。TiKV 會確保每個 Region 的大小在一定的範圍內:當某個 Region 的大小超過一定限制(預設是 144MB)後,TiKV 會將它分裂為兩個或者更多個 Region;當某個 Region 因為大量的刪除請求導致 Region 的大小變得更小時,TiKV 會將比較小的兩個相鄰 Region 合併為一個。

由於 TiDB 的資料儲存在 TiKV 上,而 TiKV 具有自己天然的資料分片單位 Region. 一個很自然的想法就是我們在讀取 TiDB 內的資料時,能不能繞過 TiDB, 按照 Region 直接讀取 TiKV 內的資料,答案是肯定的, tikv-java-client 為我們提供了這個可能。最後 TiBigData 實現瞭如下功能:

  1. 利用 Region 對 TiDB 進行資料分片,每個 Flink Task 只處理一個 Region 的資料,TiKV 會保證每一個數據分片的資料量相差不大,因此不會存在資料傾斜的情況;
  2. 分片過程完全自動,無需手動填寫配置,並且支援 TiDB 任意表結構,包括無主鍵表以及 auto_random 主鍵表;
  3. 從 TiKV 內讀取資料後,鍵值對的解碼過程放在 Flink 內,節省 TiDB 寶貴的計算資源;
  4. 具有更高的併發,理論上併發數可以與 Region 數相同。

TiBigData 讀取一個 Region 的時間平均在 6 秒,我們同步的最大表約為 8T, 在 Flink 20 併發,每個 taskmanager 4G 1Core 的情況下,只需要 4 小時。

3.1.2 基於 TiKV MVCC 的 Snapshot Read

TiBigData 解決的第二個問題就是資料版本的問題,在使用 flink-jdbc-connector 時,會存在每個分片的資料版本不同的情況,無法做到全域性一致的事務讀取。但是 TiKV 是有 MVCC 相關概念的,並且 tikv-java-client 為我們提供了訪問不同版本資料的 API, 為我們實現 TiDB 的快照讀取提供了可能。TiBigData 對快照讀取的支援如下:

  1. 支援使用者自己設定版本,使用者可以讀取 TiDB 當前最新版本的資料,也可以讀取指定版本的舊資料,這裡的版本可以理解為特定時間戳;
  2. 在 JobGraph 生成時,如果使用者未設定版本,TiBigData 會將每個資料分片的版本設定為最新版本,讀取最新版本的資料。

這樣能保證所有資料分片的版本相同,可以做到分散式抽數發生在同一個事務裡。

3.2 基於 Follower Read 的 TiDB “從庫”

資料整合平臺對 TiDB 資料同步的另一部分優化體現在部署方面。TiDB 為我們的資料同步帶來了新的更優秀的方案。

TiKV 使用多副本來保證資料的安全性,使用 Raft 演算法實現了分散式環境下面資料的強一致性,一個很自然的想法就是將副本跨資料中心部署,在對 TiDB 抽數時,訪問對應資料中心的副本即可。這樣做的好處有以下幾點:

  1. 便於維護,不需要藉助 TiCDC 與 第二套 TiDB, 完全通過 TiKV 自身的 Raft 協議維護資料的一致性,沒有額外的元件運維負擔;
  2. 節約成本,相比於需要搭建第二套 TiDB 叢集,現在只需要將一個副本跨資料中心部署,無任何額外的成本;
  3. 讀寫分離,跨資料中心放置的副本可以是除 Leader 外的任何角色,只要多數副本在主要的資料中心,就不會對線上業務造成任何影響,完美實現讀寫分離。

我們在 TiBigData 內實現了 TiKV 的 Follower Read, 支援選取任意角色讀取 TiKV 內的資料,包括 Leader, Follower 和 Learner. 架構圖如下:

4 展望

目前 TiBigData 已完成的工作都是 TiDB 讀取相關,包括:

  1. Flink-TiDB-Connector;
  2. Presto-TiDB-Connector;
  3. MapReduce-TiDB-Connector.

後續我們會對 TiDB 寫入做原生的支援,目前已經有一個實驗性的 PR , 也歡迎大家一起參與進來。