KubeSphere 後端原始碼深度解析

語言: CN / TW / HK

這篇文章我們將學習在 vscode 上的 ssh remote 外掛基礎上,嘗試 debug 和學習 KubeSphere 後端模組架構。

前提

  • 安裝好 vscode 以及 ssh remote container 外掛;

  • 在遠端主機上安裝好 kubenertes 容器 " 作業系統 " 和 KubeSphere >= v3.1.0 雲“控制面板”;

  • 安裝 go >=1.16;

  • 在 KubeSphere 上安裝了需要 debug 的 ks 元件,如 devops、kubeedge 或者 whatever, 如果是預設啟用的元件,像 monitoring,不需要去啟用。

配置 launch 檔案

$ cat .vscode/launch.json
{
// 使用 IntelliSense 瞭解相關屬性。
// 懸停以檢視現有屬性的描述。
// 欲瞭解更多資訊,請訪問: http://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "ks-apiserver",
"type": "go",
"request": "launch",
"mode": "auto",
"program": "${workspaceFolder}/cmd/ks-apiserver/apiserver.go"
}

]
}

ks-apiserver 除錯依賴檔案

在相對路徑 cmd/ks-apiserver/ 下配置 kubesphere.yaml。

首先,檢視叢集之中的 cm 配置檔案 :

$ kubectl -n kubesphere-system get cm kubesphere-config -oyaml

因為上述 configmap 中差少 kubeconfig 相關配置,所以需要將上述 yaml 檔案拷貝出來整合一下。

為啥要用新增 kubeconfig 檔案?

主要是因為 k8s 在建立 client 時需要這麼一個檔案 , 而容器中會用到 inclusterconfig 就不需要添加了。

感興趣可以看下 client-go 的例子:

http://github.com/kubernetes/client-go/blob/master/examples/in-cluster-client-configuration/main.go#L41
http://github.com/kubernetes/client-go/blob/master/examples/out-of-cluster-client-configuration/main.go#L53

所以完整的配置啟動檔案如下:

$ cat ./cmd/ks-apiserver/kubesphere.yaml
kubernetes:
kubeconfig: "/root/.kube/config"
master: http://192.168.88.6:6443
$qps: 1e+06
burst: 1000000
authentication:
authenticateRateLimiterMaxTries: 10
authenticateRateLimiterDuration: 10m0s
loginHistoryRetentionPeriod: 168h
maximumClockSkew: 10s
multipleLogin: True
kubectlImage: kubesphere/kubectl:v1.20.0
jwtSecret: "Xtc8ZWUf9f3cJN89bglrTJhfUPMZR87d"
oauthOptions:
clients:
- name: kubesphere
secret: kubesphere
redirectURIs:
- '*'
network:
ippoolType: none
monitoring:
endpoint: http://prometheus-operated.kubesphere-monitoring-system.svc:9090
enableGPUMonitoring: false
gpu:
kinds:
- resourceName: nvidia.com/gpu
resourceType: GPU
default: True
notification:
endpoint: http://notification-manager-svc.kubesphere-monitoring-system.svc:19093

kubeedge:
endpoint: http://edge-watcher.kubeedge.svc/api/

gateway:
watchesPath: /var/helm-charts/watches.yaml
namespace: kubesphere-controls-system

除了 kubernetes, 第一層的 key 表示我們叢集中已經按照或者預設啟用的 ks 元件,現在就可以通過 F5 來啟動 debug 了。

在 debug 之前,你可能會問,這個配置檔案為啥要放在 /cmd/ks-apiserver/kubesphere.yaml?

我們先來探索一波 ks-apiserver 的執行邏輯。

啟動 ks-apiserver

檢視 cmd/ks-apiserver/app/server.go 的邏輯 :

// Load configuration from file
conf, err := apiserverconfig.TryLoadFromDisk()

TryLoadFromDisk 的邏輯如下:

viper.SetConfigName(defaultConfigurationName) // kubesphere
viper.AddConfigPath(defaultConfigurationPath) // /etc/kubesphere

// Load from current working directory, only used for debugging
viper.AddConfigPath(".")

// Load from Environment variables
viper.SetEnvPrefix("kubesphere")
viper.AutomaticEnv()
viper.SetEnvKeyReplacer(strings.NewReplacer(".", "_"))

