Add mg_mqtt_pubex

This commit is contained in:
cpq 2021-06-10 19:15:50 +01:00
parent 886a29d659
commit a15d518571
6 changed files with 58 additions and 21 deletions

View File

@ -982,6 +982,15 @@ void mg_mqtt_pub(struct mg_connection *, struct mg_str *topic,
Publish message `data` to the topic `topic`.
### mg\_mqtt\_pubex()
```c
void mg_mqtt_pubex(struct mg_connection *, struct mg_str *topic,
struct mg_str *data, int qos, bool retain);
```
Like `mg_mqtt_pub()` but also allows to set QoS and retain flag.
### mg\_mqtt\_sub()

View File

@ -1921,9 +1921,9 @@ static void mqtt_login(struct mg_connection *c, const char *url,
}
}
void mg_mqtt_pub(struct mg_connection *c, struct mg_str *topic,
struct mg_str *data) {
uint8_t flags = MQTT_QOS(1);
void mg_mqtt_pubex(struct mg_connection *c, struct mg_str *topic,
struct mg_str *data, int qos, bool retain) {
uint8_t flags = (uint8_t)((qos & 3) << 1) | (retain ? 1 : 0);
uint32_t total_len = 2 + (uint32_t) topic->len + (uint32_t) data->len;
LOG(LL_DEBUG, ("%lu [%.*s] -> [%.*s]", c->id, (int) topic->len,
(char *) topic->ptr, (int) data->len, (char *) data->ptr));
@ -1939,6 +1939,11 @@ void mg_mqtt_pub(struct mg_connection *c, struct mg_str *topic,
mg_send(c, data->ptr, data->len);
}
void mg_mqtt_pub(struct mg_connection *c, struct mg_str *topic,
struct mg_str *data) {
mg_mqtt_pubex(c, topic, data, 1, false);
}
void mg_mqtt_sub(struct mg_connection *c, struct mg_str *topic) {
static uint16_t s_id;
uint8_t qos = 1;
@ -4364,6 +4369,9 @@ static void mg_ws_cb(struct mg_connection *c, int ev, void *ev_data,
char *s = (char *) c->recv.buf + msg.header_len;
struct mg_ws_message m = {{s, msg.data_len}, msg.flags};
switch (msg.flags & WEBSOCKET_FLAGS_MASK_OP) {
case WEBSOCKET_OP_CONTINUE:
mg_call(c, MG_EV_WS_CTL, &m);
break;
case WEBSOCKET_OP_PING:
LOG(LL_DEBUG, ("%s", "WS PONG"));
mg_ws_send(c, s, msg.data_len, WEBSOCKET_OP_PONG);
@ -4372,15 +4380,19 @@ static void mg_ws_cb(struct mg_connection *c, int ev, void *ev_data,
case WEBSOCKET_OP_PONG:
mg_call(c, MG_EV_WS_CTL, &m);
break;
case WEBSOCKET_OP_TEXT:
case WEBSOCKET_OP_BINARY:
mg_call(c, MG_EV_WS_MSG, &m);
break;
case WEBSOCKET_OP_CLOSE:
LOG(LL_ERROR, ("%lu Got WS CLOSE", c->id));
mg_call(c, MG_EV_WS_CTL, &m);
c->is_closing = 1;
return;
default: {
mg_call(c, MG_EV_WS_MSG, &m);
break;
}
default:
// Per RFC6455, close conn when an unknown op is recvd
mg_error(c, "unknown WS op %d", msg.flags & WEBSOCKET_FLAGS_MASK_OP);
break;
}
mg_iobuf_delete(&c->recv, msg.header_len + msg.data_len);
}

View File

@ -893,9 +893,9 @@ struct mg_mqtt_opts {
};
struct mg_mqtt_message {
struct mg_str topic; // Parsed topic
struct mg_str data; // Parsed message
struct mg_str dgram; // Whole MQTT datagram, including headers
struct mg_str topic; // Parsed topic
struct mg_str data; // Parsed message
struct mg_str dgram; // Whole MQTT datagram, including headers
uint16_t id; // Set for PUBACK, PUBREC, PUBREL, PUBCOMP, SUBACK, PUBLISH
uint8_t cmd; // MQTT command, one of MQTT_CMD_*
uint8_t qos; // Quality of service
@ -909,6 +909,8 @@ struct mg_connection *mg_mqtt_listen(struct mg_mgr *mgr, const char *url,
mg_event_handler_t fn, void *fn_data);
void mg_mqtt_pub(struct mg_connection *, struct mg_str *topic,
struct mg_str *data);
void mg_mqtt_pubex(struct mg_connection *c, struct mg_str *topic,
struct mg_str *data, int qos, bool retain);
void mg_mqtt_sub(struct mg_connection *, struct mg_str *topic);
int mg_mqtt_parse(const uint8_t *buf, size_t len, struct mg_mqtt_message *m);
void mg_mqtt_send_header(struct mg_connection *, uint8_t cmd, uint8_t flags,

View File

@ -1,8 +1,8 @@
#include "mqtt.h"
#include "arch.h"
#include "base64.h"
#include "event.h"
#include "log.h"
#include "mqtt.h"
#include "private.h"
#include "url.h"
#include "util.h"
@ -81,9 +81,9 @@ static void mqtt_login(struct mg_connection *c, const char *url,
}
}
void mg_mqtt_pub(struct mg_connection *c, struct mg_str *topic,
struct mg_str *data) {
uint8_t flags = MQTT_QOS(1);
void mg_mqtt_pubex(struct mg_connection *c, struct mg_str *topic,
struct mg_str *data, int qos, bool retain) {
uint8_t flags = (uint8_t)((qos & 3) << 1) | (retain ? 1 : 0);
uint32_t total_len = 2 + (uint32_t) topic->len + (uint32_t) data->len;
LOG(LL_DEBUG, ("%lu [%.*s] -> [%.*s]", c->id, (int) topic->len,
(char *) topic->ptr, (int) data->len, (char *) data->ptr));
@ -99,6 +99,11 @@ void mg_mqtt_pub(struct mg_connection *c, struct mg_str *topic,
mg_send(c, data->ptr, data->len);
}
void mg_mqtt_pub(struct mg_connection *c, struct mg_str *topic,
struct mg_str *data) {
mg_mqtt_pubex(c, topic, data, 1, false);
}
void mg_mqtt_sub(struct mg_connection *c, struct mg_str *topic) {
static uint16_t s_id;
uint8_t qos = 1;

View File

@ -33,9 +33,9 @@ struct mg_mqtt_opts {
};
struct mg_mqtt_message {
struct mg_str topic; // Parsed topic
struct mg_str data; // Parsed message
struct mg_str dgram; // Whole MQTT datagram, including headers
struct mg_str topic; // Parsed topic
struct mg_str data; // Parsed message
struct mg_str dgram; // Whole MQTT datagram, including headers
uint16_t id; // Set for PUBACK, PUBREC, PUBREL, PUBCOMP, SUBACK, PUBLISH
uint8_t cmd; // MQTT command, one of MQTT_CMD_*
uint8_t qos; // Quality of service
@ -49,6 +49,8 @@ struct mg_connection *mg_mqtt_listen(struct mg_mgr *mgr, const char *url,
mg_event_handler_t fn, void *fn_data);
void mg_mqtt_pub(struct mg_connection *, struct mg_str *topic,
struct mg_str *data);
void mg_mqtt_pubex(struct mg_connection *c, struct mg_str *topic,
struct mg_str *data, int qos, bool retain);
void mg_mqtt_sub(struct mg_connection *, struct mg_str *topic);
int mg_mqtt_parse(const uint8_t *buf, size_t len, struct mg_mqtt_message *m);
void mg_mqtt_send_header(struct mg_connection *, uint8_t cmd, uint8_t flags,

View File

@ -134,6 +134,9 @@ static void mg_ws_cb(struct mg_connection *c, int ev, void *ev_data,
char *s = (char *) c->recv.buf + msg.header_len;
struct mg_ws_message m = {{s, msg.data_len}, msg.flags};
switch (msg.flags & WEBSOCKET_FLAGS_MASK_OP) {
case WEBSOCKET_OP_CONTINUE:
mg_call(c, MG_EV_WS_CTL, &m);
break;
case WEBSOCKET_OP_PING:
LOG(LL_DEBUG, ("%s", "WS PONG"));
mg_ws_send(c, s, msg.data_len, WEBSOCKET_OP_PONG);
@ -142,15 +145,19 @@ static void mg_ws_cb(struct mg_connection *c, int ev, void *ev_data,
case WEBSOCKET_OP_PONG:
mg_call(c, MG_EV_WS_CTL, &m);
break;
case WEBSOCKET_OP_TEXT:
case WEBSOCKET_OP_BINARY:
mg_call(c, MG_EV_WS_MSG, &m);
break;
case WEBSOCKET_OP_CLOSE:
LOG(LL_ERROR, ("%lu Got WS CLOSE", c->id));
mg_call(c, MG_EV_WS_CTL, &m);
c->is_closing = 1;
return;
default: {
mg_call(c, MG_EV_WS_MSG, &m);
break;
}
default:
// Per RFC6455, close conn when an unknown op is recvd
mg_error(c, "unknown WS op %d", msg.flags & WEBSOCKET_FLAGS_MASK_OP);
break;
}
mg_iobuf_delete(&c->recv, msg.header_len + msg.data_len);
}