以 Serverless 的方式實現 Kubernetes 日誌告警

語言: CN / TW / HK

當我們將容器的日誌收集到訊息伺服器之後,我們該如何處理這些日誌?部署一個專用的日誌處理工作負載可能會耗費多餘的成本,而當日志體量驟增、驟降時亦難以評估日誌處理工作負載的待機數量。本文提供了一種基於 Serverless 的日誌處理思路,可以在降低該任務鏈路成本的同時提高其靈活性。

我們的大體設計是使用 Kafka 伺服器作為日誌的接收器,之後以輸入 Kafka 伺服器的日誌作為事件,驅動 Serverless 工作負載對日誌進行處理。據此的大致步驟為:

  1. 搭建 Kafka 伺服器作為 Kubernetes 叢集的日誌接收器

  2. 部署 OpenFunction 為日誌處理工作負載提供 Serverless 能力

  3. 編寫日誌處理函式,抓取特定的日誌生成告警訊息

  4. 配置 Notification Manager[1] 將告警傳送至 Slack

file

在這個場景中,我們會利用到 OpenFunction[2] 帶來的 Serverless 能力。

OpenFunction[3] 是 KubeSphere 社群開源的一個 FaaS(Serverless)專案,旨在讓使用者專注於他們的業務邏輯,而不必關心底層執行環境和基礎設施。該專案當前具備以下關鍵能力:

  • 支援通過 dockerfile 或 buildpacks 方式構建 OCI 映象
  • 支援使用 Knative Serving 或 OpenFunctionAsync ( KEDA + Dapr ) 作為 runtime 執行 Serverless 工作負載
  • 自帶事件驅動框架

使用 Kafka 作為日誌接收器

首先,我們為 KubeSphere 平臺開啟 logging 元件(可以參考 啟用可插拔元件[3] 獲取更多資訊)。然後我們使用 strimzi-kafka-operator[5] 搭建一個最小化的 Kafka 伺服器。

  1. 在 default 名稱空間中安裝 strimzi-kafka-operator[6] :

     helm repo add strimzi http://strimzi.io/charts/
     helm install kafka-operator -n default strimzi/strimzi-kafka-operator
    
  2. 執行以下命令在 default 名稱空間中建立 Kafka 叢集和 Kafka Topic,該命令所建立的 Kafka 和 Zookeeper 叢集的儲存型別為 ephemeral,使用 emptyDir 進行演示。

注意,我們此時建立了一個名為 “logs” 的 topic,後續會用到它

	cat <<EOF | kubectl apply -f -
	apiVersion: kafka.strimzi.io/v1beta2
	kind: Kafka
	metadata:
		name: kafka-logs-receiver
		namespace: default
	spec:
		kafka:
			version: 2.8.0
			replicas: 1
			listeners:
				- name: plain
					port: 9092
					type: internal
					tls: false
				- name: tls
					port: 9093
					type: internal
					tls: true
			config:
				offsets.topic.replication.factor: 1
				transaction.state.log.replication.factor: 1
				transaction.state.log.min.isr: 1
				log.message.format.version: '2.8'
				inter.broker.protocol.version: "2.8"
			storage:
				type: ephemeral
		zookeeper:
			replicas: 1
			storage:
				type: ephemeral
		entityOperator:
			topicOperator: {}
			userOperator: {}
	---
	apiVersion: kafka.strimzi.io/v1beta1
	kind: KafkaTopic
	metadata:
		name: logs
		namespace: default
		labels:
			strimzi.io/cluster: kafka-logs-receiver
	spec:
		partitions: 10
		replicas: 3
		config:
			retention.ms: 7200000
			segment.bytes: 1073741824
	EOF
  1. 執行以下命令檢視 Pod 狀態,並等待 Kafka 和 Zookeeper 執行並啟動。

     $ kubectl get po
     NAME                                                   READY   STATUS        RESTARTS   AGE
     kafka-logs-receiver-entity-operator-568957ff84-nmtlw   3/3     Running       0          8m42s
     kafka-logs-receiver-kafka-0                            1/1     Running       0          9m13s
     kafka-logs-receiver-zookeeper-0                        1/1     Running       0          9m46s
     strimzi-cluster-operator-687fdd6f77-cwmgm              1/1     Running       0          11m
    

