DolphinDB 函數化編程案例教程

語言: CN / TW / HK

DolphinDB支持函數化編程:函數對象可以作為高階函數的參數。這提高了代碼表達能力,可以簡化代碼,複雜的任務可以通過一行或幾行代碼完成。

本教程介紹了一些常見場景下的函數化編程案例,重點介紹 DolphinDB 的高階函數及其使用場景。

內容主要包括:

  1. 數據導入
  2. Lambda表達式
  3. 高階函數使用案例
  4. 部分應用案例
  5. 金融場景相關案例
  6. 機器學習相關案例

 

1. 數據導入

1.1 整型時間轉化為 TIME 格式並導入

CSV 數據文件中常用整數表示時間,如 “93100000” 表示 “9:31:00.000”。為了便於查詢分析,建議將這類數據轉換為時間類型,再存儲到 DolphinDB 數據庫中。

針對這種場景,可通過 loadTextEx 函數的 transform 參數將文本文件中待轉化的時間列指定為相應的數據類型。

本例中會用到 CSV 文件 candle_201801.csv,數據樣本如下:

symbol,exchange,cycle,tradingDay,date,time,open,high,low,close,volume,turnover,unixTime
000001,SZSE,1,20180102,20180102,93100000,13.35,13.39,13.35,13.38,2003635,26785576.72,1514856660000
000001,SZSE,1,20180102,20180102,93200000,13.37,13.38,13.33,13.33,867181
......

(1)建庫

用腳本創建如下分佈式數據庫(按天進行值分區):

login(`admin,`123456)
dataFilePath="/home/data/candle_201801.csv"
dbPath="dfs://DolphinDBdatabase"
db=database(dbPath,VALUE,2018.01.02..2018.01.30)

(2)建表

下面先通過 extractTextSchema 函數獲取數據文件的表結構。csv 文件中的 time 字段被識別為整型。若要將其存為 TIME 類型,可以通過 update 語句更新表結構將其轉換為 TIME 類型,然後用更新後的表結構來創建分佈式表。該分佈式表的分區列是 date 列。

schemaTB=extractTextSchema(dataFilePath)
update schemaTB set type="TIME" where name="time"
tb=table(1:0, schemaTB.name, schemaTB.type)
tb=db.createPartitionedTable(tb, `tb1, `date);
這裏通過   extractTextSchema  獲取表結構。用户也可以自定義表結構。

(3)導入數據

可以通過自定義函數 i2t 對時間列 time 進行預處理,將其轉換為 TIME 類型,並返回處理後的數據表。

