以 Serverless 的方式實現 Kubernetes 日誌吿警
當我們將容器的日誌收集到消息服務器之後,我們該如何處理這些日誌?部署一個專用的日誌處理工作負載可能會耗費多餘的成本,而當日志體量驟增、驟降時亦難以評估日誌處理工作負載的待機數量。本文提供了一種基於 Serverless 的日誌處理思路,可以在降低該任務鏈路成本的同時提高其靈活性。
我們的大體設計是使用 Kafka 服務器作為日誌的接收器,之後以輸入 Kafka 服務器的日誌作為事件,驅動 Serverless 工作負載對日誌進行處理。據此的大致步驟為:
-
搭建 Kafka 服務器作為 Kubernetes 集羣的日誌接收器
-
部署 OpenFunction 為日誌處理工作負載提供 Serverless 能力
-
編寫日誌處理函數,抓取特定的日誌生成吿警消息
-
配置 Notification Manager[1] 將吿警發送至 Slack
在這個場景中,我們會利用到 OpenFunction[2] 帶來的 Serverless 能力。
OpenFunction[3] 是 KubeSphere 社區開源的一個 FaaS(Serverless)項目,旨在讓用户專注於他們的業務邏輯,而不必關心底層運行環境和基礎設施。該項目當前具備以下關鍵能力:
- 支持通過 dockerfile 或 buildpacks 方式構建 OCI 鏡像
- 支持使用 Knative Serving 或 OpenFunctionAsync ( KEDA + Dapr ) 作為 runtime 運行 Serverless 工作負載
- 自帶事件驅動框架
使用 Kafka 作為日誌接收器
首先,我們為 KubeSphere 平台開啟 logging 組件(可以參考 啟用可插拔組件[3] 獲取更多信息)。然後我們使用 strimzi-kafka-operator[5] 搭建一個最小化的 Kafka 服務器。
-
在 default 命名空間中安裝 strimzi-kafka-operator[6] :
helm repo add strimzi http://strimzi.io/charts/ helm install kafka-operator -n default strimzi/strimzi-kafka-operator
-
運行以下命令在 default 命名空間中創建 Kafka 集羣和 Kafka Topic,該命令所創建的 Kafka 和 Zookeeper 集羣的存儲類型為 ephemeral,使用 emptyDir 進行演示。
注意,我們此時創建了一個名為 “logs” 的 topic,後續會用到它
cat <<EOF | kubectl apply -f -
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: kafka-logs-receiver
namespace: default
spec:
kafka:
version: 2.8.0
replicas: 1
listeners:
- name: plain
port: 9092
type: internal
tls: false
- name: tls
port: 9093
type: internal
tls: true
config:
offsets.topic.replication.factor: 1
transaction.state.log.replication.factor: 1
transaction.state.log.min.isr: 1
log.message.format.version: '2.8'
inter.broker.protocol.version: "2.8"
storage:
type: ephemeral
zookeeper:
replicas: 1
storage:
type: ephemeral
entityOperator:
topicOperator: {}
userOperator: {}
---
apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaTopic
metadata:
name: logs
namespace: default
labels:
strimzi.io/cluster: kafka-logs-receiver
spec:
partitions: 10
replicas: 3
config:
retention.ms: 7200000
segment.bytes: 1073741824
EOF
-
運行以下命令查看 Pod 狀態,並等待 Kafka 和 Zookeeper 運行並啟動。
$ kubectl get po NAME READY STATUS RESTARTS AGE kafka-logs-receiver-entity-operator-568957ff84-nmtlw 3/3 Running 0 8m42s kafka-logs-receiver-kafka-0 1/1 Running 0 9m13s kafka-logs-receiver-zookeeper-0 1/1 Running 0 9m46s strimzi-cluster-operator-687fdd6f77-cwmgm 1/1 Running 0 11m
運行以下命令查看 Kafka 集羣的元數據:
# 啟動一個工具 pod
$ kubectl run utils --image=arunvelsriram/utils -i --tty --rm
# 查看 Kafka 集羣的元數據
$ kafkacat -L -b kafka-logs-receiver-kafka-brokers:9092
我們將這個 Kafka 服務器添加為日誌接收器。
- 以 admin 身份登錄 KubeSphere 的 Web 控制枱。點擊左上角的平台管理,然後選擇集羣管理。
如果您啟用了多集羣功能[7],您可以選擇一個集羣。
-
在集羣管理頁面,選擇集羣設置下的日誌收集。
-
點擊添加日誌接收器並選擇 Kafka。輸入 Kafka 代理地址和端口信息,然後點擊確定繼續。
- 運行以下命令驗證 Kafka 集羣是否能從 Fluent Bit 接收日誌: # 啟動一個工具 pod $ kubectl run utils --image=arunvelsriram/utils -i --tty --rm # 檢查 logs topic 中的日誌情況 $ kafkacat -C -b kafka-logs-receiver-kafka-0.kafka-logs-receiver-kafka-brokers.default.svc:9092 -t logs
部署 OpenFunction
按照概述中的設計,我們需要先部署 OpenFunction。OpenFunction 項目引用了很多第三方的項目,如 Knative、Tekton、ShipWright、Dapr、KEDA 等,手動安裝較為繁瑣,推薦使用 Prerequisites 文檔[8] 中的方法,一鍵部署 OpenFunction 的依賴組件。
其中 --with-shipwright 表示部署 shipwright 作為函數的構建驅動--with-openFuncAsync 表示部署 OpenFuncAsync Runtime 作為函數的負載驅動 而當你的網絡在訪問 Github 及 Google 受限時,可以加上 --poor-network 參數用於下載相關的組件
$ sh hack/deploy.sh --with-shipwright --with-openFuncAsync --poor-network
部署 OpenFunction:
此處選擇安裝最新的穩定版本,你也可以使用開發版本,參考 Install 文檔[9] 為了可以正常使用 ShipWright ,我們提供了默認的構建策略,可以使用以下命令設置該策略: $ kubectl apply -f http://raw.githubusercontent.com/OpenFunction/OpenFunction/main/config/strategy/openfunction.yaml
$ kubectl apply -f http://github.com/OpenFunction/OpenFunction/releases/download/v0.3.0/bundle.yaml
編寫日誌處理函數
我們以 創建並部署 WordPress[10] 為例,搭建一個 WordPress 應用作為日誌的生產者。該應用的工作負載所在的命名空間為 “demo-project”,Pod 名稱為 “wordpress-v1-f54f697c5-hdn2z”。
當請求結果為 404 時,我們收到的日誌內容如下:
{"@timestamp":1629856477.226758,"log":"*.*.*.* - - [25/Aug/2021:01:54:36 +0000] \"GET /notfound HTTP/1.1\" 404 49923 \"-\" \"curl/7.58.0\"\n","time":"2021-08-25T01:54:37.226757612Z","kubernetes":{"pod_name":"wordpress-v1-f54f697c5-hdn2z","namespace_name":"demo-project","container_name":"container-nrdsp1","docker_id":"bb7b48e2883be0c05b22c04b1d1573729dd06223ae0b1676e33a4fac655958a5","container_image":"wordpress:4.8-apache"}}
我們的需求是:當一個請求結果為 404 時,發送一個吿警通知給接收器(可以根據 配置 Slack 通知[11] 配置一個 Slack 吿警接收器),並記錄命名空間、Pod 名稱、請求路徑、請求方法等信息。按照這個需求,我們編寫一個簡單的處理函數:
你可以從 OpenFunction Context Spec[12] 處瞭解 openfunction-context 的使用方法,這是 OpenFunction 提供給用户編寫函數的工具庫 你可以通過 OpenFunction Samples[13] 瞭解更多的 OpenFunction 函數案例
package logshandler
import (
"encoding/json"
"fmt"
"log"
"regexp"
"time"
ofctx "github.com/OpenFunction/functions-framework-go/openfunction-context"
alert "github.com/prometheus/alertmanager/template"
)
const (
HTTPCodeNotFound = "404"
Namespace = "demo-project"
PodName = "wordpress-v1-[A-Za-z0-9]{9}-[A-Za-z0-9]{5}"
AlertName = "404 Request"
Severity = "warning"
)
// LogsHandler ctx 參數提供了用户函數在集羣語境中的上下文句柄,如 ctx.SendTo 用於將數據發送至指定的目的地
// LogsHandler in 參數用於將輸入源中的數據(如有)以 bytes 的方式傳遞給函數
func LogsHandler(ctx *ofctx.OpenFunctionContext, in []byte) int {
content := string(in)
// 這裏我們設置了三個正則表達式,分別用於匹配 HTTP 返回碼、資源命名空間、資源 Pod 名稱
matchHTTPCode, _ := regexp.MatchString(fmt.Sprintf(" %s ", HTTPCodeNotFound), content)
matchNamespace, _ := regexp.MatchString(fmt.Sprintf("namespace_name\":\"%s", Namespace), content)
matchPodName := regexp.MustCompile(fmt.Sprintf(`(%s)`, PodName)).FindStringSubmatch(content)
if matchHTTPCode && matchNamespace && matchPodName != nil {
log.Printf("Match log - Content: %s", content)
// 如果上述三個正則表達式同時命中,那麼我們需要提取日誌內容中的一些信息,用於填充至吿警信息中
// 這些信息為:404 請求的請求方式(HTTP Method)、請求路徑(HTTP Path)以及 Pod 名稱
match := regexp.MustCompile(`([A-Z]+) (/\S*) HTTP`).FindStringSubmatch(content)
if match == nil {
return 500
}
path := match[len(match)-1]
method := match[len(match)-2]
podName := matchPodName[len(matchPodName)-1]
// 收集到關鍵信息後,我們使用 altermanager 的 Data 結構體組裝吿警信息
notify := &alert.Data{
Receiver: "notification_manager",
Status: "firing",
Alerts: alert.Alerts{},
GroupLabels: alert.KV{"alertname": AlertName, "namespace": Namespace},
CommonLabels: alert.KV{"alertname": AlertName, "namespace": Namespace, "severity": Severity},
CommonAnnotations: alert.KV{},
ExternalURL: "",
}
alt := alert.Alert{
Status: "firing",
Labels: alert.KV{
"alertname": AlertName,
"namespace": Namespace,
"severity": Severity,
"pod": podName,
"path": path,
"method": method,
},
Annotations: alert.KV{},
StartsAt: time.Now(),
EndsAt: time.Time{},
GeneratorURL: "",
Fingerprint: "",
}
notify.Alerts = append(notify.Alerts, alt)
notifyBytes, _ := json.Marshal(notify)
// 使用 ctx.SendTo 將內容發送給名為 "notification-manager" 的輸出端(你可以在之後的函數配置 logs-handler-function.yaml 中找到它的定義)
if err := ctx.SendTo(notifyBytes, "notification-manager"); err != nil {
panic(err)
}
log.Printf("Send log to notification manager.")
}
return 200
}
我們將這個函數上傳到代碼倉庫中,記錄代碼倉庫的地址以及代碼在倉庫中的目錄路徑,在下面的創建函數步驟中我們將使用到這兩個值。
你可以在 OpenFunction Samples[14] 中找到這個案例。
創建函數
接下來我們將使用 OpenFunction 構建上述的函數。首先設置一個用於訪問鏡像倉庫的祕鑰文件 push-secret(在使用代碼構建出 OCI 鏡像後,OpenFunction 會將該鏡像上傳到用户的鏡像倉庫中,用於後續的負載啟動):
$ REGISTRY_SERVER=http://index.docker.io/v1/ REGISTRY_USER=<your username> REGISTRY_PASSWORD=<your password>
$ kubectl create secret docker-registry push-secret \
--docker-server=$REGISTRY_SERVER \
--docker-username=$REGISTRY_USER \
--docker-password=$REGISTRY_PASSWORD
應用函數 logs-handler-function.yaml:
函數定義中包含了對兩個關鍵組件的使用: Dapr[15] 對應用程序屏蔽了複雜的中間件,使得 logs-handler 可以非常容易地處理 Kafka 中的事件 KEDA[16] 通過監控消息服務器中的事件流量來驅動 logs-handler 函數的啟動,並且根據 Kafka 中消息的消費延時動態擴展 logs-handler 實例
apiVersion: core.openfunction.io/v1alpha1
kind: Function
metadata:
name: logs-handler
spec:
version: "v1.0.0"
# 這裏定義了構建後的鏡像的上傳路徑
image: openfunctiondev/logs-async-handler:v1
imageCredentials:
name: push-secret
build:
builder: openfunctiondev/go115-builder:v0.2.0
env:
FUNC_NAME: "LogsHandler"
# 這裏定義了源代碼的路徑
# url 為上面提到的代碼倉庫地址
# sourceSubPath 為代碼在倉庫中的目錄路徑
srcRepo:
url: "http://github.com/OpenFunction/samples.git"
sourceSubPath: "functions/OpenFuncAsync/logs-handler-function/"
serving:
# OpenFuncAsync 是 OpenFunction 通過 KEDA+Dapr 實現的一種由事件驅動的異步函數運行時
runtime: "OpenFuncAsync"
openFuncAsync:
# 此處定義了函數的輸入(kafka-receiver)和輸出(notification-manager),與下面 components 中的定義對應關聯
dapr:
inputs:
- name: kafka-receiver
type: bindings
outputs:
- name: notification-manager
type: bindings
params:
operation: "post"
type: "bindings"
annotations:
dapr.io/log-level: "debug"
# 這裏完成了上述輸入端和輸出端的具體定義(即 Dapr Components)
components:
- name: kafka-receiver
type: bindings.kafka
version: v1
metadata:
- name: brokers
value: "kafka-logs-receiver-kafka-brokers:9092"
- name: authRequired
value: "false"
- name: publishTopic
value: "logs"
- name: topics
value: "logs"
- name: consumerGroup
value: "logs-handler"
# 此處為 KubeSphere 的 notification-manager 地址
- name: notification-manager
type: bindings.http
version: v1
metadata:
- name: url
value: http://notification-manager-svc.kubesphere-monitoring-system.svc.cluster.local:19093/api/v2/alerts
keda:
scaledObject:
pollingInterval: 15
minReplicaCount: 0
maxReplicaCount: 10
cooldownPeriod: 30
# 這裏定義了函數的觸發器,即 Kafka 服務器的 “logs” topic
# 同時定義了消息堆積閾值(此處為 10),即當消息堆積量超過 10,logs-handler 實例個數就會自動擴展
triggers:
- type: kafka
metadata:
topic: logs
bootstrapServers: kafka-logs-receiver-kafka-brokers.default.svc.cluster.local:9092
consumerGroup: logs-handler
lagThreshold: "10"
結果演示
我們先關閉 Kafka 日誌接收器:在日誌收集頁面,點擊進入 Kafka 日誌接收器詳情頁面,然後點擊更多操作並選擇更改狀態,將其設置為關閉。
停用後一段時間,我們可以觀察到 logs-handler 函數實例已經收縮到 0 了。
再將 Kafka 日誌接收器激活,logs-handler 隨之啟動。
$ kubectl get po --watch
NAME READY STATUS RESTARTS AGE
kafka-logs-receiver-entity-operator-568957ff84-tdrrx 3/3 Running 0 7m27s
kafka-logs-receiver-kafka-0 1/1 Running 0 7m48s
kafka-logs-receiver-zookeeper-0 1/1 Running 0 8m12s
logs-handler-serving-kpngc-v100-zcj4q-5f46996f8c-b9d6f 2/2 Terminating 0 34s
strimzi-cluster-operator-687fdd6f77-kc8cv 1/1 Running 0 10m
logs-handler-serving-kpngc-v100-zcj4q-5f46996f8c-b9d6f 2/2 Terminating 0 36s
logs-handler-serving-kpngc-v100-zcj4q-5f46996f8c-b9d6f 0/2 Terminating 0 37s
logs-handler-serving-kpngc-v100-zcj4q-5f46996f8c-b9d6f 0/2 Terminating 0 38s
logs-handler-serving-kpngc-v100-zcj4q-5f46996f8c-b9d6f 0/2 Terminating 0 38s
logs-handler-serving-kpngc-v100-zcj4q-5f46996f8c-9kj2c 0/2 Pending 0 0s
logs-handler-serving-kpngc-v100-zcj4q-5f46996f8c-9kj2c 0/2 Pending 0 0s
logs-handler-serving-kpngc-v100-zcj4q-5f46996f8c-9kj2c 0/2 ContainerCreating 0 0s
logs-handler-serving-kpngc-v100-zcj4q-5f46996f8c-9kj2c 0/2 ContainerCreating 0 2s
logs-handler-serving-kpngc-v100-zcj4q-5f46996f8c-9kj2c 1/2 Running 0 4s
logs-handler-serving-kpngc-v100-zcj4q-5f46996f8c-9kj2c 2/2 Running 0 11s
接着我們向 WordPress 應用一個不存在的路徑發起請求:
$ curl http://<wp-svc-address>/notfound
可以看到 Slack 中已經收到了這條消息(與之對比的是,當我們正常訪問該 WordPress 站點時, Slack 中並不會收到吿警消息):
進一步探索
- 同步函數的解決方案
為了可以正常使用 Knative Serving ,我們需要設置其網關的負載均衡器地址。(你可以使用本機地址作為 workaround) 將下面的 "1.2.3.4" 替換為實際場景中的地址。
除了直接由 Kafka 服務器驅動函數運作(異步方式),OpenFunction 還支持使用自帶的事件框架對接 Kafka 服務器,之後以 Sink 的方式驅動 Knative 函數運作。可以參考 OpenFunction Samples[17] 中的案例。
在該方案中,同步函數的處理速度較之異步函數有所降低,當然我們同樣可以藉助 KEDA 來觸發 Knative Serving 的 concurrency 機制,但總體而言缺乏異步函數的便捷性。(後續的階段中我們會優化 OpenFunction 的事件框架來解決同步函數這方面的缺陷)
由此可見,不同類型的 Serverless 函數有其擅長的任務場景,如一個有序的控制流函數就需要由同步函數而非異步函數來處理。
綜述
Serverless 帶來了我們所期望的對業務場景快速拆解重構的能力。
如本案例所示,OpenFunction 不但以 Serverless 的方式提升了日誌處理、吿警通知鏈路的靈活度,還通過函數框架將通常對接 Kafka 時複雜的配置步驟簡化為語義明確的代碼邏輯。同時,我們也在不斷演進 OpenFunction,將在之後版本中實現由自身的 Serverless 能力驅動自身的組件運作。
引用鏈接 [1]Notification Manager: http://github.com/kubesphere/notification-manager/
[2]OpenFunction: http://github.com/OpenFunction/OpenFunction
[3]OpenFunction: http://github.com/OpenFunction/OpenFunction
[4]啟用可插拔組件: http://kubesphere.io/zh/docs/pluggable-components/
[5]strimzi-kafka-operator: http://github.com/strimzi/strimzi-kafka-operator
[6]strimzi-kafka-operator: http://github.com/strimzi/strimzi-kafka-operator
[7]多集羣功能: http://kubesphere.io/zh/docs/multicluster-management/
[8]Prerequisites 文檔: http://github.com/OpenFunction/OpenFunction#prerequisites
[9]Install 文檔: http://github.com/OpenFunction/OpenFunction#install
[10]創建並部署 WordPress: http://kubesphere.io/zh/docs/quick-start/wordpress-deployment/
[11]配置 Slack 通知: http://kubesphere.io/zh/docs/cluster-administration/platform-settings/notification-management/configure-slack/
[12]OpenFunction Context Spec: http://github.com/OpenFunction/functions-framework/blob/main/docs/OpenFunction-context-specs.md
[13]OpenFunction Samples: http://github.com/OpenFunction/samples
[14]OpenFunction Samples: http://github.com/OpenFunction/samples/tree/main/functions/OpenFuncAsync/logs-handler-function
[15]Dapr: http://dapr.io/
[16]KEDA: http://keda.sh/
[17]OpenFunction Samples: http://github.com/OpenFunction/samples/tree/main/functions/Knative/logs-handler-function
作者
方闐 OpenFunction 開源社區 Maintainer
- Kubernetes CRI 分析 - kubelet 創建 Pod 分析
- 終於有人把 ZFS 文件系統講明白了
- KVSSD: 結合 LSM 與 FTL 以實現寫入優化的 KV 存儲
- 雲戰略現狀調查: 歡迎來到多雲時代!
- 雲戰略現狀調查: 歡迎來到多雲時代!
- 以 Serverless 的方式實現 Kubernetes 日誌吿警
- Knative Autoscaler 自定義彈性伸縮
- 科技熱點週刊|Zoom 1 億美元、Docker 收費、380 億美元 Databricks
- 科技熱點週刊|Linux 30 週年、Horizon Workroom 發佈、Humanoid Robot、元宇宙
- KubeSphere 核心架構淺析
- Go 語言實現 WebSocket 推送
- 基於 SDN 編排的雲安全服務
- 複雜應用開發測試的 ChatOps 實踐
- 基於 Formily 的表單設計器實現原理分析
- SegmentFault 基於 Kubernetes 的容器化與持續交付實踐
- 基於 Kubernetes 的雲原生 AI 平台建設
- 雲原生|新東方在有狀態服務 In K8s 的實踐
- 在線教育平台青椒課堂:使用 KubeSphere QKE 輕鬆實現容器多集羣管理
- 人均雲原生2.0,容器的圈子內卷嗎?
- 存儲大師班:NFS 的誕生與成長