應用實踐 | Apache Doris 整合 Iceberg + Flink CDC 構建實時湖倉一體的聯邦查詢分析架構

語言: CN / TW / HK

應用實踐 | Apache Doris 整合 Iceberg + Flink CDC 構建實時湖倉一體的聯邦查詢分析架構

導讀:這是一篇非常完整全面的應用技術乾貨,手把手教你如何使用 Doris+Iceberg+Flink CDC 構建實時湖倉一體的聯邦查詢分析架構。按照本文中步驟一步步完成,完整體驗搭建操作的完整過程。

作者Apache Doris PMC 成員 張家鋒

1.概覽

這篇教程將展示如何使用 Doris+Iceberg+Flink CDC 構建實時湖倉一體的聯邦查詢分析,Doris 1.1版本提供了Iceberg的支援,本文主要展示Doris和Iceberg怎麼使用,同時本教程整個環境是都基於偽分散式環境搭建,大家按照步驟可以一步步完成。完整體驗整個搭建操作的過程。

1.1 軟體環境

本教程的演示環境如下:

  1. Centos7
  2. Apahce doris 1.1
  3. Hadoop 3.3.3
  4. hive 3.1.3
  5. Fink 1.14.4
  6. flink-sql-connector-mysql-cdc-2.2.1
  7. Apache Iceberg 0.13.2
  8. JDK 1.8.0_311
  9. MySQL 8.0.29
wget http://archive.apache.org/dist/hadoop/core/hadoop-3.3.3/hadoop-3.3.3.tar.gz
wget http://archive.apache.org/dist/hive/hive-3.1.3/apache-hive-3.1.3-bin.tar.gz
wget http://dlcdn.apache.org/flink/flink-1.14.4/flink-1.14.4-bin-scala_2.12.tgz
wget http://search.maven.org/remotecontent?filepath=org/apache/iceberg/iceberg-flink-runtime-1.14/0.13.2/iceberg-flink-runtime-1.14-0.13.2.jar
wget http://repository.cloudera.com/artifactory/cloudera-repos/org/apache/flink/flink-shaded-hadoop-3-uber/3.1.1.7.2.9.0-173-9.0/flink-shaded-hadoop-3-uber-3.1.1.7.2.9.0-173-9.0.jar

1.2 系統架構

我們整理架構圖如下

圖片

  1. 首先我們從Mysql資料中使用Flink 通過 Binlog完成資料的實時採集
  2. 然後再Flink 中建立 Iceberg 表,Iceberg的元資料儲存在hive裡
  3. 最後我們在Doris中建立Iceberg外表
  4. 在通過Doris 統一查詢入口完成對Iceberg裡的資料進行查詢分析,供前端應用呼叫,這裡iceberg外表的資料可以和Doris內部資料或者Doris其他外部資料來源的資料進行關聯查詢分析

Doris湖倉一體的聯邦查詢架構如下:

圖片

  1. Doris 通過 ODBC 方式支援:MySQL,Postgresql,Oracle ,SQLServer
  2. 同時支援 Elasticsearch 外表
  3. 1.0版本支援Hive外表
  4. 1.1版本支援Iceberg外表
  5. 1.2版本支援Hudi 外表

2.環境安裝部署

2.1 安裝Hadoop、Hive

tar zxvf hadoop-3.3.3.tar.gz
tar zxvf apache-hive-3.1.3-bin.tar.gz

配置系統環境變數

export HADOOP_HOME=/data/hadoop-3.3.3
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
export HADOOP_HDFS_HOME=$HADOOP_HOME
export HIVE_HOME=/data/hive-3.1.3
export PATH=$PATH:$HADOOP_HOME/bin:$HIVE_HOME/bin:$HIVE_HOME/conf

2.2 配置hdfs

2.2.1 core-site.xml

vi etc/hadoop/core-site.xml

<configuration>
    <property>
        <name>fs.defaultFS</name>
        <value>hdfs://localhost:9000</value>
    </property>
