解讀分散式排程平臺Airflow在華為雲MRS中的實踐

語言: CN / TW / HK

本文分享自華為雲社群《分散式排程平臺Airflow在華為雲MRS中的實踐》,作者: 啊喔YeYe 。

介紹

Airflow是一個使用Python語言編寫的分散式排程平臺,通過DAG(Directed acyclic graph 有向無環圖)來管理任務,不需要知道業務資料的具體內容,設定任務的依賴關係即可實現任務排程。其平臺擁有和Hive、Presto、MySQL、HDFS、Postgres等資料來源之間互動的能力,並且提供了hook,使其擁有很好地擴充套件性。

MapReduce服務提供租戶完全可控的企業級大資料叢集雲服務,可輕鬆執行Hadoop、Spark、HBase、Kafka、Storm等大資料元件。Airflow對接MapReduce服務後,可依靠Airflow平臺提供的命令列介面和一個基於Web的使用者介面,可以視覺化管理依賴關係、監控進度、觸發任務等

環境準備

  • 在華為雲購買彈性雲伺服器ECS,用於安裝執行Airflow,並繫結彈性公網IP,用於訪問公網安裝Airflow服務
  • 已開通MRS 3.x普通叢集
  • 彈性雲伺服器ECS的虛擬私有云和安全組需與MRS普通叢集一致,其公共映象建議選擇CentOS 8.2 64bit

安裝Airflow

1. 登入已購買的Linux彈性雲伺服器,執行以下命令升級pip版本

pip3 install --upgrade pip==20.2.4

2. 安裝Airflow以及建立登入Airflow的admin使用者

使用vim 指令碼名.sh新建指令碼,寫入如下內容並儲存,使用sh 指令碼名.sh執行指令碼,執行完成後會建立登入Airflow的admin使用者,並輸入密碼完成建立。本指令碼會完成Airflow的安裝以及建立登入Airflow的admin使用者。指令碼含義見註釋。

```

airflow needs a home, ~/airflow is the default,

but you can lay foundation somewhere else if you prefer

(optional)

export AIRFLOW_HOME=~/airflow

AIRFLOW_VERSION=2.0.1 PYTHON_VERSION="$(python3 --version | cut -d " " -f 2 | cut -d "." -f 1-2)"

For example: 3.6

CONSTRAINT_URL="http://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"

For example: http://raw.githubusercontent.com/apache/airflow/constraints-2.0.1/constraints-3.6.txt

pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"

initialize the database

airflow db init

airflow users create \ --username admin \ --firstname Peter \ --lastname Parker \ --role Admin \ --email [email protected] ```

3.執行以下命令啟動Airflow WebServer

airflow webserver --port 8080 -D

image.png

4. 執行以下命令啟動Airflow Scheduler

airflow scheduler -D

image.png

5. 訪問Airflow WebUI

在本地瀏覽器輸入“http://ECS彈性IP:8080”網址,進入Airflow WebUI登入介面

image.png

登入之後:

image.png

提交spark作業至MRS

1. 參考安裝客戶端在執行Airflow的彈性雲伺服器ECS上安裝MRS客戶端

例如安裝客戶端到/opt/client目錄下,安裝命令:

sh ./install.sh /opt/client/ -o chrony

2. 在安裝Airflow的目錄下新建目錄"dags"

如Airflow安裝目錄是“/root/airflow”,新建目錄“/root/airflow/dags”

3. 新建提交Spark作業的Python指令碼

在新建目錄下使用vim 指令碼名.py新建python指令碼並儲存,指令碼內容如下:

``` from datetime import timedelta from airflow import DAG from airflow.operators.bash import BashOperator from airflow.operators.dummy import DummyOperator from airflow.utils.dates import days_ago

args = { 'owner': 'airflow', }

dag = DAG( dag_id='spark-pi', default_args=args, start_date=days_ago(200), schedule_interval='@once', dagrun_timeout=timedelta(minutes=300), tags=['spark'], )

run_this = BashOperator( task_id='run_on_yarn',

其他元件命令參考MRS元件開發指南,將任務提交或執行命令替換到bash_command變數

bash_command='source /opt/client/bigdata_env;spark-submit --class org.apache.spark.examples.SparkPi --master yarn --deploy-mode cluster /opt/client/Spark2x/spark/examples/jars/spark-examples_2.11-2.4.5-hw-ei-302023.jar 10',

bash_command='echo "run"; echo 0 >> /tmp/test',

dag=dag, )

run_this ```

4. 登入Airflow WebUI,單擊“spark-pi”左側的image.png,然後單擊右側的image.png執行

image.png

5. 登入Manager頁面,選擇“叢集 > 服務 > Yarn > 概覽”

6. 步單擊“ResourceManager WebUI”後面對應的連結,進入Yarn的WebUI頁面,檢視Spark任務是否執行

image.png

點選關注,第一時間瞭解華為雲新鮮技術~