Java UDF 的設計與使用介紹,兼容 Hive UDF 實現數據快速遷移

語言: CN / TW / HK

作者介紹: 李仕楊,SelectDB 生態研發工程師,Apache Doris Contributor。

我們在使用各個 SQL 引擎時,會遇到紛繁複雜的查詢需求。一部分可以通過引擎自帶的內置函數去解決,但內置函數往往具有一定通用性,在部分特殊場景下內置函數可能無法滿足需求,所以一般 SQL 引擎會提供 UDF 功能,方便用户通過自己寫邏輯來滿足特定的需求,Apache Doris 也不例外。

在 Java UDF 之前,Apache Doris 提供了原生 UDF 。由於是使用 C++ 來編寫的,執行效率高、速度更快,但是在實際使用中也會存在一些問題:

  • 跟 Doris 代碼耦合度高,需要自己打包編譯 Doris 源碼
  • 只支持 C++ 語言並且 UDF 代碼出錯會影響 Doris 集羣穩定性
  • 對於只熟悉 Hive、Spark 等大數據組件的用户有一定使用門檻

由上可知,原生的 UDF 實現起來門檻較高且存在一定的不穩定性因素。那麼是否有一種實現相對簡單、使用門檻較低且與 Doris 代碼耦合度低的 UDF 呢?