</configuration>

2.2.2 hdfs-site.xml

vi etc/hadoop/hdfs-site.xml

  <configuration>
    <property>
      <name>dfs.replication</name>
      <value>1</value>
    </property>
    <property>
      <name>dfs.namenode.name.dir</name>
      <value>/data/hdfs/namenode</value>
    </property>
    <property>
      <name>dfs.datanode.data.dir</name>
      <value>/data/hdfs/datanode</value>
    </property>
  </configuration>

2.2.3 修改Hadoop啟動指令碼

sbin/start-dfs.sh

sbin/stop-dfs.sh

在檔案開始加上下面的內容

HDFS_DATANODE_USER=root
HADOOP_SECURE_DN_USER=hdfs
HDFS_NAMENODE_USER=root
HDFS_SECONDARYNAMENODE_USER=root

sbin/start-yarn.sh

sbin/stop-yarn.sh

在檔案開始加上下面的內容

YARN_RESOURCEMANAGER_USER=root
HADOOP_SECURE_DN_USER=yarn
YARN_NODEMANAGER_USER=root

2.3 配置yarn

這裡我改變了Yarn的一些埠,因為我是單機環境和Doris 的一些埠衝突。你可以不啟動yarn

vi etc/hadoop/yarn-site.xml

<property>        
    <name>yarn.resourcemanager.address</name>  
    <value>jiafeng-test:50056</value> 
</property>  
<property>  
    <name>yarn.resourcemanager.scheduler.address</name> 
    <value>jiafeng-test:50057</value> 
</property> 
<property> 
    <name>yarn.resourcemanager.resource-tracker.address</name>  
    <value>jiafeng-test:50058</value> 
</property> 
<property>
    <name>yarn.resourcemanager.admin.address</name> 
    <value>jiafeng-test:50059</value> 
</property> 
<property>
    <name>yarn.resourcemanager.webapp.address</name> 
    <value>jiafeng-test:9090</value> 
</property> 
<property> 
    <name>yarn.nodemanager.localizer.address</name>
    <value>0.0.0.0:50060</value> 
</property> 
<property> 
    <name>yarn.nodemanager.webapp.address</name> 
    <value>0.0.0.0:50062</value>  
</property>

vi etc/hadoop/mapred-site.xm

<property>       
    <name>mapreduce.jobhistory.address</name>  
    <value>0.0.0.0:10020</value>  
</property> 
<property> 
    <name>mapreduce.jobhistory.webapp.address</name> 
    <value>0.0.0.0:19888</value> 
</property> 
<property> 
    <name>mapreduce.shuffle.port</name>
    <value>50061</value> 
</property>

2.2.4 啟動hadoop

sbin/start-all.sh

2.4 配置Hive

2.4.1 建立hdfs目錄

hdfs dfs -mkdir -p /user/hive/warehouse
hdfs dfs -mkdir /tmp
hdfs dfs -chmod g+w /user/hive/warehouse
hdfs dfs -chmod g+w /tmp

2.4.2 配置hive-site.xml

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
​
<configuration>
        <property>
            <name>javax.jdo.option.ConnectionURL</name>
            <value>jdbc:mysql://localhost:3306/hive?createDatabaseIfNotExist=true</value>
        </property>
        <property>
            <name>javax.jdo.option.ConnectionDriverName</name>
            <value>com.mysql.jdbc.Driver</value>
        </property>
        <property>
            <name>javax.jdo.option.ConnectionUserName</name>
            <value>root</value>
        </property>
        <property>
            <name>javax.jdo.option.ConnectionPassword</name>
            <value>MyNewPass4!</value>
        </property>
        <property>
                <name>hive.metastore.warehouse.dir</name>
                <value>/user/hive/warehouse</value>
                <description>location of default database for the warehouse</description>
        </property>
        <property>
                <name>hive.metastore.uris</name>
                <value/>
                <description>Thrift URI for the remote metastore. Used by metastore client to connect to remote metastore.</description>
        </property>
        <property>
                <name>javax.jdo.PersistenceManagerFactoryClass</name>
                <value>org.datanucleus.api.jdo.JDOPersistenceManagerFactory</value>
        </property>
        <property>
                <name>hive.metastore.schema.verification</name>
                <value>false</value>
        </property>
        <property>
                <name>datanucleus.schema.autoCreateAll</name>
                <value>true</value>
        </property>