// 上面一頓配置之後,單步除錯,ReadInConfig這一步讀取的檔案路徑是
// v.configPaths:["/etc/kubesphere","/root/go/src/kubesphere.io/kubesphere/cmd/ks-apiserver"]
if err := viper.ReadInConfig(); err != nil {
if _, ok := err.(viper.ConfigFileNotFoundError); ok {
return nil, err
} else {
return nil, fmt.Errorf("error parsing configuration file %s", err)
}
}

conf := New() // 初始化各元件配置

// 從讀取的實際路徑配置檔案來反序列化到conf這個struct
if err := viper.Unmarshal(conf); err != nil {
return nil, err
}

return conf, n

上面的註釋,解釋了需要在指定路徑下新增 kubesphere.yaml 啟動 ks-apiserver 命令列。

我們接著往下擼,這裡使用 cobra.Command 這個 package 來做命令列的整合:

func Run(s *options.ServerRunOptions, ctx context.Context) error {
// NewAPIServer 通過給定的配置啟動apiserver例項,繫結例項化的各元件的client
// 這一步還通過AddToScheme來註冊一些自定義的GVK到k8s,最終暴露為apis API
// 藉助rest.Config和scheme 初始化runtimecache和runtimeClient
apiserver, err := s.NewAPIServer(ctx.Done())
if err != nil {
return err
}

// PrepareRun 主要是使用resful-go整合kapis API
// 上一步綁定了各元件的client,這一步就可以呼叫各元件的client來訪問對應元件的server端了
// 猜猜4.0後端可插拔架構會是什麼樣子的?
err = apiserver.PrepareRun(ctx.Done())
if err != nil {
return nil
}

// 執行各種informers同步資源,並開始ks-apiserver監聽請求
return apiserver.Run(ctx)
}

s.NewAPIServer(ctx.Done()) 主要是建立一個 apiserver 例項。建立 apiserver 例項這一步,還通過 scheme 註冊 ks 自定義的 GVK 到 k8s,  暴露為 apis 請求路徑的 API。

PrepareRun 主要是使用 resful-go 框架集成了各子模組代理請求或整合服務, 暴露為 kapis 請求路徑的 API 功能 。

apiserver.Run(ctx) 則是做了資源同步,並啟動 server 監聽。

下面分開闡述說明。

NewAPIServer

首先是繫結各種 client 和 informers:

// 呼叫各元件的NewForConfig方法整合clientset
kubernetesClient, err := k8s.NewKubernetesClient(s.KubernetesOptions)
if err != nil {
return nil, err
}
apiServer.KubernetesClient = kubernetesClient
informerFactory := informers.NewInformerFactories(kubernetesClient.Kubernetes(), kubernetesClient.KubeSphere(),kubernetesClient.Istio(), kubernetesClient.Snapshot(), kubernetesClient.ApiExtensions(), kubernetesClient.Prometheus())
apiServer.InformerFactory = informerFactory
...
// 根據kubesphere.yaml或者kubesphere-config configmap的配置來繫結ks元件的client
...

初始化繫結完畢後 , 會啟動一個 server 來響應請求 , 所以這裡會做一個 addr 繫結 :

...
server := &http.Server{
Addr: fmt.Sprintf(":%d", s.GenericServerRunOptions.InsecurePort),
}

if s.GenericServerRunOptions.SecurePort != 0 {
certificate, err := tls.LoadX509KeyPair(s.GenericServerRunOptions.TlsCertFile, s.GenericServerRunOptions.TlsPrivateKey)
if err != nil {
return nil, err
}

server.TLSConfig = &tls.Config{
Certificates: []tls.Certificate{certificate},
}
server.Addr = fmt.Sprintf(":%d", s.GenericServerRunOptions.SecurePort)
}

sch := scheme.Scheme
if err := apis.AddToScheme(sch); err != nil {
klog.Fatalf("unable add APIs to scheme: %v", err)
}
...

注意這一步 apis.AddToScheme(sch), 將我們定義的 GVK 註冊到 k8s 中。

順帶一提,GVK 指的是 Group,Version, Kind, 舉個栗子:

{Group: "", Version: "v1", Resource: "namespaces"}
{Group: "", Version: "v1", Resource: "nodes"}
{Group: "", Version: "v1", Resource: "resourcequotas"}
...
{Group: "tenant.kubesphere.io", Version: "v1alpha1", Resource: "workspaces"}
{Group: "cluster.kubesphere.io", Version: "v1alpha1", Resource: "clusters"}
...

Scheme 管理 GVK 和 Type 的關係 ,  一個 GVK 只能對應一個 reflect.Type, 一個 reflect.Type 可能對應多個 GVK;此外,Scheme 還聚合了 converter 及 cloner, 用來轉換不同版本的結構體和獲取結構體值的拷貝;限於篇幅有限,感興趣的童鞋可以深入探索下。

迴歸正文,下面我們看下怎麼注入 scheme 的:

// AddToSchemes may be used to add all resources defined in the project to a Schemevar AddToSchemes runtime.SchemeBuilder
// AddToScheme adds all Resources to the Schemefunc
AddToScheme(s *runtime.Scheme) error { return AddToSchemes.AddToScheme(s)}

而 AddToSchemes 這個型別的是 []func(*Scheme) error 的別名,只需要在 package apis 下的介面檔案中實現相應的 init() 方法來匯入實現的版本 API,就可以注入 Scheme 中。

舉個例子:

$ cat pkg/apis/addtoscheme_dashboard_v1alpha2.go
package apis
import monitoringdashboardv1alpha2 "kubesphere.io/monitoring-dashboard/api/v1alpha2"
func init() {
AddToSchemes = append(AddToSchemes, monitoringdashboardv1alpha2.SchemeBuilder.AddToScheme)
}

也就是,我們開發的外掛整合的版本化資源,必須實現 xxx.SchemeBuilder.AddToScheme 功能,才能註冊到 scheme 中,最終暴露為 apis 訪問 API 服務。

至此,所有子模組對應的 client 已經與這個 apiserver 繫結。

PrepareRun

下面,我們探討下 PrepareRun 是怎麼註冊 kapis 以及繫結 handler 的。

主要是通過 restful-go 框架來實現的。

restful-go 框架使用 container 來 hold 住擁有特定 GVR 的 webservice, 一個 webserver 可以繫結多個 router,允許 container 或者 webserver 新增自定義攔截器,也就是呼叫 filter 方法。

func (s *APIServer) PrepareRun(stopCh <-chan struct{}) error {
// container來hold住擁有特定GVR的webservice
s.container = restful.NewContainer()
// 新增請求Request日誌攔截器
s.container.Filter(logRequestAndResponse)
s.container.Router(restful.CurlyRouter{})

// 發生Recover時,繫結一個日誌handler
s.container.RecoverHandler(func(panicReason interface{}, httpWriter http.ResponseWriter) {
logStackOnRecover(panicReason, httpWriter)
})

// 每個API組都構建一個webservice,然後根據路由規則來並繫結回撥函式
// 通過AddToContainer來完成繫結
s.installKubeSphereAPIs()

// 註冊metrics指標: ks_server_request_total、ks_server_request_duration_seconds
// 繫結metrics handler
s.installMetricsAPI()

// 為有效請求增加監控計數
s.container.Filter(monitorRequest)

for _, ws := range s.container.RegisteredWebServices() {
klog.V(2).Infof("%s", ws.RootPath())
}

s.Server.Handler = s.container

// 新增各個呼叫鏈的攔截器, 用於驗證和路由分發
s.buildHandlerChain(stopCh)

return nil
}

上面主要使用 restful-go 框架給 s.Server.handler 綁定了一個 container, 添加了各種攔截器。

在 s.installKubeSphereAPIS() 這一步安裝 GVR 綁定了 kapis 代理,具體是這樣實現的:

// 呼叫各api組的AddToContainer方法來向container註冊kapi:
urlruntime.Must(monitoringv1alpha3.AddToContainer(s.container, s.KubernetesClient.Kubernetes(), s.MonitoringClient, s.MetricsClient, s.InformerFactory, s.KubernetesClient.KubeSphere(), s.Config.OpenPitrixOptions))

