Go 語言實現 WebSocket 推送

語言: CN / TW / HK

寫在前面

系統開發的過程中,我們經常需要實現訊息推送的需求。單端單例項的情況下很好處理(網上有許多教程這裡不做展開),但在分散式系統及多端需要推送的情況下該如何處理呢?

在分散式系統中,訊息推送服務端是多例項的。某系統中一個服務生成一條訊息,這條訊息需要實時推送到多個終端,此時該如何進行有效的 WebSocket 推送呢?首先一起看看如下場景:

假設推送訊息由訊息例項 2 產生,但是終端真正連線的訊息例項是例項 1 和例項 3,並沒有連線到生產訊息的例項 2,系統是如何將例項 2 的訊息同步推送到終端 1 和終端 2 的呢?下文將詳細描述。

file

基本原理

為了滿足需求,我們採用 Redis 做協同中介軟體,用於儲存使用者資訊、生成使用者連線的唯一性標識以及 pod address,訊息的生產者例項通過訂閱 Redis 獲取終端連線的唯一性標識和 pod address,並通知到對應的訊息例項,最終由相應連線終端的訊息例項通過 WebSocket 將訊息發推送到使用者終端。具體流程如下圖:

file

為了實現構想,我們構造了兩個元件:Client、ClientManager,實現邏輯如下。

服務端實現

Client

Client 元件的作用,是當用戶與訊息服務中某個例項建立連線後,管理這個連線的資訊,這裡通過一個 Golang 結構體來定義:

	type Client struct {
			UUID   string 
			UserID string
			Socket *websocket.Conn
			Send   chan []byte
	}

結構體中的資料型別說明如下:

  • UUID:對連線進行唯一性的標識,通過此標識可以查詢到連線資訊。

  • UserID:使用者 ID。

  • Socket:連線物件。

  • Send:訊息資料 channel。

我們為 Client 結構體實現了兩個方法:Read、Write 來處理訊息的接受和傳送。

Read 方法

Read 方法比較簡單,從終端接收請求訊息後,訊息例項通過 WebSocket 迴應接收訊息狀態,並不返回請求結果。結果通過 Write 方法返回。

	func (c *Client) Read(close, renewal chan *Client) {
			defer func() {
					close <- c
			}()
	for {
			_, message, err := c.Socket.ReadMessage()
			if err != nil {
					break
			}
		// ...
			 // message logic
	}
	}

Write 方法

Write 方法將請求結果返回給終端。Client 會監聽 send channel,當 channel 有資料時,通過 socket 連線將訊息傳送給終端。

	func (c *Client) Write(close chan *Client) {
			for {
					select {
					case message, ok := <-c.Send:
							if !ok {
									return
							}
							c.Socket.WriteMessage(websocket.TextMessage, message)
					case <-c.Ctx.Done():
				return
					}
			}
	}

ClientManger

ClientManager 元件相當於連線池,可以管理所有的終端連線,並提供註冊、登出、續期功能。

	type ClientManager struct {
			sync.RWMutex
			Clients    map[string]*Client 
			Register   chan *Client
			Unregister chan *Client
			Renewal    chan *Client
	}

結構體的資料型別說明如下:

  • Clients:是一個集合,用於儲存建立的 Client 物件。

  • Register:註冊的 channel。

    • 把連線註冊到 Clients 中,並通過 key-value 加入 Client 集合中,key 是連線的唯一性標識 ,value 是連線本身。

    • 把連線的唯一性標識和使用者的 ID 以及建立連線的 pod address 資訊,儲存到 Redis 中。

  • Unregister:登出的 channel。

    • 從 ClientManager 元件的 Clients 集合中移除連線物件。

    • 刪除 Redis 對應的快取資訊。

  • Renewal:續期的 channel,用於對 Redis 的鍵續期。

ClientManager 提供了一個 Start 方法,Start 方法提供監聽註冊、登出以及續期的 channel,通過監聽這些 channel 來管理建立的連線物件。當這些 channel 有資料時,執行對應的操作。

	func (manager *ClientManager) Start(ctx context.Context) {
			for {
					select {
					case conn := <-manager.Register:
							manager.Lock()
							manager.Clients[conn.UUID] = conn
							manager.Unlock()
							_, err := manager.affair.Register(ctx, &RegisterReq{
									UserID: conn.UserID,
									UUID:   conn.UUID,
									IP:     manager.IP,
							})
					case conn := <-manager.Unregister:
							_, err := manager.affair.Unregister(ctx, &UnregisterReq{
									UserID: conn.UserID,
									UUID:   conn.UUID,
							})
							conn.Socket.Close()
							close(conn.Send)
							delete(manager.Clients, conn.UUID)
					case conn := <-manager.Renewal:
											//...
							// Key renewal to Redis
					}
			}
	}

訊息推送

當一個訊息服務例項生產使用者的訊息,需要推送訊息給終端時,推送步驟如下:

  1. 根據 UserID 從 Redis 讀取資料,得到連線唯一性標識和 pod address 地址,這些資訊是在終端第一次與訊息例項建立連線的時候寫入 Redis 的。

  2. 此時根據 pod address,向對應的訊息服務例項傳送請求。

  3. 相應的訊息服務例項接收到請求。

服務端接收請求的處理邏輯如下:

  1. 根據傳遞過來連線唯一性標識的引數,找到標識對應的連線。我們為 ClientManager 提供了一個 Write 方法。

此方法用到 ClientManager 元件的 Clients 集合,根據唯一性標識找到對應的 Client。再利用 Client 的 SendOut 方法,寫出資料到終端。

	func (manager *ClientManager) Write(message *Message) error {
		manager.RLock()
		client, ok := manager.Clients[message.Recipient]
		manager.RUnlock()
		if !ok {
			 return errors.New("client miss [" + message.Recipient + "]")
		}
		return client.SendOut(message)
	}
  1. 定義 Client 的 SendOut 方法。

此方法只負責:把接收到的訊息轉換為位元組陣列後,傳送 Client 的 Send Channel 中。

	func (c *Client) SendOut(message *Message) error {
			content, err := json.Marshal(message.Content)
			if err != nil {
					return err
			}
			c.Send <- content
			return nil
	}
  1. 傳送資料到終端。

前文 Client 元件的 Write 方法中,已說明 send channel 中有資料時,便會讀取channel 產生的資料,並通過連線物件返回資料給對應的終端。

總結

以上是 Web Socket 推送訊息給終端的主要思路:通過 Redis 把使用者的資訊以及連線的標識和 pod address 儲存起來,當某個訊息服務例項產生訊息,從 Redis 讀取資訊,通知連線著終端的訊息服務例項,再由這些服務例項通過 WebSocket 物件給終端傳送訊息。全象雲低程式碼平臺也集成了訊息的實時推送,使用者使用平臺時能及時獲取最新訊息狀態。 下期我們將為大家帶來 Knative Serving 自定義彈性伸縮,請大家持續關注。

作者

周慧婷 青雲全象雲軟體開發工程師