Introduce c->is_mqtt5 for keeping mqtt version. Make mqtt over ws work again

This commit is contained in:
Sergey Lyubka 2022-07-04 17:47:17 +01:00
parent 6c796b8552
commit e4a5296fbf
5 changed files with 28 additions and 26 deletions

View File

@ -33,10 +33,11 @@ static void fn(struct mg_connection *c, int ev, void *ev_data, void *fn_data) {
size_t len = c->send.len;
mg_mqtt_login(c, &opts);
mg_ws_wrap(c, c->send.len - len, WEBSOCKET_OP_BINARY);
c->is_hexdumping = 1;
} else if (ev == MG_EV_WS_MSG) {
struct mg_mqtt_message mm;
struct mg_ws_message *wm = (struct mg_ws_message *) ev_data;
uint8_t version = (uint8_t) (size_t) c->pfn_data;
uint8_t version = c->is_mqtt5 ? 5 : 4;
MG_INFO(("GOT %d bytes WS msg", (int) wm->data.len));
while ((mg_mqtt_parse((uint8_t *) wm->data.ptr, wm->data.len, version,
&mm)) == 0) {
@ -82,6 +83,7 @@ int main(void) {
struct mg_mgr mgr; // Event manager
bool done = false; // Event handler flips it to true when done
mg_mgr_init(&mgr); // Initialise event manager
mg_log_set("4"); // Set debug log level
mg_ws_connect(&mgr, s_url, fn, &done, NULL); // Create client connection
while (done == false) mg_mgr_poll(&mgr, 1000); // Event loop
mg_mgr_free(&mgr); // Finished, cleanup

View File

@ -3035,7 +3035,7 @@ void mg_mqtt_login(struct mg_connection *c, const struct mg_mqtt_opts *opts) {
}
if (hdr[6] == 0) hdr[6] = 4; // If version is not set, use 4 (3.1.1)
c->pfn_data = (void *) (size_t) hdr[6]; // Store version
c->is_mqtt5 = hdr[6] == 5; // Set version 5 flag
hdr[7] = (uint8_t) ((opts->will_qos & 3) << 3); // Connection flags
if (opts->user.len > 0) {
total_len += 2 + (uint32_t) opts->user.len;
@ -3053,19 +3053,19 @@ void mg_mqtt_login(struct mg_connection *c, const struct mg_mqtt_opts *opts) {
if (opts->clean || cid.len == 0) hdr[7] |= MQTT_CLEAN_SESSION;
if (opts->will_retain) hdr[7] |= MQTT_WILL_RETAIN;
total_len += (uint32_t) cid.len;
if (opts->version == 5) total_len += 1U + (hdr[7] & MQTT_HAS_WILL ? 1U : 0);
if (c->is_mqtt5) total_len += 1U + (hdr[7] & MQTT_HAS_WILL ? 1U : 0);
mg_mqtt_send_header(c, MQTT_CMD_CONNECT, 0, total_len);
mg_send(c, hdr, sizeof(hdr));
// keepalive == 0 means "do not disconnect us!"
mg_send_u16(c, mg_htons((uint16_t) opts->keepalive));
if (opts->version == 5) mg_send(c, &zero, sizeof(zero)); // V5 properties
if (c->is_mqtt5) mg_send(c, &zero, sizeof(zero)); // V5 properties
mg_send_u16(c, mg_htons((uint16_t) cid.len));
mg_send(c, cid.ptr, cid.len);
if (hdr[7] & MQTT_HAS_WILL) {
if (opts->version == 5) mg_send(c, &zero, sizeof(zero)); // will props
if (c->is_mqtt5) mg_send(c, &zero, sizeof(zero)); // will props
mg_send_u16(c, mg_htons((uint16_t) opts->will_topic.len));
mg_send(c, opts->will_topic.ptr, opts->will_topic.len);
mg_send_u16(c, mg_htons((uint16_t) opts->will_message.len));
@ -3083,13 +3083,12 @@ void mg_mqtt_login(struct mg_connection *c, const struct mg_mqtt_opts *opts) {
void mg_mqtt_pub(struct mg_connection *c, struct mg_str topic,
struct mg_str data, int qos, bool retain) {
uint8_t flags = (uint8_t) (((qos & 3) << 1) | (retain ? 1 : 0));
uint8_t version = (uint8_t) (size_t) c->pfn_data, zero = 0;
uint8_t flags = (uint8_t) (((qos & 3) << 1) | (retain ? 1 : 0)), zero = 0;
uint32_t len = 2 + (uint32_t) topic.len + (uint32_t) data.len;
MG_DEBUG(("%lu [%.*s] -> [%.*s]", c->id, (int) topic.len, (char *) topic.ptr,
(int) data.len, (char *) data.ptr));
if (qos > 0) len += 2;
if (version == 5) len++;
if (c->is_mqtt5) len++;
mg_mqtt_send_header(c, MQTT_CMD_PUBLISH, flags, len);
mg_send_u16(c, mg_htons((uint16_t) topic.len));
mg_send(c, topic.ptr, topic.len);
@ -3097,17 +3096,17 @@ void mg_mqtt_pub(struct mg_connection *c, struct mg_str topic,
if (++c->mgr->mqtt_id == 0) ++c->mgr->mqtt_id;
mg_send_u16(c, mg_htons(c->mgr->mqtt_id));
}
if (version == 5) mg_send(c, &zero, sizeof(zero));
if (c->is_mqtt5) mg_send(c, &zero, sizeof(zero));
mg_send(c, data.ptr, data.len);
}
void mg_mqtt_sub(struct mg_connection *c, struct mg_str topic, int qos) {
uint8_t qos_ = qos & 3, version = (uint8_t) (size_t) c->pfn_data, zero = 0;
uint32_t len = 2 + (uint32_t) topic.len + 2 + 1 + (version == 5 ? 1 : 0);
uint8_t qos_ = qos & 3, zero = 0;
uint32_t len = 2 + (uint32_t) topic.len + 2 + 1 + (c->is_mqtt5 ? 1 : 0);
mg_mqtt_send_header(c, MQTT_CMD_SUBSCRIBE, 2, len);
if (++c->mgr->mqtt_id == 0) ++c->mgr->mqtt_id;
mg_send_u16(c, mg_htons(c->mgr->mqtt_id));
if (version == 5) mg_send(c, &zero, sizeof(zero));
if (c->is_mqtt5) mg_send(c, &zero, sizeof(zero));
mg_send_u16(c, mg_htons((uint16_t) topic.len));
mg_send(c, topic.ptr, topic.len);
mg_send(c, &qos_, sizeof(qos_));
@ -3208,7 +3207,7 @@ static void mqtt_cb(struct mg_connection *c, int ev, void *ev_data,
void *fn_data) {
if (ev == MG_EV_READ) {
for (;;) {
uint8_t version = (uint8_t) (size_t) c->pfn_data;
uint8_t version = c->is_mqtt5 ? 5 : 4;
struct mg_mqtt_message mm;
int rc = mg_mqtt_parse(c->recv.buf, c->recv.len, version, &mm);
if (rc == MQTT_MALFORMED) {

View File

@ -1036,6 +1036,7 @@ struct mg_connection {
unsigned is_tls_hs : 1; // TLS handshake is in progress
unsigned is_udp : 1; // UDP connection
unsigned is_websocket : 1; // WebSocket connection
unsigned is_mqtt5 : 1; // For MQTT connection, v5 indicator
unsigned is_hexdumping : 1; // Hexdump in/out traffic
unsigned is_draining : 1; // Send remaining data, then close and free
unsigned is_closing : 1; // Close and free the connection immediately

View File

@ -43,7 +43,7 @@ void mg_mqtt_login(struct mg_connection *c, const struct mg_mqtt_opts *opts) {
}
if (hdr[6] == 0) hdr[6] = 4; // If version is not set, use 4 (3.1.1)
c->pfn_data = (void *) (size_t) hdr[6]; // Store version
c->is_mqtt5 = hdr[6] == 5; // Set version 5 flag
hdr[7] = (uint8_t) ((opts->will_qos & 3) << 3); // Connection flags
if (opts->user.len > 0) {
total_len += 2 + (uint32_t) opts->user.len;
@ -61,19 +61,19 @@ void mg_mqtt_login(struct mg_connection *c, const struct mg_mqtt_opts *opts) {
if (opts->clean || cid.len == 0) hdr[7] |= MQTT_CLEAN_SESSION;
if (opts->will_retain) hdr[7] |= MQTT_WILL_RETAIN;
total_len += (uint32_t) cid.len;
if (opts->version == 5) total_len += 1U + (hdr[7] & MQTT_HAS_WILL ? 1U : 0);
if (c->is_mqtt5) total_len += 1U + (hdr[7] & MQTT_HAS_WILL ? 1U : 0);
mg_mqtt_send_header(c, MQTT_CMD_CONNECT, 0, total_len);
mg_send(c, hdr, sizeof(hdr));
// keepalive == 0 means "do not disconnect us!"
mg_send_u16(c, mg_htons((uint16_t) opts->keepalive));
if (opts->version == 5) mg_send(c, &zero, sizeof(zero)); // V5 properties
if (c->is_mqtt5) mg_send(c, &zero, sizeof(zero)); // V5 properties
mg_send_u16(c, mg_htons((uint16_t) cid.len));
mg_send(c, cid.ptr, cid.len);
if (hdr[7] & MQTT_HAS_WILL) {
if (opts->version == 5) mg_send(c, &zero, sizeof(zero)); // will props
if (c->is_mqtt5) mg_send(c, &zero, sizeof(zero)); // will props
mg_send_u16(c, mg_htons((uint16_t) opts->will_topic.len));
mg_send(c, opts->will_topic.ptr, opts->will_topic.len);
mg_send_u16(c, mg_htons((uint16_t) opts->will_message.len));
@ -91,13 +91,12 @@ void mg_mqtt_login(struct mg_connection *c, const struct mg_mqtt_opts *opts) {
void mg_mqtt_pub(struct mg_connection *c, struct mg_str topic,
struct mg_str data, int qos, bool retain) {
uint8_t flags = (uint8_t) (((qos & 3) << 1) | (retain ? 1 : 0));
uint8_t version = (uint8_t) (size_t) c->pfn_data, zero = 0;
uint8_t flags = (uint8_t) (((qos & 3) << 1) | (retain ? 1 : 0)), zero = 0;
uint32_t len = 2 + (uint32_t) topic.len + (uint32_t) data.len;
MG_DEBUG(("%lu [%.*s] -> [%.*s]", c->id, (int) topic.len, (char *) topic.ptr,
(int) data.len, (char *) data.ptr));
if (qos > 0) len += 2;
if (version == 5) len++;
if (c->is_mqtt5) len++;
mg_mqtt_send_header(c, MQTT_CMD_PUBLISH, flags, len);
mg_send_u16(c, mg_htons((uint16_t) topic.len));
mg_send(c, topic.ptr, topic.len);
@ -105,17 +104,17 @@ void mg_mqtt_pub(struct mg_connection *c, struct mg_str topic,
if (++c->mgr->mqtt_id == 0) ++c->mgr->mqtt_id;
mg_send_u16(c, mg_htons(c->mgr->mqtt_id));
}
if (version == 5) mg_send(c, &zero, sizeof(zero));
if (c->is_mqtt5) mg_send(c, &zero, sizeof(zero));
mg_send(c, data.ptr, data.len);
}
void mg_mqtt_sub(struct mg_connection *c, struct mg_str topic, int qos) {
uint8_t qos_ = qos & 3, version = (uint8_t) (size_t) c->pfn_data, zero = 0;
uint32_t len = 2 + (uint32_t) topic.len + 2 + 1 + (version == 5 ? 1 : 0);
uint8_t qos_ = qos & 3, zero = 0;
uint32_t len = 2 + (uint32_t) topic.len + 2 + 1 + (c->is_mqtt5 ? 1 : 0);
mg_mqtt_send_header(c, MQTT_CMD_SUBSCRIBE, 2, len);
if (++c->mgr->mqtt_id == 0) ++c->mgr->mqtt_id;
mg_send_u16(c, mg_htons(c->mgr->mqtt_id));
if (version == 5) mg_send(c, &zero, sizeof(zero));
if (c->is_mqtt5) mg_send(c, &zero, sizeof(zero));
mg_send_u16(c, mg_htons((uint16_t) topic.len));
mg_send(c, topic.ptr, topic.len);
mg_send(c, &qos_, sizeof(qos_));
@ -216,7 +215,7 @@ static void mqtt_cb(struct mg_connection *c, int ev, void *ev_data,
void *fn_data) {
if (ev == MG_EV_READ) {
for (;;) {
uint8_t version = (uint8_t) (size_t) c->pfn_data;
uint8_t version = c->is_mqtt5 ? 5 : 4;
struct mg_mqtt_message mm;
int rc = mg_mqtt_parse(c->recv.buf, c->recv.len, version, &mm);
if (rc == MQTT_MALFORMED) {

View File

@ -60,6 +60,7 @@ struct mg_connection {
unsigned is_tls_hs : 1; // TLS handshake is in progress
unsigned is_udp : 1; // UDP connection
unsigned is_websocket : 1; // WebSocket connection
unsigned is_mqtt5 : 1; // For MQTT connection, v5 indicator
unsigned is_hexdumping : 1; // Hexdump in/out traffic
unsigned is_draining : 1; // Send remaining data, then close and free
unsigned is_closing : 1; // Close and free the connection immediately