// 詳細來說,各個元件實現的AddToContainer方法
// 為帶有GroupVersion資訊的webserver新增route,不同路由路徑繫結不同的handler
ws := runtime.NewWebService(GroupVersion)
// 給子路由繫結回撥函式
ws.Route(ws.GET("/kubesphere").
To(h.handleKubeSphereMetricsQuery).
Doc("Get platform-level metric data.").
Metadata(restfulspec.KeyOpenAPITags, []string{constants.KubeSphereMetricsTag}).
Writes(model.Metrics{}).
Returns(http.StatusOK, respOK, model.Metrics{})).
Produces(restful.MIME_JSON)

我們知道 apis 對應 k8s 的請求,而在 ks 中 kapis 對應子元件的代理請求,由 ks-apiserver 自身或者轉發目標元件 server 來提供響應,那麼 ks-apiserver 是怎麼區分這些請求的?

答案是通過 buildHandlerChain 來進行分發的。

buildHandlerChain

上面說到 buildHandlerChain 構建了各種服務的攔截器,按序排列如下。

handler = filters.WithKubeAPIServer(handler, s.KubernetesClient.Config(), &errorResponder{})

if s.Config.AuditingOptions.Enable {
handler = filters.WithAuditing(handler,
audit.NewAuditing(s.InformerFactory, s.Config.AuditingOptions, stopCh))
}

handler = filters.WithAuthorization(handler, authorizers)
if s.Config.MultiClusterOptions.Enable {
clusterDispatcher := dispatch.NewClusterDispatch(s.InformerFactory.KubeSphereSharedInformerFactory().Cluster().V1alpha1().Clusters())
handler = filters.WithMultipleClusterDispatcher(handler, clusterDispatcher)
}

handler = filters.WithAuthentication(handler, authn)
handler = filters.WithRequestInfo(handler, requestInfoResolver)

WithRequestInfo 這個 filter 定義瞭如下邏輯:

info, err := resolver.NewRequestInfo(req)
---
func (r *RequestInfoFactory) NewRequestInfo(req *http.Request) (*RequestInfo, error) {
...
defer func() {
prefix := requestInfo.APIPrefix
if prefix == "" {
currentParts := splitPath(requestInfo.Path)
//Proxy discovery API
if len(currentParts) > 0 && len(currentParts) < 3 {
prefix = currentParts[0]
}
}
// 通過api路由路徑中的攜帶apis還是kapis就可以區分
if kubernetesAPIPrefixes.Has(prefix) {
requestInfo.IsKubernetesRequest = true
}
}()

...
// URL forms: /clusters/{cluster}/*
if currentParts[0] == "clusters" {
if len(currentParts) > 1 {
requestInfo.Cluster = currentParts[1]
}
if len(currentParts) > 2 {
currentParts = currentParts[2:]
}
}
...
}

程式碼很多,我就不一一截圖了,大概意思可以從註釋看到:

// NewRequestInfo returns the information from the http request.  If error is not nil, RequestInfo holds the information as best it is known before the failure
// It handles both resource and non-resource requests and fills in all the pertinent information for each.
// Valid Inputs:
//
// /apis/{api-group}/{version}/namespaces
// /api/{version}/namespaces
// /api/{version}/namespaces/{namespace}
// /api/{version}/namespaces/{namespace}/{resource}
// /api/{version}/namespaces/{namespace}/{resource}/{resourceName}
// /api/{version}/{resource}
// /api/{version}/{resource}/{resourceName}
//
// Special verbs without subresources:
// /api/{version}/proxy/{resource}/{resourceName}
// /api/{version}/proxy/namespaces/{namespace}/{resource}/{resourceName}
//
// Special verbs with subresources:
// /api/{version}/watch/{resource}
// /api/{version}/watch/namespaces/{namespace}/{resource}
//
// /kapis/{api-group}/{version}/workspaces/{workspace}/{resource}/{resourceName}
// /
// /kapis/{api-group}/{version}/namespaces/{namespace}/{resource}
// /kapis/{api-group}/{version}/namespaces/{namespace}/{resource}/{resourceName}
// With workspaces:
// /kapis/clusters/{cluster}/{api-group}/{version}/namespaces/{namespace}/{resource}
// /kapis/clusters/{cluster}/{api-group}/{version}/namespaces/{namespace}/{resource}/{resourceName}

通過路由定義的資訊,就可以區分這個請求是什麼級別的,以及這個請求要分發到哪個 server 了。