</configuration>

2.4.3 配置 hive-env.sh

加入一下內容

HADOOP_HOME=/data/hadoop-3.3.3

2.4.4 hive元資料初始化

schematool -initSchema -dbType mysql

2.4.5 啟動hive metaservice

後臺執行

nohup bin/hive --service metaservice 1>/dev/null 2>&1 &

驗證

lsof -i:9083
COMMAND   PID USER   FD   TYPE   DEVICE SIZE/OFF NODE NAME
java    20700 root  567u  IPv6 54605348      0t0  TCP *:emc-pp-mgmtsvc (LISTEN)

2.5 安裝MySQL

具體請參照這裡:

使用 Flink CDC 實現 MySQL 資料實時入 Apache Doris

2.5.1 建立MySQL資料庫表並初始化資料

CREATE DATABASE demo;
USE demo;
CREATE TABLE userinfo (
  id int NOT NULL AUTO_INCREMENT,
  name VARCHAR(255) NOT NULL DEFAULT 'flink',
  address VARCHAR(1024),
  phone_number VARCHAR(512),
  email VARCHAR(255),
  PRIMARY KEY (`id`)
)ENGINE=InnoDB ;
INSERT INTO userinfo VALUES (10001,'user_110','Shanghai','13347420870', NULL);
INSERT INTO userinfo VALUES (10002,'user_111','xian','13347420870', NULL);
INSERT INTO userinfo VALUES (10003,'user_112','beijing','13347420870', NULL);
INSERT INTO userinfo VALUES (10004,'user_113','shenzheng','13347420870', NULL);
INSERT INTO userinfo VALUES (10005,'user_114','hangzhou','13347420870', NULL);
INSERT INTO userinfo VALUES (10006,'user_115','guizhou','13347420870', NULL);
INSERT INTO userinfo VALUES (10007,'user_116','chengdu','13347420870', NULL);
INSERT INTO userinfo VALUES (10008,'user_117','guangzhou','13347420870', NULL);
INSERT INTO userinfo VALUES (10009,'user_118','xian','13347420870', NULL);

2.6 安裝 Flink

tar zxvf flink-1.14.4-bin-scala_2.12.tgz

然後需要將下面的依賴拷貝到Flink安裝目錄下的lib目錄下,具體的依賴的lib檔案如下:

圖片

下面將幾個Hadoop和Flink裡沒有的依賴下載地址放在下面

wget http://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.2.1/flink-sql-connector-mysql-cdc-2.2.1.jar
wget http://repo1.maven.org/maven2/org/apache/thrift/libfb303/0.9.3/libfb303-0.9.3.jar
wget http://search.maven.org/remotecontent?filepath=org/apache/iceberg/iceberg-flink-runtime-1.14/0.13.2/iceberg-flink-runtime-1.14-0.13.2.jar
wget http://repository.cloudera.com/artifactory/cloudera-repos/org/apache/flink/flink-shaded-hadoop-3-uber/3.1.1.7.2.9.0-173-9.0/flink-shaded-hadoop-3-uber-3.1.1.7.2.9.0-173-9.0.jar

其他的:

hadoop-3.3.3/share/hadoop/common/lib/commons-configuration2-2.1.1.jar
hadoop-3.3.3/share/hadoop/common/lib/commons-logging-1.1.3.jar
hadoop-3.3.3/share/hadoop/tools/lib/hadoop-archive-logs-3.3.3.jar
hadoop-3.3.3/share/hadoop/common/lib/hadoop-auth-3.3.3.jar
hadoop-3.3.3/share/hadoop/common/lib/hadoop-annotations-3.3.3.jar
hadoop-3.3.3/share/hadoop/common/hadoop-common-3.3.3.jar
adoop-3.3.3/share/hadoop/hdfs/hadoop-hdfs-3.3.3.jar
hadoop-3.3.3/share/hadoop/client/hadoop-client-api-3.3.3.jar
hive-3.1.3/lib/hive-exec-3.1.3.jar
hive-3.1.3/lib/hive-metastore-3.1.3.jar
hive-3.1.3/lib/hive-hcatalog-core-3.1.3.jar

2.6.1 啟動Flink

bin/start-cluster.sh

啟動後的介面如下:

圖片

2.6.2 進入 Flink SQL Client

 bin/sql-client.sh embedded 

圖片

開啟 checkpoint,每隔3秒做一次 checkpoint

Checkpoint 預設是不開啟的,我們需要開啟 Checkpoint 來讓 Iceberg 可以提交事務。 並且,mysql-cdc 在 binlog 讀取階段開始前,需要等待一個完整的 checkpoint 來避免 binlog 記錄亂序的情況。

注意:

這裡是演示環境,checkpoint的間隔設定比較短,線上使用,建議設定為3-5分鐘一次checkpoint。

Flink SQL> SET execution.checkpointing.interval = 3s;
[INFO] Session property has been set.

2.6.3 建立Iceberg Catalog

CREATE CATALOG hive_catalog WITH (
  'type'='iceberg',
  'catalog-type'='hive',
  'uri'='thrift://localhost:9083',
  'clients'='5',
  'property-version'='1',
  'warehouse'='hdfs://localhost:8020/user/hive/warehouse'
);

檢視catalog

Flink SQL> show catalogs;
+-----------------+
|    catalog name |
+-----------------+
| default_catalog |
|    hive_catalog |
+-----------------+
2 rows in set

2.6.4 建立 Mysql CDC 表

 CREATE TABLE user_source (
    database_name STRING METADATA VIRTUAL,
    table_name STRING METADATA VIRTUAL,
    `id` DECIMAL(20, 0) NOT NULL,
    name STRING,
    address STRING,
    phone_number STRING,
    email STRING,
    PRIMARY KEY (`id`) NOT ENFORCED
  ) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = 'localhost',
    'port' = '3306',
    'username' = 'root',
    'password' = 'MyNewPass4!',
    'database-name' = 'demo',
    'table-name' = 'userinfo'
  );

查詢CDC表:

select * from user_source;

img

2.6.5 建立Iceberg表

---檢視catalog
show catalogs;
---使用catalog
use catalog hive_catalog;
--建立資料庫
CREATE DATABASE iceberg_hive; 
--使用資料庫
use iceberg_hive;
​
2.6.5.1 建立表
CREATE TABLE all_users_info (
    database_name STRING,
    table_name    STRING,
    `id`          DECIMAL(20, 0) NOT NULL,
    name          STRING,
    address       STRING,
    phone_number  STRING,
    email         STRING,
    PRIMARY KEY (database_name, table_name, `id`) NOT ENFORCED
  ) WITH (
    'catalog-type'='hive'
  );

從CDC表裡插入資料到Iceberg表裡

use catalog default_catalog;
​
insert into hive_catalog.iceberg_hive.all_users_info select * from user_source;

在web介面可以看到任務的執行情況

img

然後停掉任務,我們去查詢iceberg表

select * from hive_catalog.iceberg_hive.all_users_info

可以看到下面的結果

img

我們去hdfs上可以看到hive目錄下的資料及對應的元資料

img

我們也可以通過Hive建好Iceberg表,然後通過Flink將資料插入到表裡

下載Iceberg Hive執行依賴

 wget http://repo1.maven.org/maven2/org/apache/iceberg/iceberg-hive-runtime/0.13.2/iceberg-hive-runtime-0.13.2.jar

在hive shell下執行:

SET engine.hive.enabled=true; 
SET iceberg.engine.hive.enabled=true; 
SET iceberg.mr.catalog=hive; 
 add jar /path/to/iiceberg-hive-runtime-0.13.2.jar;

建立表

CREATE EXTERNAL TABLE iceberg_hive( 
  `id` int, 
  `name` string)
STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' 
LOCATION 'hdfs://localhost:8020/user/hive/warehouse/iceber_db/iceberg_hive'
TBLPROPERTIES (
  'iceberg.mr.catalog'='hadoop', 
'iceberg.mr.catalog.hadoop.warehouse.location'='hdfs://localhost:8020/user/hive/warehouse/iceber_db/iceberg_hive'
  ); 

然後再Flink SQL Client下執行下面語句將資料插入到Iceber表裡

INSERT INTO hive_catalog.iceberg_hive.iceberg_hive values(2, 'c');
INSERT INTO hive_catalog.iceberg_hive.iceberg_hive values(3, 'zhangfeng');

查詢這個表

select * from hive_catalog.iceberg_hive.iceberg_hive

可以看到下面的結果

img

3. Doris 查詢 Iceberg

Apache Doris 提供了 Doris 直接訪問 Iceberg 外部表的能力,外部表省去了繁瑣的資料匯入工作,並藉助 Doris 本身的 OLAP 的能力來解決 Iceberg 表的資料分析問題:

  1. 支援 Iceberg 資料來源接入Doris
  2. 支援 Doris 與 Iceberg 資料來源中的表聯合查詢,進行更加複雜的分析操作

3.1安裝Doris

這裡我們不在詳細講解Doris的安裝,如果你不知道怎麼安裝Doris請參照官方文件:快速入門

3.2 建立Iceberg外表

CREATE TABLE `all_users_info` 
ENGINE = ICEBERG
PROPERTIES (
"iceberg.database" = "iceberg_hive",
"iceberg.table" = "all_users_info",
"iceberg.hive.metastore.uris"  =  "thrift://localhost:9083",
"iceberg.catalog.type"  =  "HIVE_CATALOG"
);

引數說明:

  • ENGINE 需要指定為 ICEBERG

  • PROPERTIES 屬性:

    • iceberg.hive.metastore.uris:Hive Metastore 服務地址
    • iceberg.database:掛載 Iceberg 對應的資料庫名
    • iceberg.table:掛載 Iceberg 對應的表名,掛載 Iceberg database 時無需指定。
    • iceberg.catalog.type:Iceberg 中使用的 catalog 方式,預設為 HIVE_CATALOG,當前僅支援該方式,後續會支援更多的 Iceberg catalog 接入方式。
mysql> CREATE TABLE `all_users_info`
    -> ENGINE = ICEBERG
    -> PROPERTIES (
    -> "iceberg.database" = "iceberg_hive",
    -> "iceberg.table" = "all_users_info",
    -> "iceberg.hive.metastore.uris"  =  "thrift://localhost:9083",
    -> "iceberg.catalog.type"  =  "HIVE_CATALOG"
    -> );
Query OK, 0 rows affected (0.23 sec)
​
mysql> select * from all_users_info;
+---------------+------------+-------+----------+-----------+--------------+-------+
| database_name | table_name | id    | name     | address   | phone_number | email |
+---------------+------------+-------+----------+-----------+--------------+-------+
| demo          | userinfo   | 10004 | user_113 | shenzheng | 13347420870  | NULL  |
| demo          | userinfo   | 10005 | user_114 | hangzhou  | 13347420870  | NULL  |
| demo          | userinfo   | 10002 | user_111 | xian      | 13347420870  | NULL  |
| demo          | userinfo   | 10003 | user_112 | beijing   | 13347420870  | NULL  |
| demo          | userinfo   | 10001 | user_110 | Shanghai  | 13347420870  | NULL  |
| demo          | userinfo   | 10008 | user_117 | guangzhou | 13347420870  | NULL  |
| demo          | userinfo   | 10009 | user_118 | xian      | 13347420870  | NULL  |
| demo          | userinfo   | 10006 | user_115 | guizhou   | 13347420870  | NULL  |
| demo          | userinfo   | 10007 | user_116 | chengdu   | 13347420870  | NULL  |
+---------------+------------+-------+----------+-----------+--------------+-------+
9 rows in set (0.18 sec)

