如何優雅的升級 Flink Job?

語言: CN / TW / HK

theme: nico

Flink 作為有狀態計算的流批一體分散式計算引擎,會在執行過程中儲存很多的「狀態」資料,並依賴這些資料完成任務的 Failover 以及任務的重啟恢復。

那麼,請思考一個問題:如果程序升級迭代調整了這些「狀態」的資料結構以及型別,Flink 能不能從舊的「狀態」檔案(一般就是 Savepoint 檔案)中恢復?

資料型別

上一篇我們介紹過 Flink 內建的一些用於狀態儲存的集合工具,如 ValueState、ListState、MapState 等。這些只是裝資料的容器,具體能儲存哪些型別的資料或許你還不清楚。

實際上,Flink 支援以下一些資料型別:

內建型別狀態資料結構更新

Flink 中預設提供對一些特定條件下的狀態資料結構升級的自動相容,無需使用者介入。

POJO 型別

Flink 基於下面的規則來支援 POJO 型別結構的升級: - 可以刪除欄位。一旦刪除,被刪除欄位的前值將會在將來的 checkpoints 以及 savepoints 中刪除。 - 可以新增欄位。新欄位會使用型別對應的預設值進行初始化,比如 Java 型別。 - 不可以修改欄位的宣告型別。 - 不可以改變 POJO 型別的類名,包括類的名稱空間。

其中,比較重要的是,對於一個 POJO 物件的某些欄位的型別修改是不被支援的,因為 Savepoint 檔案是按照二進位制位緊湊儲存的,不同型別佔用的 bit 位長度是不一樣的。

按照目前的 Flink 內建支援能力,最多對於 POJO 型別增加或者刪除欄位等基本操作。

Avro 型別

Avro 的 Schema 用 JSON 表示。Schema 定義了簡單資料型別和複雜資料型別。Flink 完全支援 Avro 的 Schema 升級。

因為 Avro 本身就是一個高效能的資料序列化框架,它使用JSON 來定義資料型別和通訊協議,使用壓縮二進位制格式來序列化資料。

Flink 中相當於藉助它完成資料的序列化和反序列化,那麼理論上只要使用者的 Schema 升級是 Avro 支援的,那麼 Flink 也是完全支援的。

非內建型別狀態資料結構更新

除了上述兩種 Flink 內建支援的兩種型別外,其餘所有型別均不支援 Schema 升級。那麼我們就只有通過自定義狀態序列化器來完成對狀態 Schema 升級的相容。

序列化反序列化的流程

HashMapStateBackend 這種基於記憶體的狀態後端和 EmbeddedRocksDBStateBackend 這種基於 RocksDb 的狀態後端的序列化與反序列化流程稍有不同。

基於記憶體狀態後端的序列化反序列化流程:

  • Job的相關狀態的資料是以Object的形式儲存在JVM記憶體堆中
  • 通過Checkpoint/Savepoint機制將記憶體中的狀態資料序列化到外部儲存介質
  • 新序列化器反序列化的時候會通過舊的序列化器反序列化資料到記憶體
  • 基於記憶體中狀態更新後再通過新序列化器序列化資料到外部儲存介質

基於RocksDb狀態後端的序列化反序列化流程:

  • Job的相關狀態資料直接經過序列化器序列化好儲存在JVM堆外記憶體中
  • 通過Checkpoint/Savepoint機制將記憶體中序列化好的資料原樣傳輸到外部儲存介質
  • 新序列化器反序列化的時候會從外部介質直接讀取狀態資料到記憶體(不做反序列化操作)
  • 對於使用到的狀態資料會使用舊序列化器先反序列化,再修改,再使用新序列化器序列化

其中,對於後面兩個步驟與記憶體狀態後端是有區別的,相當於是一種 lazy 的模式,只有用到才會去反序列化。

舉個例子:狀態中有個 KeyedState(我們知道每個Key會對應一個狀態),那麼如果某些 Key 的狀態資料恢復到記憶體後沒有被程式使用或者更新,那麼下一次序列化的時候就不會使用新序列化器操作。

那麼,結果就是:對於一個 Job 的 Checkpoint/Savepoint 檔案裡是存在多個版本的。這也是待會兒要提到的,對於每一次序列化都會把序列化器的相關配置以快照的形式和資料一起儲存,這樣才保證了多個版本狀態資料存在的可能。

演示一個 Schema 升級狀態恢復失敗的 Demo

模擬一個訂單系統上報資料的場景,計算每十秒系統的訂單量以及下單數最多的使用者