def i2t(mutable t){
    return t.replaceColumn!(`time, t.time.format("000000000").temporalParse("HHmmssSSS"))
}
請注意:在自定義函數體內對數據進行處理時,請儘量使用本地的修改(以!結尾的函數)來提升性能。

調用 loadTextEx 函數導入 csv 文件的數據到分佈式表,這裏指定 transform 參數為 i2t 函數,導入時會自動應用 i2t 函數處理數據。

tmpTB=loadTextEx(dbHandle=db, tableName=`tb1, partitionColumns=`date, filename=dataFilePath, transform=i2t);

(4)查詢數據

查看錶內前 2 行數據,可以看到結果符合預期。

select top 2 * from loadTable(dbPath,`tb1);

symbol exchange cycle tradingDay date       time               open  high  low   close volume  turnover   unixTime
------ -------- ----- ---------- ---------- --------------     ----- ----- ----- ----- ------- ---------- -------------
000001 SZSE     1     2018.01.02 2018.01.02 09:31:00.000       13.35 13.39 13.35 13.38 2003635 2.678558E7 1514856660000
000001 SZSE     1     2018.01.02 2018.01.02 09:32:00.000       13.37 13.38 13.33 13.33 867181  1.158757E7 1514856720000

完整代碼如下:

login(`admin,`123456)
dataFilePath="/home/data/candle_201801.csv"
dbPath="dfs://DolphinDBdatabase"
db=database(dbPath,VALUE,2018.01.02..2018.01.30)
schemaTB=extractTextSchema(dataFilePath)
update schemaTB set type="TIME" where name="time"
tb=table(1:0,schemaTB.name,schemaTB.type)
tb=db.createPartitionedTable(tb,`tb1,`date);

def i2t(mutable t){
    return t.replaceColumn!(`time,t.time.format("000000000").temporalParse("HHmmssSSS"))
}

tmpTB=loadTextEx(dbHandle=db,tableName=`tb1,partitionColumns=`date,filename=dataFilePath,transform=i2t);
關於文本導入的相關函數和案例,可以參考   DolphinDB數據導入教程

1.2 有納秒時間戳的文本導入

本例將以整數類型存儲的納秒級數據導入為NANOTIMESTAMP類型。本例使用文本文件 nx.txt,數據樣本如下:

SendingTimeInNano#securityID#origSendingTimeInNano#bidSize
1579510735948574000#27522#1575277200049000000#1
1579510735948606000#27522#1575277200049000000#2
...

每一行記錄通過字符'#'來分隔列,SendingTimeInNano 和 origSendingTimeInNano 用於存儲納秒時間戳。

(1)建庫建表

首先定義分佈式數據庫和表,腳本如下:

dbSendingTimeInNano = database(, VALUE, 2020.01.20..2020.02.22);
dbSecurityIDRange = database(, RANGE,  0..10001);
db = database("dfs://testdb", COMPO, [dbSendingTimeInNano, dbSecurityIDRange]);

nameCol = `SendingTimeInNano`securityID`origSendingTimeInNano`bidSize;
typeCol = [`NANOTIMESTAMP,`INT,`NANOTIMESTAMP,`INT];
schemaTb = table(1:0,nameCol,typeCol);

db = database("dfs://testdb");
nx = db.createPartitionedTable(schemaTb, `nx, `SendingTimeInNano`securityID);

上述腳本創建了一個 組合分區 的數據庫,然後根據文本的字段和類型創建了表 nx。

(2)導入數據

導入數據時,使用函數 nanotimestamp,將文本中的整型轉化為 NANOTIMESTAMP 類型:

def dataTransform(mutable t){
  return t.replaceColumn!(`SendingTimeInNano, nanotimestamp(t.SendingTimeInNano)).replaceColumn!(`origSendingTimeInNano, nanotimestamp(t.origSendingTimeInNano))
}

最終通過 loadTextEx 導入數據。

完整代碼如下:

dbSendingTimeInNano = database(, VALUE, 2020.01.20..2020.02.22);
dbSecurityIDRange = database(, RANGE,  0..10001);
db = database("dfs://testdb", COMPO, [dbSendingTimeInNano, dbSecurityIDRange]);

nameCol = `SendingTimeInNano`securityID`origSendingTimeInNano`bidSize;
typeCol = [`NANOTIMESTAMP,`INT,`NANOTIMESTAMP,`INT];
schemaTb = table(1:0,nameCol,typeCol);

db = database("dfs://testdb");
nx = db.createPartitionedTable(schemaTb, `nx, `SendingTimeInNano`securityID);

def dataTransform(mutable t){
  return t.replaceColumn!(`SendingTimeInNano, nanotimestamp(t.SendingTimeInNano)).replaceColumn!(`origSendingTimeInNano, nanotimestamp(t.origSendingTimeInNano))
}

pt=loadTextEx(dbHandle=db,tableName=`nx , partitionColumns=`SendingTimeInNano`securityID,filename="nx.txt",delimiter='#',transform=dataTransform);

2. Lambda 表達式

DolphinDB 中可以創建自定義函數,可以是命名函數或者匿名函數(通常為 lambda 表達式)。

x = 1..10
each(x -> pow(x,2), x)

上例定義了一個 lambda 表達式: x -> pow(x,2),作為高階函數 each 的參數,來計算每一個元素的平方。

接下去的例子中,也會有其它的 lambda 函數案例。

3. 高階函數使用案例

3.1 cross 使用案例

3.1.1 將兩個向量或矩陣,兩兩組合作為參數來調用函數

cross 函數的偽代碼如下:

for(i:0~(size(X)-1)){
   for(j:0~(size(Y)-1)){
       result[i,j]=<function>(X[i], Y[j]);
   }
}
return result;

以計算 協方差矩陣 為例,一般需要使用兩個 for 循環計算。代碼如下:

def matlab_cov(mutable matt){
	nullFill!(matt,0.0)
	rowss,colss=matt.shape()
	msize = min(rowss, colss)
	df=matrix(float,msize,msize)
	for (r in 0..(msize-1)){
		for (c in 0..(msize-1)){
			df[r,c]=covar(matt[:,r],matt[:,c])
		}
	}
	return df
}

以上代碼雖然邏輯簡單,但是宂長,表達能力較差,且易出錯。

在DolphinDB 中可以使用高階函數 cross  pcross 計算協方差矩陣:

cross(covar, matt)

3.1.2 計算股票兩兩之間的相關性

本例中,我們使用金融大數據開放社區 Tushare 的滬深股票 日線行情 數據,來計算股票間的兩兩相關性。

首先我們定義一個數據庫和表,來存儲滬深股票日線行情數據。相關語句如下:

login("admin","123456")
dbPath="dfs://tushare"
yearRange=date(2008.01M + 12*0..22)
if(existsDatabase(dbPath)){
	dropDatabase(dbPath)
}
columns1=`ts_code`trade_date`open`high`low`close`pre_close`change`pct_change`vol`amount
type1=`SYMBOL`NANOTIMESTAMP`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE
db=database(dbPath,RANGE,yearRange)
hushen_daily_line=db.createPartitionedTable(table(100000000:0,columns1,type1),`hushen_daily_line,`trade_date)
上面的表是按照   日線行情  裏的結構説明定義的。

定義好表結構後,如需獲取對應的數據,可前往 Tushare 平台註冊賬户,獲取 TOKEN,然後參考 案例腳本 進行數據導入操作。本案例使用 DolphinDB 的 Python API 獲取數據,用户也可參考 Tushare 的説明文檔的説明文檔使用其它語言或庫。本例使用 2008 年到 2017 年的日線行情進行説明。

在計算兩兩相關性時,首先使用 exec + pivot by 生成股票回報率矩陣:

retMatrix=exec pct_change/100 as ret from daily_line pivot by trade_date, ts_code

exec  pivot by 是 DolphinDB 編程語言的特點之一。exec  select 的用法相同,但 select 語句僅可生成表, exec 語句可以生成向量。pivot by 用於重整維度,與 exec 一起使用時會生成一個矩陣。

調用高階函數 cross 生成股票兩兩相關性矩陣:

corrMatrix=cross(corr,retMatrix)

查詢和每隻股票相關性最高的 10 只股票:

syms=(exec count(*) from daily_line group by ts_code).ts_code
syms="C"+strReplace(syms, ".", "_")
mostCorrelated=select * from table(corrMatrix.columnNames() as ts_code, corrMatrix).rename!([`ts_code].append!(syms)).unpivot(`ts_code, syms).rename!(`ts_code`corr_ts_code`corr) context by ts_code having rank(corr,false) between 1:10

上面代碼中,corrMatrix是一個矩陣,需要轉化為表做進一步處理,同時新增一列表示股票代碼。使用 table 函數轉化成表後,通過 rename! 函數去修改表的列名。由於表的列名不能以數字開頭,故此例中,在 syms 前拼接了字符 "C",並將 syms 中的字符'.'轉化成'_'。

之後,對錶做 unpivot 操作,把多列的數據轉化成一列。

為了説明中間過程,我們將以上代碼拆解出一箇中間步驟:

select * from table(corrMatrix.columnNames() as ts_code, corrMatrix).rename!([`ts_code].append!(syms)).unpivot(`ts_code, syms)

這一步生成結果如下:

ts_code   valueType  value
--------- ---------- -----------------
000001.SZ C600539_SH 1
000002.SZ C600539_SH 0.581235290880416
000004.SZ C600539_SH 0.277978963095669
000005.SZ C600539_SH 0.352580116619933
000006.SZ C600539_SH 0.5056164472398
......

這樣就得到了每隻股票與其它股票的相關係數。之後又使用 rename! 來修改列名,然後通過 context by 來按照 ts_code (股票代碼)分組計算。每組中,查詢相關性最高的 10 只股票。

最終完整代碼為:

login("admin","123456")
daily_line= loadTable("dfs://tushare","hushen_daily_line")

retMatrix=exec pct_change/100 as ret from daily_line pivot by trade_date,ts_code
corrMatrix=cross(corr,retMatrix)

syms=(exec count(*) from daily_line group by ts_code).ts_code
syms="C"+strReplace(syms, ".", "_")
mostCorrelated=select * from table(corrMatrix.columnNames() as ts_code, corrMatrix).rename!([`ts_code].append!(syms)).unpivot(`ts_code, syms).rename!(`ts_code`corr_ts_code`corr) context by ts_code having rank(corr,false) between 1:10

3.2 each 使用案例

某些場景需要把函數應用到指定參數中的每個元素。若不使用函數化編程,需要使用 for 循環。DolphinDB 提供的高階函數,例如 each, peach, loop, ploop 等,可以簡化代碼。

3.2.1 獲取數據表各個列的 NULL 值個數

計算表 t 各列的 NULL 值個數,可以使用高階函數 each 

each(x->x.size() - x.count(), t.values())
在 DolphinDB 中,對於向量或矩陣,size 返回所有元素的個數,而 count 返回的是非 NULL 元素的個數。因此可以通過 size 和 count 的差值獲得 NULL 元素的個數。

其中,t.values() 返回一個tuple,每個元素為表 t 其中的一列。

3.2.2 去除表中存在 NULL 值的行

先通過如下代碼生成表 t:

sym = take(`a`b`c, 110)
id = 1..100 join take(int(),10)
id2 =  take(int(),10) join 1..100
t = table(sym, id,id2)

可用以下兩種方法實現。

第一種是直接按行處理,檢查每一行是否存在 NULL 值,若存在就去除該行。解決方案如下:

t[each(x -> !(x.id == NULL || x.id2 == NULL), t)]

需要注意的是,按行處理表時,表的每一行是一個字典對象。這裏定義了一個 lambda 表達式來檢查空值。

若列數較多,不便枚舉時,可以採用以下寫法:

t[each(x -> all(isValid(x.values())), t)]

上面代碼中,x.values 獲取了該字典所有的值,然後通過 isValid 檢查 NULL 值,最後通過 all 將結果彙總,判斷該行是否包含 NULL 值。

當數據量較大時,上述腳本運行效率較低。

DolphinDB 採用列式存儲,列操作較行操作具有更佳的性能。我們可以調用高階函數 each 對錶的每一列分別應用 isValid 函數,返回一個結果矩陣。通過 rowAnd 判斷矩陣的每一行是否存在 0 值。

代碼如下:

t[each(isValid, t.values()).rowAnd()]

當數據量很大時,可能會產生如下報錯:

The number of cells in a matrix can't exceed 2 billions.

這是因為 each(isValid, t.values()) 生成的矩陣過大。為解決該問題,可以調用 reduce 進行迭代計算,遍歷檢查每一列是否存在 NULL 值。

t[reduce(def(x,y) -> x and isValid(y), t.values(), true)]

3.2.3 按行處理與按列處理性能比較案例

下例對錶的某個字段進行如下處理:"aaaa_bbbb" 替換為 "bbbb_aaaa"。

先創建一個表 t:

t=table(take("aaaa_bbbb", 1000000) as str);

有兩種處理思路,可以按行處理或按列處理。

按行處理:

可以調用高階函數 each 遍歷每一行數據,切分後拼接。

each(x -> split(x, '_').reverse().concat('_'), t[`str])

