Fix #1394 - mg_mqtt_sub(). Add MG_EV_OPEN, too.

This commit is contained in:
Sergey Lyubka 2021-11-01 16:20:00 +00:00
parent 2016586475
commit 423aaa492c
7 changed files with 64 additions and 47 deletions

View File

@ -17,7 +17,9 @@ static const char *s_topic = "mg/mq-clnt-test";
static int s_qos = 1;
static void fn(struct mg_connection *c, int ev, void *ev_data, void *fn_data) {
if (ev == MG_EV_ERROR) {
if (ev == MG_EV_OPEN) {
// c->is_hexdumping = 1;
} else if (ev == MG_EV_ERROR) {
// On error, log error message
LOG(LL_ERROR, ("%p %s", c->fd, (char *) ev_data));
} else if (ev == MG_EV_CONNECT) {
@ -30,9 +32,9 @@ static void fn(struct mg_connection *c, int ev, void *ev_data, void *fn_data) {
// MQTT connect is successful
struct mg_str topic = mg_str(s_topic), data = mg_str("hello");
LOG(LL_INFO, ("CONNECTED to %s", s_url));
mg_mqtt_sub(c, &topic, 1);
mg_mqtt_sub(c, &topic, s_qos);
LOG(LL_INFO, ("SUBSCRIBED to %.*s", (int) topic.len, topic.ptr));
mg_mqtt_pub(c, &topic, &data, 1, false);
mg_mqtt_pub(c, &topic, &data, s_qos, false);
LOG(LL_INFO, ("PUBSLISHED %.*s -> %.*s", (int) data.len, data.ptr,
(int) topic.len, topic.ptr));
} else if (ev == MG_EV_MQTT_MSG) {
@ -43,6 +45,7 @@ static void fn(struct mg_connection *c, int ev, void *ev_data, void *fn_data) {
}
if (ev == MG_EV_ERROR || ev == MG_EV_CLOSE || ev == MG_EV_MQTT_MSG) {
LOG(LL_INFO, ("Got event %d, stopping...", ev));
*(bool *) fn_data = true; // Signal that we're done
}
}

View File

@ -2094,8 +2094,6 @@ void mg_md5_final(mg_md5_ctx *ctx, unsigned char digest[16]) {
#define MQTT_WILL_RETAIN 0x20
#define MQTT_HAS_PASSWORD 0x40
#define MQTT_HAS_USER_NAME 0x80
#define MQTT_GET_WILL_QOS(flags) (((flags) &0x18) >> 3)
#define MQTT_SET_WILL_QOS(flags, qos) (flags) = ((flags) & ~0x18) | ((qos) << 3)
enum { MQTT_OK, MQTT_INCOMPLETE, MQTT_MALFORMED };
@ -2117,34 +2115,43 @@ static void mg_send_u16(struct mg_connection *c, uint16_t value) {
}
void mg_mqtt_login(struct mg_connection *c, struct mg_mqtt_opts *opts) {
char rnd[9], client_id[16];
struct mg_str cid = opts->client_id;
uint32_t total_len = 7 + 1 + 2 + 2;
uint16_t flags = (uint16_t) (((uint16_t) opts->qos & 3) << 3);
uint8_t connflag = (uint8_t) ((opts->qos & 3) << 1);
if (cid.len == 0) {
mg_random(rnd, sizeof(rnd));
mg_base64_encode((unsigned char *) rnd, sizeof(rnd), client_id);
client_id[sizeof(client_id) - 1] = '\0';
cid = mg_str(client_id);
}
if (opts->user.len > 0) {
total_len += 2 + (uint32_t) opts->user.len;
flags |= MQTT_HAS_USER_NAME;
connflag |= MQTT_HAS_USER_NAME;
}
if (opts->pass.len > 0) {
total_len += 2 + (uint32_t) opts->pass.len;
flags |= MQTT_HAS_PASSWORD;
connflag |= MQTT_HAS_PASSWORD;
}
if (opts->will_topic.len > 0 && opts->will_message.len > 0) {
total_len +=
4 + (uint32_t) opts->will_topic.len + (uint32_t) opts->will_message.len;
flags |= MQTT_HAS_WILL;
connflag |= MQTT_HAS_WILL;
}
if (opts->clean || opts->client_id.len == 0) flags |= MQTT_CLEAN_SESSION;
if (opts->will_retain) flags |= MQTT_WILL_RETAIN;
total_len += (uint32_t) opts->client_id.len;
if (opts->clean || cid.len == 0) connflag |= MQTT_CLEAN_SESSION;
if (opts->will_retain) connflag |= MQTT_WILL_RETAIN;
total_len += (uint32_t) cid.len;
mg_mqtt_send_header(c, MQTT_CMD_CONNECT, 0, total_len);
mg_send(c, "\00\04MQTT\04", 7);
mg_send(c, &flags, 1);
mg_send(c, &connflag, sizeof(connflag));
// keepalive == 0 means "do not disconnect us!"
mg_send_u16(c, mg_htons((uint16_t) opts->keepalive));
mg_send_u16(c, mg_htons((uint16_t) opts->client_id.len));
mg_send(c, opts->client_id.ptr, opts->client_id.len);
if (flags & MQTT_HAS_WILL) {
mg_send_u16(c, mg_htons((uint16_t) cid.len));
mg_send(c, cid.ptr, cid.len);
if (connflag & MQTT_HAS_WILL) {
mg_send_u16(c, mg_htons((uint16_t) opts->will_topic.len));
mg_send(c, opts->will_topic.ptr, opts->will_topic.len);
mg_send_u16(c, mg_htons((uint16_t) opts->will_message.len));
@ -2166,11 +2173,11 @@ void mg_mqtt_pub(struct mg_connection *c, struct mg_str *topic,
uint32_t total_len = 2 + (uint32_t) topic->len + (uint32_t) data->len;
LOG(LL_DEBUG, ("%lu [%.*s] -> [%.*s]", c->id, (int) topic->len,
(char *) topic->ptr, (int) data->len, (char *) data->ptr));
if (MQTT_GET_QOS(flags) > 0) total_len += 2;
if (qos > 0) total_len += 2;
mg_mqtt_send_header(c, MQTT_CMD_PUBLISH, flags, total_len);
mg_send_u16(c, mg_htons((uint16_t) topic->len));
mg_send(c, topic->ptr, topic->len);
if (MQTT_GET_QOS(flags) > 0) {
if (qos > 0) {
static uint16_t s_id;
if (++s_id == 0) s_id++;
mg_send_u16(c, mg_htons(s_id));
@ -2182,8 +2189,7 @@ void mg_mqtt_sub(struct mg_connection *c, struct mg_str *topic, int qos) {
static uint16_t s_id;
uint8_t qos_ = qos & 3;
uint32_t total_len = 2 + (uint32_t) topic->len + 2 + 1;
mg_mqtt_send_header(c, MQTT_CMD_SUBSCRIBE, (uint8_t) MQTT_QOS(qos_),
total_len);
mg_mqtt_send_header(c, MQTT_CMD_SUBSCRIBE, 2, total_len);
if (++s_id == 0) ++s_id;
mg_send_u16(c, mg_htons(s_id));
mg_send_u16(c, mg_htons((uint16_t) topic->len));
@ -3222,6 +3228,7 @@ struct mg_connection *mg_connect(struct mg_mgr *mgr, const char *url,
c->fn = fn;
c->fn_data = fn_data;
LOG(LL_DEBUG, ("%lu -> %s", c->id, url));
mg_call(c, MG_EV_OPEN, NULL);
mg_resolve(c, &host, mgr->dnstimeout);
}
return c;
@ -3262,6 +3269,7 @@ static void accept_conn(struct mg_mgr *mgr, struct mg_connection *lsn) {
c->pfn_data = lsn->pfn_data;
c->fn = lsn->fn;
c->fn_data = lsn->fn_data;
mg_call(c, MG_EV_OPEN, NULL);
mg_call(c, MG_EV_ACCEPT, NULL);
}
}
@ -3322,6 +3330,7 @@ struct mg_connection *mg_mkpipe(struct mg_mgr *mgr, mg_event_handler_t fn,
c->pfn_data = (void *) (size_t) sp[0];
c->fn = fn;
c->fn_data = fn_data;
mg_call(c, MG_EV_OPEN, NULL);
LIST_ADD_HEAD(struct mg_connection, &mgr->conns, c);
}
return c;
@ -3346,7 +3355,8 @@ struct mg_connection *mg_listen(struct mg_mgr *mgr, const char *url,
LIST_ADD_HEAD(struct mg_connection, &mgr->conns, c);
c->fn = fn;
c->fn_data = fn_data;
LOG(LL_INFO,
mg_call(c, MG_EV_OPEN, NULL);
LOG(LL_DEBUG,
("%lu accepting on %s (port %u)", c->id, url, mg_ntohs(c->peer.port)));
}
return c;

View File

@ -737,6 +737,7 @@ void mg_error(struct mg_connection *c, const char *fmt, ...);
enum {
MG_EV_ERROR, // Error char *error_message
MG_EV_OPEN, // Connection created NULL
MG_EV_POLL, // mg_mgr_poll iteration unsigned long *millis
MG_EV_RESOLVE, // Host name is resolved NULL
MG_EV_CONNECT, // Connection established NULL
@ -969,10 +970,6 @@ int mg_sntp_parse(const unsigned char *buf, size_t len, struct timeval *tv);
#define MQTT_CMD_PINGRESP 13
#define MQTT_CMD_DISCONNECT 14
#define MQTT_QOS(qos) ((qos) << 1)
#define MQTT_GET_QOS(flags) (((flags) &0x6) >> 1)
#define MQTT_SET_QOS(flags, qos) (flags) = ((flags) & ~0x6) | ((qos) << 1)
struct mg_mqtt_opts {
struct mg_str user; // Username, can be empty
struct mg_str pass; // Password, can be empty

View File

@ -8,6 +8,7 @@ void mg_error(struct mg_connection *c, const char *fmt, ...);
enum {
MG_EV_ERROR, // Error char *error_message
MG_EV_OPEN, // Connection created NULL
MG_EV_POLL, // mg_mgr_poll iteration unsigned long *millis
MG_EV_RESOLVE, // Host name is resolved NULL
MG_EV_CONNECT, // Connection established NULL

View File

@ -11,8 +11,6 @@
#define MQTT_WILL_RETAIN 0x20
#define MQTT_HAS_PASSWORD 0x40
#define MQTT_HAS_USER_NAME 0x80
#define MQTT_GET_WILL_QOS(flags) (((flags) &0x18) >> 3)
#define MQTT_SET_WILL_QOS(flags, qos) (flags) = ((flags) & ~0x18) | ((qos) << 3)
enum { MQTT_OK, MQTT_INCOMPLETE, MQTT_MALFORMED };
@ -34,34 +32,43 @@ static void mg_send_u16(struct mg_connection *c, uint16_t value) {
}
void mg_mqtt_login(struct mg_connection *c, struct mg_mqtt_opts *opts) {
char rnd[9], client_id[16];
struct mg_str cid = opts->client_id;
uint32_t total_len = 7 + 1 + 2 + 2;
uint16_t flags = (uint16_t) (((uint16_t) opts->qos & 3) << 3);
uint8_t connflag = (uint8_t) ((opts->qos & 3) << 1);
if (cid.len == 0) {
mg_random(rnd, sizeof(rnd));
mg_base64_encode((unsigned char *) rnd, sizeof(rnd), client_id);
client_id[sizeof(client_id) - 1] = '\0';
cid = mg_str(client_id);
}
if (opts->user.len > 0) {
total_len += 2 + (uint32_t) opts->user.len;
flags |= MQTT_HAS_USER_NAME;
connflag |= MQTT_HAS_USER_NAME;
}
if (opts->pass.len > 0) {
total_len += 2 + (uint32_t) opts->pass.len;
flags |= MQTT_HAS_PASSWORD;
connflag |= MQTT_HAS_PASSWORD;
}
if (opts->will_topic.len > 0 && opts->will_message.len > 0) {
total_len +=
4 + (uint32_t) opts->will_topic.len + (uint32_t) opts->will_message.len;
flags |= MQTT_HAS_WILL;
connflag |= MQTT_HAS_WILL;
}
if (opts->clean || opts->client_id.len == 0) flags |= MQTT_CLEAN_SESSION;
if (opts->will_retain) flags |= MQTT_WILL_RETAIN;
total_len += (uint32_t) opts->client_id.len;
if (opts->clean || cid.len == 0) connflag |= MQTT_CLEAN_SESSION;
if (opts->will_retain) connflag |= MQTT_WILL_RETAIN;
total_len += (uint32_t) cid.len;
mg_mqtt_send_header(c, MQTT_CMD_CONNECT, 0, total_len);
mg_send(c, "\00\04MQTT\04", 7);
mg_send(c, &flags, 1);
mg_send(c, &connflag, sizeof(connflag));
// keepalive == 0 means "do not disconnect us!"
mg_send_u16(c, mg_htons((uint16_t) opts->keepalive));
mg_send_u16(c, mg_htons((uint16_t) opts->client_id.len));
mg_send(c, opts->client_id.ptr, opts->client_id.len);
if (flags & MQTT_HAS_WILL) {
mg_send_u16(c, mg_htons((uint16_t) cid.len));
mg_send(c, cid.ptr, cid.len);
if (connflag & MQTT_HAS_WILL) {
mg_send_u16(c, mg_htons((uint16_t) opts->will_topic.len));
mg_send(c, opts->will_topic.ptr, opts->will_topic.len);
mg_send_u16(c, mg_htons((uint16_t) opts->will_message.len));
@ -83,11 +90,11 @@ void mg_mqtt_pub(struct mg_connection *c, struct mg_str *topic,
uint32_t total_len = 2 + (uint32_t) topic->len + (uint32_t) data->len;
LOG(LL_DEBUG, ("%lu [%.*s] -> [%.*s]", c->id, (int) topic->len,
(char *) topic->ptr, (int) data->len, (char *) data->ptr));
if (MQTT_GET_QOS(flags) > 0) total_len += 2;
if (qos > 0) total_len += 2;
mg_mqtt_send_header(c, MQTT_CMD_PUBLISH, flags, total_len);
mg_send_u16(c, mg_htons((uint16_t) topic->len));
mg_send(c, topic->ptr, topic->len);
if (MQTT_GET_QOS(flags) > 0) {
if (qos > 0) {
static uint16_t s_id;
if (++s_id == 0) s_id++;
mg_send_u16(c, mg_htons(s_id));
@ -99,8 +106,7 @@ void mg_mqtt_sub(struct mg_connection *c, struct mg_str *topic, int qos) {
static uint16_t s_id;
uint8_t qos_ = qos & 3;
uint32_t total_len = 2 + (uint32_t) topic->len + 2 + 1;
mg_mqtt_send_header(c, MQTT_CMD_SUBSCRIBE, (uint8_t) MQTT_QOS(qos_),
total_len);
mg_mqtt_send_header(c, MQTT_CMD_SUBSCRIBE, 2, total_len);
if (++s_id == 0) ++s_id;
mg_send_u16(c, mg_htons(s_id));
mg_send_u16(c, mg_htons((uint16_t) topic->len));

View File

@ -18,10 +18,6 @@
#define MQTT_CMD_PINGRESP 13
#define MQTT_CMD_DISCONNECT 14
#define MQTT_QOS(qos) ((qos) << 1)
#define MQTT_GET_QOS(flags) (((flags) &0x6) >> 1)
#define MQTT_SET_QOS(flags, qos) (flags) = ((flags) & ~0x6) | ((qos) << 1)
struct mg_mqtt_opts {
struct mg_str user; // Username, can be empty
struct mg_str pass; // Password, can be empty

View File

@ -369,6 +369,7 @@ struct mg_connection *mg_connect(struct mg_mgr *mgr, const char *url,
c->fn = fn;
c->fn_data = fn_data;
LOG(LL_DEBUG, ("%lu -> %s", c->id, url));
mg_call(c, MG_EV_OPEN, NULL);
mg_resolve(c, &host, mgr->dnstimeout);
}
return c;
@ -409,6 +410,7 @@ static void accept_conn(struct mg_mgr *mgr, struct mg_connection *lsn) {
c->pfn_data = lsn->pfn_data;
c->fn = lsn->fn;
c->fn_data = lsn->fn_data;
mg_call(c, MG_EV_OPEN, NULL);
mg_call(c, MG_EV_ACCEPT, NULL);
}
}
@ -469,6 +471,7 @@ struct mg_connection *mg_mkpipe(struct mg_mgr *mgr, mg_event_handler_t fn,
c->pfn_data = (void *) (size_t) sp[0];
c->fn = fn;
c->fn_data = fn_data;
mg_call(c, MG_EV_OPEN, NULL);
LIST_ADD_HEAD(struct mg_connection, &mgr->conns, c);
}
return c;
@ -493,7 +496,8 @@ struct mg_connection *mg_listen(struct mg_mgr *mgr, const char *url,
LIST_ADD_HEAD(struct mg_connection, &mgr->conns, c);
c->fn = fn;
c->fn_data = fn_data;
LOG(LL_INFO,
mg_call(c, MG_EV_OPEN, NULL);
LOG(LL_DEBUG,
("%lu accepting on %s (port %u)", c->id, url, mg_ntohs(c->peer.port)));
}
return c;