執行以下命令檢視 Kafka 叢集的元資料:

	# 啟動一個工具 pod
	$ kubectl run utils --image=arunvelsriram/utils -i --tty --rm
	# 檢視 Kafka 叢集的元資料
	$ kafkacat -L -b kafka-logs-receiver-kafka-brokers:9092

我們將這個 Kafka 伺服器新增為日誌接收器。

  1. 以 admin 身份登入 KubeSphere 的 Web 控制檯。點選左上角的平臺管理,然後選擇叢集管理。

如果您啟用了多叢集功能[7],您可以選擇一個叢集。

  1. 在叢集管理頁面,選擇叢集設定下的日誌收集。

  2. 點選新增日誌接收器並選擇 Kafka。輸入 Kafka 代理地址和埠資訊,然後點選確定繼續。

file

  1. 執行以下命令驗證 Kafka 叢集是否能從 Fluent Bit 接收日誌: # 啟動一個工具 pod $ kubectl run utils --image=arunvelsriram/utils -i --tty --rm # 檢查 logs topic 中的日誌情況 $ kafkacat -C -b kafka-logs-receiver-kafka-0.kafka-logs-receiver-kafka-brokers.default.svc:9092 -t logs

部署 OpenFunction

按照概述中的設計,我們需要先部署 OpenFunction。OpenFunction 專案引用了很多第三方的專案,如 Knative、Tekton、ShipWright、Dapr、KEDA 等,手動安裝較為繁瑣,推薦使用 Prerequisites 文件[8] 中的方法,一鍵部署 OpenFunction 的依賴元件。

其中 --with-shipwright 表示部署 shipwright 作為函式的構建驅動--with-openFuncAsync 表示部署 OpenFuncAsync Runtime 作為函式的負載驅動 而當你的網路在訪問 Github 及 Google 受限時,可以加上 --poor-network 引數用於下載相關的元件

	$ sh hack/deploy.sh --with-shipwright --with-openFuncAsync --poor-network

部署 OpenFunction:

此處選擇安裝最新的穩定版本,你也可以使用開發版本,參考 Install 文件[9] 為了可以正常使用 ShipWright ,我們提供了預設的構建策略,可以使用以下命令設定該策略: $ kubectl apply -f http://raw.githubusercontent.com/OpenFunction/OpenFunction/main/config/strategy/openfunction.yaml

	$ kubectl apply -f http://github.com/OpenFunction/OpenFunction/releases/download/v0.3.0/bundle.yaml

編寫日誌處理函式

我們以 建立並部署 WordPress[10] 為例,搭建一個 WordPress 應用作為日誌的生產者。該應用的工作負載所在的名稱空間為 “demo-project”,Pod 名稱為 “wordpress-v1-f54f697c5-hdn2z”。

當請求結果為 404 時,我們收到的日誌內容如下:

	{"@timestamp":1629856477.226758,"log":"*.*.*.* - - [25/Aug/2021:01:54:36 +0000] \"GET /notfound HTTP/1.1\" 404 49923 \"-\" \"curl/7.58.0\"\n","time":"2021-08-25T01:54:37.226757612Z","kubernetes":{"pod_name":"wordpress-v1-f54f697c5-hdn2z","namespace_name":"demo-project","container_name":"container-nrdsp1","docker_id":"bb7b48e2883be0c05b22c04b1d1573729dd06223ae0b1676e33a4fac655958a5","container_image":"wordpress:4.8-apache"}}

我們的需求是:當一個請求結果為 404 時,傳送一個告警通知給接收器(可以根據 配置 Slack 通知[11] 配置一個 Slack 告警接收器),並記錄名稱空間、Pod 名稱、請求路徑、請求方法等資訊。按照這個需求,我們編寫一個簡單的處理函式:

你可以從 OpenFunction Context Spec[12] 處瞭解 openfunction-context 的使用方法,這是 OpenFunction 提供給使用者編寫函式的工具庫 你可以通過 OpenFunction Samples[13] 瞭解更多的 OpenFunction 函式案例

package logshandler

import (
 "encoding/json"
 "fmt"
 "log"
 "regexp"
 "time"

 ofctx "github.com/OpenFunction/functions-framework-go/openfunction-context"
 alert "github.com/prometheus/alertmanager/template"
)

const (
 HTTPCodeNotFound = "404"
 Namespace        = "demo-project"
 PodName          = "wordpress-v1-[A-Za-z0-9]{9}-[A-Za-z0-9]{5}"
 AlertName        = "404 Request"
 Severity         = "warning"
)