我們給各個 filter 的回撥函式加上斷點, 然後做個小實驗看下攔截器的攔截順序是怎樣的。

假設遠端雲主機的服務已經啟動,服務埠在 9090,以及你為 anonymous 這個 globalrole 設定了 monitoring.kubesphere.io 這個組下資源型別為 ClusterDashboard 的訪問許可權。當然了,你也可以用有訪問許可權的賬號來直接測試。

接下來,我們來發送一個 kapis 請求,看這個鏈路怎麼跳躍的:

curl -d '{"grafanaDashboardUrl":"http://grafana.com/api/dashboards/7362/revisions/5/download", "description":"this is a test dashboard."}' -H "Content-Type: application/json" localhost:9090/kapis/monitoring.kubesphere.io/v1alpha3/clusterdashboards/test1/template

測試結果如下:

WithRequestInfo -> WithAuthentication -> WithAuthorization -> WithKubeAPIServer

Run

這個方法主要乾了兩件事,一是啟動 informers 同步資源 , 二是啟動 ks apiserver。

func (s *APIServer) Run(ctx context.Context) (err error) {
// 啟動informer工廠,包括k8s和ks的informers
// 同步資源,包括k8s和ks的GVR
// 檢查GVR是否存在,不存在報錯警告,存在就同步
err = s.waitForResourceSync(ctx)
if err != nil {
return err
}

shutdownCtx, cancel := context.WithCancel(context.Background())
defer cancel()

go func() {
<-ctx.Done()
_ = s.Server.Shutdown(shutdownCtx)
}()

// 啟動server
klog.V(0).Infof("Start listening on %s", s.Server.Addr)
if s.Server.TLSConfig != nil {
err = s.Server.ListenAndServeTLS("", "")
} else {
err = s.Server.ListenAndServe()
}

return err
}

至此,呼叫完 Run 方法後,ks-apiserver 就啟動了。

現在我們做一下簡單總結:

  • 根據配置檔案建立 ks-apiserver 例項 , 該例項呼叫了三個關鍵方法,分別是 NewAPIServer、PrepareRun 以及 Run 方法;

  • NewAPIServer 通過給定的配置,繫結各個模組的 client,將自定義的 GVK 註冊到 Scheme,暴露 apis 路由服務;

  • PrepareRun 通過 restful-go 框架來註冊、繫結 kapi 路由和回撥函式,用來自身響應或者下發元件 server 查詢合併資料返回給客戶端 ;

  • 最後 , 呼叫 Run 方法,同步資源並啟動 ks-apiserver 服務;

GVK 探索實戰

顯然,我們只需要關注各模組的 AddToContainer 方法就行了。

iam.kubesphere.io

pkg/kapis/iam/v1alpha2/register.go

從程式碼註釋來看,這個模組管理著 users、clustermembers、globalroles、clusterroles、workspaceroles、roles、workspaces groups 、workspace members、devops members 等賬號角色的 CRUD。

現在我們可以在 handler 中打上斷點,去請求這些 api。

$ curl "localhost:9090/kapis/iam.kubesphere.io/v1alpha2/users"
$ curl "localhost:9090/kapis/iam.kubesphere.io/v1alpha2/clustermembers"
$ curl "localhost:9090/kapis/iam.kubesphere.io/v1alpha2/users/admin/globalroles"
...

kubeedge.kubesphere.io

pkg/kapis/kubeedge/v1alpha1/register.go

程式碼裡面使用的代理轉發請求:

func AddToContainer(container *restful.Container, endpoint string) error {
proxy, err := generic.NewGenericProxy(endpoint, GroupVersion.Group, GroupVersion.Version)
if err != nil {
return nil
}

return proxy.AddToContainer(container)
}

也就是 kapis/kubeedge.kubesphere.io 的請求會轉發到 http://edge-watcher.kubeedge.svc/api/,也就是 kubeedge 這個 namespace 下的 service,相關的介面整合在那裡。

關於整合邊緣計算平臺的整合,除了需要做一個主流邊緣框架的快速安裝和整合外,還可以整合一個類似 edge-shim 的介面卡,大概需要從一下幾個方面考慮:

  • 代理 endpoint: 現在的 kubeedge 就是使用代理模式轉發;

  • 健康檢查介面:至少要確保雲端的元件已經成功部署;

  • 事件、長期日誌、審計等可觀測元件的支援;

  • 其他邊緣輔助功能,如檔案或者配置下發等;

