ESP32學習筆記(46)——MQTT客戶端

語言: CN / TW / HK

highlight: vs2015

攜手創作,共同成長!這是我參與「掘金日新計劃 · 8 月更文挑戰」的第15天,點選檢視活動詳情

一、MQTT簡介

1.1 實現方式

實現MQTT協議需要客戶端和伺服器端通訊完成,在通訊過程中,MQTT協議中有三種身份:釋出者(Publish)、代理(Broker)(伺服器)、訂閱者(Subscribe)。其中,訊息的釋出者和訂閱者都是客戶端,訊息代理是伺服器,訊息釋出者可以同時是訂閱者。

MQTT傳輸的訊息分為:主題(Topic)和負載(payload)兩部分:

Topic,可以理解為訊息的型別,訂閱者訂閱(Subscribe)後,就會收到該主題的訊息內容(payload);

payload,可以理解為訊息的內容,是指訂閱者具體要使用的內容。

  • MQTT伺服器的主要工作是資料分發,沒有資料儲存功能。
  • 可以訂閱自己釋出的主題,伺服器就是回發測試。
  • MQTT讓邏輯變得更清晰,需要什麼訂閱什麼。
  • 走標準化流程,解放了私有協議制定、實現、除錯、測試一整套複雜的流程。

1.2 ESP-MQTT

ESP-MQTT 是 MQTT 協議客戶端的實現(MQTT 是輕量級的釋出/訂閱訊息協議)。

  • 支援 MQTT over TCP、SSL with mbedtls、MQTT over Websocket、MQTT over Websocket Secure。
  • 使用 URI 輕鬆設定
  • 多個例項(一個應用程式中有多個客戶端)
  • 支援訂閱、釋出、身份驗證、最後遺囑訊息、保持活動 ping 和所有 3 個 QoS 級別(它應該是一個功能齊全的客戶端)。

ESP-IDF 程式設計指南——ESP-MQTT

二、API說明

以下 MQTT 客戶端介面位於 components/mqtt/esp-mqtt/include/mqtt_client.h

2.1 esp_mqtt_client_init

2.2 esp_mqtt_client_register_event

2.3 esp_mqtt_client_start

2.4 esp_mqtt_client_publish

2.5 esp_mqtt_client_subscribe

2.6 esp_mqtt_client_unsubscribe

三、MQTT客戶端

3.1 主要流程

3.2 配置MQTT引數

首先,要定義一個 MQTT 客戶端配置結構體,最小配置即填入 MQTT 伺服器的 URL 即可。 cpp esp_mqtt_client_config_t mqtt_cfg = { .uri = CONFIG_BROKER_URL, }; esp_mqtt_client_config_t 結構體如下: cpp typedef struct { mqtt_event_callback_t event_handle; /*回撥*/ const char *host; /*!< MQTT 伺服器域名(ipv4 as string)*/ const char *uri; /*!< MQTT 伺服器域名 */ uint32_t port; /*!< MQTT伺服器埠*/ const char *client_id; /*MQTT Client的名字預設是ESP32_加上MAC後3hex*/ const char *username; /*MQTT使用者名稱*/ const char *password; /*MQTT密碼*/ const char *lwt_topic; /*!< LWT主題,預設為空*/ const char *lwt_msg; /*!< LWT資訊,預設為空*/ int lwt_qos; /*!< LWT訊息質量*/ int lwt_retain; /*!< LWT保留訊息標誌*/ int lwt_msg_len; /*!< LWT訊息長度*/ int disable_clean_session; /*!< mqtt clean session,預設為真*/ int keepalive; /*MQTT心跳,預設120秒 */ bool disable_auto_reconnect; /*錯誤,斷開後重連,true不連*/ void *user_context; /*使用者資訊 */ int task_prio; /*!< MQTT任務優先順序,預設為5,可以在make menuconfig中修改*/ int task_stack; /*!< MQTT 任務堆疊大小,預設6144 bytes,可以在make menuconfig中修改*/ int buffer_size; /*!< MQTT收發快取,預設1024 */ const char *cert_pem; /*指向用於伺服器驗證(使用SSL)的PEM格式的證書資料的指標,預設值為空,不需要驗證伺服器 */ const char *client_cert_pem; /*指向用於SSL相互身份驗證的PEM格式的證書資料的指標,預設值為空,如果不需要相互身份驗證,則不需要。如果不為空,還必須提供“客戶機金鑰”。*/ const char *client_key_pem; /*指向用於SSL相互身份驗證的PEM格式的私鑰資料的指標,預設值為空,如果不需要相互身份驗證,則不需要。如果不為空,還必須提供“client-cert-pem”。*/ esp_mqtt_transport_t transport; /*覆蓋URI傳輸*/ } esp_mqtt_client_config_t;

