MQTT: support for properties feature added

This commit is contained in:
robert 2023-04-20 09:59:27 +03:00
parent 44bf059b2f
commit 6c13df1aff
13 changed files with 1740 additions and 1341 deletions

View File

@ -132,7 +132,12 @@ static void mqtt_fn(struct mg_connection *c, int ev, void *ev_data, void *fnd) {
} else if (ev == MG_EV_MQTT_OPEN) {
s_connected = true;
c->is_hexdumping = 1;
mg_mqtt_sub(s_mqtt, mg_str(s_config.sub), 2);
struct mg_mqtt_opts sub_opts;
memset(&sub_opts, 0, sizeof(sub_opts));
sub_opts.topic = mg_str(s_config.sub);
sub_opts.qos = 2;
mg_mqtt_sub(s_mqtt, &sub_opts);
send_notification(c->mgr, "{%m:%m,%m:null}", mg_print_esc, 0, "name",
mg_print_esc, 0, "config", mg_print_esc, 0, "data");
MG_INFO(("MQTT connected, server %s", MQTT_SERVER));
@ -249,7 +254,13 @@ void device_dashboard_fn(struct mg_connection *c, int ev, void *ev_data,
char buf[256];
if (s_connected &&
mg_http_get_var(&hm->body, "message", buf, sizeof(buf)) > 0) {
mg_mqtt_pub(s_mqtt, mg_str(s_config.pub), mg_str(buf), 1, false);
struct mg_mqtt_opts pub_opts;
memset(&pub_opts, 0, sizeof(pub_opts));
pub_opts.topic = mg_str(s_config.pub);
pub_opts.message = mg_str(buf);
pub_opts.qos = 2, pub_opts.retain = false;
mg_mqtt_pub(s_mqtt, &pub_opts);
}
mg_http_reply(c, 200, "", "ok\n");
} else if (mg_http_match_uri(hm, "/api/watch")) {

File diff suppressed because it is too large Load Diff

View File

@ -54,7 +54,11 @@ static void fn(struct mg_connection *c, int ev, void *ev_data, void *fn_data) {
struct mg_str topic = mg_str(s_rx_topic);
MG_INFO(("Connected to %s", s_url));
MG_INFO(("Subscribing to %s", s_rx_topic));
mg_mqtt_sub(c, topic, s_qos);
struct mg_mqtt_opts sub_opts;
memset(&sub_opts, 0, sizeof(sub_opts));
sub_opts.topic = topic;
sub_opts.qos = s_qos;
mg_mqtt_sub(c, &sub_opts);
c->data[0] = 'X'; // Set a label that we're logged in
} else if (ev == MG_EV_MQTT_MSG) {
// When we receive MQTT message, print it
@ -67,7 +71,12 @@ static void fn(struct mg_connection *c, int ev, void *ev_data, void *fn_data) {
if (now_second != prev_second) {
struct mg_str topic = mg_str(s_tx_topic), data = mg_str("{\"a\":123}");
MG_INFO(("Publishing to %s", s_tx_topic));
mg_mqtt_pub(c, topic, data, s_qos, false);
struct mg_mqtt_opts pub_opts;
memset(&pub_opts, 0, sizeof(pub_opts));
pub_opts.topic = topic;
pub_opts.message = data;
pub_opts.qos = s_qos, pub_opts.retain = false;
mg_mqtt_pub(c, &pub_opts);
prev_second = now_second;
}
}

View File

@ -40,10 +40,18 @@ static void fn(struct mg_connection *c, int ev, void *ev_data, void *fn_data) {
struct mg_str subt = mg_str(s_sub_topic);
struct mg_str pubt = mg_str(s_pub_topic), data = mg_str("hello");
MG_INFO(("%lu CONNECTED to %s", c->id, s_url));
mg_mqtt_sub(c, subt, s_qos);
struct mg_mqtt_opts sub_opts;
memset(&sub_opts, 0, sizeof(sub_opts));
sub_opts.topic = subt;
sub_opts.qos = s_qos;
mg_mqtt_sub(c, &sub_opts);
MG_INFO(("%lu SUBSCRIBED to %.*s", c->id, (int) subt.len, subt.ptr));
mg_mqtt_pub(c, pubt, data, s_qos, false);
struct mg_mqtt_opts pub_opts;
memset(&pub_opts, 0, sizeof(pub_opts));
pub_opts.topic = pubt;
pub_opts.message = data;
pub_opts.qos = s_qos, pub_opts.retain = false;
mg_mqtt_pub(c, &pub_opts);
MG_INFO(("%lu PUBLISHED %.*s -> %.*s", c->id, (int) data.len, data.ptr,
(int) pubt.len, pubt.ptr));
} else if (ev == MG_EV_MQTT_MSG) {
@ -62,9 +70,10 @@ static void fn(struct mg_connection *c, int ev, void *ev_data, void *fn_data) {
static void timer_fn(void *arg) {
struct mg_mgr *mgr = (struct mg_mgr *) arg;
struct mg_mqtt_opts opts = {.clean = true,
.will_qos = s_qos,
.will_topic = mg_str(s_pub_topic),
.will_message = mg_str("bye")};
.qos = s_qos,
.topic = mg_str(s_pub_topic),
.version = 4,
.message = mg_str("bye")};
if (s_conn == NULL) s_conn = mg_mqtt_connect(mgr, s_url, &opts, fn, NULL);
}

View File

@ -33,9 +33,9 @@ static void fn(struct mg_connection *c, int ev, void *ev_data, void *fn_data) {
} else if (ev == MG_EV_WS_OPEN) {
// WS connection established. Perform MQTT login
MG_INFO(("Connected to WS. Logging in to MQTT..."));
struct mg_mqtt_opts opts = {.will_qos = 1,
.will_topic = mg_str(s_topic),
.will_message = mg_str("goodbye")};
struct mg_mqtt_opts opts = {.qos = 1,
.topic = mg_str(s_topic),
.message = mg_str("goodbye")};
size_t len = c->send.len;
mg_mqtt_login(c, &opts);
mg_ws_wrap(c, c->send.len - len, WEBSOCKET_OP_BINARY);
@ -54,10 +54,19 @@ static void fn(struct mg_connection *c, int ev, void *ev_data, void *fn_data) {
struct mg_str topic = mg_str(s_topic), data = mg_str("hello");
size_t len = c->send.len;
MG_INFO(("CONNECTED to %s", s_url));
mg_mqtt_sub(c, topic, 1);
struct mg_mqtt_opts sub_opts;
memset(&sub_opts, 0, sizeof(sub_opts));
sub_opts.topic = topic;
sub_opts.qos = 1;
mg_mqtt_sub(c, &sub_opts);
len = mg_ws_wrap(c, c->send.len - len, WEBSOCKET_OP_BINARY);
MG_INFO(("SUBSCRIBED to %.*s", (int) topic.len, topic.ptr));
mg_mqtt_pub(c, topic, data, 1, false);
struct mg_mqtt_opts pub_opts;
memset(&pub_opts, 0, sizeof(pub_opts));
pub_opts.topic = topic;
pub_opts.message = data;
pub_opts.qos = 1, pub_opts.retain = false;
mg_mqtt_pub(c, &pub_opts);
MG_INFO(("PUBLISHED %.*s -> %.*s", (int) data.len, data.ptr,
(int) topic.len, topic.ptr));
len = mg_ws_wrap(c, c->send.len - len, WEBSOCKET_OP_BINARY);

View File

@ -103,7 +103,12 @@ static void fn(struct mg_connection *c, int ev, void *ev_data, void *fn_data) {
mm->data.ptr, (int) mm->topic.len, mm->topic.ptr));
for (struct sub *sub = s_subs; sub != NULL; sub = sub->next) {
if (mg_match(mm->topic, sub->topic, NULL)) {
mg_mqtt_pub(sub->c, mm->topic, mm->data, 1, false);
struct mg_mqtt_opts pub_opts;
memset(&pub_opts, 0, sizeof(pub_opts));
pub_opts.topic = mm->topic;
pub_opts.message = mm->data;
pub_opts.qos = 1, pub_opts.retain = false;
mg_mqtt_pub(sub->c, &pub_opts);
}
}
break;

View File

@ -107,7 +107,11 @@ static void mq_fn(struct mg_connection *c, int ev, void *evd, void *fnd) {
// c->is_hexdumping = 1;
} else if (ev == MG_EV_MQTT_OPEN) {
c->data[0] = 'M';
mg_mqtt_sub(c, mqtt_topic("rx", "b/rx"), 1); // Subscribe to RX topic
struct mg_mqtt_opts sub_opts;
memset(&sub_opts, 0, sizeof(sub_opts));
sub_opts.topic = mqtt_topic("rx", "b/rx");
sub_opts.qos = 1;
mg_mqtt_sub(c, &sub_opts); // Subscribe to RX topic
} else if (ev == MG_EV_MQTT_MSG) {
struct mg_mqtt_message *mm = evd; // MQTT message
uart_write(mm->data.ptr, mm->data.len); // Send to UART
@ -140,8 +144,14 @@ static void timer_fn(void *param) {
for (struct mg_connection *c = mgr->conns; c != NULL; c = c->next) {
if (c->data[0] == 'W') mg_ws_send(c, buf, len, WEBSOCKET_OP_TEXT);
if (c->data[0] == 'T') mg_send(c, buf, len);
if (c->data[0] == 'M')
mg_mqtt_pub(c, mqtt_topic("tx", "b/tx"), mg_str_n(buf, len), 1, false);
if (c->data[0] == 'M') {
struct mg_mqtt_opts pub_opts;
memset(&pub_opts, 0, sizeof(pub_opts));
pub_opts.topic = mqtt_topic("tx", "b/tx");
pub_opts.message = mg_str_n(buf, len);
pub_opts.qos = 1, pub_opts.retain = false;
mg_mqtt_pub(c, &pub_opts);
}
}
}
}

View File

@ -31,7 +31,11 @@ static void fn(struct mg_connection *c, int ev, void *ev_data, void *fn_data) {
struct mg_str topic = mg_str(s_rx_topic);
MG_INFO(("Connected to %s", s_url));
MG_INFO(("Subscribing to %s", s_rx_topic));
mg_mqtt_sub(c, topic, s_qos);
struct mg_mqtt_opts sub_opts;
memset(&sub_opts, 0, sizeof(sub_opts));
sub_opts.topic = topic;
sub_opts.qos = s_qos;
mg_mqtt_sub(c, &sub_opts);
c->data[0] = 'X'; // Set a label that we're logged in
} else if (ev == MG_EV_MQTT_MSG) {
// When we receive MQTT message, print it
@ -44,7 +48,12 @@ static void fn(struct mg_connection *c, int ev, void *ev_data, void *fn_data) {
if (now_second != prev_second) {
struct mg_str topic = mg_str(s_tx_topic), data = mg_str("{\"a\":123}");
MG_INFO(("Publishing to %s", s_tx_topic));
mg_mqtt_pub(c, topic, data, s_qos, false);
struct mg_mqtt_opts pub_opts;
memset(&pub_opts, 0, sizeof(pub_opts));
pub_opts.topic = topic;
pub_opts.message = data;
pub_opts.qos = s_qos, pub_opts.retain = false;
mg_mqtt_pub(c, &pub_opts);
prev_second = now_second;
}
}

View File

@ -2920,22 +2920,228 @@ void mg_md5_final(mg_md5_ctx *ctx, unsigned char digest[16]) {
void mg_mqtt_send_header(struct mg_connection *c, uint8_t cmd, uint8_t flags,
uint32_t len) {
uint8_t buf[1 + sizeof(len)], *vlen = &buf[1];
buf[0] = (uint8_t) ((cmd << 4) | flags);
buf[0] = (uint8_t)((cmd << 4) | flags);
do {
*vlen = len % 0x80;
len /= 0x80;
if (len > 0) *vlen |= 0x80;
vlen++;
} while (len > 0 && vlen < &buf[sizeof(buf)]);
mg_send(c, buf, (size_t) (vlen - buf));
mg_send(c, buf, (size_t)(vlen - buf));
}
static void mg_send_u16(struct mg_connection *c, uint16_t value) {
mg_send(c, &value, sizeof(value));
}
static void mg_send_u32(struct mg_connection *c, uint32_t value) {
mg_send(c, &value, sizeof(value));
}
static uint8_t compute_variable_length_size(uint32_t length) {
uint8_t bytes_needed = 0;
do {
bytes_needed++;
length /= 0x80;
} while (length > 0);
return bytes_needed;
}
static int encode_variable_length(uint8_t *buf, int value) {
int len = 0;
do {
uint8_t byte = (uint8_t)(value % 128);
value /= 128;
if (value > 0) byte |= 0x80;
buf[len++] = byte;
} while (value > 0);
return len;
}
static uint32_t decode_variable_length(const char *buf,
uint32_t *bytes_consumed) {
const uint8_t *p = (const uint8_t *) buf;
uint32_t value = 0;
uint32_t multiplier = 1;
uint8_t encoded_byte;
uint32_t offset;
for (offset = 0; offset < 4; offset++) {
encoded_byte = p[offset];
value += (encoded_byte & 0x7F) * multiplier;
multiplier *= 128;
if (!(encoded_byte & 0x80)) break;
}
if (bytes_consumed != NULL) *bytes_consumed = offset + 1;
return value;
}
static int mqtt_prop_type_by_id(uint8_t prop_id) {
size_t num_properties = sizeof(mqtt_prop_map) / sizeof(mqtt_prop_map[0]);
for (size_t i = 0; i < num_properties; ++i) {
if (mqtt_prop_map[i].id == prop_id) {
return mqtt_prop_map[i].type;
}
}
return -1; // Property ID not found
}
/*
returns the size of the properties section, without the
size of the content's length
*/
static uint32_t get_properties_length(struct mg_mqtt_prop *props, int count) {
uint32_t size = 0;
for (int i = 0; i < count; i++) {
size++; // identifier
switch (mqtt_prop_type_by_id(props[i].id)) {
case MQTT_PROP_TYPE_STRING_PAIR:
size += (uint32_t)(props[i].val.len + props[i].key.len +
2 * sizeof(uint16_t));
break;
case MQTT_PROP_TYPE_STRING:
size += (uint32_t)(props[i].val.len + sizeof(uint16_t));
break;
case MQTT_PROP_TYPE_BINARY_DATA:
size += (uint32_t)(props[i].val.len + sizeof(uint16_t));
break;
case MQTT_PROP_TYPE_VARIABLE_INT:
size += compute_variable_length_size((uint32_t) props[i].iv);
break;
case MQTT_PROP_TYPE_INT:
size += (uint32_t) sizeof(uint32_t);
break;
case MQTT_PROP_TYPE_SHORT:
size += (uint32_t) sizeof(uint16_t);
break;
default:
return size; // cannot parse further down
}
}
return size;
}
/*
returns the entire size of the properties section, including the
size of the variable length of the content
*/
static uint32_t get_full_properties_size(struct mg_mqtt_prop *props,
int count) {
uint32_t size = get_properties_length(props, count);
size += compute_variable_length_size(size);
return size;
}
static void mg_send_mqtt_properties(struct mg_connection *c,
struct mg_mqtt_prop *props, int nr_props) {
uint32_t total_size = get_properties_length(props, nr_props);
uint8_t buf_v[4] = {0, 0, 0, 0};
uint8_t buf[4] = {0, 0, 0, 0};
int len = encode_variable_length(buf, (int) total_size);
mg_send(c, buf, (size_t) len);
for (int i = 0; i < nr_props; i++) {
mg_send(c, &props[i].id, sizeof(props[i].id));
switch (mqtt_prop_type_by_id(props[i].id)) {
case MQTT_PROP_TYPE_STRING_PAIR:
mg_send_u16(c, mg_htons((uint16_t) props[i].key.len));
mg_send(c, props[i].key.ptr, props[i].key.len);
mg_send_u16(c, mg_htons((uint16_t) props[i].val.len));
mg_send(c, props[i].val.ptr, props[i].val.len);
break;
case MQTT_PROP_TYPE_BYTE:
mg_send(c, &props[i].iv, sizeof(uint8_t));
break;
case MQTT_PROP_TYPE_SHORT:
mg_send_u16(c, mg_htons((uint16_t) props[i].iv));
break;
case MQTT_PROP_TYPE_INT:
mg_send_u32(c, mg_htonl((uint32_t) props[i].iv));
break;
case MQTT_PROP_TYPE_STRING:
mg_send_u16(c, mg_htons((uint16_t) props[i].val.len));
mg_send(c, props[i].val.ptr, props[i].val.len);
break;
case MQTT_PROP_TYPE_BINARY_DATA:
mg_send_u16(c, mg_htons((uint16_t) props[i].val.len));
mg_send(c, props[i].val.ptr, props[i].val.len);
break;
case MQTT_PROP_TYPE_VARIABLE_INT:
len = encode_variable_length(buf_v, (int) props[i].iv);
mg_send(c, buf_v, (size_t) len);
break;
}
}
}
size_t mg_mqtt_next_prop(struct mg_mqtt_message *msg, struct mg_mqtt_prop *prop,
size_t crt_pos) {
unsigned char *i =
(unsigned char *) msg->dgram.ptr + msg->props_start + crt_pos;
size_t new_pos = crt_pos;
uint32_t bytes_consumed;
if (crt_pos >= msg->dgram.len ||
crt_pos >= msg->props_start + msg->props_size)
return 0;
uint8_t id = i[0];
i++, new_pos++;
prop->id = id;
switch (mqtt_prop_type_by_id(id)) {
case MQTT_PROP_TYPE_STRING_PAIR:
prop->key.len = (uint16_t)((((uint16_t) i[0]) << 8) | i[1]);
prop->key.ptr = (char *) i + 2;
i += 2 + prop->key.len;
prop->val.len = (uint16_t)((((uint16_t) i[0]) << 8) | i[1]);
prop->val.ptr = (char *) i + 2;
new_pos += 2 * sizeof(uint16_t) + prop->val.len + prop->key.len;
break;
case MQTT_PROP_TYPE_BYTE:
prop->iv = (uint8_t) i[0];
new_pos++;
break;
case MQTT_PROP_TYPE_SHORT:
prop->iv = (uint16_t)((((uint16_t) i[0]) << 8) | i[1]);
new_pos += sizeof(uint16_t);
break;
case MQTT_PROP_TYPE_INT:
prop->iv = ((uint32_t) i[0] << 24) | ((uint32_t) i[1] << 16) |
((uint32_t) i[2] << 8) | i[3];
new_pos += sizeof(uint32_t);
break;
case MQTT_PROP_TYPE_STRING:
prop->val.len = (uint16_t)((((uint16_t) i[0]) << 8) | i[1]);
prop->val.ptr = (char *) i + 2;
new_pos += 2 + prop->val.len;
break;
case MQTT_PROP_TYPE_BINARY_DATA:
prop->val.len = (uint16_t)((((uint16_t) i[0]) << 8) | i[1]);
prop->val.ptr = (char *) i + 2;
new_pos += 2 + prop->val.len;
break;
case MQTT_PROP_TYPE_VARIABLE_INT:
prop->iv = decode_variable_length((char *) i, &bytes_consumed);
new_pos += bytes_consumed;
break;
default:
new_pos = 0;
}
return new_pos;
}
void mg_mqtt_login(struct mg_connection *c, const struct mg_mqtt_opts *opts) {
char rnd[10], client_id[21], zero = 0;
char rnd[10], client_id[21];
struct mg_str cid = opts->client_id;
uint32_t total_len = 7 + 1 + 2 + 2;
uint8_t hdr[8] = {0, 4, 'M', 'Q', 'T', 'T', opts->version, 0};
@ -2949,7 +3155,7 @@ void mg_mqtt_login(struct mg_connection *c, const struct mg_mqtt_opts *opts) {
if (hdr[6] == 0) hdr[6] = 4; // If version is not set, use 4 (3.1.1)
c->is_mqtt5 = hdr[6] == 5; // Set version 5 flag
hdr[7] = (uint8_t) ((opts->will_qos & 3) << 3); // Connection flags
hdr[7] = (uint8_t)((opts->qos & 3) << 3); // Connection flags
if (opts->user.len > 0) {
total_len += 2 + (uint32_t) opts->user.len;
hdr[7] |= MQTT_HAS_USER_NAME;
@ -2958,31 +3164,38 @@ void mg_mqtt_login(struct mg_connection *c, const struct mg_mqtt_opts *opts) {
total_len += 2 + (uint32_t) opts->pass.len;
hdr[7] |= MQTT_HAS_PASSWORD;
}
if (opts->will_topic.len > 0 && opts->will_message.len > 0) {
total_len +=
4 + (uint32_t) opts->will_topic.len + (uint32_t) opts->will_message.len;
if (opts->topic.len > 0 && opts->message.len > 0) {
total_len += 4 + (uint32_t) opts->topic.len + (uint32_t) opts->message.len;
hdr[7] |= MQTT_HAS_WILL;
}
if (opts->clean || cid.len == 0) hdr[7] |= MQTT_CLEAN_SESSION;
if (opts->will_retain) hdr[7] |= MQTT_WILL_RETAIN;
if (opts->retain) hdr[7] |= MQTT_WILL_RETAIN;
total_len += (uint32_t) cid.len;
if (c->is_mqtt5) total_len += 1U + (hdr[7] & MQTT_HAS_WILL ? 1U : 0);
if (c->is_mqtt5) {
total_len += get_full_properties_size(opts->props, opts->num_props);
if (hdr[7] & MQTT_HAS_WILL)
total_len += get_full_properties_size(opts->will_props,
opts->num_will_props);
}
mg_mqtt_send_header(c, MQTT_CMD_CONNECT, 0, total_len);
mg_send(c, hdr, sizeof(hdr));
// keepalive == 0 means "do not disconnect us!"
mg_send_u16(c, mg_htons((uint16_t) opts->keepalive));
if (c->is_mqtt5) mg_send(c, &zero, sizeof(zero)); // V5 properties
if (c->is_mqtt5) mg_send_mqtt_properties(c, opts->props, opts->num_props);
mg_send_u16(c, mg_htons((uint16_t) cid.len));
mg_send(c, cid.ptr, cid.len);
if (hdr[7] & MQTT_HAS_WILL) {
if (c->is_mqtt5) mg_send(c, &zero, sizeof(zero)); // will props
mg_send_u16(c, mg_htons((uint16_t) opts->will_topic.len));
mg_send(c, opts->will_topic.ptr, opts->will_topic.len);
mg_send_u16(c, mg_htons((uint16_t) opts->will_message.len));
mg_send(c, opts->will_message.ptr, opts->will_message.len);
if (c->is_mqtt5)
mg_send_mqtt_properties(c, opts->will_props, opts->num_will_props);
mg_send_u16(c, mg_htons((uint16_t) opts->topic.len));
mg_send(c, opts->topic.ptr, opts->topic.len);
mg_send_u16(c, mg_htons((uint16_t) opts->message.len));
mg_send(c, opts->message.ptr, opts->message.len);
}
if (opts->user.len > 0) {
mg_send_u16(c, mg_htons((uint16_t) opts->user.len));
@ -2994,34 +3207,43 @@ void mg_mqtt_login(struct mg_connection *c, const struct mg_mqtt_opts *opts) {
}
}
void mg_mqtt_pub(struct mg_connection *c, struct mg_str topic,
struct mg_str data, int qos, bool retain) {
uint8_t flags = (uint8_t) (((qos & 3) << 1) | (retain ? 1 : 0)), zero = 0;
uint32_t len = 2 + (uint32_t) topic.len + (uint32_t) data.len;
MG_DEBUG(("%lu [%.*s] -> [%.*s]", c->id, (int) topic.len, (char *) topic.ptr,
(int) data.len, (char *) data.ptr));
if (qos > 0) len += 2;
if (c->is_mqtt5) len++;
void mg_mqtt_pub(struct mg_connection *c, const struct mg_mqtt_opts *opts) {
uint8_t flags = (uint8_t)(((opts->qos & 3) << 1) | (opts->retain ? 1 : 0));
uint32_t len = 2 + (uint32_t) opts->topic.len + (uint32_t) opts->message.len;
MG_DEBUG(("%lu [%.*s] -> [%.*s]", c->id, (int) opts->topic.len,
(char *) opts->topic.ptr, (int) opts->message.len,
(char *) opts->message.ptr));
if (opts->qos > 0) len += 2;
if (c->is_mqtt5)
len += get_full_properties_size(opts->props, opts->num_props);
mg_mqtt_send_header(c, MQTT_CMD_PUBLISH, flags, len);
mg_send_u16(c, mg_htons((uint16_t) topic.len));
mg_send(c, topic.ptr, topic.len);
if (qos > 0) {
mg_send_u16(c, mg_htons((uint16_t) opts->topic.len));
mg_send(c, opts->topic.ptr, opts->topic.len);
if (opts->qos > 0) {
if (++c->mgr->mqtt_id == 0) ++c->mgr->mqtt_id;
mg_send_u16(c, mg_htons(c->mgr->mqtt_id));
}
if (c->is_mqtt5) mg_send(c, &zero, sizeof(zero));
mg_send(c, data.ptr, data.len);
if (c->is_mqtt5) mg_send_mqtt_properties(c, opts->props, opts->num_props);
mg_send(c, opts->message.ptr, opts->message.len);
}
void mg_mqtt_sub(struct mg_connection *c, struct mg_str topic, int qos) {
uint8_t qos_ = qos & 3, zero = 0;
uint32_t len = 2 + (uint32_t) topic.len + 2 + 1 + (c->is_mqtt5 ? 1 : 0);
void mg_mqtt_sub(struct mg_connection *c, const struct mg_mqtt_opts *opts) {
uint8_t qos_ = opts->qos & 3;
uint32_t len =
2 + (uint32_t) opts->topic.len + 2 + 1 +
(c->is_mqtt5 ? get_full_properties_size(opts->props, opts->num_props)
: 0);
mg_mqtt_send_header(c, MQTT_CMD_SUBSCRIBE, 2, len);
if (++c->mgr->mqtt_id == 0) ++c->mgr->mqtt_id;
mg_send_u16(c, mg_htons(c->mgr->mqtt_id));
if (c->is_mqtt5) mg_send(c, &zero, sizeof(zero));
mg_send_u16(c, mg_htons((uint16_t) topic.len));
mg_send(c, topic.ptr, topic.len);
if (c->is_mqtt5) mg_send_mqtt_properties(c, opts->props, opts->num_props);
mg_send_u16(c, mg_htons((uint16_t) opts->topic.len));
mg_send(c, opts->topic.ptr, opts->topic.len);
mg_send(c, &qos_, sizeof(qos_));
}
@ -3033,21 +3255,21 @@ int mg_mqtt_parse(const uint8_t *buf, size_t len, uint8_t version,
memset(m, 0, sizeof(*m));
m->dgram.ptr = (char *) buf;
if (len < 2) return MQTT_INCOMPLETE;
m->cmd = (uint8_t) (buf[0] >> 4);
m->cmd = (uint8_t)(buf[0] >> 4);
m->qos = (buf[0] >> 1) & 3;
n = len_len = 0;
p = (uint8_t *) buf + 1;
while ((size_t) (p - buf) < len) {
while ((size_t)(p - buf) < len) {
lc = *((uint8_t *) p++);
n += (uint32_t) ((lc & 0x7f) << 7 * len_len);
n += (uint32_t)((lc & 0x7f) << 7 * len_len);
len_len++;
if (!(lc & 0x80)) break;
if (len_len >= 4) return MQTT_MALFORMED;
}
end = p + n;
if ((lc & 0x80) || (end > buf + len)) return MQTT_INCOMPLETE;
m->dgram.len = (size_t) (end - buf);
m->dgram.len = (size_t)(end - buf);
switch (m->cmd) {
case MQTT_CMD_CONNACK:
@ -3063,25 +3285,29 @@ int mg_mqtt_parse(const uint8_t *buf, size_t len, uint8_t version,
case MQTT_CMD_UNSUBSCRIBE:
case MQTT_CMD_UNSUBACK:
if (p + 2 > end) return MQTT_MALFORMED;
m->id = (uint16_t) ((((uint16_t) p[0]) << 8) | p[1]);
m->id = (uint16_t)((((uint16_t) p[0]) << 8) | p[1]);
p += 2;
break;
case MQTT_CMD_PUBLISH: {
if (p + 2 > end) return MQTT_MALFORMED;
m->topic.len = (uint16_t) ((((uint16_t) p[0]) << 8) | p[1]);
m->topic.len = (uint16_t)((((uint16_t) p[0]) << 8) | p[1]);
m->topic.ptr = (char *) p + 2;
p += 2 + m->topic.len;
if (p > end) return MQTT_MALFORMED;
if (m->qos > 0) {
if (p + 2 > end) return MQTT_MALFORMED;
m->id = (uint16_t) ((((uint16_t) p[0]) << 8) | p[1]);
m->id = (uint16_t)((((uint16_t) p[0]) << 8) | p[1]);
p += 2;
}
if (p > end) return MQTT_MALFORMED;
if (version == 5 && p + 2 < end) p += 1 + p[0]; // Skip options
if (version == 5 && p + 2 < end) {
m->props_size = decode_variable_length((char *) p, &len_len);
m->props_start = (size_t)(p + len_len - buf);
p += len_len + m->props_size;
}
if (p > end) return MQTT_MALFORMED;
m->data.ptr = (char *) p;
m->data.len = (size_t) (end - p);
m->data.len = (size_t)(end - p);
break;
}
default:
@ -3119,8 +3345,16 @@ static void mqtt_cb(struct mg_connection *c, int ev, void *ev_data,
mm.topic.ptr, (int) mm.data.len, mm.data.ptr));
if (mm.qos > 0) {
uint16_t id = mg_htons(mm.id);
mg_mqtt_send_header(c, MQTT_CMD_PUBACK, 0, sizeof(id));
uint32_t remaining_len = sizeof(id);
if (c->is_mqtt5) remaining_len += 1;
mg_mqtt_send_header(c, MQTT_CMD_PUBACK, 0, remaining_len);
mg_send(c, &id, sizeof(id));
if (c->is_mqtt5) {
uint16_t zero = 0;
mg_send(c, &zero, sizeof(zero));
}
}
mg_call(c, MG_EV_MQTT_MSG, &mm);
break;
@ -3145,8 +3379,22 @@ void mg_mqtt_pong(struct mg_connection *nc) {
mg_mqtt_send_header(nc, MQTT_CMD_PINGRESP, 0, 0);
}
void mg_mqtt_disconnect(struct mg_connection *nc) {
mg_mqtt_send_header(nc, MQTT_CMD_DISCONNECT, 0, 0);
void mg_mqtt_disconnect(struct mg_connection *nc,
const struct mg_mqtt_opts *opts) {
uint32_t len;
if (nc->is_mqtt5)
len = 1 + get_full_properties_size(opts->props, opts->num_props);
else
len = 0;
mg_mqtt_send_header(nc, MQTT_CMD_DISCONNECT, 0, len);
if (nc->is_mqtt5) {
uint8_t zero = 0;
mg_send(nc, &zero, sizeof(zero)); // reason code
mg_send_mqtt_properties(nc, opts->props, opts->num_props);
}
}
struct mg_connection *mg_mqtt_connect(struct mg_mgr *mgr, const char *url,
@ -4297,7 +4545,7 @@ bool mg_open_listener(struct mg_connection *c, const char *url) {
#if defined(SO_REUSEADDR) && (!defined(LWIP_SOCKET) || SO_REUSE)
} else if ((rc = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char *) &on,
sizeof(on))) != 0) {
// 1. SO_RESUSEADDR semantics on UNIX and Windows is different. On
// 1. SO_REUSEADDR semantics on UNIX and Windows is different. On
// Windows, SO_REUSEADDR allows to bind a socket to a port without error
// even if the port is already open by another program. This is not the
// behavior SO_REUSEADDR was designed for, and leads to hard-to-track

View File

@ -1414,19 +1414,102 @@ int64_t mg_sntp_parse(const unsigned char *buf, size_t len);
#define MQTT_CMD_DISCONNECT 14
#define MQTT_CMD_AUTH 15
#define MQTT_PROP_PAYLOAD_FORMAT_INDICATOR 0x01
#define MQTT_PROP_MESSAGE_EXPIRY_INTERVAL 0x02
#define MQTT_PROP_CONTENT_TYPE 0x03
#define MQTT_PROP_RESPONSE_TOPIC 0x08
#define MQTT_PROP_CORRELATION_DATA 0x09
#define MQTT_PROP_SUBSCRIPTION_IDENTIFIER 0x0B
#define MQTT_PROP_SESSION_EXPIRY_INTERVAL 0x11
#define MQTT_PROP_ASSIGNED_CLIENT_IDENTIFIER 0x12
#define MQTT_PROP_SERVER_KEEP_ALIVE 0x13
#define MQTT_PROP_AUTHENTICATION_METHOD 0x15
#define MQTT_PROP_AUTHENTICATION_DATA 0x16
#define MQTT_PROP_REQUEST_PROBLEM_INFORMATION 0x17
#define MQTT_PROP_WILL_DELAY_INTERVAL 0x18
#define MQTT_PROP_REQUEST_RESPONSE_INFORMATION 0x19
#define MQTT_PROP_RESPONSE_INFORMATION 0x1A
#define MQTT_PROP_SERVER_REFERENCE 0x1C
#define MQTT_PROP_REASON_STRING 0x1F
#define MQTT_PROP_RECEIVE_MAXIMUM 0x21
#define MQTT_PROP_TOPIC_ALIAS_MAXIMUM 0x22
#define MQTT_PROP_TOPIC_ALIAS 0x23
#define MQTT_PROP_MAXIMUM_QOS 0x24
#define MQTT_PROP_RETAIN_AVAILABLE 0x25
#define MQTT_PROP_USER_PROPERTY 0x26
#define MQTT_PROP_MAXIMUM_PACKET_SIZE 0x27
#define MQTT_PROP_WILDCARD_SUBSCRIPTION_AVAILABLE 0x28
#define MQTT_PROP_SUBSCRIPTION_IDENTIFIER_AVAILABLE 0x29
#define MQTT_PROP_SHARED_SUBSCRIPTION_AVAILABLE 0x2A
enum {
MQTT_PROP_TYPE_BYTE,
MQTT_PROP_TYPE_STRING,
MQTT_PROP_TYPE_STRING_PAIR,
MQTT_PROP_TYPE_BINARY_DATA,
MQTT_PROP_TYPE_VARIABLE_INT,
MQTT_PROP_TYPE_INT,
MQTT_PROP_TYPE_SHORT
};
enum { MQTT_OK, MQTT_INCOMPLETE, MQTT_MALFORMED };
struct mqtt_prop_map_t {
uint8_t id;
uint8_t type;
};
static const struct mqtt_prop_map_t mqtt_prop_map[] = {
{MQTT_PROP_PAYLOAD_FORMAT_INDICATOR, MQTT_PROP_TYPE_BYTE},
{MQTT_PROP_MESSAGE_EXPIRY_INTERVAL, MQTT_PROP_TYPE_INT},
{MQTT_PROP_CONTENT_TYPE, MQTT_PROP_TYPE_STRING},
{MQTT_PROP_RESPONSE_TOPIC, MQTT_PROP_TYPE_STRING},
{MQTT_PROP_CORRELATION_DATA, MQTT_PROP_TYPE_BINARY_DATA},
{MQTT_PROP_SUBSCRIPTION_IDENTIFIER, MQTT_PROP_TYPE_VARIABLE_INT},
{MQTT_PROP_SESSION_EXPIRY_INTERVAL, MQTT_PROP_TYPE_INT},
{MQTT_PROP_ASSIGNED_CLIENT_IDENTIFIER, MQTT_PROP_TYPE_STRING},
{MQTT_PROP_SERVER_KEEP_ALIVE, MQTT_PROP_TYPE_SHORT},
{MQTT_PROP_AUTHENTICATION_METHOD, MQTT_PROP_TYPE_STRING},
{MQTT_PROP_AUTHENTICATION_DATA, MQTT_PROP_TYPE_BINARY_DATA},
{MQTT_PROP_REQUEST_PROBLEM_INFORMATION, MQTT_PROP_TYPE_BYTE},
{MQTT_PROP_WILL_DELAY_INTERVAL, MQTT_PROP_TYPE_INT},
{MQTT_PROP_REQUEST_RESPONSE_INFORMATION, MQTT_PROP_TYPE_BYTE},
{MQTT_PROP_RESPONSE_INFORMATION, MQTT_PROP_TYPE_STRING},
{MQTT_PROP_SERVER_REFERENCE, MQTT_PROP_TYPE_STRING},
{MQTT_PROP_REASON_STRING, MQTT_PROP_TYPE_STRING},
{MQTT_PROP_RECEIVE_MAXIMUM, MQTT_PROP_TYPE_SHORT},
{MQTT_PROP_TOPIC_ALIAS_MAXIMUM, MQTT_PROP_TYPE_SHORT},
{MQTT_PROP_TOPIC_ALIAS, MQTT_PROP_TYPE_SHORT},
{MQTT_PROP_MAXIMUM_QOS, MQTT_PROP_TYPE_BYTE},
{MQTT_PROP_RETAIN_AVAILABLE, MQTT_PROP_TYPE_BYTE},
{MQTT_PROP_USER_PROPERTY, MQTT_PROP_TYPE_STRING_PAIR},
{MQTT_PROP_MAXIMUM_PACKET_SIZE, MQTT_PROP_TYPE_INT},
{MQTT_PROP_WILDCARD_SUBSCRIPTION_AVAILABLE, MQTT_PROP_TYPE_BYTE},
{MQTT_PROP_SUBSCRIPTION_IDENTIFIER_AVAILABLE, MQTT_PROP_TYPE_BYTE},
{MQTT_PROP_SHARED_SUBSCRIPTION_AVAILABLE, MQTT_PROP_TYPE_BYTE}};
struct mg_mqtt_prop {
uint8_t id; // Enumerated at MQTT5 Reference
uint32_t iv; // integer value for 8-, 16-, 32-bit integers types
struct mg_str key; // non-NULL only for user property type
struct mg_str val; // non-NULL only for UTF-8 types and user properties
};
struct mg_mqtt_opts {
struct mg_str user; // Username, can be empty
struct mg_str pass; // Password, can be empty
struct mg_str client_id; // Client ID
struct mg_str will_topic; // Will topic
struct mg_str will_message; // Will message
uint8_t will_qos; // Will message quality of service
struct mg_str topic; // topic
struct mg_str message; // message
uint8_t qos; // message quality of service
uint8_t version; // Can be 4 (3.1.1), or 5. If 0, assume 4.
uint16_t keepalive; // Keep-alive timer in seconds
bool will_retain; // Retain last will
bool retain; // Retain last will
bool clean; // Use clean session, 0 or 1
struct mg_mqtt_prop *props; // MQTT5 props array
int num_props; // number of props
struct mg_mqtt_prop *will_props; // only found in the CONNECT packet
int num_will_props; // number of will props
};
struct mg_mqtt_message {
@ -1437,6 +1520,8 @@ struct mg_mqtt_message {
uint8_t cmd; // MQTT command, one of MQTT_CMD_*
uint8_t qos; // Quality of service
uint8_t ack; // Connack return code. 0 - success
size_t props_start; // offset to the start of the properties
size_t props_size; // length of the properties
};
struct mg_connection *mg_mqtt_connect(struct mg_mgr *, const char *url,
@ -1445,15 +1530,17 @@ struct mg_connection *mg_mqtt_connect(struct mg_mgr *, const char *url,
struct mg_connection *mg_mqtt_listen(struct mg_mgr *mgr, const char *url,
mg_event_handler_t fn, void *fn_data);
void mg_mqtt_login(struct mg_connection *c, const struct mg_mqtt_opts *opts);
void mg_mqtt_pub(struct mg_connection *c, struct mg_str topic,
struct mg_str data, int qos, bool retain);
void mg_mqtt_sub(struct mg_connection *, struct mg_str topic, int qos);
void mg_mqtt_pub(struct mg_connection *c, const struct mg_mqtt_opts *opts);
void mg_mqtt_sub(struct mg_connection *, const struct mg_mqtt_opts *opts);
int mg_mqtt_parse(const uint8_t *, size_t, uint8_t, struct mg_mqtt_message *);
void mg_mqtt_send_header(struct mg_connection *, uint8_t cmd, uint8_t flags,
uint32_t len);
void mg_mqtt_ping(struct mg_connection *);
void mg_mqtt_pong(struct mg_connection *);
void mg_mqtt_disconnect(struct mg_connection *);
void mg_mqtt_disconnect(struct mg_connection *,
const struct mg_mqtt_opts *opts);
size_t mg_mqtt_next_prop(struct mg_mqtt_message *, struct mg_mqtt_prop *,
size_t ofs);

View File

@ -15,22 +15,228 @@
void mg_mqtt_send_header(struct mg_connection *c, uint8_t cmd, uint8_t flags,
uint32_t len) {
uint8_t buf[1 + sizeof(len)], *vlen = &buf[1];
buf[0] = (uint8_t) ((cmd << 4) | flags);
buf[0] = (uint8_t)((cmd << 4) | flags);
do {
*vlen = len % 0x80;
len /= 0x80;
if (len > 0) *vlen |= 0x80;
vlen++;
} while (len > 0 && vlen < &buf[sizeof(buf)]);
mg_send(c, buf, (size_t) (vlen - buf));
mg_send(c, buf, (size_t)(vlen - buf));
}
static void mg_send_u16(struct mg_connection *c, uint16_t value) {
mg_send(c, &value, sizeof(value));
}
static void mg_send_u32(struct mg_connection *c, uint32_t value) {
mg_send(c, &value, sizeof(value));
}
static uint8_t compute_variable_length_size(uint32_t length) {
uint8_t bytes_needed = 0;
do {
bytes_needed++;
length /= 0x80;
} while (length > 0);
return bytes_needed;
}
static int encode_variable_length(uint8_t *buf, int value) {
int len = 0;
do {
uint8_t byte = (uint8_t)(value % 128);
value /= 128;
if (value > 0) byte |= 0x80;
buf[len++] = byte;
} while (value > 0);
return len;
}
static uint32_t decode_variable_length(const char *buf,
uint32_t *bytes_consumed) {
const uint8_t *p = (const uint8_t *) buf;
uint32_t value = 0;
uint32_t multiplier = 1;
uint8_t encoded_byte;
uint32_t offset;
for (offset = 0; offset < 4; offset++) {
encoded_byte = p[offset];
value += (encoded_byte & 0x7F) * multiplier;
multiplier *= 128;
if (!(encoded_byte & 0x80)) break;
}
if (bytes_consumed != NULL) *bytes_consumed = offset + 1;
return value;
}
static int mqtt_prop_type_by_id(uint8_t prop_id) {
size_t num_properties = sizeof(mqtt_prop_map) / sizeof(mqtt_prop_map[0]);
for (size_t i = 0; i < num_properties; ++i) {
if (mqtt_prop_map[i].id == prop_id) {
return mqtt_prop_map[i].type;
}
}
return -1; // Property ID not found
}
/*
returns the size of the properties section, without the
size of the content's length
*/
static uint32_t get_properties_length(struct mg_mqtt_prop *props, int count) {
uint32_t size = 0;
for (int i = 0; i < count; i++) {
size++; // identifier
switch (mqtt_prop_type_by_id(props[i].id)) {
case MQTT_PROP_TYPE_STRING_PAIR:
size += (uint32_t)(props[i].val.len + props[i].key.len +
2 * sizeof(uint16_t));
break;
case MQTT_PROP_TYPE_STRING:
size += (uint32_t)(props[i].val.len + sizeof(uint16_t));
break;
case MQTT_PROP_TYPE_BINARY_DATA:
size += (uint32_t)(props[i].val.len + sizeof(uint16_t));
break;
case MQTT_PROP_TYPE_VARIABLE_INT:
size += compute_variable_length_size((uint32_t) props[i].iv);
break;
case MQTT_PROP_TYPE_INT:
size += (uint32_t) sizeof(uint32_t);
break;
case MQTT_PROP_TYPE_SHORT:
size += (uint32_t) sizeof(uint16_t);
break;
default:
return size; // cannot parse further down
}
}
return size;
}
/*
returns the entire size of the properties section, including the
size of the variable length of the content
*/
static uint32_t get_full_properties_size(struct mg_mqtt_prop *props,
int count) {
uint32_t size = get_properties_length(props, count);
size += compute_variable_length_size(size);
return size;
}
static void mg_send_mqtt_properties(struct mg_connection *c,
struct mg_mqtt_prop *props, int nr_props) {
uint32_t total_size = get_properties_length(props, nr_props);
uint8_t buf_v[4] = {0, 0, 0, 0};
uint8_t buf[4] = {0, 0, 0, 0};
int len = encode_variable_length(buf, (int) total_size);
mg_send(c, buf, (size_t) len);
for (int i = 0; i < nr_props; i++) {
mg_send(c, &props[i].id, sizeof(props[i].id));
switch (mqtt_prop_type_by_id(props[i].id)) {
case MQTT_PROP_TYPE_STRING_PAIR:
mg_send_u16(c, mg_htons((uint16_t) props[i].key.len));
mg_send(c, props[i].key.ptr, props[i].key.len);
mg_send_u16(c, mg_htons((uint16_t) props[i].val.len));
mg_send(c, props[i].val.ptr, props[i].val.len);
break;
case MQTT_PROP_TYPE_BYTE:
mg_send(c, &props[i].iv, sizeof(uint8_t));
break;
case MQTT_PROP_TYPE_SHORT:
mg_send_u16(c, mg_htons((uint16_t) props[i].iv));
break;
case MQTT_PROP_TYPE_INT:
mg_send_u32(c, mg_htonl((uint32_t) props[i].iv));
break;
case MQTT_PROP_TYPE_STRING:
mg_send_u16(c, mg_htons((uint16_t) props[i].val.len));
mg_send(c, props[i].val.ptr, props[i].val.len);
break;
case MQTT_PROP_TYPE_BINARY_DATA:
mg_send_u16(c, mg_htons((uint16_t) props[i].val.len));
mg_send(c, props[i].val.ptr, props[i].val.len);
break;
case MQTT_PROP_TYPE_VARIABLE_INT:
len = encode_variable_length(buf_v, (int) props[i].iv);
mg_send(c, buf_v, (size_t) len);
break;
}
}
}
size_t mg_mqtt_next_prop(struct mg_mqtt_message *msg, struct mg_mqtt_prop *prop,
size_t crt_pos) {
unsigned char *i =
(unsigned char *) msg->dgram.ptr + msg->props_start + crt_pos;
size_t new_pos = crt_pos;
uint32_t bytes_consumed;
if (crt_pos >= msg->dgram.len ||
crt_pos >= msg->props_start + msg->props_size)
return 0;
uint8_t id = i[0];
i++, new_pos++;
prop->id = id;
switch (mqtt_prop_type_by_id(id)) {
case MQTT_PROP_TYPE_STRING_PAIR:
prop->key.len = (uint16_t)((((uint16_t) i[0]) << 8) | i[1]);
prop->key.ptr = (char *) i + 2;
i += 2 + prop->key.len;
prop->val.len = (uint16_t)((((uint16_t) i[0]) << 8) | i[1]);
prop->val.ptr = (char *) i + 2;
new_pos += 2 * sizeof(uint16_t) + prop->val.len + prop->key.len;
break;
case MQTT_PROP_TYPE_BYTE:
prop->iv = (uint8_t) i[0];
new_pos++;
break;
case MQTT_PROP_TYPE_SHORT:
prop->iv = (uint16_t)((((uint16_t) i[0]) << 8) | i[1]);
new_pos += sizeof(uint16_t);
break;
case MQTT_PROP_TYPE_INT:
prop->iv = ((uint32_t) i[0] << 24) | ((uint32_t) i[1] << 16) |
((uint32_t) i[2] << 8) | i[3];
new_pos += sizeof(uint32_t);
break;
case MQTT_PROP_TYPE_STRING:
prop->val.len = (uint16_t)((((uint16_t) i[0]) << 8) | i[1]);
prop->val.ptr = (char *) i + 2;
new_pos += 2 + prop->val.len;
break;
case MQTT_PROP_TYPE_BINARY_DATA:
prop->val.len = (uint16_t)((((uint16_t) i[0]) << 8) | i[1]);
prop->val.ptr = (char *) i + 2;
new_pos += 2 + prop->val.len;
break;
case MQTT_PROP_TYPE_VARIABLE_INT:
prop->iv = decode_variable_length((char *) i, &bytes_consumed);
new_pos += bytes_consumed;
break;
default:
new_pos = 0;
}
return new_pos;
}
void mg_mqtt_login(struct mg_connection *c, const struct mg_mqtt_opts *opts) {
char rnd[10], client_id[21], zero = 0;
char rnd[10], client_id[21];
struct mg_str cid = opts->client_id;
uint32_t total_len = 7 + 1 + 2 + 2;
uint8_t hdr[8] = {0, 4, 'M', 'Q', 'T', 'T', opts->version, 0};
@ -44,7 +250,7 @@ void mg_mqtt_login(struct mg_connection *c, const struct mg_mqtt_opts *opts) {
if (hdr[6] == 0) hdr[6] = 4; // If version is not set, use 4 (3.1.1)
c->is_mqtt5 = hdr[6] == 5; // Set version 5 flag
hdr[7] = (uint8_t) ((opts->will_qos & 3) << 3); // Connection flags
hdr[7] = (uint8_t)((opts->qos & 3) << 3); // Connection flags
if (opts->user.len > 0) {
total_len += 2 + (uint32_t) opts->user.len;
hdr[7] |= MQTT_HAS_USER_NAME;
@ -53,31 +259,38 @@ void mg_mqtt_login(struct mg_connection *c, const struct mg_mqtt_opts *opts) {
total_len += 2 + (uint32_t) opts->pass.len;
hdr[7] |= MQTT_HAS_PASSWORD;
}
if (opts->will_topic.len > 0 && opts->will_message.len > 0) {
total_len +=
4 + (uint32_t) opts->will_topic.len + (uint32_t) opts->will_message.len;
if (opts->topic.len > 0 && opts->message.len > 0) {
total_len += 4 + (uint32_t) opts->topic.len + (uint32_t) opts->message.len;
hdr[7] |= MQTT_HAS_WILL;
}
if (opts->clean || cid.len == 0) hdr[7] |= MQTT_CLEAN_SESSION;
if (opts->will_retain) hdr[7] |= MQTT_WILL_RETAIN;
if (opts->retain) hdr[7] |= MQTT_WILL_RETAIN;
total_len += (uint32_t) cid.len;
if (c->is_mqtt5) total_len += 1U + (hdr[7] & MQTT_HAS_WILL ? 1U : 0);
if (c->is_mqtt5) {
total_len += get_full_properties_size(opts->props, opts->num_props);
if (hdr[7] & MQTT_HAS_WILL)
total_len += get_full_properties_size(opts->will_props,
opts->num_will_props);
}
mg_mqtt_send_header(c, MQTT_CMD_CONNECT, 0, total_len);
mg_send(c, hdr, sizeof(hdr));
// keepalive == 0 means "do not disconnect us!"
mg_send_u16(c, mg_htons((uint16_t) opts->keepalive));
if (c->is_mqtt5) mg_send(c, &zero, sizeof(zero)); // V5 properties
if (c->is_mqtt5) mg_send_mqtt_properties(c, opts->props, opts->num_props);
mg_send_u16(c, mg_htons((uint16_t) cid.len));
mg_send(c, cid.ptr, cid.len);
if (hdr[7] & MQTT_HAS_WILL) {
if (c->is_mqtt5) mg_send(c, &zero, sizeof(zero)); // will props
mg_send_u16(c, mg_htons((uint16_t) opts->will_topic.len));
mg_send(c, opts->will_topic.ptr, opts->will_topic.len);
mg_send_u16(c, mg_htons((uint16_t) opts->will_message.len));
mg_send(c, opts->will_message.ptr, opts->will_message.len);
if (c->is_mqtt5)
mg_send_mqtt_properties(c, opts->will_props, opts->num_will_props);
mg_send_u16(c, mg_htons((uint16_t) opts->topic.len));
mg_send(c, opts->topic.ptr, opts->topic.len);
mg_send_u16(c, mg_htons((uint16_t) opts->message.len));
mg_send(c, opts->message.ptr, opts->message.len);
}
if (opts->user.len > 0) {
mg_send_u16(c, mg_htons((uint16_t) opts->user.len));
@ -89,34 +302,43 @@ void mg_mqtt_login(struct mg_connection *c, const struct mg_mqtt_opts *opts) {
}
}
void mg_mqtt_pub(struct mg_connection *c, struct mg_str topic,
struct mg_str data, int qos, bool retain) {
uint8_t flags = (uint8_t) (((qos & 3) << 1) | (retain ? 1 : 0)), zero = 0;
uint32_t len = 2 + (uint32_t) topic.len + (uint32_t) data.len;
MG_DEBUG(("%lu [%.*s] -> [%.*s]", c->id, (int) topic.len, (char *) topic.ptr,
(int) data.len, (char *) data.ptr));
if (qos > 0) len += 2;
if (c->is_mqtt5) len++;
void mg_mqtt_pub(struct mg_connection *c, const struct mg_mqtt_opts *opts) {
uint8_t flags = (uint8_t)(((opts->qos & 3) << 1) | (opts->retain ? 1 : 0));
uint32_t len = 2 + (uint32_t) opts->topic.len + (uint32_t) opts->message.len;
MG_DEBUG(("%lu [%.*s] -> [%.*s]", c->id, (int) opts->topic.len,
(char *) opts->topic.ptr, (int) opts->message.len,
(char *) opts->message.ptr));
if (opts->qos > 0) len += 2;
if (c->is_mqtt5)
len += get_full_properties_size(opts->props, opts->num_props);
mg_mqtt_send_header(c, MQTT_CMD_PUBLISH, flags, len);
mg_send_u16(c, mg_htons((uint16_t) topic.len));
mg_send(c, topic.ptr, topic.len);
if (qos > 0) {
mg_send_u16(c, mg_htons((uint16_t) opts->topic.len));
mg_send(c, opts->topic.ptr, opts->topic.len);
if (opts->qos > 0) {
if (++c->mgr->mqtt_id == 0) ++c->mgr->mqtt_id;
mg_send_u16(c, mg_htons(c->mgr->mqtt_id));
}
if (c->is_mqtt5) mg_send(c, &zero, sizeof(zero));
mg_send(c, data.ptr, data.len);
if (c->is_mqtt5) mg_send_mqtt_properties(c, opts->props, opts->num_props);
mg_send(c, opts->message.ptr, opts->message.len);
}
void mg_mqtt_sub(struct mg_connection *c, struct mg_str topic, int qos) {
uint8_t qos_ = qos & 3, zero = 0;
uint32_t len = 2 + (uint32_t) topic.len + 2 + 1 + (c->is_mqtt5 ? 1 : 0);
void mg_mqtt_sub(struct mg_connection *c, const struct mg_mqtt_opts *opts) {
uint8_t qos_ = opts->qos & 3;
uint32_t len =
2 + (uint32_t) opts->topic.len + 2 + 1 +
(c->is_mqtt5 ? get_full_properties_size(opts->props, opts->num_props)
: 0);
mg_mqtt_send_header(c, MQTT_CMD_SUBSCRIBE, 2, len);
if (++c->mgr->mqtt_id == 0) ++c->mgr->mqtt_id;
mg_send_u16(c, mg_htons(c->mgr->mqtt_id));
if (c->is_mqtt5) mg_send(c, &zero, sizeof(zero));
mg_send_u16(c, mg_htons((uint16_t) topic.len));
mg_send(c, topic.ptr, topic.len);
if (c->is_mqtt5) mg_send_mqtt_properties(c, opts->props, opts->num_props);
mg_send_u16(c, mg_htons((uint16_t) opts->topic.len));
mg_send(c, opts->topic.ptr, opts->topic.len);
mg_send(c, &qos_, sizeof(qos_));
}
@ -128,21 +350,21 @@ int mg_mqtt_parse(const uint8_t *buf, size_t len, uint8_t version,
memset(m, 0, sizeof(*m));
m->dgram.ptr = (char *) buf;
if (len < 2) return MQTT_INCOMPLETE;
m->cmd = (uint8_t) (buf[0] >> 4);
m->cmd = (uint8_t)(buf[0] >> 4);
m->qos = (buf[0] >> 1) & 3;
n = len_len = 0;
p = (uint8_t *) buf + 1;
while ((size_t) (p - buf) < len) {
while ((size_t)(p - buf) < len) {
lc = *((uint8_t *) p++);
n += (uint32_t) ((lc & 0x7f) << 7 * len_len);
n += (uint32_t)((lc & 0x7f) << 7 * len_len);
len_len++;
if (!(lc & 0x80)) break;
if (len_len >= 4) return MQTT_MALFORMED;
}
end = p + n;
if ((lc & 0x80) || (end > buf + len)) return MQTT_INCOMPLETE;
m->dgram.len = (size_t) (end - buf);
m->dgram.len = (size_t)(end - buf);
switch (m->cmd) {
case MQTT_CMD_CONNACK:
@ -158,25 +380,29 @@ int mg_mqtt_parse(const uint8_t *buf, size_t len, uint8_t version,
case MQTT_CMD_UNSUBSCRIBE:
case MQTT_CMD_UNSUBACK:
if (p + 2 > end) return MQTT_MALFORMED;
m->id = (uint16_t) ((((uint16_t) p[0]) << 8) | p[1]);
m->id = (uint16_t)((((uint16_t) p[0]) << 8) | p[1]);
p += 2;
break;
case MQTT_CMD_PUBLISH: {
if (p + 2 > end) return MQTT_MALFORMED;
m->topic.len = (uint16_t) ((((uint16_t) p[0]) << 8) | p[1]);
m->topic.len = (uint16_t)((((uint16_t) p[0]) << 8) | p[1]);
m->topic.ptr = (char *) p + 2;
p += 2 + m->topic.len;
if (p > end) return MQTT_MALFORMED;
if (m->qos > 0) {
if (p + 2 > end) return MQTT_MALFORMED;
m->id = (uint16_t) ((((uint16_t) p[0]) << 8) | p[1]);
m->id = (uint16_t)((((uint16_t) p[0]) << 8) | p[1]);
p += 2;
}
if (p > end) return MQTT_MALFORMED;
if (version == 5 && p + 2 < end) p += 1 + p[0]; // Skip options
if (version == 5 && p + 2 < end) {
m->props_size = decode_variable_length((char *) p, &len_len);
m->props_start = (size_t)(p + len_len - buf);
p += len_len + m->props_size;
}
if (p > end) return MQTT_MALFORMED;
m->data.ptr = (char *) p;
m->data.len = (size_t) (end - p);
m->data.len = (size_t)(end - p);
break;
}
default:
@ -214,8 +440,16 @@ static void mqtt_cb(struct mg_connection *c, int ev, void *ev_data,
mm.topic.ptr, (int) mm.data.len, mm.data.ptr));
if (mm.qos > 0) {
uint16_t id = mg_htons(mm.id);
mg_mqtt_send_header(c, MQTT_CMD_PUBACK, 0, sizeof(id));
uint32_t remaining_len = sizeof(id);
if (c->is_mqtt5) remaining_len += 1;
mg_mqtt_send_header(c, MQTT_CMD_PUBACK, 0, remaining_len);
mg_send(c, &id, sizeof(id));
if (c->is_mqtt5) {
uint16_t zero = 0;
mg_send(c, &zero, sizeof(zero));
}
}
mg_call(c, MG_EV_MQTT_MSG, &mm);
break;
@ -240,8 +474,22 @@ void mg_mqtt_pong(struct mg_connection *nc) {
mg_mqtt_send_header(nc, MQTT_CMD_PINGRESP, 0, 0);
}
void mg_mqtt_disconnect(struct mg_connection *nc) {
mg_mqtt_send_header(nc, MQTT_CMD_DISCONNECT, 0, 0);
void mg_mqtt_disconnect(struct mg_connection *nc,
const struct mg_mqtt_opts *opts) {
uint32_t len;
if (nc->is_mqtt5)
len = 1 + get_full_properties_size(opts->props, opts->num_props);
else
len = 0;
mg_mqtt_send_header(nc, MQTT_CMD_DISCONNECT, 0, len);
if (nc->is_mqtt5) {
uint8_t zero = 0;
mg_send(nc, &zero, sizeof(zero)); // reason code
mg_send_mqtt_properties(nc, opts->props, opts->num_props);
}
}
struct mg_connection *mg_mqtt_connect(struct mg_mgr *mgr, const char *url,

View File

@ -19,19 +19,102 @@
#define MQTT_CMD_DISCONNECT 14
#define MQTT_CMD_AUTH 15
#define MQTT_PROP_PAYLOAD_FORMAT_INDICATOR 0x01
#define MQTT_PROP_MESSAGE_EXPIRY_INTERVAL 0x02
#define MQTT_PROP_CONTENT_TYPE 0x03
#define MQTT_PROP_RESPONSE_TOPIC 0x08
#define MQTT_PROP_CORRELATION_DATA 0x09
#define MQTT_PROP_SUBSCRIPTION_IDENTIFIER 0x0B
#define MQTT_PROP_SESSION_EXPIRY_INTERVAL 0x11
#define MQTT_PROP_ASSIGNED_CLIENT_IDENTIFIER 0x12
#define MQTT_PROP_SERVER_KEEP_ALIVE 0x13
#define MQTT_PROP_AUTHENTICATION_METHOD 0x15
#define MQTT_PROP_AUTHENTICATION_DATA 0x16
#define MQTT_PROP_REQUEST_PROBLEM_INFORMATION 0x17
#define MQTT_PROP_WILL_DELAY_INTERVAL 0x18
#define MQTT_PROP_REQUEST_RESPONSE_INFORMATION 0x19
#define MQTT_PROP_RESPONSE_INFORMATION 0x1A
#define MQTT_PROP_SERVER_REFERENCE 0x1C
#define MQTT_PROP_REASON_STRING 0x1F
#define MQTT_PROP_RECEIVE_MAXIMUM 0x21
#define MQTT_PROP_TOPIC_ALIAS_MAXIMUM 0x22
#define MQTT_PROP_TOPIC_ALIAS 0x23
#define MQTT_PROP_MAXIMUM_QOS 0x24
#define MQTT_PROP_RETAIN_AVAILABLE 0x25
#define MQTT_PROP_USER_PROPERTY 0x26
#define MQTT_PROP_MAXIMUM_PACKET_SIZE 0x27
#define MQTT_PROP_WILDCARD_SUBSCRIPTION_AVAILABLE 0x28
#define MQTT_PROP_SUBSCRIPTION_IDENTIFIER_AVAILABLE 0x29
#define MQTT_PROP_SHARED_SUBSCRIPTION_AVAILABLE 0x2A
enum {
MQTT_PROP_TYPE_BYTE,
MQTT_PROP_TYPE_STRING,
MQTT_PROP_TYPE_STRING_PAIR,
MQTT_PROP_TYPE_BINARY_DATA,
MQTT_PROP_TYPE_VARIABLE_INT,
MQTT_PROP_TYPE_INT,
MQTT_PROP_TYPE_SHORT
};
enum { MQTT_OK, MQTT_INCOMPLETE, MQTT_MALFORMED };
struct mqtt_prop_map_t {
uint8_t id;
uint8_t type;
};
static const struct mqtt_prop_map_t mqtt_prop_map[] = {
{MQTT_PROP_PAYLOAD_FORMAT_INDICATOR, MQTT_PROP_TYPE_BYTE},
{MQTT_PROP_MESSAGE_EXPIRY_INTERVAL, MQTT_PROP_TYPE_INT},
{MQTT_PROP_CONTENT_TYPE, MQTT_PROP_TYPE_STRING},
{MQTT_PROP_RESPONSE_TOPIC, MQTT_PROP_TYPE_STRING},
{MQTT_PROP_CORRELATION_DATA, MQTT_PROP_TYPE_BINARY_DATA},
{MQTT_PROP_SUBSCRIPTION_IDENTIFIER, MQTT_PROP_TYPE_VARIABLE_INT},
{MQTT_PROP_SESSION_EXPIRY_INTERVAL, MQTT_PROP_TYPE_INT},
{MQTT_PROP_ASSIGNED_CLIENT_IDENTIFIER, MQTT_PROP_TYPE_STRING},
{MQTT_PROP_SERVER_KEEP_ALIVE, MQTT_PROP_TYPE_SHORT},
{MQTT_PROP_AUTHENTICATION_METHOD, MQTT_PROP_TYPE_STRING},
{MQTT_PROP_AUTHENTICATION_DATA, MQTT_PROP_TYPE_BINARY_DATA},
{MQTT_PROP_REQUEST_PROBLEM_INFORMATION, MQTT_PROP_TYPE_BYTE},
{MQTT_PROP_WILL_DELAY_INTERVAL, MQTT_PROP_TYPE_INT},
{MQTT_PROP_REQUEST_RESPONSE_INFORMATION, MQTT_PROP_TYPE_BYTE},
{MQTT_PROP_RESPONSE_INFORMATION, MQTT_PROP_TYPE_STRING},
{MQTT_PROP_SERVER_REFERENCE, MQTT_PROP_TYPE_STRING},
{MQTT_PROP_REASON_STRING, MQTT_PROP_TYPE_STRING},
{MQTT_PROP_RECEIVE_MAXIMUM, MQTT_PROP_TYPE_SHORT},
{MQTT_PROP_TOPIC_ALIAS_MAXIMUM, MQTT_PROP_TYPE_SHORT},
{MQTT_PROP_TOPIC_ALIAS, MQTT_PROP_TYPE_SHORT},
{MQTT_PROP_MAXIMUM_QOS, MQTT_PROP_TYPE_BYTE},
{MQTT_PROP_RETAIN_AVAILABLE, MQTT_PROP_TYPE_BYTE},
{MQTT_PROP_USER_PROPERTY, MQTT_PROP_TYPE_STRING_PAIR},
{MQTT_PROP_MAXIMUM_PACKET_SIZE, MQTT_PROP_TYPE_INT},
{MQTT_PROP_WILDCARD_SUBSCRIPTION_AVAILABLE, MQTT_PROP_TYPE_BYTE},
{MQTT_PROP_SUBSCRIPTION_IDENTIFIER_AVAILABLE, MQTT_PROP_TYPE_BYTE},
{MQTT_PROP_SHARED_SUBSCRIPTION_AVAILABLE, MQTT_PROP_TYPE_BYTE}};
struct mg_mqtt_prop {
uint8_t id; // Enumerated at MQTT5 Reference
uint32_t iv; // integer value for 8-, 16-, 32-bit integers types
struct mg_str key; // non-NULL only for user property type
struct mg_str val; // non-NULL only for UTF-8 types and user properties
};
struct mg_mqtt_opts {
struct mg_str user; // Username, can be empty
struct mg_str pass; // Password, can be empty
struct mg_str client_id; // Client ID
struct mg_str will_topic; // Will topic
struct mg_str will_message; // Will message
uint8_t will_qos; // Will message quality of service
struct mg_str topic; // topic
struct mg_str message; // message
uint8_t qos; // message quality of service
uint8_t version; // Can be 4 (3.1.1), or 5. If 0, assume 4.
uint16_t keepalive; // Keep-alive timer in seconds
bool will_retain; // Retain last will
bool retain; // Retain last will
bool clean; // Use clean session, 0 or 1
struct mg_mqtt_prop *props; // MQTT5 props array
int num_props; // number of props
struct mg_mqtt_prop *will_props; // only found in the CONNECT packet
int num_will_props; // number of will props
};
struct mg_mqtt_message {
@ -42,6 +125,8 @@ struct mg_mqtt_message {
uint8_t cmd; // MQTT command, one of MQTT_CMD_*
uint8_t qos; // Quality of service
uint8_t ack; // Connack return code. 0 - success
size_t props_start; // offset to the start of the properties
size_t props_size; // length of the properties
};
struct mg_connection *mg_mqtt_connect(struct mg_mgr *, const char *url,
@ -50,12 +135,14 @@ struct mg_connection *mg_mqtt_connect(struct mg_mgr *, const char *url,
struct mg_connection *mg_mqtt_listen(struct mg_mgr *mgr, const char *url,
mg_event_handler_t fn, void *fn_data);
void mg_mqtt_login(struct mg_connection *c, const struct mg_mqtt_opts *opts);
void mg_mqtt_pub(struct mg_connection *c, struct mg_str topic,
struct mg_str data, int qos, bool retain);
void mg_mqtt_sub(struct mg_connection *, struct mg_str topic, int qos);
void mg_mqtt_pub(struct mg_connection *c, const struct mg_mqtt_opts *opts);
void mg_mqtt_sub(struct mg_connection *, const struct mg_mqtt_opts *opts);
int mg_mqtt_parse(const uint8_t *, size_t, uint8_t, struct mg_mqtt_message *);
void mg_mqtt_send_header(struct mg_connection *, uint8_t cmd, uint8_t flags,
uint32_t len);
void mg_mqtt_ping(struct mg_connection *);
void mg_mqtt_pong(struct mg_connection *);
void mg_mqtt_disconnect(struct mg_connection *);
void mg_mqtt_disconnect(struct mg_connection *,
const struct mg_mqtt_opts *opts);
size_t mg_mqtt_next_prop(struct mg_mqtt_message *, struct mg_mqtt_prop *,
size_t ofs);

View File

@ -350,6 +350,7 @@ struct mqtt_data {
static void mqtt_cb(struct mg_connection *c, int ev, void *evd, void *fnd) {
struct mqtt_data *test_data = (struct mqtt_data *) fnd;
char *buf = test_data->buf;
if (ev == MG_EV_MQTT_OPEN) {
buf[0] = *(int *) evd == 0 ? 'X' : 'Y';
} else if (ev == MG_EV_MQTT_CMD) {
@ -364,10 +365,56 @@ static void mqtt_cb(struct mg_connection *c, int ev, void *evd, void *fnd) {
struct mg_mqtt_message *mm = (struct mg_mqtt_message *) evd;
snprintf(buf + 1, test_data->bufsize, "%.*s/%.*s", (int) mm->topic.len,
mm->topic.ptr, (int) mm->data.len, mm->data.ptr);
if (mm->cmd == MQTT_CMD_PUBLISH && c->is_mqtt5) {
size_t pos = 0;
struct mg_mqtt_prop prop;
// note: the server will send the properties sorted by their ID
ASSERT((pos = mg_mqtt_next_prop(mm, &prop, pos)) > 0);
ASSERT(prop.iv == 10 && prop.id == MQTT_PROP_MESSAGE_EXPIRY_INTERVAL);
ASSERT((pos = mg_mqtt_next_prop(mm, &prop, pos)) > 0);
ASSERT(prop.id == MQTT_PROP_CONTENT_TYPE);
ASSERT(strncmp(prop.val.ptr, "test_content_val_2", prop.val.len) == 0 &&
prop.val.len == strlen("test_content_val_2"));
ASSERT((pos = mg_mqtt_next_prop(mm, &prop, pos)) > 0);
ASSERT(prop.id == MQTT_PROP_USER_PROPERTY);
ASSERT(strncmp(prop.key.ptr, "test_key_1", prop.key.len) == 0 &&
prop.key.len == strlen("test_key_1"));
ASSERT(strncmp(prop.val.ptr, "test_value_1", prop.val.len) == 0 &&
prop.val.len == strlen("test_value_1"));
ASSERT((pos = mg_mqtt_next_prop(mm, &prop, pos)) > 0);
ASSERT(prop.id == MQTT_PROP_USER_PROPERTY);
ASSERT(strncmp(prop.key.ptr, "test_key_2", prop.key.len) == 0 &&
prop.key.len == strlen("test_key_2"));
ASSERT(strncmp(prop.val.ptr, "test_value_2", prop.val.len) == 0 &&
prop.val.len == strlen("test_value_2"));
ASSERT((pos = mg_mqtt_next_prop(mm, &prop, pos)) == 0);
}
}
(void) c;
}
static void construct_props(struct mg_mqtt_prop *props) {
props[0].id = MQTT_PROP_MESSAGE_EXPIRY_INTERVAL;
props[0].iv = 10;
props[1].id = MQTT_PROP_USER_PROPERTY;
props[1].key = mg_str("test_key_1");
props[1].val = mg_str("test_value_1");
props[2].id = MQTT_PROP_USER_PROPERTY;
props[2].key = mg_str("test_key_2");
props[2].val = mg_str("test_value_2");
props[3].id = MQTT_PROP_CONTENT_TYPE;
props[3].val = mg_str("test_content_val_2");
}
static void test_mqtt_base(void);
static void test_mqtt_base(void) {
char buf[50] = {0};
@ -396,6 +443,7 @@ static void test_mqtt_ver(uint8_t mqtt_version) {
struct mg_str topic = mg_str("x/f12"), data = mg_str("hi");
struct mg_connection *c;
struct mg_mqtt_opts opts;
struct mg_mqtt_prop properties[4];
const char *url = "mqtt://broker.hivemq.com:1883";
int i;
mg_mgr_init(&mgr);
@ -406,11 +454,15 @@ static void test_mqtt_ver(uint8_t mqtt_version) {
if (buf[0] != 'X') MG_INFO(("[%s]", buf));
ASSERT(buf[0] == 'X');
ASSERT(test_data.subscribed == 0);
mg_mqtt_sub(c, topic, 1);
opts.topic = topic, opts.qos = 1;
mg_mqtt_sub(c, &opts);
for (i = 0; i < 500 && test_data.subscribed == 0; i++) mg_mgr_poll(&mgr, 10);
ASSERT(test_data.subscribed == 1);
ASSERT(test_data.published == 0);
mg_mqtt_pub(c, topic, data, 1, false);
opts.topic = topic, opts.message = data, opts.qos = 1, opts.retain = false;
mg_mqtt_pub(c, &opts);
for (i = 0; i < 500 && test_data.published == 0; i++) mg_mgr_poll(&mgr, 10);
ASSERT(test_data.published == 1);
for (i = 0; i < 500 && buf[1] == 0; i++) mg_mgr_poll(&mgr, 10);
@ -422,24 +474,32 @@ static void test_mqtt_ver(uint8_t mqtt_version) {
test_data.published = 0;
memset(buf, 0, sizeof(buf));
memset(&opts, 0, sizeof(opts));
opts.clean = true;
opts.will_qos = 1;
opts.will_retain = true;
opts.keepalive = 20;
opts.clean = true, opts.qos = 1, opts.retain = true, opts.keepalive = 20;
opts.version = mqtt_version;
opts.will_topic = mg_str(mg_random_str(will_topic, sizeof(will_topic)));
opts.will_message = mg_str("mg_will_messsage");
opts.topic = mg_str(mg_random_str(will_topic, sizeof(will_topic)));
opts.message = mg_str("mg_will_messsage");
opts.client_id = mg_str(mg_random_str(client_id, sizeof(client_id)));
c = mg_mqtt_connect(&mgr, url, &opts, mqtt_cb, &test_data);
for (i = 0; i < 300 && buf[0] == 0; i++) mg_mgr_poll(&mgr, 10);
if (buf[0] != 'X') MG_INFO(("[%s]", buf));
ASSERT(buf[0] == 'X');
ASSERT(test_data.subscribed == 0);
mg_mqtt_sub(c, topic, 1);
opts.topic = topic, opts.qos = 1;
mg_mqtt_sub(c, &opts);
for (i = 0; i < 500 && test_data.subscribed == 0; i++) mg_mgr_poll(&mgr, 10);
ASSERT(test_data.subscribed == 1);
ASSERT(test_data.published == 0);
mg_mqtt_pub(c, topic, data, 1, false);
opts.topic = topic, opts.message = data, opts.qos = 1, opts.retain = false;
if (mqtt_version == 5) {
opts.props = properties;
opts.num_props = 4;
construct_props(properties);
}
mg_mqtt_pub(c, &opts);
for (i = 0; i < 500 && test_data.published == 0; i++) mg_mgr_poll(&mgr, 10);
ASSERT(test_data.published == 1);
for (i = 0; i < 500 && buf[1] == 0; i++) mg_mgr_poll(&mgr, 10);
@ -452,8 +512,8 @@ static void test_mqtt_ver(uint8_t mqtt_version) {
static void test_mqtt(void) {
test_mqtt_base();
test_mqtt_ver(5);
test_mqtt_ver(4);
test_mqtt_ver(5);
test_mqtt_base();
}
@ -580,7 +640,7 @@ static int cmpbody(const char *buf, const char *str) {
struct mg_str s = mg_str(str);
size_t len = strlen(buf);
mg_http_parse(buf, len, &hm);
if (hm.body.len > len) hm.body.len = len - (size_t) (hm.body.ptr - buf);
if (hm.body.len > len) hm.body.len = len - (size_t)(hm.body.ptr - buf);
return mg_strcmp(hm.body, s);
}