notification.kubesphere.io

pkg/kapis/notification/v2beta1/register.go

這個組下的 api 主要實現了 notification 的全域性或租戶級別的 config 和 receivers 資源的 CRUD。

config 資源

用於配置對接通知渠道相關引數的一些配置,分為全域性的和租戶級別的 config 資源;

reciever 資源

用於配置接收者的一些配置資訊,區分全域性的和租戶級別的接收者;

我們挑選一個回撥函式進行剖析:

ws.Route(ws.GET("/{resources}").
To(h.ListResource).
Doc("list the notification configs or receivers").
Metadata(KeyOpenAPITags, []string{constants.NotificationTag}).
Param(ws.PathParameter("resources", "known values include configs, receivers, secrets")).
Param(ws.QueryParameter(query.ParameterName, "name used for filtering").Required(false)).
Param(ws.QueryParameter(query.ParameterLabelSelector, "label selector used for filtering").Required(false)).
Param(ws.QueryParameter("type", "config or receiver type, known values include dingtalk, email, slack, webhook, wechat").Required(false)).
Param(ws.QueryParameter(query.ParameterPage, "page").Required(false).DataFormat("page=%d").DefaultValue("page=1")).
Param(ws.QueryParameter(query.ParameterLimit, "limit").Required(false)).
Param(ws.QueryParameter(query.ParameterAscending, "sort parameters, e.g. ascending=false").Required(false).DefaultValue("ascending=false")).
Param(ws.QueryParameter(query.ParameterOrderBy, "sort parameters, e.g. orderBy=createTime")).
Returns(http.StatusOK, api.StatusOK, api.ListResult{Items: []interface{}{}}))

func (h *handler) ListResource(req *restful.Request, resp *restful.Response) {
// 租戶或使用者的名稱
user := req.PathParameter("user")
// 資源型別,configs/recievers/secrets
resource := req.PathParameter("resources")
// 通知渠道 dingtalk/slack/email/webhook/wechat
subresource := req.QueryParameter("type")
q := query.ParseQueryParameter(req)

if !h.operator.IsKnownResource(resource, subresource) {
api.HandleBadRequest(resp, req, servererr.New("unknown resource type %s/%s", resource, subresource))
return
}

objs, err := h.operator.List(user, resource, subresource, q)
handleResponse(req, resp, objs, err)
}

我們看下 list object 的邏輯:

// List objects.
func (o *operator) List(user, resource, subresource string, q *query.Query) (*api.ListResult, error) {
if len(q.LabelSelector) > 0 {
q.LabelSelector = q.LabelSelector + ","
}

filter := ""
// 如果沒有給定租戶的名稱,則獲取全域性的物件
if user == "" {
if isConfig(o.GetObject(resource)) {
// type=default對config資源來說是全域性的
filter = "type=default"
} else {
// type=global對receiever資源來說是全域性的
filter = "type=global"
}
} else {
// 否則就給過濾器繫結租戶名稱
filter = "type=tenant,user=" + user
}
// 組裝過濾標籤
q.LabelSelector = q.LabelSelector + filter
...
// 通過過濾標籤獲取cluster或者namespace下的指定資源
res, err := o.resourceGetter.List(resource, ns, q)
if err != nil {
return nil, err
}

if subresource == "" || resource == Secret {
return res, nil
}

results := &api.ListResult{}
...
}

這樣一來,就實現了租戶級別的通知告警 CR 配置的 CRUD,這些 CR 是這麼分類的:

  • config 分為全域性 type = default, 租戶 type = tenant 兩種級別;

  • reciever 分為全域性 type = global, 租戶 type = tenant 兩種級別;

那麼 config 和 reciever 怎麼相互繫結、告警是如何通過渠道給租戶發訊息的?

http://github.com/kubesphere/notification-manager/blob/master/pkg/webhook/v1/handler.go#L45
http://github.com/kubesphere/notification-manager/blob/master/pkg/notify/notify.go#L66

notification-manager 簡稱 nm,我這裡斷章取義地簡要回答一下。