3.3 初始化MQTT客戶端

然後通過 esp_mqtt_client_init() 獲取一個 MQTT 客戶端結構體指標,引數是 MQTT 客戶端配置結構體。 cpp esp_mqtt_client_handle_t client = esp_mqtt_client_init(&mqtt_cfg);

3.4 註冊MQTT事件

預設情況下,MQTT 客戶端使用事件迴圈庫來發布相關的 MQTT 事件(已連線,已訂閱,已釋出等)。

所以我們要註冊一個 MQTT 事件,填入 MQTT 事件處理函式 mqtt_event_handler()cpp static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data) { ESP_LOGD(TAG, "Event dispatched from event loop base=%s, event_id=%d", base, event_id); mqtt_event_handler_cb(event_data); } * 第一個引數為MQTT客戶端結構體, * 第二個是事件ID對應的事件型別, * 第三個引數即事件處理函式, * 第四個引數為事件處理函式的引數。

3.5 開啟MQTT客戶端

cpp esp_mqtt_client_start(client);

3.6 MQTT事件處理

```cpp static esp_err_t mqtt_event_handler_cb(esp_mqtt_event_handle_t event) { esp_mqtt_client_handle_t client = event->client; int msg_id; // your_context_t *context = event->context; switch (event->event_id) { case MQTT_EVENT_CONNECTED: ESP_LOGI(TAG, "MQTT_EVENT_CONNECTED"); msg_id = esp_mqtt_client_publish(client, "/topic/qos1", "data_3", 0, 1, 0); ESP_LOGI(TAG, "sent publish successful, msg_id=%d", msg_id);

        msg_id = esp_mqtt_client_subscribe(client, "/topic/qos0", 0);
        ESP_LOGI(TAG, "sent subscribe successful, msg_id=%d", msg_id);

        msg_id = esp_mqtt_client_subscribe(client, "/topic/qos1", 1);
        ESP_LOGI(TAG, "sent subscribe successful, msg_id=%d", msg_id);

        msg_id = esp_mqtt_client_unsubscribe(client, "/topic/qos1");
        ESP_LOGI(TAG, "sent unsubscribe successful, msg_id=%d", msg_id);
        break;
    case MQTT_EVENT_DISCONNECTED:
        ESP_LOGI(TAG, "MQTT_EVENT_DISCONNECTED");
        break;

    case MQTT_EVENT_SUBSCRIBED:
        ESP_LOGI(TAG, "MQTT_EVENT_SUBSCRIBED, msg_id=%d", event->msg_id);
        msg_id = esp_mqtt_client_publish(client, "/topic/qos0", "data", 0, 0, 0);
        ESP_LOGI(TAG, "sent publish successful, msg_id=%d", msg_id);
        break;
    case MQTT_EVENT_UNSUBSCRIBED:
        ESP_LOGI(TAG, "MQTT_EVENT_UNSUBSCRIBED, msg_id=%d", event->msg_id);
        break;
    case MQTT_EVENT_PUBLISHED:
        ESP_LOGI(TAG, "MQTT_EVENT_PUBLISHED, msg_id=%d", event->msg_id);
        break;
    case MQTT_EVENT_DATA:
        ESP_LOGI(TAG, "MQTT_EVENT_DATA");
        printf("TOPIC=%.*s\r\n", event->topic_len, event->topic);
        printf("DATA=%.*s\r\n", event->data_len, event->data);
        break;
    case MQTT_EVENT_ERROR:
        ESP_LOGI(TAG, "MQTT_EVENT_ERROR");
        break;
    default:
        ESP_LOGI(TAG, "Other event id:%d", event->event_id);
        break;
}
return ESP_OK;

} ```