按列處理:

pos = strpos(t[`str], "_")
substr(t[`str], pos+1)+"_"+t[`str].left(pos)

對比兩種方式的性能,可以看到使用高階函數 each 按行遍歷的時間在 2s300ms 左右,而按列處理的時間在 100ms 左右。因此按列處理性能更高。

完整代碼和測試結果如下:

t=table(take("aaaa_bbbb", 1000000) as str);

timer r = each(x -> split(x, '_').reverse().concat('_'), t[`str])

timer {
	pos = strpos(t[`str], "_")
	r = substr(t[`str], pos+1)+"_"+t[`str].left(pos)
}

3.2.4 判斷兩張表內容是否相同

判斷兩張表 t1 和 t2 的數據是否完全相同,可以使用 each 高階函數,對錶的每列進行比較。

all(each(eqObj, t1.values(), t2.values()))

3.3 loop 使用案例

3.3.1 loop 與 each 的區別

高階函數 loop  each 相似, 區別在於結果的格式和類型。

each  func 的第一個返回值數據格式和類型決定了所有返回值數據格式和類型。而 loop 沒有這樣的限制,適用於函數返回值類型不同的情況。

def parse_signals(mutable tbl_value, value){
    kvs = split(value, ',');
    d = dict(STRING, STRING);
    for(kv in kvs) {
        sp = split(kv, ':');
        d[sp[0]] = sp[1];
    }
    insert into tbl_value values(date(d[`tradingday]), d[`signal_id], d[`index], d[`underlying], d[`symbol], int(d[`volume]), int(d[`buysell]), int(d[`openclose]), temporalParse(d[`signal_time], "HHmmssSSS"));
}
tbl_value=table(100:0, [`tradingday,`signal_id,`index,`underlying,`symbol,`volume,`buysell,`openclose,`signal_time],[DATE,SYMBOL,SYMBOL,SYMBOL,SYMBOL,INT,INT,INT,TIME]);

v1="tradingday:2020.06.03,signal_id:1,index:000300,underlying:510300,symbol:10002985,volume:2,buysell:0,openclose:0,signal_time:093000120";
v2="tradingday:2020.06.04,signal_id:2,index:000500,underlying:510050,symbol:10002986,volume:3,buysell:1,openclose:1,signal_time:093100120"
parse_signals(tbl_value, v1);
each(parse_signals{tbl_value}, [v1, v2]);

上面案例中,若使用 each 函數會報錯:

Not allowed to create void vector

each 作為高階函數,會並行執行多個計算任務。第一個任務的結果類型將決定整個函數的運行結果的類型。若單個任務返回一個 scalar,那麼 each 返回一個 vector;若單個任務返回 vector,那麼 each 返回一個 matrix;若單個任務返回字典 each,那麼 each 返回一個 table。

該問題中的 parse_signals 函數沒有任何返回值(也就是返回一個 NOTHING 標量),所以 each 試圖去創建一個類型為 void 的 vector,這在 DolphinDB 中是不被允許的。

 each 替換為 looploop 返回一個 tuple,每個單獨任務的返回值作為 tuple 的一個元素。

loop(parse_signals{tbl_value}, [v1, v2]);

3.3.2 導入多個文件

假設在一個目錄下,有多個結構相同的 csv 文件,需將其導入到同一個 DolphinDB 內存表中。可以調用高階函數 loop 來實現:

loop(loadText, fileDir + "/" + files(fileDir).filename).unionAll(false)

3.4 moving/rolling 使用案例

3.4.1 moving 案例

以當前記錄的 UpAvgPrice 和 DownAvgPrice 字段值確定一個區間,取 close 字段的前 20 個數計算其是否在區間 [DownAvgPrice, UpAvgPrice] 範圍內,並統計範圍內數據的百分比。

數據如下:

以 trade_date 為 2019.06.17 的記錄的 UpAvgPrice 和 DownAvgPrice 字段確定一個區間 [11.5886533, 12.8061868],檢查該記錄的前 20 行 close 數據(即圖中標 1 這列)是否在對應區間中,若其中有 75% 的數據落在區間內,則 signal(圖中標 4)的值設為 true,否則為 false。

解決方案:

使用高階函數 moving。下例編寫自定義函數 rangeTest 對每個窗口的數據進行上述區間判斷,返回 true 或 false。

defg rangeTest(close, downlimit, uplimit){
  size = close.size() - 1
  return between(close.subarray(0:size), downlimit.last():uplimit.last()).sum() >= size*0.75
}

update t set signal = moving(rangeTest, [close, downAvgPrice, upAvgPrice], 21)
本例中,因為是計算前 20 行作為當期行的列數值,因而窗口需要包含前 20 條記錄和本條記錄,故窗口大小為 21 行。

上例調用   between  函數,來檢查每個元素是否在 a 和 b 之間(邊界包含在內)。

下例模擬行情數據,創建一個測試表 t:

t=table(rand("d"+string(1..n),n) as ts_code, nanotimestamp(2008.01.10+1..n) as trade_date, rand(n,n) as open, rand(n,n) as high, rand(n,n) as low, rand(n,n) as close, rand(n,n) as pre_close, rand(n,n) as change, rand(n,n) as pct_change, rand(n,n) as vol, rand(n,n) as amount, rand(n,n) as downAvgPrice, rand(n,n) as upAvgPrice, rand(1 0,n) as singna)

rolling 和 moving 類似,都將函數運算符應用到滑動窗口,進行窗口計算。兩者也有細微區別: rolling 可以指定步長 step,moving 的步長為 1;且兩者對空值的處理也不相同。詳情可參考 rolling 的空值處理

3.4.2 moving(sum) 和 msum 性能差距

雖然DolphinDB提供了高階函數moving,但是如果所要進行的計算可以用m系列函數(例如msum, mcount, mavg等)實現,請避免使用moving實現,這是因為m系列函數進行了優化,性能遠超moving。下面以 moving(sum) 和 msum為例:

x=1..1000000
timer moving(sum, x, 10)
timer msum(x, 10)

根據數據量的不同,msum 比 moving(sum) 計算耗時縮短 50 至 200 倍。

性能差距的主要原因如下:

  • 取數方式不同: msum 是一次性將數據讀入內存,無需為每次計算任務單獨分配內存; moving(sum) 每次計算都會生成一個子對象,每次計算都需要為子對象申請內存,計算完成後還需要進行內存回收。
  • msum 為增量計算,每次窗口計算都使用上一個窗口計算的結果。即直接加上當前窗口新合入的數據,並減去上一個窗口的第一條數據;而 moving(sum) 為全量計算,即每次計算都會累加窗口內的所有數據。

3.5 eachPre 使用案例

創建一個表 t,包含 sym 和 BidPrice 兩列:

t = table(take(`a`b`c`d`e ,100) as sym, rand(100.0,100) as bidPrice)

