diff --git a/mongoose.c b/mongoose.c index e1e97fbc..22d59c83 100644 --- a/mongoose.c +++ b/mongoose.c @@ -2917,17 +2917,51 @@ void mg_md5_final(mg_md5_ctx *ctx, unsigned char digest[16]) { #define MQTT_HAS_PASSWORD 0x40 #define MQTT_HAS_USER_NAME 0x80 +struct mg_mqtt_pmap { + uint8_t id; + uint8_t type; +}; + +static const struct mg_mqtt_pmap s_prop_map[] = { + {MQTT_PROP_PAYLOAD_FORMAT_INDICATOR, MQTT_PROP_TYPE_BYTE}, + {MQTT_PROP_MESSAGE_EXPIRY_INTERVAL, MQTT_PROP_TYPE_INT}, + {MQTT_PROP_CONTENT_TYPE, MQTT_PROP_TYPE_STRING}, + {MQTT_PROP_RESPONSE_TOPIC, MQTT_PROP_TYPE_STRING}, + {MQTT_PROP_CORRELATION_DATA, MQTT_PROP_TYPE_BINARY_DATA}, + {MQTT_PROP_SUBSCRIPTION_IDENTIFIER, MQTT_PROP_TYPE_VARIABLE_INT}, + {MQTT_PROP_SESSION_EXPIRY_INTERVAL, MQTT_PROP_TYPE_INT}, + {MQTT_PROP_ASSIGNED_CLIENT_IDENTIFIER, MQTT_PROP_TYPE_STRING}, + {MQTT_PROP_SERVER_KEEP_ALIVE, MQTT_PROP_TYPE_SHORT}, + {MQTT_PROP_AUTHENTICATION_METHOD, MQTT_PROP_TYPE_STRING}, + {MQTT_PROP_AUTHENTICATION_DATA, MQTT_PROP_TYPE_BINARY_DATA}, + {MQTT_PROP_REQUEST_PROBLEM_INFORMATION, MQTT_PROP_TYPE_BYTE}, + {MQTT_PROP_WILL_DELAY_INTERVAL, MQTT_PROP_TYPE_INT}, + {MQTT_PROP_REQUEST_RESPONSE_INFORMATION, MQTT_PROP_TYPE_BYTE}, + {MQTT_PROP_RESPONSE_INFORMATION, MQTT_PROP_TYPE_STRING}, + {MQTT_PROP_SERVER_REFERENCE, MQTT_PROP_TYPE_STRING}, + {MQTT_PROP_REASON_STRING, MQTT_PROP_TYPE_STRING}, + {MQTT_PROP_RECEIVE_MAXIMUM, MQTT_PROP_TYPE_SHORT}, + {MQTT_PROP_TOPIC_ALIAS_MAXIMUM, MQTT_PROP_TYPE_SHORT}, + {MQTT_PROP_TOPIC_ALIAS, MQTT_PROP_TYPE_SHORT}, + {MQTT_PROP_MAXIMUM_QOS, MQTT_PROP_TYPE_BYTE}, + {MQTT_PROP_RETAIN_AVAILABLE, MQTT_PROP_TYPE_BYTE}, + {MQTT_PROP_USER_PROPERTY, MQTT_PROP_TYPE_STRING_PAIR}, + {MQTT_PROP_MAXIMUM_PACKET_SIZE, MQTT_PROP_TYPE_INT}, + {MQTT_PROP_WILDCARD_SUBSCRIPTION_AVAILABLE, MQTT_PROP_TYPE_BYTE}, + {MQTT_PROP_SUBSCRIPTION_IDENTIFIER_AVAILABLE, MQTT_PROP_TYPE_BYTE}, + {MQTT_PROP_SHARED_SUBSCRIPTION_AVAILABLE, MQTT_PROP_TYPE_BYTE}}; + void mg_mqtt_send_header(struct mg_connection *c, uint8_t cmd, uint8_t flags, uint32_t len) { uint8_t buf[1 + sizeof(len)], *vlen = &buf[1]; - buf[0] = (uint8_t)((cmd << 4) | flags); + buf[0] = (uint8_t) ((cmd << 4) | flags); do { *vlen = len % 0x80; len /= 0x80; if (len > 0) *vlen |= 0x80; vlen++; } while (len > 0 && vlen < &buf[sizeof(buf)]); - mg_send(c, buf, (size_t)(vlen - buf)); + mg_send(c, buf, (size_t) (vlen - buf)); } static void mg_send_u16(struct mg_connection *c, uint16_t value) { @@ -2938,21 +2972,20 @@ static void mg_send_u32(struct mg_connection *c, uint32_t value) { mg_send(c, &value, sizeof(value)); } -static uint8_t compute_variable_length_size(uint32_t length) { +static uint8_t compute_variable_length_size(size_t length) { uint8_t bytes_needed = 0; do { bytes_needed++; length /= 0x80; } while (length > 0); - return bytes_needed; } -static int encode_variable_length(uint8_t *buf, int value) { +static int encode_variable_length(uint8_t *buf, size_t value) { int len = 0; do { - uint8_t byte = (uint8_t)(value % 128); + uint8_t byte = (uint8_t) (value % 128); value /= 128; if (value > 0) byte |= 0x80; buf[len++] = byte; @@ -2963,14 +2996,10 @@ static int encode_variable_length(uint8_t *buf, int value) { static uint32_t decode_variable_length(const char *buf, uint32_t *bytes_consumed) { - const uint8_t *p = (const uint8_t *) buf; - uint32_t value = 0; - uint32_t multiplier = 1; - uint8_t encoded_byte; - uint32_t offset; + uint32_t value = 0, multiplier = 1, offset; for (offset = 0; offset < 4; offset++) { - encoded_byte = p[offset]; + uint8_t encoded_byte = ((uint8_t *) buf)[offset]; value += (encoded_byte & 0x7F) * multiplier; multiplier *= 128; @@ -2983,72 +3012,59 @@ static uint32_t decode_variable_length(const char *buf, } static int mqtt_prop_type_by_id(uint8_t prop_id) { - size_t num_properties = sizeof(mqtt_prop_map) / sizeof(mqtt_prop_map[0]); - for (size_t i = 0; i < num_properties; ++i) { - if (mqtt_prop_map[i].id == prop_id) { - return mqtt_prop_map[i].type; - } + size_t i, num_properties = sizeof(s_prop_map) / sizeof(s_prop_map[0]); + for (i = 0; i < num_properties; ++i) { + if (s_prop_map[i].id == prop_id) return s_prop_map[i].type; } - return -1; // Property ID not found } -/* -returns the size of the properties section, without the -size of the content's length -*/ -static uint32_t get_properties_length(struct mg_mqtt_prop *props, int count) { - uint32_t size = 0; - for (int i = 0; i < count; i++) { +// Returns the size of the properties section, without the +// size of the content's length +static size_t get_properties_length(struct mg_mqtt_prop *props, size_t count) { + size_t i, size = 0; + for (i = 0; i < count; i++) { size++; // identifier switch (mqtt_prop_type_by_id(props[i].id)) { case MQTT_PROP_TYPE_STRING_PAIR: - size += (uint32_t)(props[i].val.len + props[i].key.len + - 2 * sizeof(uint16_t)); + size += (uint32_t) (props[i].val.len + props[i].key.len + + 2 * sizeof(uint16_t)); break; case MQTT_PROP_TYPE_STRING: - size += (uint32_t)(props[i].val.len + sizeof(uint16_t)); + size += (uint32_t) (props[i].val.len + sizeof(uint16_t)); break; case MQTT_PROP_TYPE_BINARY_DATA: - size += (uint32_t)(props[i].val.len + sizeof(uint16_t)); + size += (uint32_t) (props[i].val.len + sizeof(uint16_t)); break; case MQTT_PROP_TYPE_VARIABLE_INT: size += compute_variable_length_size((uint32_t) props[i].iv); break; - case MQTT_PROP_TYPE_INT: - size += (uint32_t) sizeof(uint32_t); - break; - case MQTT_PROP_TYPE_SHORT: - size += (uint32_t) sizeof(uint16_t); - break; - default: - return size; // cannot parse further down + case MQTT_PROP_TYPE_INT: size += (uint32_t) sizeof(uint32_t); break; + case MQTT_PROP_TYPE_SHORT: size += (uint32_t) sizeof(uint16_t); break; + default: return size; // cannot parse further down } } return size; } -/* -returns the entire size of the properties section, including the -size of the variable length of the content -*/ -static uint32_t get_full_properties_size(struct mg_mqtt_prop *props, - int count) { - uint32_t size = get_properties_length(props, count); +// returns the entire size of the properties section, including the +// size of the variable length of the content +static size_t get_props_size(struct mg_mqtt_prop *props, size_t count) { + size_t size = get_properties_length(props, count); size += compute_variable_length_size(size); return size; } static void mg_send_mqtt_properties(struct mg_connection *c, - struct mg_mqtt_prop *props, int nr_props) { - uint32_t total_size = get_properties_length(props, nr_props); + struct mg_mqtt_prop *props, size_t nprops) { + size_t total_size = get_properties_length(props, nprops); uint8_t buf_v[4] = {0, 0, 0, 0}; uint8_t buf[4] = {0, 0, 0, 0}; - int len = encode_variable_length(buf, (int) total_size); + int i, len = encode_variable_length(buf, total_size); mg_send(c, buf, (size_t) len); - for (int i = 0; i < nr_props; i++) { + for (i = 0; i < (int) nprops; i++) { mg_send(c, &props[i].id, sizeof(props[i].id)); switch (mqtt_prop_type_by_id(props[i].id)) { case MQTT_PROP_TYPE_STRING_PAIR: @@ -3075,7 +3091,7 @@ static void mg_send_mqtt_properties(struct mg_connection *c, mg_send(c, props[i].val.ptr, props[i].val.len); break; case MQTT_PROP_TYPE_VARIABLE_INT: - len = encode_variable_length(buf_v, (int) props[i].iv); + len = encode_variable_length(buf_v, props[i].iv); mg_send(c, buf_v, (size_t) len); break; } @@ -3083,26 +3099,22 @@ static void mg_send_mqtt_properties(struct mg_connection *c, } size_t mg_mqtt_next_prop(struct mg_mqtt_message *msg, struct mg_mqtt_prop *prop, - size_t crt_pos) { - unsigned char *i = - (unsigned char *) msg->dgram.ptr + msg->props_start + crt_pos; - size_t new_pos = crt_pos; + size_t ofs) { + uint8_t *i = (uint8_t *) msg->dgram.ptr + msg->props_start + ofs; + size_t new_pos = ofs; uint32_t bytes_consumed; + prop->id = i[0]; - if (crt_pos >= msg->dgram.len || - crt_pos >= msg->props_start + msg->props_size) + if (ofs >= msg->dgram.len || ofs >= msg->props_start + msg->props_size) return 0; - - uint8_t id = i[0]; i++, new_pos++; - prop->id = id; - switch (mqtt_prop_type_by_id(id)) { + switch (mqtt_prop_type_by_id(prop->id)) { case MQTT_PROP_TYPE_STRING_PAIR: - prop->key.len = (uint16_t)((((uint16_t) i[0]) << 8) | i[1]); + prop->key.len = (uint16_t) ((((uint16_t) i[0]) << 8) | i[1]); prop->key.ptr = (char *) i + 2; i += 2 + prop->key.len; - prop->val.len = (uint16_t)((((uint16_t) i[0]) << 8) | i[1]); + prop->val.len = (uint16_t) ((((uint16_t) i[0]) << 8) | i[1]); prop->val.ptr = (char *) i + 2; new_pos += 2 * sizeof(uint16_t) + prop->val.len + prop->key.len; break; @@ -3111,7 +3123,7 @@ size_t mg_mqtt_next_prop(struct mg_mqtt_message *msg, struct mg_mqtt_prop *prop, new_pos++; break; case MQTT_PROP_TYPE_SHORT: - prop->iv = (uint16_t)((((uint16_t) i[0]) << 8) | i[1]); + prop->iv = (uint16_t) ((((uint16_t) i[0]) << 8) | i[1]); new_pos += sizeof(uint16_t); break; case MQTT_PROP_TYPE_INT: @@ -3120,12 +3132,12 @@ size_t mg_mqtt_next_prop(struct mg_mqtt_message *msg, struct mg_mqtt_prop *prop, new_pos += sizeof(uint32_t); break; case MQTT_PROP_TYPE_STRING: - prop->val.len = (uint16_t)((((uint16_t) i[0]) << 8) | i[1]); + prop->val.len = (uint16_t) ((((uint16_t) i[0]) << 8) | i[1]); prop->val.ptr = (char *) i + 2; new_pos += 2 + prop->val.len; break; case MQTT_PROP_TYPE_BINARY_DATA: - prop->val.len = (uint16_t)((((uint16_t) i[0]) << 8) | i[1]); + prop->val.len = (uint16_t) ((((uint16_t) i[0]) << 8) | i[1]); prop->val.ptr = (char *) i + 2; new_pos += 2 + prop->val.len; break; @@ -3133,8 +3145,7 @@ size_t mg_mqtt_next_prop(struct mg_mqtt_message *msg, struct mg_mqtt_prop *prop, prop->iv = decode_variable_length((char *) i, &bytes_consumed); new_pos += bytes_consumed; break; - default: - new_pos = 0; + default: new_pos = 0; } return new_pos; @@ -3143,7 +3154,7 @@ size_t mg_mqtt_next_prop(struct mg_mqtt_message *msg, struct mg_mqtt_prop *prop, void mg_mqtt_login(struct mg_connection *c, const struct mg_mqtt_opts *opts) { char rnd[10], client_id[21]; struct mg_str cid = opts->client_id; - uint32_t total_len = 7 + 1 + 2 + 2; + size_t total_len = 7 + 1 + 2 + 2; uint8_t hdr[8] = {0, 4, 'M', 'Q', 'T', 'T', opts->version, 0}; if (cid.len == 0) { @@ -3155,7 +3166,7 @@ void mg_mqtt_login(struct mg_connection *c, const struct mg_mqtt_opts *opts) { if (hdr[6] == 0) hdr[6] = 4; // If version is not set, use 4 (3.1.1) c->is_mqtt5 = hdr[6] == 5; // Set version 5 flag - hdr[7] = (uint8_t)((opts->qos & 3) << 3); // Connection flags + hdr[7] = (uint8_t) ((opts->qos & 3) << 3); // Connection flags if (opts->user.len > 0) { total_len += 2 + (uint32_t) opts->user.len; hdr[7] |= MQTT_HAS_USER_NAME; @@ -3172,13 +3183,12 @@ void mg_mqtt_login(struct mg_connection *c, const struct mg_mqtt_opts *opts) { if (opts->retain) hdr[7] |= MQTT_WILL_RETAIN; total_len += (uint32_t) cid.len; if (c->is_mqtt5) { - total_len += get_full_properties_size(opts->props, opts->num_props); + total_len += get_props_size(opts->props, opts->num_props); if (hdr[7] & MQTT_HAS_WILL) - total_len += get_full_properties_size(opts->will_props, - opts->num_will_props); + total_len += get_props_size(opts->will_props, opts->num_will_props); } - mg_mqtt_send_header(c, MQTT_CMD_CONNECT, 0, total_len); + mg_mqtt_send_header(c, MQTT_CMD_CONNECT, 0, (uint32_t) total_len); mg_send(c, hdr, sizeof(hdr)); // keepalive == 0 means "do not disconnect us!" mg_send_u16(c, mg_htons((uint16_t) opts->keepalive)); @@ -3208,16 +3218,15 @@ void mg_mqtt_login(struct mg_connection *c, const struct mg_mqtt_opts *opts) { } void mg_mqtt_pub(struct mg_connection *c, const struct mg_mqtt_opts *opts) { - uint8_t flags = (uint8_t)(((opts->qos & 3) << 1) | (opts->retain ? 1 : 0)); - uint32_t len = 2 + (uint32_t) opts->topic.len + (uint32_t) opts->message.len; + uint8_t flags = (uint8_t) (((opts->qos & 3) << 1) | (opts->retain ? 1 : 0)); + size_t len = 2 + opts->topic.len + opts->message.len; MG_DEBUG(("%lu [%.*s] -> [%.*s]", c->id, (int) opts->topic.len, (char *) opts->topic.ptr, (int) opts->message.len, (char *) opts->message.ptr)); if (opts->qos > 0) len += 2; - if (c->is_mqtt5) - len += get_full_properties_size(opts->props, opts->num_props); + if (c->is_mqtt5) len += get_props_size(opts->props, opts->num_props); - mg_mqtt_send_header(c, MQTT_CMD_PUBLISH, flags, len); + mg_mqtt_send_header(c, MQTT_CMD_PUBLISH, flags, (uint32_t) len); mg_send_u16(c, mg_htons((uint16_t) opts->topic.len)); mg_send(c, opts->topic.ptr, opts->topic.len); if (opts->qos > 0) { @@ -3232,12 +3241,10 @@ void mg_mqtt_pub(struct mg_connection *c, const struct mg_mqtt_opts *opts) { void mg_mqtt_sub(struct mg_connection *c, const struct mg_mqtt_opts *opts) { uint8_t qos_ = opts->qos & 3; - uint32_t len = - 2 + (uint32_t) opts->topic.len + 2 + 1 + - (c->is_mqtt5 ? get_full_properties_size(opts->props, opts->num_props) - : 0); + size_t plen = c->is_mqtt5 ? get_props_size(opts->props, opts->num_props) : 0; + size_t len = 2 + opts->topic.len + 2 + 1 + plen; - mg_mqtt_send_header(c, MQTT_CMD_SUBSCRIBE, 2, len); + mg_mqtt_send_header(c, MQTT_CMD_SUBSCRIBE, 2, (uint32_t) len); if (++c->mgr->mqtt_id == 0) ++c->mgr->mqtt_id; mg_send_u16(c, mg_htons(c->mgr->mqtt_id)); if (c->is_mqtt5) mg_send_mqtt_properties(c, opts->props, opts->num_props); @@ -3255,21 +3262,21 @@ int mg_mqtt_parse(const uint8_t *buf, size_t len, uint8_t version, memset(m, 0, sizeof(*m)); m->dgram.ptr = (char *) buf; if (len < 2) return MQTT_INCOMPLETE; - m->cmd = (uint8_t)(buf[0] >> 4); + m->cmd = (uint8_t) (buf[0] >> 4); m->qos = (buf[0] >> 1) & 3; n = len_len = 0; p = (uint8_t *) buf + 1; - while ((size_t)(p - buf) < len) { + while ((size_t) (p - buf) < len) { lc = *((uint8_t *) p++); - n += (uint32_t)((lc & 0x7f) << 7 * len_len); + n += (uint32_t) ((lc & 0x7f) << 7 * len_len); len_len++; if (!(lc & 0x80)) break; if (len_len >= 4) return MQTT_MALFORMED; } end = p + n; if ((lc & 0x80) || (end > buf + len)) return MQTT_INCOMPLETE; - m->dgram.len = (size_t)(end - buf); + m->dgram.len = (size_t) (end - buf); switch (m->cmd) { case MQTT_CMD_CONNACK: @@ -3285,33 +3292,32 @@ int mg_mqtt_parse(const uint8_t *buf, size_t len, uint8_t version, case MQTT_CMD_UNSUBSCRIBE: case MQTT_CMD_UNSUBACK: if (p + 2 > end) return MQTT_MALFORMED; - m->id = (uint16_t)((((uint16_t) p[0]) << 8) | p[1]); + m->id = (uint16_t) ((((uint16_t) p[0]) << 8) | p[1]); p += 2; break; case MQTT_CMD_PUBLISH: { if (p + 2 > end) return MQTT_MALFORMED; - m->topic.len = (uint16_t)((((uint16_t) p[0]) << 8) | p[1]); + m->topic.len = (uint16_t) ((((uint16_t) p[0]) << 8) | p[1]); m->topic.ptr = (char *) p + 2; p += 2 + m->topic.len; if (p > end) return MQTT_MALFORMED; if (m->qos > 0) { if (p + 2 > end) return MQTT_MALFORMED; - m->id = (uint16_t)((((uint16_t) p[0]) << 8) | p[1]); + m->id = (uint16_t) ((((uint16_t) p[0]) << 8) | p[1]); p += 2; } if (p > end) return MQTT_MALFORMED; if (version == 5 && p + 2 < end) { m->props_size = decode_variable_length((char *) p, &len_len); - m->props_start = (size_t)(p + len_len - buf); + m->props_start = (size_t) (p + len_len - buf); p += len_len + m->props_size; } if (p > end) return MQTT_MALFORMED; m->data.ptr = (char *) p; - m->data.len = (size_t)(end - p); + m->data.len = (size_t) (end - p); break; } - default: - break; + default: break; } return MQTT_OK; } @@ -3379,21 +3385,16 @@ void mg_mqtt_pong(struct mg_connection *nc) { mg_mqtt_send_header(nc, MQTT_CMD_PINGRESP, 0, 0); } -void mg_mqtt_disconnect(struct mg_connection *nc, +void mg_mqtt_disconnect(struct mg_connection *c, const struct mg_mqtt_opts *opts) { - uint32_t len; + size_t len = 0; + if (c->is_mqtt5) len = 1 + get_props_size(opts->props, opts->num_props); + mg_mqtt_send_header(c, MQTT_CMD_DISCONNECT, 0, (uint32_t) len); - if (nc->is_mqtt5) - len = 1 + get_full_properties_size(opts->props, opts->num_props); - else - len = 0; - - mg_mqtt_send_header(nc, MQTT_CMD_DISCONNECT, 0, len); - - if (nc->is_mqtt5) { + if (c->is_mqtt5) { uint8_t zero = 0; - mg_send(nc, &zero, sizeof(zero)); // reason code - mg_send_mqtt_properties(nc, opts->props, opts->num_props); + mg_send(c, &zero, sizeof(zero)); // reason code + mg_send_mqtt_properties(c, opts->props, opts->num_props); } } diff --git a/mongoose.h b/mongoose.h index f3b7bd11..365d1b49 100644 --- a/mongoose.h +++ b/mongoose.h @@ -1454,74 +1454,40 @@ enum { enum { MQTT_OK, MQTT_INCOMPLETE, MQTT_MALFORMED }; -struct mqtt_prop_map_t { - uint8_t id; - uint8_t type; -}; - -static const struct mqtt_prop_map_t mqtt_prop_map[] = { - {MQTT_PROP_PAYLOAD_FORMAT_INDICATOR, MQTT_PROP_TYPE_BYTE}, - {MQTT_PROP_MESSAGE_EXPIRY_INTERVAL, MQTT_PROP_TYPE_INT}, - {MQTT_PROP_CONTENT_TYPE, MQTT_PROP_TYPE_STRING}, - {MQTT_PROP_RESPONSE_TOPIC, MQTT_PROP_TYPE_STRING}, - {MQTT_PROP_CORRELATION_DATA, MQTT_PROP_TYPE_BINARY_DATA}, - {MQTT_PROP_SUBSCRIPTION_IDENTIFIER, MQTT_PROP_TYPE_VARIABLE_INT}, - {MQTT_PROP_SESSION_EXPIRY_INTERVAL, MQTT_PROP_TYPE_INT}, - {MQTT_PROP_ASSIGNED_CLIENT_IDENTIFIER, MQTT_PROP_TYPE_STRING}, - {MQTT_PROP_SERVER_KEEP_ALIVE, MQTT_PROP_TYPE_SHORT}, - {MQTT_PROP_AUTHENTICATION_METHOD, MQTT_PROP_TYPE_STRING}, - {MQTT_PROP_AUTHENTICATION_DATA, MQTT_PROP_TYPE_BINARY_DATA}, - {MQTT_PROP_REQUEST_PROBLEM_INFORMATION, MQTT_PROP_TYPE_BYTE}, - {MQTT_PROP_WILL_DELAY_INTERVAL, MQTT_PROP_TYPE_INT}, - {MQTT_PROP_REQUEST_RESPONSE_INFORMATION, MQTT_PROP_TYPE_BYTE}, - {MQTT_PROP_RESPONSE_INFORMATION, MQTT_PROP_TYPE_STRING}, - {MQTT_PROP_SERVER_REFERENCE, MQTT_PROP_TYPE_STRING}, - {MQTT_PROP_REASON_STRING, MQTT_PROP_TYPE_STRING}, - {MQTT_PROP_RECEIVE_MAXIMUM, MQTT_PROP_TYPE_SHORT}, - {MQTT_PROP_TOPIC_ALIAS_MAXIMUM, MQTT_PROP_TYPE_SHORT}, - {MQTT_PROP_TOPIC_ALIAS, MQTT_PROP_TYPE_SHORT}, - {MQTT_PROP_MAXIMUM_QOS, MQTT_PROP_TYPE_BYTE}, - {MQTT_PROP_RETAIN_AVAILABLE, MQTT_PROP_TYPE_BYTE}, - {MQTT_PROP_USER_PROPERTY, MQTT_PROP_TYPE_STRING_PAIR}, - {MQTT_PROP_MAXIMUM_PACKET_SIZE, MQTT_PROP_TYPE_INT}, - {MQTT_PROP_WILDCARD_SUBSCRIPTION_AVAILABLE, MQTT_PROP_TYPE_BYTE}, - {MQTT_PROP_SUBSCRIPTION_IDENTIFIER_AVAILABLE, MQTT_PROP_TYPE_BYTE}, - {MQTT_PROP_SHARED_SUBSCRIPTION_AVAILABLE, MQTT_PROP_TYPE_BYTE}}; - struct mg_mqtt_prop { uint8_t id; // Enumerated at MQTT5 Reference - uint32_t iv; // integer value for 8-, 16-, 32-bit integers types - struct mg_str key; // non-NULL only for user property type - struct mg_str val; // non-NULL only for UTF-8 types and user properties + uint32_t iv; // Integer value for 8-, 16-, 32-bit integers types + struct mg_str key; // Non-NULL only for user property type + struct mg_str val; // Non-NULL only for UTF-8 types and user properties }; struct mg_mqtt_opts { - struct mg_str user; // Username, can be empty - struct mg_str pass; // Password, can be empty - struct mg_str client_id; // Client ID - struct mg_str topic; // topic - struct mg_str message; // message - uint8_t qos; // message quality of service - uint8_t version; // Can be 4 (3.1.1), or 5. If 0, assume 4. - uint16_t keepalive; // Keep-alive timer in seconds - bool retain; // Retain last will - bool clean; // Use clean session, 0 or 1 - struct mg_mqtt_prop *props; // MQTT5 props array - int num_props; // number of props - struct mg_mqtt_prop *will_props; // only found in the CONNECT packet - int num_will_props; // number of will props + struct mg_str user; // Username, can be empty + struct mg_str pass; // Password, can be empty + struct mg_str client_id; // Client ID + struct mg_str topic; // topic + struct mg_str message; // message + uint8_t qos; // message quality of service + uint8_t version; // Can be 4 (3.1.1), or 5. If 0, assume 4. + uint16_t keepalive; // Keep-alive timer in seconds + bool retain; // Retain last will + bool clean; // Use clean session, 0 or 1 + struct mg_mqtt_prop *props; // MQTT5 props array + size_t num_props; // number of props + struct mg_mqtt_prop *will_props; // Valid only for CONNECT packet + size_t num_will_props; // Number of will props }; struct mg_mqtt_message { struct mg_str topic; // Parsed topic struct mg_str data; // Parsed message struct mg_str dgram; // Whole MQTT datagram, including headers - uint16_t id; // Set for PUBACK, PUBREC, PUBREL, PUBCOMP, SUBACK, PUBLISH - uint8_t cmd; // MQTT command, one of MQTT_CMD_* - uint8_t qos; // Quality of service - uint8_t ack; // Connack return code. 0 - success - size_t props_start; // offset to the start of the properties - size_t props_size; // length of the properties + uint16_t id; // For PUBACK, PUBREC, PUBREL, PUBCOMP, SUBACK, PUBLISH + uint8_t cmd; // MQTT command, one of MQTT_CMD_* + uint8_t qos; // Quality of service + uint8_t ack; // Connack return code. 0 - success + size_t props_start; // Offset to the start of the properties + size_t props_size; // Length of the properties }; struct mg_connection *mg_mqtt_connect(struct mg_mgr *, const char *url, @@ -1537,8 +1503,7 @@ void mg_mqtt_send_header(struct mg_connection *, uint8_t cmd, uint8_t flags, uint32_t len); void mg_mqtt_ping(struct mg_connection *); void mg_mqtt_pong(struct mg_connection *); -void mg_mqtt_disconnect(struct mg_connection *, - const struct mg_mqtt_opts *opts); +void mg_mqtt_disconnect(struct mg_connection *, const struct mg_mqtt_opts *); size_t mg_mqtt_next_prop(struct mg_mqtt_message *, struct mg_mqtt_prop *, size_t ofs); diff --git a/src/mqtt.c b/src/mqtt.c index c003bbcc..50f6aaee 100644 --- a/src/mqtt.c +++ b/src/mqtt.c @@ -1,8 +1,8 @@ -#include "mqtt.h" #include "arch.h" #include "base64.h" #include "event.h" #include "log.h" +#include "mqtt.h" #include "url.h" #include "util.h" @@ -12,17 +12,51 @@ #define MQTT_HAS_PASSWORD 0x40 #define MQTT_HAS_USER_NAME 0x80 +struct mg_mqtt_pmap { + uint8_t id; + uint8_t type; +}; + +static const struct mg_mqtt_pmap s_prop_map[] = { + {MQTT_PROP_PAYLOAD_FORMAT_INDICATOR, MQTT_PROP_TYPE_BYTE}, + {MQTT_PROP_MESSAGE_EXPIRY_INTERVAL, MQTT_PROP_TYPE_INT}, + {MQTT_PROP_CONTENT_TYPE, MQTT_PROP_TYPE_STRING}, + {MQTT_PROP_RESPONSE_TOPIC, MQTT_PROP_TYPE_STRING}, + {MQTT_PROP_CORRELATION_DATA, MQTT_PROP_TYPE_BINARY_DATA}, + {MQTT_PROP_SUBSCRIPTION_IDENTIFIER, MQTT_PROP_TYPE_VARIABLE_INT}, + {MQTT_PROP_SESSION_EXPIRY_INTERVAL, MQTT_PROP_TYPE_INT}, + {MQTT_PROP_ASSIGNED_CLIENT_IDENTIFIER, MQTT_PROP_TYPE_STRING}, + {MQTT_PROP_SERVER_KEEP_ALIVE, MQTT_PROP_TYPE_SHORT}, + {MQTT_PROP_AUTHENTICATION_METHOD, MQTT_PROP_TYPE_STRING}, + {MQTT_PROP_AUTHENTICATION_DATA, MQTT_PROP_TYPE_BINARY_DATA}, + {MQTT_PROP_REQUEST_PROBLEM_INFORMATION, MQTT_PROP_TYPE_BYTE}, + {MQTT_PROP_WILL_DELAY_INTERVAL, MQTT_PROP_TYPE_INT}, + {MQTT_PROP_REQUEST_RESPONSE_INFORMATION, MQTT_PROP_TYPE_BYTE}, + {MQTT_PROP_RESPONSE_INFORMATION, MQTT_PROP_TYPE_STRING}, + {MQTT_PROP_SERVER_REFERENCE, MQTT_PROP_TYPE_STRING}, + {MQTT_PROP_REASON_STRING, MQTT_PROP_TYPE_STRING}, + {MQTT_PROP_RECEIVE_MAXIMUM, MQTT_PROP_TYPE_SHORT}, + {MQTT_PROP_TOPIC_ALIAS_MAXIMUM, MQTT_PROP_TYPE_SHORT}, + {MQTT_PROP_TOPIC_ALIAS, MQTT_PROP_TYPE_SHORT}, + {MQTT_PROP_MAXIMUM_QOS, MQTT_PROP_TYPE_BYTE}, + {MQTT_PROP_RETAIN_AVAILABLE, MQTT_PROP_TYPE_BYTE}, + {MQTT_PROP_USER_PROPERTY, MQTT_PROP_TYPE_STRING_PAIR}, + {MQTT_PROP_MAXIMUM_PACKET_SIZE, MQTT_PROP_TYPE_INT}, + {MQTT_PROP_WILDCARD_SUBSCRIPTION_AVAILABLE, MQTT_PROP_TYPE_BYTE}, + {MQTT_PROP_SUBSCRIPTION_IDENTIFIER_AVAILABLE, MQTT_PROP_TYPE_BYTE}, + {MQTT_PROP_SHARED_SUBSCRIPTION_AVAILABLE, MQTT_PROP_TYPE_BYTE}}; + void mg_mqtt_send_header(struct mg_connection *c, uint8_t cmd, uint8_t flags, uint32_t len) { uint8_t buf[1 + sizeof(len)], *vlen = &buf[1]; - buf[0] = (uint8_t)((cmd << 4) | flags); + buf[0] = (uint8_t) ((cmd << 4) | flags); do { *vlen = len % 0x80; len /= 0x80; if (len > 0) *vlen |= 0x80; vlen++; } while (len > 0 && vlen < &buf[sizeof(buf)]); - mg_send(c, buf, (size_t)(vlen - buf)); + mg_send(c, buf, (size_t) (vlen - buf)); } static void mg_send_u16(struct mg_connection *c, uint16_t value) { @@ -33,21 +67,20 @@ static void mg_send_u32(struct mg_connection *c, uint32_t value) { mg_send(c, &value, sizeof(value)); } -static uint8_t compute_variable_length_size(uint32_t length) { +static uint8_t compute_variable_length_size(size_t length) { uint8_t bytes_needed = 0; do { bytes_needed++; length /= 0x80; } while (length > 0); - return bytes_needed; } -static int encode_variable_length(uint8_t *buf, int value) { +static int encode_variable_length(uint8_t *buf, size_t value) { int len = 0; do { - uint8_t byte = (uint8_t)(value % 128); + uint8_t byte = (uint8_t) (value % 128); value /= 128; if (value > 0) byte |= 0x80; buf[len++] = byte; @@ -58,14 +91,10 @@ static int encode_variable_length(uint8_t *buf, int value) { static uint32_t decode_variable_length(const char *buf, uint32_t *bytes_consumed) { - const uint8_t *p = (const uint8_t *) buf; - uint32_t value = 0; - uint32_t multiplier = 1; - uint8_t encoded_byte; - uint32_t offset; + uint32_t value = 0, multiplier = 1, offset; for (offset = 0; offset < 4; offset++) { - encoded_byte = p[offset]; + uint8_t encoded_byte = ((uint8_t *) buf)[offset]; value += (encoded_byte & 0x7F) * multiplier; multiplier *= 128; @@ -78,72 +107,59 @@ static uint32_t decode_variable_length(const char *buf, } static int mqtt_prop_type_by_id(uint8_t prop_id) { - size_t num_properties = sizeof(mqtt_prop_map) / sizeof(mqtt_prop_map[0]); - for (size_t i = 0; i < num_properties; ++i) { - if (mqtt_prop_map[i].id == prop_id) { - return mqtt_prop_map[i].type; - } + size_t i, num_properties = sizeof(s_prop_map) / sizeof(s_prop_map[0]); + for (i = 0; i < num_properties; ++i) { + if (s_prop_map[i].id == prop_id) return s_prop_map[i].type; } - return -1; // Property ID not found } -/* -returns the size of the properties section, without the -size of the content's length -*/ -static uint32_t get_properties_length(struct mg_mqtt_prop *props, int count) { - uint32_t size = 0; - for (int i = 0; i < count; i++) { +// Returns the size of the properties section, without the +// size of the content's length +static size_t get_properties_length(struct mg_mqtt_prop *props, size_t count) { + size_t i, size = 0; + for (i = 0; i < count; i++) { size++; // identifier switch (mqtt_prop_type_by_id(props[i].id)) { case MQTT_PROP_TYPE_STRING_PAIR: - size += (uint32_t)(props[i].val.len + props[i].key.len + - 2 * sizeof(uint16_t)); + size += (uint32_t) (props[i].val.len + props[i].key.len + + 2 * sizeof(uint16_t)); break; case MQTT_PROP_TYPE_STRING: - size += (uint32_t)(props[i].val.len + sizeof(uint16_t)); + size += (uint32_t) (props[i].val.len + sizeof(uint16_t)); break; case MQTT_PROP_TYPE_BINARY_DATA: - size += (uint32_t)(props[i].val.len + sizeof(uint16_t)); + size += (uint32_t) (props[i].val.len + sizeof(uint16_t)); break; case MQTT_PROP_TYPE_VARIABLE_INT: size += compute_variable_length_size((uint32_t) props[i].iv); break; - case MQTT_PROP_TYPE_INT: - size += (uint32_t) sizeof(uint32_t); - break; - case MQTT_PROP_TYPE_SHORT: - size += (uint32_t) sizeof(uint16_t); - break; - default: - return size; // cannot parse further down + case MQTT_PROP_TYPE_INT: size += (uint32_t) sizeof(uint32_t); break; + case MQTT_PROP_TYPE_SHORT: size += (uint32_t) sizeof(uint16_t); break; + default: return size; // cannot parse further down } } return size; } -/* -returns the entire size of the properties section, including the -size of the variable length of the content -*/ -static uint32_t get_full_properties_size(struct mg_mqtt_prop *props, - int count) { - uint32_t size = get_properties_length(props, count); +// returns the entire size of the properties section, including the +// size of the variable length of the content +static size_t get_props_size(struct mg_mqtt_prop *props, size_t count) { + size_t size = get_properties_length(props, count); size += compute_variable_length_size(size); return size; } static void mg_send_mqtt_properties(struct mg_connection *c, - struct mg_mqtt_prop *props, int nr_props) { - uint32_t total_size = get_properties_length(props, nr_props); + struct mg_mqtt_prop *props, size_t nprops) { + size_t total_size = get_properties_length(props, nprops); uint8_t buf_v[4] = {0, 0, 0, 0}; uint8_t buf[4] = {0, 0, 0, 0}; - int len = encode_variable_length(buf, (int) total_size); + int i, len = encode_variable_length(buf, total_size); mg_send(c, buf, (size_t) len); - for (int i = 0; i < nr_props; i++) { + for (i = 0; i < (int) nprops; i++) { mg_send(c, &props[i].id, sizeof(props[i].id)); switch (mqtt_prop_type_by_id(props[i].id)) { case MQTT_PROP_TYPE_STRING_PAIR: @@ -170,7 +186,7 @@ static void mg_send_mqtt_properties(struct mg_connection *c, mg_send(c, props[i].val.ptr, props[i].val.len); break; case MQTT_PROP_TYPE_VARIABLE_INT: - len = encode_variable_length(buf_v, (int) props[i].iv); + len = encode_variable_length(buf_v, props[i].iv); mg_send(c, buf_v, (size_t) len); break; } @@ -178,26 +194,22 @@ static void mg_send_mqtt_properties(struct mg_connection *c, } size_t mg_mqtt_next_prop(struct mg_mqtt_message *msg, struct mg_mqtt_prop *prop, - size_t crt_pos) { - unsigned char *i = - (unsigned char *) msg->dgram.ptr + msg->props_start + crt_pos; - size_t new_pos = crt_pos; + size_t ofs) { + uint8_t *i = (uint8_t *) msg->dgram.ptr + msg->props_start + ofs; + size_t new_pos = ofs; uint32_t bytes_consumed; + prop->id = i[0]; - if (crt_pos >= msg->dgram.len || - crt_pos >= msg->props_start + msg->props_size) + if (ofs >= msg->dgram.len || ofs >= msg->props_start + msg->props_size) return 0; - - uint8_t id = i[0]; i++, new_pos++; - prop->id = id; - switch (mqtt_prop_type_by_id(id)) { + switch (mqtt_prop_type_by_id(prop->id)) { case MQTT_PROP_TYPE_STRING_PAIR: - prop->key.len = (uint16_t)((((uint16_t) i[0]) << 8) | i[1]); + prop->key.len = (uint16_t) ((((uint16_t) i[0]) << 8) | i[1]); prop->key.ptr = (char *) i + 2; i += 2 + prop->key.len; - prop->val.len = (uint16_t)((((uint16_t) i[0]) << 8) | i[1]); + prop->val.len = (uint16_t) ((((uint16_t) i[0]) << 8) | i[1]); prop->val.ptr = (char *) i + 2; new_pos += 2 * sizeof(uint16_t) + prop->val.len + prop->key.len; break; @@ -206,7 +218,7 @@ size_t mg_mqtt_next_prop(struct mg_mqtt_message *msg, struct mg_mqtt_prop *prop, new_pos++; break; case MQTT_PROP_TYPE_SHORT: - prop->iv = (uint16_t)((((uint16_t) i[0]) << 8) | i[1]); + prop->iv = (uint16_t) ((((uint16_t) i[0]) << 8) | i[1]); new_pos += sizeof(uint16_t); break; case MQTT_PROP_TYPE_INT: @@ -215,12 +227,12 @@ size_t mg_mqtt_next_prop(struct mg_mqtt_message *msg, struct mg_mqtt_prop *prop, new_pos += sizeof(uint32_t); break; case MQTT_PROP_TYPE_STRING: - prop->val.len = (uint16_t)((((uint16_t) i[0]) << 8) | i[1]); + prop->val.len = (uint16_t) ((((uint16_t) i[0]) << 8) | i[1]); prop->val.ptr = (char *) i + 2; new_pos += 2 + prop->val.len; break; case MQTT_PROP_TYPE_BINARY_DATA: - prop->val.len = (uint16_t)((((uint16_t) i[0]) << 8) | i[1]); + prop->val.len = (uint16_t) ((((uint16_t) i[0]) << 8) | i[1]); prop->val.ptr = (char *) i + 2; new_pos += 2 + prop->val.len; break; @@ -228,8 +240,7 @@ size_t mg_mqtt_next_prop(struct mg_mqtt_message *msg, struct mg_mqtt_prop *prop, prop->iv = decode_variable_length((char *) i, &bytes_consumed); new_pos += bytes_consumed; break; - default: - new_pos = 0; + default: new_pos = 0; } return new_pos; @@ -238,7 +249,7 @@ size_t mg_mqtt_next_prop(struct mg_mqtt_message *msg, struct mg_mqtt_prop *prop, void mg_mqtt_login(struct mg_connection *c, const struct mg_mqtt_opts *opts) { char rnd[10], client_id[21]; struct mg_str cid = opts->client_id; - uint32_t total_len = 7 + 1 + 2 + 2; + size_t total_len = 7 + 1 + 2 + 2; uint8_t hdr[8] = {0, 4, 'M', 'Q', 'T', 'T', opts->version, 0}; if (cid.len == 0) { @@ -250,7 +261,7 @@ void mg_mqtt_login(struct mg_connection *c, const struct mg_mqtt_opts *opts) { if (hdr[6] == 0) hdr[6] = 4; // If version is not set, use 4 (3.1.1) c->is_mqtt5 = hdr[6] == 5; // Set version 5 flag - hdr[7] = (uint8_t)((opts->qos & 3) << 3); // Connection flags + hdr[7] = (uint8_t) ((opts->qos & 3) << 3); // Connection flags if (opts->user.len > 0) { total_len += 2 + (uint32_t) opts->user.len; hdr[7] |= MQTT_HAS_USER_NAME; @@ -267,13 +278,12 @@ void mg_mqtt_login(struct mg_connection *c, const struct mg_mqtt_opts *opts) { if (opts->retain) hdr[7] |= MQTT_WILL_RETAIN; total_len += (uint32_t) cid.len; if (c->is_mqtt5) { - total_len += get_full_properties_size(opts->props, opts->num_props); + total_len += get_props_size(opts->props, opts->num_props); if (hdr[7] & MQTT_HAS_WILL) - total_len += get_full_properties_size(opts->will_props, - opts->num_will_props); + total_len += get_props_size(opts->will_props, opts->num_will_props); } - mg_mqtt_send_header(c, MQTT_CMD_CONNECT, 0, total_len); + mg_mqtt_send_header(c, MQTT_CMD_CONNECT, 0, (uint32_t) total_len); mg_send(c, hdr, sizeof(hdr)); // keepalive == 0 means "do not disconnect us!" mg_send_u16(c, mg_htons((uint16_t) opts->keepalive)); @@ -303,16 +313,15 @@ void mg_mqtt_login(struct mg_connection *c, const struct mg_mqtt_opts *opts) { } void mg_mqtt_pub(struct mg_connection *c, const struct mg_mqtt_opts *opts) { - uint8_t flags = (uint8_t)(((opts->qos & 3) << 1) | (opts->retain ? 1 : 0)); - uint32_t len = 2 + (uint32_t) opts->topic.len + (uint32_t) opts->message.len; + uint8_t flags = (uint8_t) (((opts->qos & 3) << 1) | (opts->retain ? 1 : 0)); + size_t len = 2 + opts->topic.len + opts->message.len; MG_DEBUG(("%lu [%.*s] -> [%.*s]", c->id, (int) opts->topic.len, (char *) opts->topic.ptr, (int) opts->message.len, (char *) opts->message.ptr)); if (opts->qos > 0) len += 2; - if (c->is_mqtt5) - len += get_full_properties_size(opts->props, opts->num_props); + if (c->is_mqtt5) len += get_props_size(opts->props, opts->num_props); - mg_mqtt_send_header(c, MQTT_CMD_PUBLISH, flags, len); + mg_mqtt_send_header(c, MQTT_CMD_PUBLISH, flags, (uint32_t) len); mg_send_u16(c, mg_htons((uint16_t) opts->topic.len)); mg_send(c, opts->topic.ptr, opts->topic.len); if (opts->qos > 0) { @@ -327,12 +336,10 @@ void mg_mqtt_pub(struct mg_connection *c, const struct mg_mqtt_opts *opts) { void mg_mqtt_sub(struct mg_connection *c, const struct mg_mqtt_opts *opts) { uint8_t qos_ = opts->qos & 3; - uint32_t len = - 2 + (uint32_t) opts->topic.len + 2 + 1 + - (c->is_mqtt5 ? get_full_properties_size(opts->props, opts->num_props) - : 0); + size_t plen = c->is_mqtt5 ? get_props_size(opts->props, opts->num_props) : 0; + size_t len = 2 + opts->topic.len + 2 + 1 + plen; - mg_mqtt_send_header(c, MQTT_CMD_SUBSCRIBE, 2, len); + mg_mqtt_send_header(c, MQTT_CMD_SUBSCRIBE, 2, (uint32_t) len); if (++c->mgr->mqtt_id == 0) ++c->mgr->mqtt_id; mg_send_u16(c, mg_htons(c->mgr->mqtt_id)); if (c->is_mqtt5) mg_send_mqtt_properties(c, opts->props, opts->num_props); @@ -350,21 +357,21 @@ int mg_mqtt_parse(const uint8_t *buf, size_t len, uint8_t version, memset(m, 0, sizeof(*m)); m->dgram.ptr = (char *) buf; if (len < 2) return MQTT_INCOMPLETE; - m->cmd = (uint8_t)(buf[0] >> 4); + m->cmd = (uint8_t) (buf[0] >> 4); m->qos = (buf[0] >> 1) & 3; n = len_len = 0; p = (uint8_t *) buf + 1; - while ((size_t)(p - buf) < len) { + while ((size_t) (p - buf) < len) { lc = *((uint8_t *) p++); - n += (uint32_t)((lc & 0x7f) << 7 * len_len); + n += (uint32_t) ((lc & 0x7f) << 7 * len_len); len_len++; if (!(lc & 0x80)) break; if (len_len >= 4) return MQTT_MALFORMED; } end = p + n; if ((lc & 0x80) || (end > buf + len)) return MQTT_INCOMPLETE; - m->dgram.len = (size_t)(end - buf); + m->dgram.len = (size_t) (end - buf); switch (m->cmd) { case MQTT_CMD_CONNACK: @@ -380,33 +387,32 @@ int mg_mqtt_parse(const uint8_t *buf, size_t len, uint8_t version, case MQTT_CMD_UNSUBSCRIBE: case MQTT_CMD_UNSUBACK: if (p + 2 > end) return MQTT_MALFORMED; - m->id = (uint16_t)((((uint16_t) p[0]) << 8) | p[1]); + m->id = (uint16_t) ((((uint16_t) p[0]) << 8) | p[1]); p += 2; break; case MQTT_CMD_PUBLISH: { if (p + 2 > end) return MQTT_MALFORMED; - m->topic.len = (uint16_t)((((uint16_t) p[0]) << 8) | p[1]); + m->topic.len = (uint16_t) ((((uint16_t) p[0]) << 8) | p[1]); m->topic.ptr = (char *) p + 2; p += 2 + m->topic.len; if (p > end) return MQTT_MALFORMED; if (m->qos > 0) { if (p + 2 > end) return MQTT_MALFORMED; - m->id = (uint16_t)((((uint16_t) p[0]) << 8) | p[1]); + m->id = (uint16_t) ((((uint16_t) p[0]) << 8) | p[1]); p += 2; } if (p > end) return MQTT_MALFORMED; if (version == 5 && p + 2 < end) { m->props_size = decode_variable_length((char *) p, &len_len); - m->props_start = (size_t)(p + len_len - buf); + m->props_start = (size_t) (p + len_len - buf); p += len_len + m->props_size; } if (p > end) return MQTT_MALFORMED; m->data.ptr = (char *) p; - m->data.len = (size_t)(end - p); + m->data.len = (size_t) (end - p); break; } - default: - break; + default: break; } return MQTT_OK; } @@ -474,21 +480,16 @@ void mg_mqtt_pong(struct mg_connection *nc) { mg_mqtt_send_header(nc, MQTT_CMD_PINGRESP, 0, 0); } -void mg_mqtt_disconnect(struct mg_connection *nc, +void mg_mqtt_disconnect(struct mg_connection *c, const struct mg_mqtt_opts *opts) { - uint32_t len; + size_t len = 0; + if (c->is_mqtt5) len = 1 + get_props_size(opts->props, opts->num_props); + mg_mqtt_send_header(c, MQTT_CMD_DISCONNECT, 0, (uint32_t) len); - if (nc->is_mqtt5) - len = 1 + get_full_properties_size(opts->props, opts->num_props); - else - len = 0; - - mg_mqtt_send_header(nc, MQTT_CMD_DISCONNECT, 0, len); - - if (nc->is_mqtt5) { + if (c->is_mqtt5) { uint8_t zero = 0; - mg_send(nc, &zero, sizeof(zero)); // reason code - mg_send_mqtt_properties(nc, opts->props, opts->num_props); + mg_send(c, &zero, sizeof(zero)); // reason code + mg_send_mqtt_properties(c, opts->props, opts->num_props); } } diff --git a/src/mqtt.h b/src/mqtt.h index f77906dd..a104fe63 100644 --- a/src/mqtt.h +++ b/src/mqtt.h @@ -59,74 +59,40 @@ enum { enum { MQTT_OK, MQTT_INCOMPLETE, MQTT_MALFORMED }; -struct mqtt_prop_map_t { - uint8_t id; - uint8_t type; -}; - -static const struct mqtt_prop_map_t mqtt_prop_map[] = { - {MQTT_PROP_PAYLOAD_FORMAT_INDICATOR, MQTT_PROP_TYPE_BYTE}, - {MQTT_PROP_MESSAGE_EXPIRY_INTERVAL, MQTT_PROP_TYPE_INT}, - {MQTT_PROP_CONTENT_TYPE, MQTT_PROP_TYPE_STRING}, - {MQTT_PROP_RESPONSE_TOPIC, MQTT_PROP_TYPE_STRING}, - {MQTT_PROP_CORRELATION_DATA, MQTT_PROP_TYPE_BINARY_DATA}, - {MQTT_PROP_SUBSCRIPTION_IDENTIFIER, MQTT_PROP_TYPE_VARIABLE_INT}, - {MQTT_PROP_SESSION_EXPIRY_INTERVAL, MQTT_PROP_TYPE_INT}, - {MQTT_PROP_ASSIGNED_CLIENT_IDENTIFIER, MQTT_PROP_TYPE_STRING}, - {MQTT_PROP_SERVER_KEEP_ALIVE, MQTT_PROP_TYPE_SHORT}, - {MQTT_PROP_AUTHENTICATION_METHOD, MQTT_PROP_TYPE_STRING}, - {MQTT_PROP_AUTHENTICATION_DATA, MQTT_PROP_TYPE_BINARY_DATA}, - {MQTT_PROP_REQUEST_PROBLEM_INFORMATION, MQTT_PROP_TYPE_BYTE}, - {MQTT_PROP_WILL_DELAY_INTERVAL, MQTT_PROP_TYPE_INT}, - {MQTT_PROP_REQUEST_RESPONSE_INFORMATION, MQTT_PROP_TYPE_BYTE}, - {MQTT_PROP_RESPONSE_INFORMATION, MQTT_PROP_TYPE_STRING}, - {MQTT_PROP_SERVER_REFERENCE, MQTT_PROP_TYPE_STRING}, - {MQTT_PROP_REASON_STRING, MQTT_PROP_TYPE_STRING}, - {MQTT_PROP_RECEIVE_MAXIMUM, MQTT_PROP_TYPE_SHORT}, - {MQTT_PROP_TOPIC_ALIAS_MAXIMUM, MQTT_PROP_TYPE_SHORT}, - {MQTT_PROP_TOPIC_ALIAS, MQTT_PROP_TYPE_SHORT}, - {MQTT_PROP_MAXIMUM_QOS, MQTT_PROP_TYPE_BYTE}, - {MQTT_PROP_RETAIN_AVAILABLE, MQTT_PROP_TYPE_BYTE}, - {MQTT_PROP_USER_PROPERTY, MQTT_PROP_TYPE_STRING_PAIR}, - {MQTT_PROP_MAXIMUM_PACKET_SIZE, MQTT_PROP_TYPE_INT}, - {MQTT_PROP_WILDCARD_SUBSCRIPTION_AVAILABLE, MQTT_PROP_TYPE_BYTE}, - {MQTT_PROP_SUBSCRIPTION_IDENTIFIER_AVAILABLE, MQTT_PROP_TYPE_BYTE}, - {MQTT_PROP_SHARED_SUBSCRIPTION_AVAILABLE, MQTT_PROP_TYPE_BYTE}}; - struct mg_mqtt_prop { uint8_t id; // Enumerated at MQTT5 Reference - uint32_t iv; // integer value for 8-, 16-, 32-bit integers types - struct mg_str key; // non-NULL only for user property type - struct mg_str val; // non-NULL only for UTF-8 types and user properties + uint32_t iv; // Integer value for 8-, 16-, 32-bit integers types + struct mg_str key; // Non-NULL only for user property type + struct mg_str val; // Non-NULL only for UTF-8 types and user properties }; struct mg_mqtt_opts { - struct mg_str user; // Username, can be empty - struct mg_str pass; // Password, can be empty - struct mg_str client_id; // Client ID - struct mg_str topic; // topic - struct mg_str message; // message - uint8_t qos; // message quality of service - uint8_t version; // Can be 4 (3.1.1), or 5. If 0, assume 4. - uint16_t keepalive; // Keep-alive timer in seconds - bool retain; // Retain last will - bool clean; // Use clean session, 0 or 1 - struct mg_mqtt_prop *props; // MQTT5 props array - int num_props; // number of props - struct mg_mqtt_prop *will_props; // only found in the CONNECT packet - int num_will_props; // number of will props + struct mg_str user; // Username, can be empty + struct mg_str pass; // Password, can be empty + struct mg_str client_id; // Client ID + struct mg_str topic; // topic + struct mg_str message; // message + uint8_t qos; // message quality of service + uint8_t version; // Can be 4 (3.1.1), or 5. If 0, assume 4. + uint16_t keepalive; // Keep-alive timer in seconds + bool retain; // Retain last will + bool clean; // Use clean session, 0 or 1 + struct mg_mqtt_prop *props; // MQTT5 props array + size_t num_props; // number of props + struct mg_mqtt_prop *will_props; // Valid only for CONNECT packet + size_t num_will_props; // Number of will props }; struct mg_mqtt_message { struct mg_str topic; // Parsed topic struct mg_str data; // Parsed message struct mg_str dgram; // Whole MQTT datagram, including headers - uint16_t id; // Set for PUBACK, PUBREC, PUBREL, PUBCOMP, SUBACK, PUBLISH - uint8_t cmd; // MQTT command, one of MQTT_CMD_* - uint8_t qos; // Quality of service - uint8_t ack; // Connack return code. 0 - success - size_t props_start; // offset to the start of the properties - size_t props_size; // length of the properties + uint16_t id; // For PUBACK, PUBREC, PUBREL, PUBCOMP, SUBACK, PUBLISH + uint8_t cmd; // MQTT command, one of MQTT_CMD_* + uint8_t qos; // Quality of service + uint8_t ack; // Connack return code. 0 - success + size_t props_start; // Offset to the start of the properties + size_t props_size; // Length of the properties }; struct mg_connection *mg_mqtt_connect(struct mg_mgr *, const char *url, @@ -142,7 +108,6 @@ void mg_mqtt_send_header(struct mg_connection *, uint8_t cmd, uint8_t flags, uint32_t len); void mg_mqtt_ping(struct mg_connection *); void mg_mqtt_pong(struct mg_connection *); -void mg_mqtt_disconnect(struct mg_connection *, - const struct mg_mqtt_opts *opts); +void mg_mqtt_disconnect(struct mg_connection *, const struct mg_mqtt_opts *); size_t mg_mqtt_next_prop(struct mg_mqtt_message *, struct mg_mqtt_prop *, size_t ofs);