我有 7種 實現web實時消息推送的方案,7種!

語言: CN / TW / HK

技術交流,公眾號:程序員小富

大家好,我是小富~

我有一個朋友~

做了一個小破站,現在要實現一個站內信web消息推送的功能,對,就是下圖這個小紅點,一個很常用的功能。

不過他還沒想好用什麼方式做,這裏我幫他整理了一下幾種方案,並簡單做了實現。

案例下載,記得Star 哦

什麼是消息推送(push)

推送的場景比較多,比如有人關注我的公眾號,這時我就會收到一條推送消息,以此來吸引我點擊打開應用。

消息推送(push)通常是指網站的運營工作等人員,通過某種工具對用户當前網頁或移動設備APP進行的主動消息推送。

消息推送一般又分為web端消息推送移動端消息推送

上邊的這種屬於移動端消息推送,web端消息推送常見的諸如站內信、未讀郵件數量、監控報警數量等,應用的也非常廣泛。

在具體實現之前,咱們再來分析一下前邊的需求,其實功能很簡單,只要觸發某個事件(主動分享了資源或者後台主動推送消息),web頁面的通知小紅點就會實時的+1就可以了。

通常在服務端會有若干張消息推送表,用來記錄用户觸發不同事件所推送不同類型的消息,前端主動查詢(拉)或者被動接收(推)用户所有未讀的消息數。

消息推送無非是推(push)和拉(pull)兩種形式,下邊我們逐個瞭解下。

短輪詢

輪詢(polling)應該是實現消息推送方案中最簡單的一種,這裏我們暫且將輪詢分為短輪詢長輪詢

短輪詢很好理解,指定的時間間隔,由瀏覽器向服務器發出HTTP請求,服務器實時返回未讀消息數據給客户端,瀏覽器再做渲染顯示。

一個簡單的JS定時器就可以搞定,每秒鐘請求一次未讀消息數接口,返回的數據展示即可。