3.3 同步掛載

當 Iceberg 表 Schema 發生變更時,可以通過 REFRESH 命令手動同步,該命令會將 Doris 中的 Iceberg 外表刪除重建。

-- 同步 Iceberg 表
REFRESH TABLE t_iceberg;
​
-- 同步 Iceberg 資料庫
REFRESH DATABASE iceberg_test_db;

3.4 Doris 和 Iceberg 資料型別對應關係

支援的 Iceberg 列型別與 Doris 對應關係如下表:

ICEBERG DORIS 描述
BOOLEAN BOOLEAN
INTEGER INT
LONG BIGINT
FLOAT FLOAT
DOUBLE DOUBLE
DATE DATE
TIMESTAMP DATETIME Timestamp 轉成 Datetime 會損失精度
STRING STRING
UUID VARCHAR 使用 VARCHAR 來代替
DECIMAL DECIMAL
TIME - 不支援
FIXED - 不支援
BINARY - 不支援
STRUCT - 不支援
LIST - 不支援
MAP - 不支援

3.5 注意事項

  • Iceberg 表 Schema 變更不會自動同步,需要在 Doris 中通過 REFRESH 命令同步 Iceberg 外表或資料庫。
  • 當前預設支援的 Iceberg 版本為 0.12.0,0.13.x,未在其他版本進行測試。後續後支援更多版本。

3.6 Doris FE 配置

下面幾個配置屬於 Iceberg 外表系統級別的配置,可以通過修改 fe.conf 來配置,也可以通過 ADMIN SET CONFIG 來配置。

  • iceberg_table_creation_strict_mode

    建立 Iceberg 表預設開啟 strict mode。 strict mode 是指對 Iceberg 表的列型別進行嚴格過濾,如果有 Doris 目前不支援的資料型別,則建立外表失敗。

  • iceberg_table_creation_interval_second

    自動建立 Iceberg 表的後臺任務執行間隔,預設為 10s。

  • max_iceberg_table_creation_record_size

    Iceberg 表建立記錄保留的最大值,預設為 2000. 僅針對建立 Iceberg 資料庫記錄。

4. 總結

這裡Doris On Iceberg我們只演示了Iceberg單表的查詢,你還可以聯合Doris的表,或者其他的ODBC外表,Hive外表,ES外表等進行聯合查詢分析,通過Doris對外提供統一的查詢分析入口。

自此我們完整從搭建Hadoop,hive、flink 、Mysql、Doris 及Doris On Iceberg的使用全部介紹完了,Doris朝著資料倉庫和資料融合的架構演進,支援湖倉一體的聯邦查詢,給我們的開發帶來更多的便利,更高效的開發,省去了很多資料同步的繁瑣工作,快快來體驗吧。 最後,歡迎更多的開源技術愛好者加入 Apache Doris 社群,攜手成長,共建社群生態。

圖片

圖片

圖片

SelectDB 是一家開源技術公司,致力於為 Apache Doris 社群提供一個由全職工程師、產品經理和支援工程師組成的團隊,繁榮開源社群生態,打造實時分析型資料庫領域的國際工業界標準。基於 Apache Doris 研發的新一代雲原生實時數倉 SelectDB,運行於多家雲上,為使用者和客戶提供開箱即用的能力。

相關連結:

SelectDB 官方網站:

http://selectdb.com (We Are Coming Soon)

Apache Doris 官方網站:

http://doris.apache.org

Apache Doris Github:

http://github.com/apache/doris

Apache Doris 開發者郵件組:

[email protected]

二維碼.png