diff --git a/examples/mqtt-client/main.c b/examples/mqtt-client/main.c index e041919f..2c5c9ff8 100644 --- a/examples/mqtt-client/main.c +++ b/examples/mqtt-client/main.c @@ -17,7 +17,9 @@ static const char *s_topic = "mg/mq-clnt-test"; static int s_qos = 1; static void fn(struct mg_connection *c, int ev, void *ev_data, void *fn_data) { - if (ev == MG_EV_ERROR) { + if (ev == MG_EV_OPEN) { + // c->is_hexdumping = 1; + } else if (ev == MG_EV_ERROR) { // On error, log error message LOG(LL_ERROR, ("%p %s", c->fd, (char *) ev_data)); } else if (ev == MG_EV_CONNECT) { @@ -30,9 +32,9 @@ static void fn(struct mg_connection *c, int ev, void *ev_data, void *fn_data) { // MQTT connect is successful struct mg_str topic = mg_str(s_topic), data = mg_str("hello"); LOG(LL_INFO, ("CONNECTED to %s", s_url)); - mg_mqtt_sub(c, &topic, 1); + mg_mqtt_sub(c, &topic, s_qos); LOG(LL_INFO, ("SUBSCRIBED to %.*s", (int) topic.len, topic.ptr)); - mg_mqtt_pub(c, &topic, &data, 1, false); + mg_mqtt_pub(c, &topic, &data, s_qos, false); LOG(LL_INFO, ("PUBSLISHED %.*s -> %.*s", (int) data.len, data.ptr, (int) topic.len, topic.ptr)); } else if (ev == MG_EV_MQTT_MSG) { @@ -43,6 +45,7 @@ static void fn(struct mg_connection *c, int ev, void *ev_data, void *fn_data) { } if (ev == MG_EV_ERROR || ev == MG_EV_CLOSE || ev == MG_EV_MQTT_MSG) { + LOG(LL_INFO, ("Got event %d, stopping...", ev)); *(bool *) fn_data = true; // Signal that we're done } } diff --git a/mongoose.c b/mongoose.c index ab899350..42f30579 100644 --- a/mongoose.c +++ b/mongoose.c @@ -2094,8 +2094,6 @@ void mg_md5_final(mg_md5_ctx *ctx, unsigned char digest[16]) { #define MQTT_WILL_RETAIN 0x20 #define MQTT_HAS_PASSWORD 0x40 #define MQTT_HAS_USER_NAME 0x80 -#define MQTT_GET_WILL_QOS(flags) (((flags) &0x18) >> 3) -#define MQTT_SET_WILL_QOS(flags, qos) (flags) = ((flags) & ~0x18) | ((qos) << 3) enum { MQTT_OK, MQTT_INCOMPLETE, MQTT_MALFORMED }; @@ -2117,34 +2115,43 @@ static void mg_send_u16(struct mg_connection *c, uint16_t value) { } void mg_mqtt_login(struct mg_connection *c, struct mg_mqtt_opts *opts) { + char rnd[9], client_id[16]; + struct mg_str cid = opts->client_id; uint32_t total_len = 7 + 1 + 2 + 2; - uint16_t flags = (uint16_t) (((uint16_t) opts->qos & 3) << 3); + uint8_t connflag = (uint8_t) ((opts->qos & 3) << 1); + + if (cid.len == 0) { + mg_random(rnd, sizeof(rnd)); + mg_base64_encode((unsigned char *) rnd, sizeof(rnd), client_id); + client_id[sizeof(client_id) - 1] = '\0'; + cid = mg_str(client_id); + } if (opts->user.len > 0) { total_len += 2 + (uint32_t) opts->user.len; - flags |= MQTT_HAS_USER_NAME; + connflag |= MQTT_HAS_USER_NAME; } if (opts->pass.len > 0) { total_len += 2 + (uint32_t) opts->pass.len; - flags |= MQTT_HAS_PASSWORD; + connflag |= MQTT_HAS_PASSWORD; } if (opts->will_topic.len > 0 && opts->will_message.len > 0) { total_len += 4 + (uint32_t) opts->will_topic.len + (uint32_t) opts->will_message.len; - flags |= MQTT_HAS_WILL; + connflag |= MQTT_HAS_WILL; } - if (opts->clean || opts->client_id.len == 0) flags |= MQTT_CLEAN_SESSION; - if (opts->will_retain) flags |= MQTT_WILL_RETAIN; - total_len += (uint32_t) opts->client_id.len; + if (opts->clean || cid.len == 0) connflag |= MQTT_CLEAN_SESSION; + if (opts->will_retain) connflag |= MQTT_WILL_RETAIN; + total_len += (uint32_t) cid.len; mg_mqtt_send_header(c, MQTT_CMD_CONNECT, 0, total_len); mg_send(c, "\00\04MQTT\04", 7); - mg_send(c, &flags, 1); + mg_send(c, &connflag, sizeof(connflag)); // keepalive == 0 means "do not disconnect us!" mg_send_u16(c, mg_htons((uint16_t) opts->keepalive)); - mg_send_u16(c, mg_htons((uint16_t) opts->client_id.len)); - mg_send(c, opts->client_id.ptr, opts->client_id.len); - if (flags & MQTT_HAS_WILL) { + mg_send_u16(c, mg_htons((uint16_t) cid.len)); + mg_send(c, cid.ptr, cid.len); + if (connflag & MQTT_HAS_WILL) { mg_send_u16(c, mg_htons((uint16_t) opts->will_topic.len)); mg_send(c, opts->will_topic.ptr, opts->will_topic.len); mg_send_u16(c, mg_htons((uint16_t) opts->will_message.len)); @@ -2166,11 +2173,11 @@ void mg_mqtt_pub(struct mg_connection *c, struct mg_str *topic, uint32_t total_len = 2 + (uint32_t) topic->len + (uint32_t) data->len; LOG(LL_DEBUG, ("%lu [%.*s] -> [%.*s]", c->id, (int) topic->len, (char *) topic->ptr, (int) data->len, (char *) data->ptr)); - if (MQTT_GET_QOS(flags) > 0) total_len += 2; + if (qos > 0) total_len += 2; mg_mqtt_send_header(c, MQTT_CMD_PUBLISH, flags, total_len); mg_send_u16(c, mg_htons((uint16_t) topic->len)); mg_send(c, topic->ptr, topic->len); - if (MQTT_GET_QOS(flags) > 0) { + if (qos > 0) { static uint16_t s_id; if (++s_id == 0) s_id++; mg_send_u16(c, mg_htons(s_id)); @@ -2182,8 +2189,7 @@ void mg_mqtt_sub(struct mg_connection *c, struct mg_str *topic, int qos) { static uint16_t s_id; uint8_t qos_ = qos & 3; uint32_t total_len = 2 + (uint32_t) topic->len + 2 + 1; - mg_mqtt_send_header(c, MQTT_CMD_SUBSCRIBE, (uint8_t) MQTT_QOS(qos_), - total_len); + mg_mqtt_send_header(c, MQTT_CMD_SUBSCRIBE, 2, total_len); if (++s_id == 0) ++s_id; mg_send_u16(c, mg_htons(s_id)); mg_send_u16(c, mg_htons((uint16_t) topic->len)); @@ -3222,6 +3228,7 @@ struct mg_connection *mg_connect(struct mg_mgr *mgr, const char *url, c->fn = fn; c->fn_data = fn_data; LOG(LL_DEBUG, ("%lu -> %s", c->id, url)); + mg_call(c, MG_EV_OPEN, NULL); mg_resolve(c, &host, mgr->dnstimeout); } return c; @@ -3262,6 +3269,7 @@ static void accept_conn(struct mg_mgr *mgr, struct mg_connection *lsn) { c->pfn_data = lsn->pfn_data; c->fn = lsn->fn; c->fn_data = lsn->fn_data; + mg_call(c, MG_EV_OPEN, NULL); mg_call(c, MG_EV_ACCEPT, NULL); } } @@ -3322,6 +3330,7 @@ struct mg_connection *mg_mkpipe(struct mg_mgr *mgr, mg_event_handler_t fn, c->pfn_data = (void *) (size_t) sp[0]; c->fn = fn; c->fn_data = fn_data; + mg_call(c, MG_EV_OPEN, NULL); LIST_ADD_HEAD(struct mg_connection, &mgr->conns, c); } return c; @@ -3346,7 +3355,8 @@ struct mg_connection *mg_listen(struct mg_mgr *mgr, const char *url, LIST_ADD_HEAD(struct mg_connection, &mgr->conns, c); c->fn = fn; c->fn_data = fn_data; - LOG(LL_INFO, + mg_call(c, MG_EV_OPEN, NULL); + LOG(LL_DEBUG, ("%lu accepting on %s (port %u)", c->id, url, mg_ntohs(c->peer.port))); } return c; diff --git a/mongoose.h b/mongoose.h index db4fad1e..419f080c 100644 --- a/mongoose.h +++ b/mongoose.h @@ -737,6 +737,7 @@ void mg_error(struct mg_connection *c, const char *fmt, ...); enum { MG_EV_ERROR, // Error char *error_message + MG_EV_OPEN, // Connection created NULL MG_EV_POLL, // mg_mgr_poll iteration unsigned long *millis MG_EV_RESOLVE, // Host name is resolved NULL MG_EV_CONNECT, // Connection established NULL @@ -969,10 +970,6 @@ int mg_sntp_parse(const unsigned char *buf, size_t len, struct timeval *tv); #define MQTT_CMD_PINGRESP 13 #define MQTT_CMD_DISCONNECT 14 -#define MQTT_QOS(qos) ((qos) << 1) -#define MQTT_GET_QOS(flags) (((flags) &0x6) >> 1) -#define MQTT_SET_QOS(flags, qos) (flags) = ((flags) & ~0x6) | ((qos) << 1) - struct mg_mqtt_opts { struct mg_str user; // Username, can be empty struct mg_str pass; // Password, can be empty diff --git a/src/event.h b/src/event.h index 566fd53f..ddb136ee 100644 --- a/src/event.h +++ b/src/event.h @@ -8,6 +8,7 @@ void mg_error(struct mg_connection *c, const char *fmt, ...); enum { MG_EV_ERROR, // Error char *error_message + MG_EV_OPEN, // Connection created NULL MG_EV_POLL, // mg_mgr_poll iteration unsigned long *millis MG_EV_RESOLVE, // Host name is resolved NULL MG_EV_CONNECT, // Connection established NULL diff --git a/src/mqtt.c b/src/mqtt.c index 064addc9..8f99b8ed 100644 --- a/src/mqtt.c +++ b/src/mqtt.c @@ -11,8 +11,6 @@ #define MQTT_WILL_RETAIN 0x20 #define MQTT_HAS_PASSWORD 0x40 #define MQTT_HAS_USER_NAME 0x80 -#define MQTT_GET_WILL_QOS(flags) (((flags) &0x18) >> 3) -#define MQTT_SET_WILL_QOS(flags, qos) (flags) = ((flags) & ~0x18) | ((qos) << 3) enum { MQTT_OK, MQTT_INCOMPLETE, MQTT_MALFORMED }; @@ -34,34 +32,43 @@ static void mg_send_u16(struct mg_connection *c, uint16_t value) { } void mg_mqtt_login(struct mg_connection *c, struct mg_mqtt_opts *opts) { + char rnd[9], client_id[16]; + struct mg_str cid = opts->client_id; uint32_t total_len = 7 + 1 + 2 + 2; - uint16_t flags = (uint16_t) (((uint16_t) opts->qos & 3) << 3); + uint8_t connflag = (uint8_t) ((opts->qos & 3) << 1); + + if (cid.len == 0) { + mg_random(rnd, sizeof(rnd)); + mg_base64_encode((unsigned char *) rnd, sizeof(rnd), client_id); + client_id[sizeof(client_id) - 1] = '\0'; + cid = mg_str(client_id); + } if (opts->user.len > 0) { total_len += 2 + (uint32_t) opts->user.len; - flags |= MQTT_HAS_USER_NAME; + connflag |= MQTT_HAS_USER_NAME; } if (opts->pass.len > 0) { total_len += 2 + (uint32_t) opts->pass.len; - flags |= MQTT_HAS_PASSWORD; + connflag |= MQTT_HAS_PASSWORD; } if (opts->will_topic.len > 0 && opts->will_message.len > 0) { total_len += 4 + (uint32_t) opts->will_topic.len + (uint32_t) opts->will_message.len; - flags |= MQTT_HAS_WILL; + connflag |= MQTT_HAS_WILL; } - if (opts->clean || opts->client_id.len == 0) flags |= MQTT_CLEAN_SESSION; - if (opts->will_retain) flags |= MQTT_WILL_RETAIN; - total_len += (uint32_t) opts->client_id.len; + if (opts->clean || cid.len == 0) connflag |= MQTT_CLEAN_SESSION; + if (opts->will_retain) connflag |= MQTT_WILL_RETAIN; + total_len += (uint32_t) cid.len; mg_mqtt_send_header(c, MQTT_CMD_CONNECT, 0, total_len); mg_send(c, "\00\04MQTT\04", 7); - mg_send(c, &flags, 1); + mg_send(c, &connflag, sizeof(connflag)); // keepalive == 0 means "do not disconnect us!" mg_send_u16(c, mg_htons((uint16_t) opts->keepalive)); - mg_send_u16(c, mg_htons((uint16_t) opts->client_id.len)); - mg_send(c, opts->client_id.ptr, opts->client_id.len); - if (flags & MQTT_HAS_WILL) { + mg_send_u16(c, mg_htons((uint16_t) cid.len)); + mg_send(c, cid.ptr, cid.len); + if (connflag & MQTT_HAS_WILL) { mg_send_u16(c, mg_htons((uint16_t) opts->will_topic.len)); mg_send(c, opts->will_topic.ptr, opts->will_topic.len); mg_send_u16(c, mg_htons((uint16_t) opts->will_message.len)); @@ -83,11 +90,11 @@ void mg_mqtt_pub(struct mg_connection *c, struct mg_str *topic, uint32_t total_len = 2 + (uint32_t) topic->len + (uint32_t) data->len; LOG(LL_DEBUG, ("%lu [%.*s] -> [%.*s]", c->id, (int) topic->len, (char *) topic->ptr, (int) data->len, (char *) data->ptr)); - if (MQTT_GET_QOS(flags) > 0) total_len += 2; + if (qos > 0) total_len += 2; mg_mqtt_send_header(c, MQTT_CMD_PUBLISH, flags, total_len); mg_send_u16(c, mg_htons((uint16_t) topic->len)); mg_send(c, topic->ptr, topic->len); - if (MQTT_GET_QOS(flags) > 0) { + if (qos > 0) { static uint16_t s_id; if (++s_id == 0) s_id++; mg_send_u16(c, mg_htons(s_id)); @@ -99,8 +106,7 @@ void mg_mqtt_sub(struct mg_connection *c, struct mg_str *topic, int qos) { static uint16_t s_id; uint8_t qos_ = qos & 3; uint32_t total_len = 2 + (uint32_t) topic->len + 2 + 1; - mg_mqtt_send_header(c, MQTT_CMD_SUBSCRIBE, (uint8_t) MQTT_QOS(qos_), - total_len); + mg_mqtt_send_header(c, MQTT_CMD_SUBSCRIBE, 2, total_len); if (++s_id == 0) ++s_id; mg_send_u16(c, mg_htons(s_id)); mg_send_u16(c, mg_htons((uint16_t) topic->len)); diff --git a/src/mqtt.h b/src/mqtt.h index a9b3c3ac..5fb98c14 100644 --- a/src/mqtt.h +++ b/src/mqtt.h @@ -18,10 +18,6 @@ #define MQTT_CMD_PINGRESP 13 #define MQTT_CMD_DISCONNECT 14 -#define MQTT_QOS(qos) ((qos) << 1) -#define MQTT_GET_QOS(flags) (((flags) &0x6) >> 1) -#define MQTT_SET_QOS(flags, qos) (flags) = ((flags) & ~0x6) | ((qos) << 1) - struct mg_mqtt_opts { struct mg_str user; // Username, can be empty struct mg_str pass; // Password, can be empty diff --git a/src/sock.c b/src/sock.c index c56a478f..2552799a 100644 --- a/src/sock.c +++ b/src/sock.c @@ -369,6 +369,7 @@ struct mg_connection *mg_connect(struct mg_mgr *mgr, const char *url, c->fn = fn; c->fn_data = fn_data; LOG(LL_DEBUG, ("%lu -> %s", c->id, url)); + mg_call(c, MG_EV_OPEN, NULL); mg_resolve(c, &host, mgr->dnstimeout); } return c; @@ -409,6 +410,7 @@ static void accept_conn(struct mg_mgr *mgr, struct mg_connection *lsn) { c->pfn_data = lsn->pfn_data; c->fn = lsn->fn; c->fn_data = lsn->fn_data; + mg_call(c, MG_EV_OPEN, NULL); mg_call(c, MG_EV_ACCEPT, NULL); } } @@ -469,6 +471,7 @@ struct mg_connection *mg_mkpipe(struct mg_mgr *mgr, mg_event_handler_t fn, c->pfn_data = (void *) (size_t) sp[0]; c->fn = fn; c->fn_data = fn_data; + mg_call(c, MG_EV_OPEN, NULL); LIST_ADD_HEAD(struct mg_connection, &mgr->conns, c); } return c; @@ -493,7 +496,8 @@ struct mg_connection *mg_listen(struct mg_mgr *mgr, const char *url, LIST_ADD_HEAD(struct mg_connection, &mgr->conns, c); c->fn = fn; c->fn_data = fn_data; - LOG(LL_INFO, + mg_call(c, MG_EV_OPEN, NULL); + LOG(LL_DEBUG, ("%lu accepting on %s (port %u)", c->id, url, mg_ntohs(c->peer.port))); } return c;