// LogsHandler ctx 引數提供了使用者函式在叢集語境中的上下文控制代碼,如 ctx.SendTo 用於將資料傳送至指定的目的地
// LogsHandler in 引數用於將輸入源中的資料(如有)以 bytes 的方式傳遞給函式
func LogsHandler(ctx *ofctx.OpenFunctionContext, in []byte) int {
 content := string(in)
 // 這裡我們設定了三個正則表示式,分別用於匹配 HTTP 返回碼、資源名稱空間、資源 Pod 名稱
 matchHTTPCode, _ := regexp.MatchString(fmt.Sprintf(" %s ", HTTPCodeNotFound), content)
 matchNamespace, _ := regexp.MatchString(fmt.Sprintf("namespace_name\":\"%s", Namespace), content)
 matchPodName := regexp.MustCompile(fmt.Sprintf(`(%s)`, PodName)).FindStringSubmatch(content)

 if matchHTTPCode && matchNamespace && matchPodName != nil {
	log.Printf("Match log - Content: %s", content)

	// 如果上述三個正則表示式同時命中,那麼我們需要提取日誌內容中的一些資訊,用於填充至告警資訊中
	// 這些資訊為:404 請求的請求方式(HTTP Method)、請求路徑(HTTP Path)以及 Pod 名稱
	match := regexp.MustCompile(`([A-Z]+) (/\S*) HTTP`).FindStringSubmatch(content)
	if match == nil {
	 return 500
	}
	path := match[len(match)-1]
	method := match[len(match)-2]
	podName := matchPodName[len(matchPodName)-1]

	// 收集到關鍵資訊後,我們使用 altermanager 的 Data 結構體組裝告警資訊
	notify := &alert.Data{
	 Receiver:          "notification_manager",
	 Status:            "firing",
	 Alerts:            alert.Alerts{},
	 GroupLabels:       alert.KV{"alertname": AlertName, "namespace": Namespace},
	 CommonLabels:      alert.KV{"alertname": AlertName, "namespace": Namespace, "severity": Severity},
	 CommonAnnotations: alert.KV{},
	 ExternalURL:       "",
	}
	alt := alert.Alert{
	 Status: "firing",
	 Labels: alert.KV{
		"alertname": AlertName,
		"namespace": Namespace,
		"severity":  Severity,
		"pod":       podName,
		"path":      path,
		"method":    method,
	 },
	 Annotations:  alert.KV{},
	 StartsAt:     time.Now(),
	 EndsAt:       time.Time{},
	 GeneratorURL: "",
	 Fingerprint:  "",
	}
	notify.Alerts = append(notify.Alerts, alt)
	notifyBytes, _ := json.Marshal(notify)

	// 使用 ctx.SendTo 將內容傳送給名為 "notification-manager" 的輸出端(你可以在之後的函式配置 logs-handler-function.yaml 中找到它的定義)
	if err := ctx.SendTo(notifyBytes, "notification-manager"); err != nil {
	 panic(err)
	}
	log.Printf("Send log to notification manager.")
 }
 return 200
}

我們將這個函式上傳到程式碼倉庫中,記錄程式碼倉庫的地址以及程式碼在倉庫中的目錄路徑,在下面的建立函式步驟中我們將使用到這兩個值。

你可以在 OpenFunction Samples[14] 中找到這個案例。

建立函式

接下來我們將使用 OpenFunction 構建上述的函式。首先設定一個用於訪問映象倉庫的祕鑰檔案 push-secret(在使用程式碼構建出 OCI 映象後,OpenFunction 會將該映象上傳到使用者的映象倉庫中,用於後續的負載啟動):

$ REGISTRY_SERVER=http://index.docker.io/v1/ REGISTRY_USER=<your username> REGISTRY_PASSWORD=<your password>
$ kubectl create secret docker-registry push-secret \
		--docker-server=$REGISTRY_SERVER \
		--docker-username=$REGISTRY_USER \
		--docker-password=$REGISTRY_PASSWORD

應用函式 logs-handler-function.yaml:

函式定義中包含了對兩個關鍵元件的使用: Dapr[15] 對應用程式遮蔽了複雜的中介軟體,使得 logs-handler 可以非常容易地處理 Kafka 中的事件 KEDA[16] 通過監控訊息伺服器中的事件流量來驅動 logs-handler 函式的啟動,並且根據 Kafka 中訊息的消費延時動態擴充套件 logs-handler 例項

apiVersion: core.openfunction.io/v1alpha1
kind: Function
metadata:
	name: logs-handler
spec:
	version: "v1.0.0"
	# 這裡定義了構建後的映象的上傳路徑
	image: openfunctiondev/logs-async-handler:v1
	imageCredentials:
		name: push-secret
	build:
		builder: openfunctiondev/go115-builder:v0.2.0
		env:
			FUNC_NAME: "LogsHandler"
		# 這裡定義了原始碼的路徑
		# url 為上面提到的程式碼倉庫地址
		# sourceSubPath 為程式碼在倉庫中的目錄路徑
		srcRepo:
			url: "http://github.com/OpenFunction/samples.git"
			sourceSubPath: "functions/OpenFuncAsync/logs-handler-function/"
	serving:
		# OpenFuncAsync 是 OpenFunction 通過 KEDA+Dapr 實現的一種由事件驅動的非同步函式執行時
		runtime: "OpenFuncAsync"
		openFuncAsync:
			# 此處定義了函式的輸入(kafka-receiver)和輸出(notification-manager),與下面 components 中的定義對應關聯
			dapr:
				inputs:
					- name: kafka-receiver
						type: bindings
				outputs:
					- name: notification-manager
						type: bindings
						params:
							operation: "post"
							type: "bindings"
				annotations:
					dapr.io/log-level: "debug"
				# 這裡完成了上述輸入端和輸出端的具體定義(即 Dapr Components)
				components:
					- name: kafka-receiver
						type: bindings.kafka
						version: v1
						metadata:
							- name: brokers
								value: "kafka-logs-receiver-kafka-brokers:9092"
							- name: authRequired
								value: "false"
							- name: publishTopic
								value: "logs"
							- name: topics
								value: "logs"
							- name: consumerGroup
								value: "logs-handler"
					# 此處為 KubeSphere 的 notification-manager 地址
					- name: notification-manager
						type: bindings.http
						version: v1
						metadata:
							- name: url
								value: http://notification-manager-svc.kubesphere-monitoring-system.svc.cluster.local:19093/api/v2/alerts
			keda:
				scaledObject:
					pollingInterval: 15
					minReplicaCount: 0
					maxReplicaCount: 10
					cooldownPeriod: 30
					# 這裡定義了函式的觸發器,即 Kafka 伺服器的 “logs” topic
					# 同時定義了訊息堆積閾值(此處為 10),即當訊息堆積量超過 10,logs-handler 例項個數就會自動擴充套件
					triggers:
						- type: kafka
							metadata:
								topic: logs
								bootstrapServers: kafka-logs-receiver-kafka-brokers.default.svc.cluster.local:9092
								consumerGroup: logs-handler
								lagThreshold: "10"

結果演示

我們先關閉 Kafka 日誌接收器:在日誌收集頁面,點選進入 Kafka 日誌接收器詳情頁面,然後點選更多操作並選擇更改狀態,將其設定為關閉。

停用後一段時間,我們可以觀察到 logs-handler 函式例項已經收縮到 0 了。

再將 Kafka 日誌接收器啟用,logs-handler 隨之啟動。

	$ kubectl get po --watch
	NAME                                                     READY   STATUS        RESTARTS   AGE
	kafka-logs-receiver-entity-operator-568957ff84-tdrrx     3/3     Running       0          7m27s
	kafka-logs-receiver-kafka-0                              1/1     Running       0          7m48s
	kafka-logs-receiver-zookeeper-0                          1/1     Running       0          8m12s
	logs-handler-serving-kpngc-v100-zcj4q-5f46996f8c-b9d6f   2/2     Terminating   0          34s
	strimzi-cluster-operator-687fdd6f77-kc8cv                1/1     Running       0          10m
	logs-handler-serving-kpngc-v100-zcj4q-5f46996f8c-b9d6f   2/2     Terminating   0          36s
	logs-handler-serving-kpngc-v100-zcj4q-5f46996f8c-b9d6f   0/2     Terminating   0          37s
	logs-handler-serving-kpngc-v100-zcj4q-5f46996f8c-b9d6f   0/2     Terminating   0          38s
	logs-handler-serving-kpngc-v100-zcj4q-5f46996f8c-b9d6f   0/2     Terminating   0          38s
	logs-handler-serving-kpngc-v100-zcj4q-5f46996f8c-9kj2c   0/2     Pending       0          0s
	logs-handler-serving-kpngc-v100-zcj4q-5f46996f8c-9kj2c   0/2     Pending       0          0s
	logs-handler-serving-kpngc-v100-zcj4q-5f46996f8c-9kj2c   0/2     ContainerCreating   0          0s
	logs-handler-serving-kpngc-v100-zcj4q-5f46996f8c-9kj2c   0/2     ContainerCreating   0          2s
	logs-handler-serving-kpngc-v100-zcj4q-5f46996f8c-9kj2c   1/2     Running             0          4s
	logs-handler-serving-kpngc-v100-zcj4q-5f46996f8c-9kj2c   2/2     Running             0          11s