答案是有的。在 2022 年 12 月正式發佈的 Apache Doris 1.2.0 版本(http://github.com/apache/doris/releases)中,我們推出了全新的 Java UDF 和 Remote UDF 功能,其中 Java UDF 不僅能滿足以上要求,並且在方便和安全角度為用户帶來了全新體驗:

  • 不熟悉 C++?Java 代碼一樣可以實現自己的 UDF
  • 使用條件苛刻?只要有 Jar 包就能使用
  • 擔心穩定性?Java UDF 出錯隻影響自身,對 Doris 的穩定性幾乎無影響
  • 遷移舊大數據平台的數據和 UDF 費時費力?Java UDF 完全兼容 Hive UDF,輕鬆實現快速遷移
  • .......

設計思路

大體步驟

Apache Doris 的 BE 是由 C++ 代碼編寫,如果想在 Doris 中實現 Java UDF,不可避免需要調用 JNI,而不正確的 JNI 調用將導致嚴重的性能問題。那麼該如何設計 Java UDF 以解決這個問題呢?

Doris Java UDF 針對向量化引擎,其設計思路大體如下:

  • 首先,制定用户在創建 UDF 時必須遵循的一些規則。 例如,UDF 類必須具有 Evaluate 方法,並且必須是 Public 和 Non-Static 的。 這些規則確保我們可以正確調用 UDF。
  • 其次,Doris 查詢引擎會執行一個新的 Java 函數調用,BE 會創建或重用一個 JVM 來調用真正的 Java UDF。 為了隔離不同的 UDF 實例,選擇使用不同的類加載器來加載 UDF。
  • 最後,由於執行時是向量化的,因此可以實現一次執行多行數據只調用一次 JNI,原因是 JNI 開銷被輸入列中所有行分攤了[1], 這將給用户帶來更好的性能體驗。

詳細步驟

熟悉 Java 的朋友應該都知道,JVM 在直接內存即非堆區的 IO 操作比堆區更高效,因此 Doris Java UDF 一般是在直接內存中對數據進行 IO 操作。通常 UDF 有以下幾種情況:

  • 一般 UDF (定長UDF)

    • 此處的基本思想是傳遞直接指向輸入緩衝區和輸出緩衝區的地址,Doris 可以直接從所給地址中讀取和寫回數據,這可以幫助 Doris 避免不必要的數據拷貝。 Input Buffer 和 Output Buffer 都是 JVM 的堆外內存,可以直接通過J ava 的 API 來操作這部分內存。
    • 整體執行模式如下圖:

img

  • UDAF(變長輸出)

    • 對於一般 UDF 來説,輸出大小和類型是不變,因此所需 Buffer 大小也是確定的,而對於 UDAF(變長輸出),一般 UDF(定長 UDF)的步驟將不再適用。
    • 因此* *需要做出如下改變:**在第 1 步分配一個初始緩衝區,當結果大於分配的初始緩衝區時跳到第 3 步,在第 3 步中進行一次擴容。當這種情況持續發生時,我們再次重複上述步驟分配新緩衝區並繼續為剩餘的行執行 UDF,直到所有數據都執行完成。
    • 執行過程如下圖所示:

img

通過上文介紹,我們基本瞭解了 Doris Java UDF 的執行情況,那麼在實際生產中應該如何來使用Java UDF呢?

Java UDF 的使用

Java UDF 使用起來非常簡單。Java UDF 在 Doris 內註冊完成後,Doris 執行時通過調用 jar 包來實現 UDF 邏輯。順序結構如下圖:

img

具體步驟:

  1. 參考``doris/samples/doris-demo/java-udf-demo/src/main/java/org/apache/doris/udf/AddOne.java 文件,編寫 UDF 邏輯,你可以像 Hive UDF 一樣在任何地方進行編寫和打包,不必跟 Doris 環境相關聯。
  • AddOne.java文件內容如下:
  • // Licensed to the Apache Software Foundation (ASF) under one
    // or more contributor license agreements.  See the NOTICE file
    // distributed with this work for additional information
    // regarding copyright ownership.  The ASF licenses this file
    // to you under the Apache License, Version 2.0 (the
    // "License"); you may not use this file except in compliance
    // with the License.  You may obtain a copy of the License at
    //
    //   http://www.apache.org/licenses/LICENSE-2.0
    //
    // Unless required by applicable law or agreed to in writing,
    // software distributed under the License is distributed on an
    // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    // KIND, either express or implied.  See the License for the
    // specific language governing permissions and limitations
    // under the License.
    ​
    package org.apache.doris.udf;
    ​
    import org.apache.hadoop.hive.ql.exec.UDF;
    ​
    public class AddOne extends UDF {
        public Integer evaluate(Integer value) {
            return value == null? null: value + 1;
        }
    }
    
  1. 執行 mvn 打包命令

       mvn clean package
    
  2. 創建 UDF

      CREATE FUNCTION java_udf_name(int) RETURNS int PROPERTIES (
          "file"="file:///path/to/your_jar_name.jar",
          "symbol"="org.apache.doris.udf.AddOne",
          "always_nullable"="true",
          "type"="JAVA_UDF"
      );
  1. 使用已創建的 UDF
           建表:
           CREATE TABLE IF NOT EXISTS test.t1 (`col_1` int NOT NULL)
           DISTRIBUTED BY HASH(col_1) PROPERTIES("replication_num" = "1");
           
           
           插入數據:
           insert into test.t1 values(1),(2);
           
           
           使用udf:
           MySQL [(none)]> select col_1, java_udf_name(col_1) as col_2 from test.t1;
           +------------+------------+
           | col_1      | col_2      |
           +------------+------------+
           | 1          | 2          |
           | 2          | 3          |
           +------------+------------+

至此,Doris Java UDF 的創建和使用就完成了,十分簡單易用。

注意事項

  • 最開始需要確定 BE 節點是否配置了JAVA_HOME,如果環境變量沒有配置,則可以在be/bin/start_be.sh文件第一行加上
export JAVA_HOME=/xxx/xxx
  • UDF 代碼中必須要帶有以下信息(UDAF 則替換成對應的)
import org.apache.hadoop.hive.ql.exec.UDF;
  • 創建 Doris java UDF 的語句,其格式如下
CREATE FUNCTION name ([,...])
[RETURNS] rettype
PROPERTIES (["key"="value"][,...])  
  • 例子中完整的 SQL 如下
 CREATE FUNCTION java_udf_name(int) RETURNS int PROPERTIES (
"file"="file:///path/to/your_jar_name.jar",
"always_nullable"="true",
"type"="JAVA_UDF"
        );

java_udf_name 是創建 UDF 的名稱,可以進行更改,UDF 名稱不能與 Doris 其他函數重命。

名稱後的``(int)``表示函數輸入參數是 int 類型,RETURNS``後的``int``表示函數輸出也是 int 類型;輸入輸出類型跟 Java 代碼中 Evaluate 函數的輸入輸出類型要保持一致。

  • PROPERTIES

file表示 jar 包在本機的路徑,應該修改"/path/to/your_jar_name.jar"``作為 jar 包的絕對路徑。如果是多機環境,也可以使用 http 形式表示的路徑,例如"file"="http://${host}:${http_port}/${your_jar_file}"

可以使用python命令來簡單啟動一個http server:
nohup python -m SimpleHTTPServer 12345 > /dev/null 2>&1
(啟動python的目錄需要跟你的jar包保持一致,比如你的jar放在A機器的/usr/lib下,那麼python命令最好也在該機器的該目錄下啟動)

Symbol 可以參考 Java 代碼中的 Package always_nullable表示 UDF 返回結果中是否可能出現 NULL 值,如果想要在計算中對出現的NULL值有特殊處理,以確定結果中不會返回 NULL,可以設為 false,有利於提升整個查詢計算過程的性能。

收益總結

通過本文的介紹,瞭解了Doris Java UDF 的設計與使用方法,那麼在實際的應用中,Doris Java UDF 能為使用者帶來什麼收益呢?

  • 熟悉 Java 的同學也可以快速上手開發 Doris,使用簡單便捷,較大提升開發效率。

  • 兼容 Hive UDF,有效降低從 Hadoop 遷移數據的成本。

  • UDF 代碼出錯並不會影響 Doris,某種程度上保證了 Doris 的更好穩定運行。

  • 與 Doris 代碼解耦,真正做到了"Write Once Run Anywhere"

  • 執行效率方面,Java UDF 是完全向量化執行的,一次執行多行數據只調用一次 JNI,結合堆外內存、Zero Copy 等優化技術,用户在使用 Java UDF 時,也能得到與之前的 C++ UDF 一致甚至更佳的查詢性能體驗。

社區貢獻

如果你的 UDF 已被許多場景應用,可以將 UDF 貢獻到 Apache Doris 社區。貢獻步驟可參考:http://doris.incubator.apache.org/zh-CN/docs/dev/ecosystem/udf/contribute-udf

需要注意的是,Doris BE 端是由 C++ 代碼實現的,因此你所貢獻的內置 UDF 也需要由C++代碼實現。Apache Doris 社區期待你的加入!

本文引用

[1] Viktor Rosenfeld, René Müller, Pinar Tözün, etc. Processing Java UDFs in a C++ environment. SoCC 2017: 419-431.

[2] Marcel Kornacker, Alexander Behm, Victor Bittorf, etc. Impala: A Modern, Open-Source SQL Engine for Hadoop. CIDR 2015.

[3]DSIP-001: Java UDF: http://cwiki.apache.org/confluence/display/DORIS/DSIP-001%3A+Java+UDF

[4]Apache Doris GitHub: http://github.com/apache/doris

END

「其他文章」