MapReduce原始碼分析二:ReduceTask執行流程

語言: CN / TW / HK

概述

ReduceTask是MapReduce程式reduce階段執行的具體任務,同MapTask一樣上層受MRAppMaster協調排程;內部基於ReduceContext管理各個元件。其主要功能包括:

  • 拉取Map階段的輸入 合併
  • 建立一個最終檔案的迭代器
  • 執行自定義reduce邏輯
  • 輸出

呼叫關係

程式入口: org.apache.hadoop.mapred.YarnChild#main() 呼叫流程:

mr.png

核心類說明

YarnChild

入口類,通過其main()方法啟動獨立程序。內部主要是建立了reduce階段的核心類ReduceTask,並呼叫起來其run(),真正開啟reduce階段的執行。

ReduceTask

reduce階段的核心類,初始化基本元件與ReduceContextImpl環境上下文,並呼叫Reduce主邏輯,即run()。

Reduce

核心方法為run(),封裝迴圈處理每條資料的完整流程,並基於模板方法讓用於定義reduce()環節,擴充套件程式。

Shuffle操作的上下文:ShuffleConsumerPlugin.Context

Context封裝Shuffle操作所依賴的各種基礎元件,基礎元件持有Context的引用,並通過Context來獲取其他基礎元件。這種方式,將元件之間的複雜依賴關係轉化為對單一Context的依賴,從而實現元件之間的解耦。

總體來講,使用Context封裝元件的方式,一般有兩大優點:

  • 外部與具體元件的通過Context解耦
  • 元件與元件之間通過Context解耦

ShuffleConsumerPlugin(具體實現類為Shuffle)

核心功能是通過一組Fetcher執行緒從Map端拉取資料,這個過程會根據資料量來決定將輸出封裝成InMemoryMapOutput還是OnDiskMapOutput,再通過commit()方法將封裝的物件交給MergeManagerImpl來進行排程。

最終通過MergeManagerImpl#close()獲取最終檔案的迭代器。

ShuffleSchedulerImpl

Fetcher拉取完成,通過ShuffleSchedulerImpl#copySucceed()來提交給MergeManagerImpl管理。

ShuffleSchedulerImpl還有一個重要功能,就是接收map階段任務完成的事件,進而解析事件為對應的資料結構並存儲,供Fetcher執行緒操作。

Fetcher執行緒

功能為拉取map階段的結果輸出。預設會啟動5個執行緒,並從ShuffleSchedulerImpl獲取map的位置資訊,然後進行拉取操作,拉取完成再呼叫ShuffleSchedulerImpl#copySucceed()通知ShuffleSchedulerImpl,來進行後續操作。

MergeManagerImpl

預設情況下,儲存了Map階段的兩種輸出即InMemoryMapOutput和OnDiskMapOutput的集合,並對應有兩個執行緒來處理合並過程,分別是InMemoryMerger和OnDiskMerger。

InMemoryMerger合併會將結果寫入檔案,並再次交給MergeManagerImpl排程。 OnDiskMerger執行檔案的合併,合併為新的檔案。

close()方法執行最終的合併,即finalMerge(),會將記憶體、磁碟檔案進行最終的合併,並返回迭代器。

Reducer

核心方法為run(),封裝reduce階段主要處理流程,並基於模板方法模式讓使用者定義reduce()環節,擴充套件程式。

核心程式碼: ``` public void run(Context context) throws IOException, InterruptedException { setup(context); try { while (context.nextKey()) { reduce(context.getCurrentKey(), context.getValues(), context);

        // If a back up store is used, reset it
        Iterator<VALUEIN> iter = context.getValues().iterator();
        if (iter instanceof ReduceContext.ValueIterator) {
            ((ReduceContext.ValueIterator<VALUEIN>) iter).resetBackupStore();
        }
    }
} finally {
    cleanup(context);
}

} ```

ReduceContextImpl

Reduce任務的上下文物件,封裝其他元件的呼叫,提供統一入口。

LineRecordWriter

輸出元件。