接著我們向 WordPress 應用一個不存在的路徑發起請求:

$ curl http://<wp-svc-address>/notfound

可以看到 Slack 中已經收到了這條訊息(與之對比的是,當我們正常訪問該 WordPress 站點時, Slack 中並不會收到告警訊息):

file

進一步探索

  • 同步函式的解決方案

為了可以正常使用 Knative Serving ,我們需要設定其閘道器的負載均衡器地址。(你可以使用本機地址作為 workaround) 將下面的 "1.2.3.4" 替換為實際場景中的地址。 file

除了直接由 Kafka 伺服器驅動函式運作(非同步方式),OpenFunction 還支援使用自帶的事件框架對接 Kafka 伺服器,之後以 Sink 的方式驅動 Knative 函式運作。可以參考 OpenFunction Samples[17] 中的案例。

在該方案中,同步函式的處理速度較之非同步函式有所降低,當然我們同樣可以藉助 KEDA 來觸發 Knative Serving 的 concurrency 機制,但總體而言缺乏非同步函式的便捷性。(後續的階段中我們會優化 OpenFunction 的事件框架來解決同步函式這方面的缺陷)

由此可見,不同型別的 Serverless 函式有其擅長的任務場景,如一個有序的控制流函式就需要由同步函式而非非同步函式來處理。

綜述

Serverless 帶來了我們所期望的對業務場景快速拆解重構的能力。

如本案例所示,OpenFunction 不但以 Serverless 的方式提升了日誌處理、告警通知鏈路的靈活度,還通過函式框架將通常對接 Kafka 時複雜的配置步驟簡化為語義明確的程式碼邏輯。同時,我們也在不斷演進 OpenFunction,將在之後版本中實現由自身的 Serverless 能力驅動自身的元件運作。

引用連結 [1]Notification Manager: http://github.com/kubesphere/notification-manager/

[2]OpenFunction: http://github.com/OpenFunction/OpenFunction

[3]OpenFunction: http://github.com/OpenFunction/OpenFunction

[4]啟用可插拔元件: http://kubesphere.io/zh/docs/pluggable-components/

[5]strimzi-kafka-operator: http://github.com/strimzi/strimzi-kafka-operator

[6]strimzi-kafka-operator: http://github.com/strimzi/strimzi-kafka-operator

[7]多叢集功能: http://kubesphere.io/zh/docs/multicluster-management/

[8]Prerequisites 文件: http://github.com/OpenFunction/OpenFunction#prerequisites

[9]Install 文件: http://github.com/OpenFunction/OpenFunction#install

[10]建立並部署 WordPress: http://kubesphere.io/zh/docs/quick-start/wordpress-deployment/

[11]配置 Slack 通知: http://kubesphere.io/zh/docs/cluster-administration/platform-settings/notification-management/configure-slack/

[12]OpenFunction Context Spec: http://github.com/OpenFunction/functions-framework/blob/main/docs/OpenFunction-context-specs.md

[13]OpenFunction Samples: http://github.com/OpenFunction/samples

[14]OpenFunction Samples: http://github.com/OpenFunction/samples/tree/main/functions/OpenFuncAsync/logs-handler-function

[15]Dapr: http://dapr.io/

[16]KEDA: http://keda.sh/

[17]OpenFunction Samples: http://github.com/OpenFunction/samples/tree/main/functions/Knative/logs-handler-function

作者

方闐 OpenFunction 開源社群 Maintainer