1、自定義一個 SourceFunction 模擬上游源源不斷產生資料: ``` public class MakeDataSource extends RichSourceFunction {

private boolean flag = true;

@Override
public void run(SourceContext<OrderModel> sourceContext) throws Exception {
    List<String> userIdSet = Arrays.asList("joha", "nina", "gru", "andi");
    Random random = new Random();
    while(flag){
        OrderModel order = new OrderModel();
        order.setCreateTs(System.currentTimeMillis());
        order.setOrderId(UUID.randomUUID().toString());
        order.setUserId(userIdSet.get(random.nextInt(4)));
        sourceContext.collect(order);
    }
}
@Override
public void cancel() {
    flag = false;
}

} 2、寫一個 job 每十秒聚合一次視窗,輸出使用者產生的訂單數量: public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(1000 * 60); env.getCheckpointConfig().setCheckpointStorage("file:///data/");

    env.addSource(new MakeDataSource())
            .assignTimestampsAndWatermarks(
                    WatermarkStrategy.<OrderModel>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                            .withTimestampAssigner((SerializableTimestampAssigner<OrderModel>) (orderModel, l) -> orderModel.getCreateTs())
            )
            .keyBy((KeySelector<OrderModel, String>) orderModel -> orderModel.getUserId())
            .window(TumblingEventTimeWindows.of(Time.seconds(10)))
    .process(new ProcessWindowFunction<OrderModel, Tuple2<Long,String>, String, TimeWindow>() {
        @Override
        public void process(String key, Context context, Iterable<OrderModel> elements, Collector<Tuple2<Long, String>> out) throws Exception {
            Iterator<OrderModel> iterator = elements.iterator();
            int count = 0;
            while (iterator.hasNext()){
                iterator.next();
                count++;
            }
            logger.info("userid:{},count:{}",key,count);
        }
    });

    env.execute("test_job");
}

``` WebUi 上看大概就是這個樣子,由於我是本地 docker 起的叢集,所以資源不是特別充足,並行度都是 1。

我們執行以下命令停止任務並生成 Savepoint: ``` root@ae894850e6ae:/opt/flink# flink stop 613c3662a4a4f5affa8eb8fb04bf4592 Suspending job "613c3662a4a4f5affa8eb8fb04bf4592" with a savepoint. Savepoint completed. Path: file:/data/flink-savepoints/savepoint-613c36-3f0f01590c70

``` 然後我們給我們的狀態物件 OrderModel 的 orderId 欄位型別從 String 給他改成 Integer。

再指定 Savepoint 重啟 Job,不出意外的話,你應該也會得到這麼個錯誤:

自定義狀態序列化器

1、需要繼承 TypeSerializer 並實現其中相關方法,其中比較重要的有這麼幾個:

//建立一個待序列化的資料型別例項 public abstract T createInstance(); //序列化操作 public abstract void serialize(T record, DataOutputView target) throws IOException; //反序列化操作 public abstract T deserialize(DataInputView source) throws IOException; //這個比較重要,用於對序列化器進行快照儲存 public abstract TypeSerializerSnapshot<T> snapshotConfiguration(); 2、對於 TypeSerializerSnapshot 來說它實際上就是提供了對序列化器的快照儲存以及版本相容處理,核心方法有這麼幾個: //把當前序列化器以二進位制格式和資料寫到一起 void writeSnapshot(DataOutputView out) throws IOException; //從當前輸入流中,讀出序列化器,一般會有一個類私有變數來儲存 void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLoader)throws IOException; //檢驗當前序列化器是否能相容之前版本 TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(TypeSerializer<T> newSerializer); //重置當前序列化器為之前的一個序列化器 TypeSerializer<T> restoreSerializer(); 這裡面有兩個比較核心,resolveSchemaCompatibility 傳入一個新的序列化器,然後判斷這個序列化器是否能夠相容反序列化之前版本序列化器序列化的資料,TypeSerializerSchemaCompatibility.type 列舉定義了可返回的型別: enum Type { //相容,並且今後使用使用者新定義的 Serializer COMPATIBLE_AS_IS //不相容,需要重置之前序列化器反序列化後再使用新序列化器序列化 COMPATIBLE_AFTER_MIGRATION //相容,需要返回一個reconfiguredNewSerializer,替換傳入的序列化器 COMPATIBLE_WITH_RECONFIGURED_SERIALIZER //不相容,作業拋異常退出 INCOMPATIBLE }

那麼對於我們上面的案例,把 POJO 中欄位型別從 String 改成 Integer 的情況,其實只要重寫 TypeSerializerSnapshot.resolveSchemaCompatibility 方法,返回 COMPATIBLE_AFTER_MIGRATION 型別,然後再 resolveSchemaCompatibility 中返回上一個版本的序列化器(可以反序列化String)即可。

限於篇幅就不再演示了,歡迎交流!

本文所有測試來自本地 docker 起的 Flink session 叢集,如有需要 docker-compose.yml 檔案的可以公眾號回覆「flink-docker」領取

我正在參與掘金技術社群創作者簽約計劃招募活動,點選連結報名投稿