需要進行如下計算:

  • 1.生成新的一列 ln 用於存儲以下因子的計算結果:先計算當前的 bidPrice 值除以前 3 行 bidPrice 均值的結果(不包括當前行),然後取自然對數。
  • 2.基於列 ln,生成新列 clean 用於存儲以下因子的計算結果:計算 ln 的絕對值,若該值大於波動範圍閾值 F,則取上一條記錄的 ln 值,反之則認為當前報價正常,並保留當前的 ln 值。

根據 ln 列的因子計算規則,可以分析出該問題涉及到滑動窗口計算,窗口的大小為 3。參考 3.4.1 的 moving 案例,具體腳本如下:

t2 = select *, log(bidPrice / prev(moving(avg, bidPrice,3))) as ln from t

由於內置函數 msum,mcount  mavg  moving 高階函數有更好的性能,可以將上述腳本改寫如下:

//method 1
t2 = select *, log(bidPrice / prev(mavg(bidPrice,3))) as ln from t

//method 2
t22 = select *, log(bidPrice / mavg(prev(bidPrice),3)) as ln from t

此處調用 prev 函數獲取前一行的數據。

“先計算均值再移動結果” 和 “先移動列再計算均值” 效果等價的。唯一的區別是:表 t22 第三行會產生一個結果。

對於第二個數據處理要求,我們假設波動返回 F 為 0.02, 然後實現一個自定義函數 cleanFun 來實現其取值邏輯,如下:

F = 0.02
def cleanFun(F, x, y): iif(abs(x) > F, y, x)

這裏的參數 x 表示當前值,y 表示前一個值。然後調用高階函數 eachPre 來對相鄰元素兩兩計算,該函數等價於實現: F(X[0], pre), F(X[1], X[0]), ..., F(X[n], X[n-1])。對應腳本如下:

t2[`clean] = eachPre(cleanFun{F}, t2[`ln])

完整代碼如下:

F = 0.02
t = table(take(`a`b`c`d`e ,100) as sym, rand(100.0,100) as bidPrice)
t2 = select *, log(bidPrice / prev(mavg(bidPrice,3))) as ln from t
def cleanFun(F,x,y) : iif(abs(x) > F, y,x)
t2[`clean] = eachPre(cleanFun{F}, t2[`ln])

3.6 byRow 使用案例

計算矩陣每行最大值的下標。下例生成一個矩陣 m:

a1=2 3 4
a2=1 2 3
a3=1 4 5
a4=5 3 2
m = matrix(a1,a2,a3,a4)

一種思路是,對每行分別計算最大值的下標,可以直接調用 imax 函數實現。imax 在矩陣每列單獨計算,返回一個向量。

為求每行的計算結果,可以先對矩陣進行轉置操作,然後調用 imax 函數進行計算。

imax(m.transpose())

此外,DolphinDB 還提供了一個高階函數 byRow,對矩陣的每一行應用指定函數進行計算。使用該函數可以避免轉置操作。

byRow(imax, m)

3.7 segmentby 使用案例

高階函數 segmentby。其語法如下:

segmentby(func, funcArgs, segment)

根據 segment 參數取值確定分組方案,連續的相同值分為一組,進行分組計算。返回的結果與 segment 參數的長度相同。

x=1 2 3 0 3 2 1 4 5
y=1 1 1 -1 -1 -1 1 1 1
segmentby(cumsum,x,y);

上例中,根據 y 確定了 3 個分組:1 1 1, -1 -1 -1 和 1 1 1,由此把 x 也分為 3 組:1 2 3, 0 3 2 和 1 4 5,並將 cumsum 函數應用到 x 的每個分組,計算每個分組的累計和。

DolphinDB 還提供了內置函數 segment 用於在SQL語句中進行分組。與 segmentby 不同,它只返回分組信息,而不對分組進行計算。

下例中,將表的某列數據按照給定閾值進行分組,連續小於或大於該閾值的數據被劃分為一組。連續大於該閾值的分組將保留組內最大值對應的記錄並輸出(若有重複值則輸出第一條)。

表內容如下圖所示,當閾值為 0.3 時,希望結果保留箭頭所指記錄:

表定義如下:

dated = 2021.09.01..2021.09.12
v = 0 0 0.3 0.3 0 0.5 0.3 0.7 0 0 0.3 0
t = table(dated as date, v)

將數據按照是否連續大於 minV 來分組時,可以使用函數 segment

segment(v>= minV)

在 SQL 中配合 context by 語句進行分組計算,通過 having 子句過濾分組的最大值。過濾結果可能存在多行,根據需求只保留第一行滿足結果的數據,此時可以通過指定 limit 子句限定輸出的記錄數。

完整的 SQL 查詢語句如下:

select * from t context by segment(v>= minV) having (v=max(v) and v>=minV) limit 1

3.8 pivot 使用案例

高階函數 pivot 可以在指定的二維維度上重組數據,結果為一個矩陣。

現有包含 4 列數據的表 t1:

syms=`600300`600400`600500$SYMBOL
sym=syms[0 0 0 0 0 0 0 1 1 1 1 1 1 1 2 2 2 2 2 2 2]
time=09:40:00+1 30 65 90 130 185 195 10 40 90 140 160 190 200 5 45 80 140 170 190 210
price=172.12 170.32 172.25 172.55 175.1 174.85 174.5 36.45 36.15 36.3 35.9 36.5 37.15 36.9 40.1 40.2 40.25 40.15 40.1 40.05 39.95
volume=100 * 10 3 7 8 25 6 10 4 5 1 2 8 6 10 2 2 5 5 4 4 3
t1=table(sym, time, price, volume);
t1;

將 t1 的數據依據 time 和 sym 維度進行數據重組,並且計算每分鐘股價的加權平均值,以交易量為權重。

stockprice=pivot(wavg, [t1.price, t1.volume], minute(t1.time), t1.sym)
stockprice.round(2)

3.9 contextby 使用案例

高階函數 contextby 可以將數據根據列字段分組,並在組內調用指定函數進行計算。

sym=`IBM`IBM`IBM`MS`MS`MS
price=172.12 170.32 175.25 26.46 31.45 29.43
qty=5800 700 9000 6300 2100 5300
trade_date=2013.05.08 2013.05.06 2013.05.07 2013.05.08 2013.05.06 2013.05.07;
contextby(avg, price, sym);

contextby 亦可搭配 SQL 語句使用。下例調用 contextby 篩選出價格高於組內平均價的交易記錄:

t1=table(trade_date,sym,qty,price);
select trade_date, sym, qty, price from t1 where price > contextby(avg, price,sym);

3.10 call/unifiedCall 使用案例

對需要批量調用不同函數進行計算的場景,可以通過高階函數 call 或者 unifiedCall 配合高階函數 each/loop 實現。

call    unifiedCall  功能相同,但參數形式不同,詳情可參考用户手冊。

下例中在部分應用中調用了函數 call 函數,該部分應用將向量 [1, 2, 3] 作為固定參數,在高階函數 each 中調用函數 sin  log

each(call{, 1..3},(sin,log));

此外,還可通過元編程方式調用函數。這裏會用到funcByName。上述例子可改寫為:

each(call{, 1..3},(funcByName('sin'),funcByName('log')));

或者,使用 makeCall/makeUnifiedCall 生成元代碼,後續通過 eval 來執行:

each(eval, each(makeCall{,1..3},(sin,log)))

3.11 accumulate 使用案例

已知分鐘線數據如下,將某隻股票每成交約 150 萬股進行一次時間切分,最後得到時間窗口長度不等的若干條數據。具體的切分規則為:若某點的數據合入分組,可以縮小數據量和閾值(150 萬)間的差值,則加入該點,否則當前分組不合入該點的數據。示意圖如下:

構造測試數據如下:

timex = 13:03:00+(0..27)*60
volume = 288658 234804 182714 371986 265882 174778 153657 201388 175937 138388 169086 203013 261230 398871 692212 494300 581400 348160 250354 220064 218116 458865 673619 477386 454563 622870 458177 880992
t = table(timex as time, volume)

這裏自定義一個分組計算函數,將 volume 的累加,按上述切分規則,以 150 萬為閾值進行分組。

先定義一個分組函數,如下:

def caclCumVol(target, preResult, x){
 result = preResult + x
 if(result - target> target - preResult) return x
 else return result
}
accumulate(caclCumVol{1500000}, volume)

上述腳本通過自定義函數 caclCumVol 計算 volume 的累加值,結果最接近 150 萬時劃分分組。新的分組將從下一個 volume 值開始重新累加。對應腳本如下:

iif(accumulate(caclCumVol{1500000}, volume) ==volume, timex, NULL).ffill()

通過和 volume 比較,篩選出了每組的起始記錄。若中間結果存在空值,則調用 ffill 函數進行前值填充。將獲得的結果配合 group by 語句進行分組計算,查詢時,注意替換以上腳本的 timex 為表的 time 字段。

output = select sum(volume) as sum_volume, last(time) as endTime from t group by iif(accumulate(caclCumVol{1500000}, volume) ==volume, time, NULL).ffill() as startTime

完整代碼如下:

timex = 13:03:00+(0..27)*60
volume = 288658 234804 182714 371986 265882 174778 153657 201388 175937 138388 169086 203013 261230 398871 692212 494300 581400 348160 250354 220064 218116 458865 673619 477386 454563 622870 458177 880992
t = table(timex as time, volume)

def caclCumVol(target, preResult, x){
 result = preResult + x
 if(result - target> target - preResult) return x
 else return result
}
output = select sum(volume) as sum_volume, last(time) as endTime from t group by iif(accumulate(caclCumVol{1500000}, volume)==volume, time, NULL).ffill() as startTime

3.12 window 使用案例

對錶中的某列數據進行以下計算,如果當前數值是前 5 個數據的最低值 (包括當前值),也是後 5 個最低值 (包括當前值),那麼標記是 1,否則是 0。

創建測試表 t:

t = table(rand(1..100,20) as id)

可以通過應用窗口函數 window,指定一個前後都為 5 的數據窗口,在該窗口內通過調用 min 函數計算最小值。注意:函數 window 的窗口邊界包含在窗口中。

實現腳本如下:

select *, iif(id==window(min, id, -4:4), 1, 0) as mid from t

3.13 reduce 使用案例

上面的一些案例中,也有用到高階函數 reduce。偽代碼如下:

result=<function>(init,X[0]);
for(i:1~size(X)){
  result=<function>(result, X[i]);
}
return result;

 accumulate 返回中間結果不同,reduce 只返回最後一個結果。

例如下面的計算階乘的例子:

r1 = reduce(mul, 1..10);
r2 = accumulate(mul, 1..10)[9];

最終 r1 和 r2 的結果是一樣的。

4. 部分應用案例

部分應用是指固定一個函數的部分參數,產生一個參數較少的函數。部分應用通常應用在對參數個數有特定要求的高階函數中。

4.1 提交帶有參數的作業

假設需要一個 定時任務,每日 0 點執行,用於計算某設備前一日温度指標的最大值。

假設設備的温度信息存儲在分佈式庫 dfs://dolphindb 下的表 sensor 中,其時間字段為 ts,類型為 DATETIME。下例定義一個 getMaxTemperature 函數來實現計算過程,腳本如下:

def getMaxTemperature(deviceID){
    maxTemp=exec max(temperature) from loadTable("dfs://dolphindb","sensor")
            where ID=deviceID ,date(ts) = today()-1
    return  maxTemp
}

定義計算函數後,可通過函數 scheduleJob 提交定時任務。由於函數 scheduleJob 不提供接口供任務函數進行傳參,而自定義函數 getMaxTemperature 以設備 deviceID 作為參數,這裏可以通過部分應用來固定參數,從而產生一個沒有參數的函數。腳本如下:

scheduleJob(`testJob, "getMaxTemperature", getMaxTemperature{1}, 00:00m, today(), today()+30, 'D');

上例只查詢了設備號為 1 的設備。

最終,完整代碼如下:

def getMaxTemperature(deviceID){
    maxTemp=exec max(temperature) from loadTable("dfs://dolphindb","sensor")
            where ID=deviceID ,date(ts) = today()-1
    return  maxTemp
}

scheduleJob(`testJob, "getMaxTemperature", getMaxTemperature{1}, 00:00m, today(), today()+30, 'D');

4.2 獲取集羣其它節點作業信息

在 DolphinDB 中提交定時作業後,可通過函數 getRecentJobs 來取得本地節點上最近幾個批處理作業的狀態。如查看本地節點最近 3 個批處理作業狀態,可以用如下所示腳本實現:

getRecentJobs(3);

若想獲取集羣上其它節點的作業信息,需通過函數 rpc 來在指定的遠程節點上調用內置函數 getRecentJobs。如獲取節點別名為 P1-node1 的作業信息,可以如下實現:

rpc("P1-node1",getRecentJobs)

如需獲取節點 P1-node1 上最近 3 個作業的信息,通過如下腳本實現會報錯:

rpc("P1-node1",getRecentJobs(3))

因為 rpc 函數第二個參數需要為函數(內置函數或用户自定義函數)。這裏可以通過 DolphinDB 的部分應用,固定函數參數,來生成一個新的函數給 rpc 使用,如下:

rpc("P1-node1",getRecentJobs{3})

4.3 帶 “狀態” 的流計算消息處理函數

在流計算中,用户通常需要給定一個消息處理函數,接受到消息後進行處理。這個處理函數是一元函數或數據表。若為函數,用於處理訂閲數據,其唯一的參數是訂閲的數據,即不能包含狀態信息。

下例通過部分應用定義消息處理函數 cumulativeAverage,用於計算數據的累計均值。

定義流表 trades,對於其 price 字段,每接受一條消息,計算一次 price 的均值,並輸出到結果表 avgTable 中。腳本如下:

share streamTable(10000:0,`time`symbol`price, [TIMESTAMP,SYMBOL,DOUBLE]) as trades
avgT=table(10000:0,[`avg_price],[DOUBLE])

def cumulativeAverage(mutable avgTable, mutable stat, trade){
   newVals = exec price from trade;

   for(val in newVals) {
      stat[0] = (stat[0] * stat[1] + val )/(stat[1] + 1)
      stat[1] += 1
      insert into avgTable values(stat[0])
   }
}

subscribeTable(tableName="trades", actionName="action30", handler=cumulativeAverage{avgT,0.0 0.0}, msgAsTable=true)

自定義函數 cumulativeAverage 的參數 avgTable 為計算結果的存儲表。stat 是一個向量,包含了兩個值:其中,stat[0] 用來表示當前的所有數據的平均值,stat[1] 表示數據個數。函數體的計算實現為:遍歷數據更新 stat 的值,並將新的計算結果插入表。

訂閲流表時,通過在 handler 中固定前兩個參數,實現帶 “狀態” 的消息處理函數。

5. 金融場景相關案例

5.1 使用 map reduce,對 tick 數據降精度

下例中,使用 mr 函數(map reduce)將 tick 數據轉化為分鐘級數據。

在DolphinDB中,可以使用SQL語句基於 tick 數據計算分鐘級數據:

minuteQuotes=select avg(bid) as bid, avg(ofr) as ofr from t group by symbol,date,minute(time) as minute

但在數據量較大時,該實現效率低,耗時長。為提升性能,可以使用 DolphinDB 的分佈式計算。

Map-Reduce 函數 mr 是 DolphinDB 通用分佈式計算框架的核心功能。

完整代碼如下:

login(`admin, `123456)
db = database("dfs://TAQ")
quotes = db.loadTable("quotes")

//create a new table quotes_minute
model=select  top 1 symbol,date, minute(time) as minute,bid,ofr from quotes where date=2007.08.01,symbol=`EBAY
if(existsTable("dfs://TAQ", "quotes_minute"))
db.dropTable("quotes_minute")
db.createPartitionedTable(model, "quotes_minute", `date`symbol)

//populate data for table quotes_minute
def saveMinuteQuote(t){
minuteQuotes=select avg(bid) as bid, avg(ofr) as ofr from t group by symbol,date,minute(time) as minute
loadTable("dfs://TAQ", "quotes_minute").append!(minuteQuotes)
return minuteQuotes.size()
}

ds = sqlDS(<select symbol,date,time,bid,ofr from quotes where date between 2007.08.01 : 2007.08.31>)
timer mr(ds, saveMinuteQuote, +)

5.2 數據回放和高頻因子計算

有狀態的因子,即因子的計算不僅用到當前數據,還會用到歷史數據。實現狀態因子的計算,一般包括這幾個步驟:

  1. 保存本批次的消息數據到歷史記錄;
  2. 根據更新後的歷史記錄,計算因子
  3. 將因子計算結果寫入輸出表中。如有必要,刪除未來不再需要的的歷史記錄。

DolphinDB 的消息處理函數必須是單目函數,其唯一的參數就是當前的消息。要保存歷史狀態並在消息處理函數中計算曆史數據,可以通過部分應用實現:對於多參數的消息處理函數,保留一個參數用於接收消息,固化其它所有的參數,用於保存歷史狀態。這些固化參數只對消息處理函數可見,不受其他應用的影響。

歷史狀態可保存在內存表,字典或分區內存表中。本例將使用 DolphinDB流計算引擎 來處理 報價數據 通過字典保存歷史狀態並計算因子。如需通過內存表或分佈式內存表保存歷史狀態,可以參考 實時計算高頻因子

定義狀態因子:計算當前第一檔賣價 (askPrice1) 與 30 個報價前的第一檔賣價的比值。

對應的因子計算函數 factorAskPriceRatio 實現如下:

defg factorAskPriceRatio(x){
	cnt = x.size()
	if(cnt < 31) return double()
	else return x[cnt - 1]/x[cnt - 31]
}

導入數據創建對應的流表後,可以通過 replay 函數回放數據,模擬實時流計算的場景。

quotesData = loadText("/data/ddb/data/sampleQuotes.csv")

x=quotesData.schema().colDefs
share streamTable(100:0, x.name, x.typeString) as quotes1

由於這裏使用字典保存歷史狀態,可以定義如下字典:

history = dict(STRING, ANY)

該字典的鍵值為 STRING 類型,值為元組(tuple)類型,存儲股票字段,值為元組(tuple)類型,存儲賣價的歷史數據。

下例調用 dictUpdate! 函數更新字典,然後循環計算每隻股票的因子,並通過表存儲因子的計算結果。然後訂閲流表,通過數據回放向流表注入數據,每到來一條新數據都將觸發因子的計算。

消息處理函數定義如下:

def factorHandler(mutable historyDict, mutable factors, msg){
	historyDict.dictUpdate!(function=append!, keys=msg.symbol, parameters=msg.askPrice1, initFunc=x->array(x.type(), 0, 512).append!(x))
	syms = msg.symbol.distinct()
	cnt = syms.size()
	v = array(DOUBLE, cnt)
	for(i in 0:cnt){
	    v[i] = factorAskPriceRatio(historyDict[syms[i]])
	}
	factors.tableInsert([take(now(), cnt), syms, v])
}

參數 historyDict 為保存歷史狀態的字典,factors 是存儲計算結果的表。

完整代碼如下:

quotesData = loadText("/data/ddb/data/sampleQuotes.csv")

defg factorAskPriceRatio(x){
	cnt = x.size()
	if(cnt < 31) return double()
	else return x[cnt - 1]/x[cnt - 31]
}
def factorHandler(mutable historyDict, mutable factors, msg){
	historyDict.dictUpdate!(function=append!, keys=msg.symbol, parameters=msg.askPrice1, initFunc=x->array(x.type(), 0, 512).append!(x))
	syms = msg.symbol.distinct()
	cnt = syms.size()
	v = array(DOUBLE, cnt)
	for(i in 0:cnt){
	    v[i] = factorAskPriceRatio(historyDict[syms[i]])
	}
	factors.tableInsert([take(now(), cnt), syms, v])
}

x=quotesData.schema().colDefs
share streamTable(100:0, x.name, x.typeString) as quotes1
history = dict(STRING, ANY)
share streamTable(100000:0, `timestamp`symbol`factor, [TIMESTAMP,SYMBOL,DOUBLE]) as factors
subscribeTable(tableName = "quotes1", offset=0, handler=factorHandler{history, factors}, msgAsTable=true, batchSize=3000, throttle=0.005)

replay(inputTables=quotesData, outputTables=quotes1, dateColumn=`date, timeColumn=`time)

查看結果

select top 10 * from factors where isValid(factor)

5.3 基於字典的計算

下例創建表 orders,該表包含了一些簡單的股票信息:

orders = table(`IBM`IBM`IBM`GOOG as SecID, 1 2 3 4 as Value, 4 5 6 7 as Vol)

創建一個字典。鍵為股票代碼,值為從 orders 表中篩選出來的只包含該股票信息的子表。

字典定義如下:

historyDict = dict(STRING, ANY)

然後通過函數 dictUpdate!,來更新每個鍵的值,實現如下:

historyDict.dictUpdate!(function=def(x,y){tableInsert(x,y);return x}, keys=orders.SecID, parameters=orders, initFunc=def(x){t = table(100:0, x.keys(), each(type, x.values())); tableInsert(t, x); return t})

可以把 dictUpdate! 的執行過程理解成,針對參數 parameters 遍歷,每個 parameters 作為參數,通過 function 去更新字典 (字典的 key 由 keys 指定的)。當字典中不存在對應的 key 時,會調用 initFunc 去初始化 key 對應的值。

這個例子中,字典的 key 是股票代碼,value 是 orders 的子表。

這裏,我們使用 orders.SecID 作為 keys,在更新的函數參數中,我們定義了一個 lamda 函數將當前記錄插入到表中,如下:

def(x,y){tableInsert(x,y);return x}

注意此處使用 lamda 函數封裝了 tableInsert,而非指定 function=tableInsert。這是因為 tableInsert 的返回值不是一個 table,而是插入的條數,如果直接調用 tableInsert,在寫入第二條 IBM 對應的記錄時,會將字典中的值更新成插入的條數;寫入第三條 IBM 對應的記錄時,系統會拋出異常。

初始條件下,historyDict 未賦值,可以通過指定 initFunc 參數對字典進行初始化賦值:

def(x){
  t = table(100:0, x.keys(), each(type, x.values()));
  tableInsert(t, x);
  return t
}

最終,完整代碼如下:

orders = table(`IBM`IBM`IBM`GOOG as SecID, 1 2 3 4 as Value, 4 5 6 7 as Vol)
historyDict = dict(STRING, ANY)
historyDict.dictUpdate!(function=def(x,y){tableInsert(x,y);return x}, keys=orders.SecID, parameters=orders,
            initFunc=def(x){t = table(100:0, x.keys(), each(type, x.values())); tableInsert(t, x); return t})

執行後 historyDict 結果如下:

GOOG->
Vol Value SecID
--- ----- -----
7   4     GOOG

IBM->
Vol Value SecID
--- ----- -----
4   1     IBM
5   2     IBM
6   3     IBM

6. 機器學習相關案例

6.1 ols 殘差

創建樣本表 t 如下:

t=table(2020.11.01 2020.11.02 as date, `IBM`MSFT as ticker, 1.0 2 as past1, 2.0 2.5 as past3, 3.5 7 as past5, 4.2 2.4 as past10, 5.0 3.7 as past20, 5.5 6.2 as past30, 7.0 8.0 as past60)

計算每行數據和一個向量 benchX 的迴歸殘差,並將結果保存到新列中。

向量 benchX 如下:

benchX = 10 15 7 8 9 1 2.0

DolphinDB 提供了最小二乘迴歸函數 ols

先將表中參與計算的字段轉化成矩陣:

mt = matrix(t[`past1`past3`past5`past10`past20`past30`past60]).transpose()

然後定義殘差計算函數如下:

def(y, x) {
    return ols(y, x, true, 2).ANOVA.SS[1]
}

最後使用高階函數 each 與部分應用,對每行數據應用殘差計算函數:

t[`residual] = each(def(y, x){ return ols(y, x, true, 2).ANOVA.SS[1]}{,benchX}, mt)

完整代碼如下:

t=table(2020.11.01 2020.11.02 as date, `IBM`MSFT as ticker, 1.0 2 as past1, 2.0 2.5 as past3, 3.5 7 as past5, 4.2 2.4 as past10, 5.0 3.7 as past20, 5.5 6.2 as past30, 7.0 8.0 as past60)

mt = matrix(t[`past1`past3`past5`past10`past20`past30`past60]).transpose()
t[`residual] = each(def(y, x){ return ols(y, x, true, 2).ANOVA.SS[1]}{,benchX}, mt)

7. 總結

除了上面提到的一些函數與高階函數。DolphinDB 還提供了豐富的 函數庫,包括數學函數、統計函數、分佈相關函數、假設檢驗函數、機器學習函數、邏輯函數、字符串函數、時間函數、數據操作函數、窗口函數、高階函數、元編程、分佈式計算函數、流計算函數、定時任務函數、性能監控函數、用户權限管理函數等。