MapReduce原始碼分析二:ReduceTask執行流程
概述
ReduceTask是MapReduce程式reduce階段執行的具體任務,同MapTask一樣上層受MRAppMaster協調排程;內部基於ReduceContext管理各個元件。其主要功能包括:
- 拉取Map階段的輸入 合併
- 建立一個最終檔案的迭代器
- 執行自定義reduce邏輯
- 輸出
呼叫關係
程式入口:
org.apache.hadoop.mapred.YarnChild#main()
呼叫流程:
核心類說明
YarnChild
入口類,通過其main()方法啟動獨立程序。內部主要是建立了reduce階段的核心類ReduceTask,並呼叫起來其run(),真正開啟reduce階段的執行。
ReduceTask
reduce階段的核心類,初始化基本元件與ReduceContextImpl環境上下文,並呼叫Reduce主邏輯,即run()。
Reduce
核心方法為run(),封裝迴圈處理每條資料的完整流程,並基於模板方法讓用於定義reduce()環節,擴充套件程式。
Shuffle操作的上下文:ShuffleConsumerPlugin.Context
Context封裝Shuffle操作所依賴的各種基礎元件,基礎元件持有Context的引用,並通過Context來獲取其他基礎元件。這種方式,將元件之間的複雜依賴關係轉化為對單一Context的依賴,從而實現元件之間的解耦。
總體來講,使用Context封裝元件的方式,一般有兩大優點:
- 外部與具體元件的通過Context解耦
- 元件與元件之間通過Context解耦
ShuffleConsumerPlugin(具體實現類為Shuffle)
核心功能是通過一組Fetcher執行緒從Map端拉取資料,這個過程會根據資料量來決定將輸出封裝成InMemoryMapOutput還是OnDiskMapOutput,再通過commit()方法將封裝的物件交給MergeManagerImpl來進行排程。
最終通過MergeManagerImpl#close()獲取最終檔案的迭代器。
ShuffleSchedulerImpl
Fetcher拉取完成,通過ShuffleSchedulerImpl#copySucceed()來提交給MergeManagerImpl管理。
ShuffleSchedulerImpl還有一個重要功能,就是接收map階段任務完成的事件,進而解析事件為對應的資料結構並存儲,供Fetcher執行緒操作。
Fetcher執行緒
功能為拉取map階段的結果輸出。預設會啟動5個執行緒,並從ShuffleSchedulerImpl獲取map的位置資訊,然後進行拉取操作,拉取完成再呼叫ShuffleSchedulerImpl#copySucceed()通知ShuffleSchedulerImpl,來進行後續操作。
MergeManagerImpl
預設情況下,儲存了Map階段的兩種輸出即InMemoryMapOutput和OnDiskMapOutput的集合,並對應有兩個執行緒來處理合並過程,分別是InMemoryMerger和OnDiskMerger。
InMemoryMerger合併會將結果寫入檔案,並再次交給MergeManagerImpl排程。 OnDiskMerger執行檔案的合併,合併為新的檔案。
close()方法執行最終的合併,即finalMerge(),會將記憶體、磁碟檔案進行最終的合併,並返回迭代器。
Reducer
核心方法為run(),封裝reduce階段主要處理流程,並基於模板方法模式讓使用者定義reduce()環節,擴充套件程式。
核心程式碼: ``` public void run(Context context) throws IOException, InterruptedException { setup(context); try { while (context.nextKey()) { reduce(context.getCurrentKey(), context.getValues(), context);
// If a back up store is used, reset it
Iterator<VALUEIN> iter = context.getValues().iterator();
if (iter instanceof ReduceContext.ValueIterator) {
((ReduceContext.ValueIterator<VALUEIN>) iter).resetBackupStore();
}
}
} finally {
cleanup(context);
}
} ```
ReduceContextImpl
Reduce任務的上下文物件,封裝其他元件的呼叫,提供統一入口。
LineRecordWriter
輸出元件。