四、示例程式碼

根據 examples\protocols\mqtt\tcp 中的例程修改 根據伺服器地址修改.host = "192.168.61.67", ```cpp /* MQTT (over TCP) Example

This example code is in the Public Domain (or CC0 licensed, at your option.)

Unless required by applicable law or agreed to in writing, this software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. */

include

include

include

include

include "esp_wifi.h"

include "esp_system.h"

include "nvs_flash.h"

include "esp_event.h"

include "esp_netif.h"

include "protocol_examples_common.h"

include "freertos/FreeRTOS.h"

include "freertos/task.h"

include "freertos/semphr.h"

include "freertos/queue.h"

include "lwip/sockets.h"

include "lwip/dns.h"

include "lwip/netdb.h"

include "esp_log.h"

include "mqtt_client.h"

static const char *TAG = "MQTT_EXAMPLE";

static esp_err_t mqtt_event_handler_cb(esp_mqtt_event_handle_t event) { // 獲取MQTT客戶端結構體指標 esp_mqtt_client_handle_t client = event->client; int msg_id; // your_context_t *context = event->context; // 通過事件ID來分別處理對應的事件 switch (event->event_id) { case MQTT_EVENT_CONNECTED: // MQTT連上事件 ESP_LOGI(TAG, "MQTT_EVENT_CONNECTED"); // MQTT Client釋出主題函式,主題是/topic/qos1,服務質量qos1,釋出的資料是data-3 msg_id = esp_mqtt_client_publish(client, "/topic/qos1", "data_3", 0, 1, 0); ESP_LOGI(TAG, "sent publish successful, msg_id=%d", msg_id); // MQTT Client訂閱主題函式,主題是/topic/qos0,服務質量qos0 msg_id = esp_mqtt_client_subscribe(client, "/topic/qos0", 0); ESP_LOGI(TAG, "sent subscribe successful, msg_id=%d", msg_id); // MQTT Client訂閱主題函式,主題是/topic/qos1,服務質量qos1 msg_id = esp_mqtt_client_subscribe(client, "/topic/qos1", 1); ESP_LOGI(TAG, "sent subscribe successful, msg_id=%d", msg_id); // MQTT Client取消訂閱主題函式 msg_id = esp_mqtt_client_unsubscribe(client, "/topic/qos1"); ESP_LOGI(TAG, "sent unsubscribe successful, msg_id=%d", msg_id); break; case MQTT_EVENT_DISCONNECTED: // MQTT斷開連線事件 ESP_LOGI(TAG, "MQTT_EVENT_DISCONNECTED"); break;

    case MQTT_EVENT_SUBSCRIBED:    // MQTT傳送訂閱成功事件
        ESP_LOGI(TAG, "MQTT_EVENT_SUBSCRIBED, msg_id=%d", event->msg_id);
        msg_id = esp_mqtt_client_publish(client, "/topic/qos0", "data", 0, 0, 0);
        ESP_LOGI(TAG, "sent publish successful, msg_id=%d", msg_id);
        break;
    case MQTT_EVENT_UNSUBSCRIBED:    // MQTT取消訂閱事件
        ESP_LOGI(TAG, "MQTT_EVENT_UNSUBSCRIBED, msg_id=%d", event->msg_id);
        break;
    case MQTT_EVENT_PUBLISHED:    // MQTT釋出成功事件
        ESP_LOGI(TAG, "MQTT_EVENT_PUBLISHED, msg_id=%d", event->msg_id);
        break;
    case MQTT_EVENT_DATA:    // MQTT接收資料事件
        ESP_LOGI(TAG, "MQTT_EVENT_DATA");
        printf("TOPIC=%.*s\r\n", event->topic_len, event->topic);
        printf("DATA=%.*s\r\n", event->data_len, event->data);
        break;
    case MQTT_EVENT_ERROR:
        ESP_LOGI(TAG, "MQTT_EVENT_ERROR");
        break;
    default:
        ESP_LOGI(TAG, "Other event id:%d", event->event_id);
        break;
}
return ESP_OK;

}

static void mqtt_event_handler(void handler_args, esp_event_base_t base, int32_t event_id, void event_data) { ESP_LOGD(TAG, "Event dispatched from event loop base=%s, event_id=%d", base, event_id); mqtt_event_handler_cb(event_data); }

static void mqtt_app_start(void) { // 1、定義一個MQTT客戶端配置結構體,輸入MQTT的url esp_mqtt_client_config_t mqtt_cfg = { .host = "192.168.61.67", // MQTT伺服器地址 .port = 1883, // MQTT伺服器埠 };

if CONFIG_BROKER_URL_FROM_STDIN

char line[128];

if (strcmp(mqtt_cfg.uri, "FROM_STDIN") == 0) {
    int count = 0;
    printf("Please enter url of mqtt broker\n");
    while (count < 128) {
        int c = fgetc(stdin);
        if (c == '\n') {
            line[count] = '\0';
            break;
        } else if (c > 0 && c < 127) {
            line[count] = c;
            ++count;
        }
        vTaskDelay(10 / portTICK_PERIOD_MS);
    }
    mqtt_cfg.uri = line;
    printf("Broker url: %s\n", line);
} else {
    ESP_LOGE(TAG, "Configuration mismatch: wrong broker url");
    abort();
}

endif / CONFIG_BROKER_URL_FROM_STDIN /

// 2、通過esp_mqtt_client_init獲取一個MQTT客戶端結構體指標,引數是MQTT客戶端配置結構體
esp_mqtt_client_handle_t client = esp_mqtt_client_init(&mqtt_cfg);
// 3、註冊MQTT事件
esp_mqtt_client_register_event(client, ESP_EVENT_ANY_ID, mqtt_event_handler, client);
// 4、開啟MQTT功能
esp_mqtt_client_start(client);

}

void app_main(void) { ESP_LOGI(TAG, "[APP] Startup.."); ESP_LOGI(TAG, "[APP] Free memory: %d bytes", esp_get_free_heap_size()); ESP_LOGI(TAG, "[APP] IDF version: %s", esp_get_idf_version());

esp_log_level_set("*", ESP_LOG_INFO);
esp_log_level_set("MQTT_CLIENT", ESP_LOG_VERBOSE);
esp_log_level_set("MQTT_EXAMPLE", ESP_LOG_VERBOSE);
esp_log_level_set("TRANSPORT_TCP", ESP_LOG_VERBOSE);
esp_log_level_set("TRANSPORT_SSL", ESP_LOG_VERBOSE);
esp_log_level_set("TRANSPORT", ESP_LOG_VERBOSE);
esp_log_level_set("OUTBOX", ESP_LOG_VERBOSE);

ESP_ERROR_CHECK(nvs_flash_init());
ESP_ERROR_CHECK(esp_netif_init());
ESP_ERROR_CHECK(esp_event_loop_create_default());

/* This helper function configures Wi-Fi or Ethernet, as selected in menuconfig.
 * Read "Establishing Wi-Fi or Ethernet Connection" section in
 * examples/protocols/README.md for more information about this function.
 */
ESP_ERROR_CHECK(example_connect());

mqtt_app_start();

} ```

五、搭建本地MQTT伺服器

EMQ官網下載:http://www.emqx.com/zh/downloads?product=broker

  • 下載EMQ X開源版
  • 解壓後進入 emqx-windows-4.3.8\emqx\bin 目錄
  • Shift+右鍵在此處開啟 Powershell 視窗,輸入命令 emqx start
  • 開啟瀏覽器,輸入 http://127.0.0.1:18083/,賬號 admin,密碼 public,進入管理介面
  • 工具 - Websocket,選擇連線
  • 訂閱主題和釋出訊息

六、執行測試

配置連線方式:

選擇WIFI連線方式,並修改要連線路由器的SSID和密碼

除錯列印:

伺服器檢視:


• 由 Leung 寫於 2021 年 9 月 8 日

• 參考:第二十一章 ESP32開發MQTT Client ESP-IDF

ESP32學習筆記(6)MQTT應用

ESP32開發之路(9)—ESP32連線到MQTT伺服器