功能方面:

  • 全域性配置 reciever 通過配置的渠道將所有的 alerts 傳送給其定義好的接收者名單, 配置了租戶資訊的 reciever 只能通過渠道傳送當前 ns 下的 alerts;

  • reciever 中可以通過配置 alertSelector 引數來進一步過濾告警訊息;

  • 通過修改名為 notification-manager-template 的 confimap 來定製傳送訊息模板;

告警到通知的流程:

  • nm 使用埠 19093 和 API 路徑 /api/v2/alerts 接收從 Alertmanager  傳送的告警 ;

  • 回撥函式接受 alerts 轉換為 notification 模板資料,按照 namespace 區分告警資料;

  • 遍歷所有 Recievers,每個 ns 下啟動一個協程來發送訊息, 而這裡每個 ns 對應著多個通知渠道,因此也使用 waitgroup 來併發編排完成任務;

monitoring.kubesphere.io

pkg/kapis/monitoring/v1alpha3/register.go

將監控指標分為平臺級、節點級、workspaces、namespaces、pods 等級別,不僅可以獲取總的統計,還能獲取 nodes/namespaces/workspaces 下的所有 pods/containers 等監控指標。

我們查看回調函式,以 handleNamedMetricsQuery 為例分析:

  • 遍歷給定指標級別下的合法 metric 指標,根據請求引數中 metricFilter 的來過濾指標名;

  • 判斷為範圍查詢還是實時查詢,來調取 monitoring 包中相關方法,通過對應的 client 請求後端獲取結果返回;

程式碼如下:

func (h handler) handleNamedMetricsQuery(resp *restful.Response, q queryOptions) {
var res model.Metrics

var metrics []string
// q.namedMetrics 是一組按照監控指標級別分類好的擁有promsql expr定義的完整指標名陣列
// 監控指標級別分類是根據 monitoring.Levelxxx在上一個棧裡細分的,i.e: monitoring.LevelPod
for _, metric := range q.namedMetrics {
if strings.HasPrefix(metric, model.MetricMeterPrefix) {
// skip meter metric
continue
}
// 根據請求引數中的指標名來過濾
ok, _ := regexp.MatchString(q.metricFilter, metric)
if ok {
metrics = append(metrics, metric)
}
}
if len(metrics) == 0 {
resp.WriteAsJson(res)
return
}

// 判斷是否是範圍查詢還是實時查詢,繼續呼叫相關函式
// 主要還是用prometheus client去查詢promsql, 邊緣節點的指標目前通過metrics server來查詢
if q.isRangeQuery() {
res = h.mo.GetNamedMetricsOverTime(metrics, q.start, q.end, q.step, q.option)
} else {
res = h.mo.GetNamedMetrics(metrics, q.time, q.option)
if q.shouldSort() {
res = *res.Sort(q.target, q.order, q.identifier).Page(q.page, q.limit)
}
}
resp.WriteAsJson(res)
}

現在,我們將視角移植到 :

pkg/models/monitoring/monitoring.go:156

以 GetNamedMetricsOverTime 為例,這裡闡述了會合並 prometheus 和 metrics-server 的查詢結果進行返回:

func (mo monitoringOperator) GetNamedMetricsOverTime(metrics []string, start, end time.Time, step time.Duration, opt monitoring.QueryOption) Metrics {
// 獲取prometheus client查詢結果,主要使用sync.WaitGroup併發查詢,每個指標啟動一個goroutine,最後將結果和並返回
ress := mo.prometheus.GetNamedMetricsOverTime(metrics, start, end, step, opt)
// 如果metrics-server激活了
if mo.metricsserver != nil {

//合併邊緣節點資料
edgeMetrics := make(map[string]monitoring.MetricData)

for i, ressMetric := range ress {
metricName := ressMetric.MetricName
ressMetricValues := ressMetric.MetricData.MetricValues
if len(ressMetricValues) == 0 {
// this metric has no prometheus metrics data
if len(edgeMetrics) == 0 {
// start to request monintoring metricsApi data
mr := mo.metricsserver.GetNamedMetricsOverTime(metrics, start, end, step, opt)
for _, mrMetric := range mr {
edgeMetrics[mrMetric.MetricName] = mrMetric.MetricData
}
}
if val, ok := edgeMetrics[metricName]; ok {
ress[i].MetricData.MetricValues = append(ress[i].MetricData.MetricValues, val.MetricValues...)
}
}
}
}

return Metrics{Results: ress}
}

