Go 語言實現 WebSocket 推送
寫在前面
系統開發的過程中,我們經常需要實現訊息推送的需求。單端單例項的情況下很好處理(網上有許多教程這裡不做展開),但在分散式系統及多端需要推送的情況下該如何處理呢?
在分散式系統中,訊息推送服務端是多例項的。某系統中一個服務生成一條訊息,這條訊息需要實時推送到多個終端,此時該如何進行有效的 WebSocket 推送呢?首先一起看看如下場景:
假設推送訊息由訊息例項 2 產生,但是終端真正連線的訊息例項是例項 1 和例項 3,並沒有連線到生產訊息的例項 2,系統是如何將例項 2 的訊息同步推送到終端 1 和終端 2 的呢?下文將詳細描述。
基本原理
為了滿足需求,我們採用 Redis 做協同中介軟體,用於儲存使用者資訊、生成使用者連線的唯一性標識以及 pod address,訊息的生產者例項通過訂閱 Redis 獲取終端連線的唯一性標識和 pod address,並通知到對應的訊息例項,最終由相應連線終端的訊息例項通過 WebSocket 將訊息發推送到使用者終端。具體流程如下圖:
為了實現構想,我們構造了兩個元件:Client、ClientManager,實現邏輯如下。
服務端實現
Client
Client 元件的作用,是當用戶與訊息服務中某個例項建立連線後,管理這個連線的資訊,這裡通過一個 Golang 結構體來定義:
type Client struct {
UUID string
UserID string
Socket *websocket.Conn
Send chan []byte
}
結構體中的資料型別說明如下:
-
UUID:對連線進行唯一性的標識,通過此標識可以查詢到連線資訊。
-
UserID:使用者 ID。
-
Socket:連線物件。
-
Send:訊息資料 channel。
我們為 Client 結構體實現了兩個方法:Read、Write 來處理訊息的接受和傳送。
Read 方法
Read 方法比較簡單,從終端接收請求訊息後,訊息例項通過 WebSocket 迴應接收訊息狀態,並不返回請求結果。結果通過 Write 方法返回。
func (c *Client) Read(close, renewal chan *Client) {
defer func() {
close <- c
}()
for {
_, message, err := c.Socket.ReadMessage()
if err != nil {
break
}
// ...
// message logic
}
}
Write 方法
Write 方法將請求結果返回給終端。Client 會監聽 send channel,當 channel 有資料時,通過 socket 連線將訊息傳送給終端。
func (c *Client) Write(close chan *Client) {
for {
select {
case message, ok := <-c.Send:
if !ok {
return
}
c.Socket.WriteMessage(websocket.TextMessage, message)
case <-c.Ctx.Done():
return
}
}
}
ClientManger
ClientManager 元件相當於連線池,可以管理所有的終端連線,並提供註冊、登出、續期功能。
type ClientManager struct {
sync.RWMutex
Clients map[string]*Client
Register chan *Client
Unregister chan *Client
Renewal chan *Client
}
結構體的資料型別說明如下:
-
Clients:是一個集合,用於儲存建立的 Client 物件。
-
Register:註冊的 channel。
-
把連線註冊到 Clients 中,並通過 key-value 加入 Client 集合中,key 是連線的唯一性標識 ,value 是連線本身。
-
把連線的唯一性標識和使用者的 ID 以及建立連線的 pod address 資訊,儲存到 Redis 中。
-
-
Unregister:登出的 channel。
-
從 ClientManager 元件的 Clients 集合中移除連線物件。
-
刪除 Redis 對應的快取資訊。
-
-
Renewal:續期的 channel,用於對 Redis 的鍵續期。
ClientManager 提供了一個 Start 方法,Start 方法提供監聽註冊、登出以及續期的 channel,通過監聽這些 channel 來管理建立的連線物件。當這些 channel 有資料時,執行對應的操作。
func (manager *ClientManager) Start(ctx context.Context) {
for {
select {
case conn := <-manager.Register:
manager.Lock()
manager.Clients[conn.UUID] = conn
manager.Unlock()
_, err := manager.affair.Register(ctx, &RegisterReq{
UserID: conn.UserID,
UUID: conn.UUID,
IP: manager.IP,
})
case conn := <-manager.Unregister:
_, err := manager.affair.Unregister(ctx, &UnregisterReq{
UserID: conn.UserID,
UUID: conn.UUID,
})
conn.Socket.Close()
close(conn.Send)
delete(manager.Clients, conn.UUID)
case conn := <-manager.Renewal:
//...
// Key renewal to Redis
}
}
}
訊息推送
當一個訊息服務例項生產使用者的訊息,需要推送訊息給終端時,推送步驟如下:
-
根據 UserID 從 Redis 讀取資料,得到連線唯一性標識和 pod address 地址,這些資訊是在終端第一次與訊息例項建立連線的時候寫入 Redis 的。
-
此時根據 pod address,向對應的訊息服務例項傳送請求。
-
相應的訊息服務例項接收到請求。
服務端接收請求的處理邏輯如下:
- 根據傳遞過來連線唯一性標識的引數,找到標識對應的連線。我們為 ClientManager 提供了一個 Write 方法。
此方法用到 ClientManager 元件的 Clients 集合,根據唯一性標識找到對應的 Client。再利用 Client 的 SendOut 方法,寫出資料到終端。
func (manager *ClientManager) Write(message *Message) error {
manager.RLock()
client, ok := manager.Clients[message.Recipient]
manager.RUnlock()
if !ok {
return errors.New("client miss [" + message.Recipient + "]")
}
return client.SendOut(message)
}
- 定義 Client 的 SendOut 方法。
此方法只負責:把接收到的訊息轉換為位元組陣列後,傳送 Client 的 Send Channel 中。
func (c *Client) SendOut(message *Message) error {
content, err := json.Marshal(message.Content)
if err != nil {
return err
}
c.Send <- content
return nil
}
- 傳送資料到終端。
前文 Client 元件的 Write 方法中,已說明 send channel 中有資料時,便會讀取channel 產生的資料,並通過連線物件返回資料給對應的終端。
總結
以上是 Web Socket 推送訊息給終端的主要思路:通過 Redis 把使用者的資訊以及連線的標識和 pod address 儲存起來,當某個訊息服務例項產生訊息,從 Redis 讀取資訊,通知連線著終端的訊息服務例項,再由這些服務例項通過 WebSocket 物件給終端傳送訊息。全象雲低程式碼平臺也集成了訊息的實時推送,使用者使用平臺時能及時獲取最新訊息狀態。 下期我們將為大家帶來 Knative Serving 自定義彈性伸縮,請大家持續關注。
作者
周慧婷 青雲全象雲軟體開發工程師
- Kubernetes CRI 分析 - kubelet 建立 Pod 分析
- 終於有人把 ZFS 檔案系統講明白了
- KVSSD: 結合 LSM 與 FTL 以實現寫入優化的 KV 儲存
- 雲戰略現狀調查: 歡迎來到多雲時代!
- 雲戰略現狀調查: 歡迎來到多雲時代!
- 以 Serverless 的方式實現 Kubernetes 日誌告警
- Knative Autoscaler 自定義彈性伸縮
- 科技熱點週刊|Zoom 1 億美元、Docker 收費、380 億美元 Databricks
- 科技熱點週刊|Linux 30 週年、Horizon Workroom 釋出、Humanoid Robot、元宇宙
- KubeSphere 核心架構淺析
- Go 語言實現 WebSocket 推送
- 基於 SDN 編排的雲安全服務
- 複雜應用開發測試的 ChatOps 實踐
- 基於 Formily 的表單設計器實現原理分析
- SegmentFault 基於 Kubernetes 的容器化與持續交付實踐
- 基於 Kubernetes 的雲原生 AI 平臺建設
- 雲原生|新東方在有狀態服務 In K8s 的實踐
- 線上教育平臺青椒課堂:使用 KubeSphere QKE 輕鬆實現容器多叢集管理
- 人均雲原生2.0,容器的圈子內卷嗎?
- 儲存大師班:NFS 的誕生與成長