setInterval(() => { // 方法請求 messageCount().then((res) => { if (res.code === 200) { this.messageCount = res.data } }) }, 1000);

效果還是可以的,短輪詢實現固然簡單,缺點也是顯而易見,由於推送數據並不會頻繁變更,無論後端此時是否有新的消息產生,客户端都會進行請求,勢必會對服務端造成很大壓力,浪費帶寬和服務器資源。

長輪詢

長輪詢是對上邊短輪詢的一種改進版本,在儘可能減少對服務器資源浪費的同時,保證消息的相對實時性。長輪詢在中間件中應用的很廣泛,比如Nacosapollo配置中心,消息隊列kafkaRocketMQ中都有用到長輪詢。

Nacos配置中心交互模型是push還是pull?一文中我詳細介紹過Nacos長輪詢的實現原理,感興趣的小夥伴可以瞅瞅。

這次我使用apollo配置中心實現長輪詢的方式,應用了一個類DeferredResult,它是在servelet3.0後經過Spring封裝提供的一種異步請求機制,直意就是延遲結果。

DeferredResult可以允許容器線程快速釋放佔用的資源,不阻塞請求線程,以此接受更多的請求提升系統的吞吐量,然後啟動異步工作線程處理真正的業務邏輯,處理完成調用DeferredResult.setResult(200)提交響應結果。

下邊我們用長輪詢來實現消息推送。

因為一個ID可能會被多個長輪詢請求監聽,所以我採用了guava包提供的Multimap結構存放長輪詢,一個key可以對應多個value。一旦監聽到key發生變化,對應的所有長輪詢都會響應。前端得到非請求超時的狀態碼,知曉數據變更,主動查詢未讀消息數接口,更新頁面數據。

```java @Controller @RequestMapping("/polling") public class PollingController {

// 存放監聽某個Id的長輪詢集合
// 線程同步結構
public static Multimap<String, DeferredResult<String>> watchRequests = Multimaps.synchronizedMultimap(HashMultimap.create());

/**
 * 公眾號:程序員小富
 * 設置監聽
 */
@GetMapping(path = "watch/{id}")
@ResponseBody
public DeferredResult<String> watch(@PathVariable String id) {
    // 延遲對象設置超時時間
    DeferredResult<String> deferredResult = new DeferredResult<>(TIME_OUT);
    // 異步請求完成時移除 key,防止內存溢出
    deferredResult.onCompletion(() -> {
        watchRequests.remove(id, deferredResult);
    });
    // 註冊長輪詢請求
    watchRequests.put(id, deferredResult);
    return deferredResult;
}

/**
 * 公眾號:程序員小富
 * 變更數據
 */
@GetMapping(path = "publish/{id}")
@ResponseBody
public String publish(@PathVariable String id) {
    // 數據變更 取出監聽ID的所有長輪詢請求,並一一響應處理
    if (watchRequests.containsKey(id)) {
        Collection<DeferredResult<String>> deferredResults = watchRequests.get(id);
        for (DeferredResult<String> deferredResult : deferredResults) {
            deferredResult.setResult("我更新了" + new Date());
        }
    }
    return "success";
}

```

當請求超過設置的超時時間,會拋出AsyncRequestTimeoutException異常,這裏直接用@ControllerAdvice全局捕獲統一返回即可,前端獲取約定好的狀態碼後再次發起長輪詢請求,如此往復調用。

``` @ControllerAdvice public class AsyncRequestTimeoutHandler {

@ResponseStatus(HttpStatus.NOT_MODIFIED)
@ResponseBody
@ExceptionHandler(AsyncRequestTimeoutException.class)
public String asyncRequestTimeoutHandler(AsyncRequestTimeoutException e) {
    System.out.println("異步請求超時");
    return "304";
}

} ```

我們來測試一下,首先頁面發起長輪詢請求/polling/watch/10086監聽消息更變,請求被掛起,不變更數據直至超時,再次發起了長輪詢請求;緊接着手動變更數據/polling/publish/10086,長輪詢得到響應,前端處理業務邏輯完成後再次發起請求,如此循環往復。

長輪詢相比於短輪詢在性能上提升了很多,但依然會產生較多的請求,這是它的一點不完美的地方。

iframe流

iframe流就是在頁面中插入一個隱藏的<iframe>標籤,通過在src中請求消息數量API接口,由此在服務端和客户端之間創建一條長連接,服務端持續向iframe傳輸數據。

傳輸的數據通常是HTML、或是內嵌的javascript腳本,來達到實時更新頁面的效果。

這種方式實現簡單,前端只要一個<iframe>標籤搞定了

```

```

服務端直接組裝html、js腳本數據向response寫入就行了

@Controller @RequestMapping("/iframe") public class IframeController { @GetMapping(path = "message") public void message(HttpServletResponse response) throws IOException, InterruptedException { while (true) { response.setHeader("Pragma", "no-cache"); response.setDateHeader("Expires", 0); response.setHeader("Cache-Control", "no-cache,no-store"); response.setStatus(HttpServletResponse.SC_OK); response.getWriter().print(" <script type=\"text/javascript\">\n" + "parent.document.getElementById('clock').innerHTML = \"" + count.get() + "\";" + "parent.document.getElementById('count').innerHTML = \"" + count.get() + "\";" + "</script>"); } } }

但我個人不推薦,因為它在瀏覽器上會顯示請求未加載完,圖標會不停旋轉,簡直是強迫症殺手。

SSE (我的方式)

很多人可能不知道,服務端向客户端推送消息,其實除了可以用WebSocket這種耳熟能詳的機制外,還有一種服務器發送事件(Server-sent events),簡稱SSE

SSE它是基於HTTP協議的,我們知道一般意義上的HTTP協議是無法做到服務端主動向客户端推送消息的,但SSE是個例外,它變換了一種思路。

SSE在服務器和客户端之間打開一個單向通道,服務端響應的不再是一次性的數據包而是text/event-stream類型的數據流信息,在有數據變更時從服務器流式傳輸到客户端。

整體的實現思路有點類似於在線視頻播放,視頻流會連續不斷的推送到瀏覽器,你也可以理解成,客户端在完成一次用時很長(網絡不暢)的下載。

SSEWebSocket作用相似,都可以建立服務端與瀏覽器之間的通信,實現服務端向客户端推送消息,但還是有些許不同:

  • SSE 是基於HTTP協議的,它們不需要特殊的協議或服務器實現即可工作;WebSocket需單獨服務器來處理協議。
  • SSE 單向通信,只能由服務端向客户端單向通信;webSocket全雙工通信,即通信的雙方可以同時發送和接受信息。
  • SSE 實現簡單開發成本低,無需引入其他組件;WebSocket傳輸數據需做二次解析,開發門檻高一些。
  • SSE 默認支持斷線重連;WebSocket則需要自己實現。
  • SSE 只能傳送文本消息,二進制數據需要經過編碼後傳送;WebSocket默認支持傳送二進制數據。

SSE 與 WebSocket 該如何選擇?

技術並沒有好壞之分,只有哪個更合適

SSE好像一直不被大家所熟知,一部分原因是出現了WebSockets,這個提供了更豐富的協議來執行雙向、全雙工通信。對於遊戲、即時通信以及需要雙向近乎實時更新的場景,擁有雙向通道更具吸引力。

但是,在某些情況下,不需要從客户端發送數據。而你只需要一些服務器操作的更新。比如:站內信、未讀消息數、狀態更新、股票行情、監控數量等場景,SEE不管是從實現的難易和成本上都更加有優勢。此外,SSE 具有WebSockets在設計上缺乏的多種功能,例如:自動重新連接事件ID發送任意事件的能力。

前端只需進行一次HTTP請求,帶上唯一ID,打開事件流,監聽服務端推送的事件就可以了

```javascript

```

服務端的實現更簡單,創建一個SseEmitter對象放入sseEmitterMap進行管理

```java private static Map sseEmitterMap = new ConcurrentHashMap<>();

/* * 創建連接 * * @date: 2022/7/12 14:51 * @auther: 公眾號:程序員小富 / public static SseEmitter connect(String userId) { try { // 設置超時時間,0表示不過期。默認30秒 SseEmitter sseEmitter = new SseEmitter(0L); // 註冊回調 sseEmitter.onCompletion(completionCallBack(userId)); sseEmitter.onError(errorCallBack(userId)); sseEmitter.onTimeout(timeoutCallBack(userId)); sseEmitterMap.put(userId, sseEmitter); count.getAndIncrement(); return sseEmitter; } catch (Exception e) { log.info("創建新的sse連接異常,當前用户:{}", userId); } return null; }

/* * 給指定用户發送消息 * * @date: 2022/7/12 14:51 * @auther: 公眾號:程序員小富 / public static void sendMessage(String userId, String message) {

if (sseEmitterMap.containsKey(userId)) {
    try {
        sseEmitterMap.get(userId).send(message);
    } catch (IOException e) {
        log.error("用户[{}]推送異常:{}", userId, e.getMessage());
        removeUser(userId);
    }
}

} ```

我們模擬服務端推送消息,看下客户端收到了消息,和我們預期的效果一致。

注意: SSE不支持IE瀏覽器,對其他主流瀏覽器兼容性做的還不錯。

MQTT

什麼是 MQTT協議?

MQTT 全稱(Message Queue Telemetry Transport):一種基於發佈/訂閲(publish/subscribe)模式的輕量級通訊協議,通過訂閲相應的主題來獲取消息,是物聯網(Internet of Thing)中的一個標準傳輸協議。

該協議將消息的發佈者(publisher)與訂閲者(subscriber)進行分離,因此可以在不可靠的網絡環境中,為遠程連接的設備提供可靠的消息服務,使用方式與傳統的MQ有點類似。

TCP協議位於傳輸層,MQTT 協議位於應用層,MQTT 協議構建於TCP/IP協議上,也就是説只要支持TCP/IP協議棧的地方,都可以使用MQTT協議。

為什麼要用 MQTT協議?

MQTT協議為什麼在物聯網(IOT)中如此受偏愛?而不是其它協議,比如我們更為熟悉的 HTTP協議呢?

  • 首先HTTP協議它是一種同步協議,客户端請求後需要等待服務器的響應。而在物聯網(IOT)環境中,設備會很受制於環境的影響,比如帶寬低、網絡延遲高、網絡通信不穩定等,顯然異步消息協議更為適合IOT應用程序。

  • HTTP是單向的,如果要獲取消息客户端必須發起連接,而在物聯網(IOT)應用程序中,設備或傳感器往往都是客户端,這意味着它們無法被動地接收來自網絡的命令。

  • 通常需要將一條命令或者消息,發送到網絡上的所有設備上。HTTP要實現這樣的功能不但很困難,而且成本極高。

具體的MQTT協議介紹和實踐,這裏我就不再贅述了,大家可以參考我之前的兩篇文章,裏邊寫的也都很詳細了。

MQTT協議的介紹

我也沒想到 springboot + rabbitmq 做智能家居,會這麼簡單

MQTT實現消息推送

未讀消息(小紅點),前端 與 RabbitMQ 實時消息推送實踐,賊簡單~

Websocket

websocket應該是大家都比較熟悉的一種實現消息推送的方式,上邊我們在講SSE的時候也和websocket進行過比較。

WebSocket是一種在TCP連接上進行全雙工通信的協議,建立客户端和服務器之間的通信渠道。瀏覽器和服務器僅需一次握手,兩者之間就直接可以創建持久性的連接,並進行雙向數據傳輸。

圖片源於網絡

springboot整合websocket,先引入websocket相關的工具包,和SSE相比額外的開發成本。

```java

org.springframework.boot spring-boot-starter-websocket ```

服務端使用@ServerEndpoint註解標註當前類為一個websocket服務器,客户端可以通過ws://localhost:7777/webSocket/10086來連接到WebSocket服務器端。

java @Component @Slf4j @ServerEndpoint("/websocket/{userId}") public class WebSocketServer { //與某個客户端的連接會話,需要通過它來給客户端發送數據 private Session session; private static final CopyOnWriteArraySet<WebSocketServer> webSockets = new CopyOnWriteArraySet<>(); // 用來存在線連接數 private static final Map<String, Session> sessionPool = new HashMap<String, Session>(); /** * 公眾號:程序員小富 * 鏈接成功調用的方法 */ @OnOpen public void onOpen(Session session, @PathParam(value = "userId") String userId) { try { this.session = session; webSockets.add(this); sessionPool.put(userId, session); log.info("websocket消息: 有新的連接,總數為:" + webSockets.size()); } catch (Exception e) { } } /** * 公眾號:程序員小富 * 收到客户端消息後調用的方法 */ @OnMessage public void onMessage(String message) { log.info("websocket消息: 收到客户端消息:" + message); } /** * 公眾號:程序員小富 * 此為單點消息 */ public void sendOneMessage(String userId, String message) { Session session = sessionPool.get(userId); if (session != null && session.isOpen()) { try { log.info("websocket消: 單點消息:" + message); session.getAsyncRemote().sendText(message); } catch (Exception e) { e.printStackTrace(); } } } }

前端初始化打開WebSocket連接,並監聽連接狀態,接收服務端數據或向服務端發送數據。

```js

```

頁面初始化建立websocket連接,之後就可以進行雙向通信了,效果還不錯

自定義推送

上邊我們給我出了6種方案的原理和代碼實現,但在實際業務開發過程中,不能盲目的直接拿過來用,還是要結合自身系統業務的特點和實際場景來選擇合適的方案。

推送最直接的方式就是使用第三推送平台,畢竟錢能解決的需求都不是問題,無需複雜的開發運維,直接可以使用,省時、省力、省心,像goEasy、極光推送都是很不錯的三方服務商。

一般大型公司都有自研的消息推送平台,像我們本次實現的web站內信只是平台上的一個觸點而已,短信、郵件、微信公眾號、小程序凡是可以觸達到用户的渠道都可以接入進來。

圖片來源於網絡

消息推送系統內部是相當複雜的,諸如消息內容的維護審核、圈定推送人羣、觸達過濾攔截(推送的規則頻次、時段、數量、黑白名單、關鍵詞等等)、推送失敗補償非常多的模塊,技術上涉及到大數據量、高併發的場景也很多。所以我們今天的實現方式在這個龐大的系統面前只是小打小鬧。

Github地址

文中所提到的案例我都一一的做了實現,整理放在了Github上,覺得有用就 Star 一下吧!

傳送門:http://github.com/chengxy-nds/Springboot-Notebook/tree/master/springboot-realtime-data