此外,monitoring 包還定義了各監控查詢 client 的介面方法,可以按需探索:

  • GetMetric(expr string, time time.Time) Metric

  • GetMetricOverTime(expr string, start, end time.Time, step time.Duration) Metric

  • GetNamedMetrics(metrics []string, time time.Time, opt QueryOption) []Metric

  • GetNamedMetricsOverTime(metrics []string, start, end time.Time, step time.Duration, opt QueryOption) []Metric

  • GetMetadata(namespace string) []Metadata

  • GetMetricLabelSet(expr string, start, end time.Time) []map[string]string

tenant.kubesphere.io

在聊 api 之前,順帶一提多租戶在隔離的安全程度上,我們可以將其分為軟隔離 (Soft Multi-tenancy) 和硬隔離 (Hard Multi-tenancy) 兩種。

  • 軟隔離更多的是面向企業內部的多租需求;

  • 硬隔離面向的更多是對外提供服務的服務供應商,需要更嚴格的隔離作為安全保障。

這個 group 下比較重要的部分是實現租戶查詢 logs/audits/events:

以查詢日誌為例:

func (h *tenantHandler) QueryLogs(req *restful.Request, resp *restful.Response) {
// 查詢上下文中攜帶的租戶資訊
user, ok := request.UserFrom(req.Request.Context())
if !ok {
err := fmt.Errorf("cannot obtain user info")
klog.Errorln(err)
api.HandleForbidden(resp, req, err)
return
}
// 解析查詢的引數,比如確定屬於哪個ns/workload/pod/container的查詢、時間段,是否為柱狀查詢等
queryParam, err := loggingv1alpha2.ParseQueryParameter(req)
if err != nil {
klog.Errorln(err)
api.HandleInternalError(resp, req, err)
return
}
// 匯出資料
if queryParam.Operation == loggingv1alpha2.OperationExport {
resp.Header().Set(restful.HEADER_ContentType, "text/plain")
resp.Header().Set("Content-Disposition", "attachment")
// 驗證賬號是否有許可權
// admin賬號可以匯出所有ns的日誌,租戶只能匯出本ns的日誌
// 組裝loggingclient進行日誌匯出
err := h.tenant.ExportLogs(user, queryParam, resp)
if err != nil {
klog.Errorln(err)
api.HandleInternalError(resp, req, err)
return
}
} else {
// 驗證賬號是否有許可權
// admin賬號可以檢視所有ns的日誌,租戶只能檢視本ns的日誌
// 組裝loggingclient進行日誌返回
result, err := h.tenant.QueryLogs(user, queryParam)
if err != nil {
klog.Errorln(err)
api.HandleInternalError(resp, req, err)
return
}
resp.WriteAsJson(result)
}
}

由於篇幅有限,只對以上 GVR 進行了除錯,感興趣可以深入瞭解~

關於 KubeSphere

KubeSphere (http://kubesphere.io)是在 Kubernetes 之上構建的 開源容器混合雲 ,提供全棧的 IT 自動化運維的能力,簡化企業的 DevOps 工作流。

KubeSphere 已被  Aqara 智慧家居、愛立信、本來生活、東軟、華雲、新浪、三一重工、華夏銀行、四川航空、國藥集團、微眾銀行、 杭州數跑科技、 紫金保險、去哪兒網、中通、中國人民銀行、中國銀行、 中國人保壽險、中國太平保險、中國移動、中國電信、天翼雲、 中移金科、Radore、ZaloPay  等海內外數千家企業採用。KubeSphere 提供了開發者友好的嚮導式操作介面和豐富的企業級功能,包括  Kubernetes   多雲與多叢集管理、DevOps (CI/CD)、應用生命週期管理、邊緣計算、微服務治理 (Service Mesh)、多租戶管理、可觀測性、儲存與網路管理、GPU support  等功能,幫助企業快速構建一個強大和功能豐富的容器雲平臺。

 :sparkles: GitHub :http://github.com/kubesphere

 :computer: 官網(中國站) :http://kubesphere.com.cn

:man:‍:computer:‍  微信群: 請搜尋新增群助手微訊號  kubesphere