Merge pull request #2165 from cesanta/mqtt

mqtt fixes
This commit is contained in:
Sergey Lyubka 2023-04-25 08:27:01 +01:00 committed by GitHub
commit b0d44bf0ba
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 261 additions and 329 deletions

View File

@ -2917,17 +2917,51 @@ void mg_md5_final(mg_md5_ctx *ctx, unsigned char digest[16]) {
#define MQTT_HAS_PASSWORD 0x40
#define MQTT_HAS_USER_NAME 0x80
struct mg_mqtt_pmap {
uint8_t id;
uint8_t type;
};
static const struct mg_mqtt_pmap s_prop_map[] = {
{MQTT_PROP_PAYLOAD_FORMAT_INDICATOR, MQTT_PROP_TYPE_BYTE},
{MQTT_PROP_MESSAGE_EXPIRY_INTERVAL, MQTT_PROP_TYPE_INT},
{MQTT_PROP_CONTENT_TYPE, MQTT_PROP_TYPE_STRING},
{MQTT_PROP_RESPONSE_TOPIC, MQTT_PROP_TYPE_STRING},
{MQTT_PROP_CORRELATION_DATA, MQTT_PROP_TYPE_BINARY_DATA},
{MQTT_PROP_SUBSCRIPTION_IDENTIFIER, MQTT_PROP_TYPE_VARIABLE_INT},
{MQTT_PROP_SESSION_EXPIRY_INTERVAL, MQTT_PROP_TYPE_INT},
{MQTT_PROP_ASSIGNED_CLIENT_IDENTIFIER, MQTT_PROP_TYPE_STRING},
{MQTT_PROP_SERVER_KEEP_ALIVE, MQTT_PROP_TYPE_SHORT},
{MQTT_PROP_AUTHENTICATION_METHOD, MQTT_PROP_TYPE_STRING},
{MQTT_PROP_AUTHENTICATION_DATA, MQTT_PROP_TYPE_BINARY_DATA},
{MQTT_PROP_REQUEST_PROBLEM_INFORMATION, MQTT_PROP_TYPE_BYTE},
{MQTT_PROP_WILL_DELAY_INTERVAL, MQTT_PROP_TYPE_INT},
{MQTT_PROP_REQUEST_RESPONSE_INFORMATION, MQTT_PROP_TYPE_BYTE},
{MQTT_PROP_RESPONSE_INFORMATION, MQTT_PROP_TYPE_STRING},
{MQTT_PROP_SERVER_REFERENCE, MQTT_PROP_TYPE_STRING},
{MQTT_PROP_REASON_STRING, MQTT_PROP_TYPE_STRING},
{MQTT_PROP_RECEIVE_MAXIMUM, MQTT_PROP_TYPE_SHORT},
{MQTT_PROP_TOPIC_ALIAS_MAXIMUM, MQTT_PROP_TYPE_SHORT},
{MQTT_PROP_TOPIC_ALIAS, MQTT_PROP_TYPE_SHORT},
{MQTT_PROP_MAXIMUM_QOS, MQTT_PROP_TYPE_BYTE},
{MQTT_PROP_RETAIN_AVAILABLE, MQTT_PROP_TYPE_BYTE},
{MQTT_PROP_USER_PROPERTY, MQTT_PROP_TYPE_STRING_PAIR},
{MQTT_PROP_MAXIMUM_PACKET_SIZE, MQTT_PROP_TYPE_INT},
{MQTT_PROP_WILDCARD_SUBSCRIPTION_AVAILABLE, MQTT_PROP_TYPE_BYTE},
{MQTT_PROP_SUBSCRIPTION_IDENTIFIER_AVAILABLE, MQTT_PROP_TYPE_BYTE},
{MQTT_PROP_SHARED_SUBSCRIPTION_AVAILABLE, MQTT_PROP_TYPE_BYTE}};
void mg_mqtt_send_header(struct mg_connection *c, uint8_t cmd, uint8_t flags,
uint32_t len) {
uint8_t buf[1 + sizeof(len)], *vlen = &buf[1];
buf[0] = (uint8_t)((cmd << 4) | flags);
buf[0] = (uint8_t) ((cmd << 4) | flags);
do {
*vlen = len % 0x80;
len /= 0x80;
if (len > 0) *vlen |= 0x80;
vlen++;
} while (len > 0 && vlen < &buf[sizeof(buf)]);
mg_send(c, buf, (size_t)(vlen - buf));
mg_send(c, buf, (size_t) (vlen - buf));
}
static void mg_send_u16(struct mg_connection *c, uint16_t value) {
@ -2938,21 +2972,20 @@ static void mg_send_u32(struct mg_connection *c, uint32_t value) {
mg_send(c, &value, sizeof(value));
}
static uint8_t compute_variable_length_size(uint32_t length) {
static uint8_t compute_variable_length_size(size_t length) {
uint8_t bytes_needed = 0;
do {
bytes_needed++;
length /= 0x80;
} while (length > 0);
return bytes_needed;
}
static int encode_variable_length(uint8_t *buf, int value) {
static int encode_variable_length(uint8_t *buf, size_t value) {
int len = 0;
do {
uint8_t byte = (uint8_t)(value % 128);
uint8_t byte = (uint8_t) (value % 128);
value /= 128;
if (value > 0) byte |= 0x80;
buf[len++] = byte;
@ -2963,14 +2996,10 @@ static int encode_variable_length(uint8_t *buf, int value) {
static uint32_t decode_variable_length(const char *buf,
uint32_t *bytes_consumed) {
const uint8_t *p = (const uint8_t *) buf;
uint32_t value = 0;
uint32_t multiplier = 1;
uint8_t encoded_byte;
uint32_t offset;
uint32_t value = 0, multiplier = 1, offset;
for (offset = 0; offset < 4; offset++) {
encoded_byte = p[offset];
uint8_t encoded_byte = ((uint8_t *) buf)[offset];
value += (encoded_byte & 0x7F) * multiplier;
multiplier *= 128;
@ -2983,72 +3012,59 @@ static uint32_t decode_variable_length(const char *buf,
}
static int mqtt_prop_type_by_id(uint8_t prop_id) {
size_t num_properties = sizeof(mqtt_prop_map) / sizeof(mqtt_prop_map[0]);
for (size_t i = 0; i < num_properties; ++i) {
if (mqtt_prop_map[i].id == prop_id) {
return mqtt_prop_map[i].type;
}
size_t i, num_properties = sizeof(s_prop_map) / sizeof(s_prop_map[0]);
for (i = 0; i < num_properties; ++i) {
if (s_prop_map[i].id == prop_id) return s_prop_map[i].type;
}
return -1; // Property ID not found
}
/*
returns the size of the properties section, without the
size of the content's length
*/
static uint32_t get_properties_length(struct mg_mqtt_prop *props, int count) {
uint32_t size = 0;
for (int i = 0; i < count; i++) {
// Returns the size of the properties section, without the
// size of the content's length
static size_t get_properties_length(struct mg_mqtt_prop *props, size_t count) {
size_t i, size = 0;
for (i = 0; i < count; i++) {
size++; // identifier
switch (mqtt_prop_type_by_id(props[i].id)) {
case MQTT_PROP_TYPE_STRING_PAIR:
size += (uint32_t)(props[i].val.len + props[i].key.len +
2 * sizeof(uint16_t));
size += (uint32_t) (props[i].val.len + props[i].key.len +
2 * sizeof(uint16_t));
break;
case MQTT_PROP_TYPE_STRING:
size += (uint32_t)(props[i].val.len + sizeof(uint16_t));
size += (uint32_t) (props[i].val.len + sizeof(uint16_t));
break;
case MQTT_PROP_TYPE_BINARY_DATA:
size += (uint32_t)(props[i].val.len + sizeof(uint16_t));
size += (uint32_t) (props[i].val.len + sizeof(uint16_t));
break;
case MQTT_PROP_TYPE_VARIABLE_INT:
size += compute_variable_length_size((uint32_t) props[i].iv);
break;
case MQTT_PROP_TYPE_INT:
size += (uint32_t) sizeof(uint32_t);
break;
case MQTT_PROP_TYPE_SHORT:
size += (uint32_t) sizeof(uint16_t);
break;
default:
return size; // cannot parse further down
case MQTT_PROP_TYPE_INT: size += (uint32_t) sizeof(uint32_t); break;
case MQTT_PROP_TYPE_SHORT: size += (uint32_t) sizeof(uint16_t); break;
default: return size; // cannot parse further down
}
}
return size;
}
/*
returns the entire size of the properties section, including the
size of the variable length of the content
*/
static uint32_t get_full_properties_size(struct mg_mqtt_prop *props,
int count) {
uint32_t size = get_properties_length(props, count);
// returns the entire size of the properties section, including the
// size of the variable length of the content
static size_t get_props_size(struct mg_mqtt_prop *props, size_t count) {
size_t size = get_properties_length(props, count);
size += compute_variable_length_size(size);
return size;
}
static void mg_send_mqtt_properties(struct mg_connection *c,
struct mg_mqtt_prop *props, int nr_props) {
uint32_t total_size = get_properties_length(props, nr_props);
struct mg_mqtt_prop *props, size_t nprops) {
size_t total_size = get_properties_length(props, nprops);
uint8_t buf_v[4] = {0, 0, 0, 0};
uint8_t buf[4] = {0, 0, 0, 0};
int len = encode_variable_length(buf, (int) total_size);
int i, len = encode_variable_length(buf, total_size);
mg_send(c, buf, (size_t) len);
for (int i = 0; i < nr_props; i++) {
for (i = 0; i < (int) nprops; i++) {
mg_send(c, &props[i].id, sizeof(props[i].id));
switch (mqtt_prop_type_by_id(props[i].id)) {
case MQTT_PROP_TYPE_STRING_PAIR:
@ -3075,7 +3091,7 @@ static void mg_send_mqtt_properties(struct mg_connection *c,
mg_send(c, props[i].val.ptr, props[i].val.len);
break;
case MQTT_PROP_TYPE_VARIABLE_INT:
len = encode_variable_length(buf_v, (int) props[i].iv);
len = encode_variable_length(buf_v, props[i].iv);
mg_send(c, buf_v, (size_t) len);
break;
}
@ -3083,26 +3099,22 @@ static void mg_send_mqtt_properties(struct mg_connection *c,
}
size_t mg_mqtt_next_prop(struct mg_mqtt_message *msg, struct mg_mqtt_prop *prop,
size_t crt_pos) {
unsigned char *i =
(unsigned char *) msg->dgram.ptr + msg->props_start + crt_pos;
size_t new_pos = crt_pos;
size_t ofs) {
uint8_t *i = (uint8_t *) msg->dgram.ptr + msg->props_start + ofs;
size_t new_pos = ofs;
uint32_t bytes_consumed;
prop->id = i[0];
if (crt_pos >= msg->dgram.len ||
crt_pos >= msg->props_start + msg->props_size)
if (ofs >= msg->dgram.len || ofs >= msg->props_start + msg->props_size)
return 0;
uint8_t id = i[0];
i++, new_pos++;
prop->id = id;
switch (mqtt_prop_type_by_id(id)) {
switch (mqtt_prop_type_by_id(prop->id)) {
case MQTT_PROP_TYPE_STRING_PAIR:
prop->key.len = (uint16_t)((((uint16_t) i[0]) << 8) | i[1]);
prop->key.len = (uint16_t) ((((uint16_t) i[0]) << 8) | i[1]);
prop->key.ptr = (char *) i + 2;
i += 2 + prop->key.len;
prop->val.len = (uint16_t)((((uint16_t) i[0]) << 8) | i[1]);
prop->val.len = (uint16_t) ((((uint16_t) i[0]) << 8) | i[1]);
prop->val.ptr = (char *) i + 2;
new_pos += 2 * sizeof(uint16_t) + prop->val.len + prop->key.len;
break;
@ -3111,7 +3123,7 @@ size_t mg_mqtt_next_prop(struct mg_mqtt_message *msg, struct mg_mqtt_prop *prop,
new_pos++;
break;
case MQTT_PROP_TYPE_SHORT:
prop->iv = (uint16_t)((((uint16_t) i[0]) << 8) | i[1]);
prop->iv = (uint16_t) ((((uint16_t) i[0]) << 8) | i[1]);
new_pos += sizeof(uint16_t);
break;
case MQTT_PROP_TYPE_INT:
@ -3120,12 +3132,12 @@ size_t mg_mqtt_next_prop(struct mg_mqtt_message *msg, struct mg_mqtt_prop *prop,
new_pos += sizeof(uint32_t);
break;
case MQTT_PROP_TYPE_STRING:
prop->val.len = (uint16_t)((((uint16_t) i[0]) << 8) | i[1]);
prop->val.len = (uint16_t) ((((uint16_t) i[0]) << 8) | i[1]);
prop->val.ptr = (char *) i + 2;
new_pos += 2 + prop->val.len;
break;
case MQTT_PROP_TYPE_BINARY_DATA:
prop->val.len = (uint16_t)((((uint16_t) i[0]) << 8) | i[1]);
prop->val.len = (uint16_t) ((((uint16_t) i[0]) << 8) | i[1]);
prop->val.ptr = (char *) i + 2;
new_pos += 2 + prop->val.len;
break;
@ -3133,8 +3145,7 @@ size_t mg_mqtt_next_prop(struct mg_mqtt_message *msg, struct mg_mqtt_prop *prop,
prop->iv = decode_variable_length((char *) i, &bytes_consumed);
new_pos += bytes_consumed;
break;
default:
new_pos = 0;
default: new_pos = 0;
}
return new_pos;
@ -3143,7 +3154,7 @@ size_t mg_mqtt_next_prop(struct mg_mqtt_message *msg, struct mg_mqtt_prop *prop,
void mg_mqtt_login(struct mg_connection *c, const struct mg_mqtt_opts *opts) {
char rnd[10], client_id[21];
struct mg_str cid = opts->client_id;
uint32_t total_len = 7 + 1 + 2 + 2;
size_t total_len = 7 + 1 + 2 + 2;
uint8_t hdr[8] = {0, 4, 'M', 'Q', 'T', 'T', opts->version, 0};
if (cid.len == 0) {
@ -3155,7 +3166,7 @@ void mg_mqtt_login(struct mg_connection *c, const struct mg_mqtt_opts *opts) {
if (hdr[6] == 0) hdr[6] = 4; // If version is not set, use 4 (3.1.1)
c->is_mqtt5 = hdr[6] == 5; // Set version 5 flag
hdr[7] = (uint8_t)((opts->qos & 3) << 3); // Connection flags
hdr[7] = (uint8_t) ((opts->qos & 3) << 3); // Connection flags
if (opts->user.len > 0) {
total_len += 2 + (uint32_t) opts->user.len;
hdr[7] |= MQTT_HAS_USER_NAME;
@ -3172,13 +3183,12 @@ void mg_mqtt_login(struct mg_connection *c, const struct mg_mqtt_opts *opts) {
if (opts->retain) hdr[7] |= MQTT_WILL_RETAIN;
total_len += (uint32_t) cid.len;
if (c->is_mqtt5) {
total_len += get_full_properties_size(opts->props, opts->num_props);
total_len += get_props_size(opts->props, opts->num_props);
if (hdr[7] & MQTT_HAS_WILL)
total_len += get_full_properties_size(opts->will_props,
opts->num_will_props);
total_len += get_props_size(opts->will_props, opts->num_will_props);
}
mg_mqtt_send_header(c, MQTT_CMD_CONNECT, 0, total_len);
mg_mqtt_send_header(c, MQTT_CMD_CONNECT, 0, (uint32_t) total_len);
mg_send(c, hdr, sizeof(hdr));
// keepalive == 0 means "do not disconnect us!"
mg_send_u16(c, mg_htons((uint16_t) opts->keepalive));
@ -3208,16 +3218,15 @@ void mg_mqtt_login(struct mg_connection *c, const struct mg_mqtt_opts *opts) {
}
void mg_mqtt_pub(struct mg_connection *c, const struct mg_mqtt_opts *opts) {
uint8_t flags = (uint8_t)(((opts->qos & 3) << 1) | (opts->retain ? 1 : 0));
uint32_t len = 2 + (uint32_t) opts->topic.len + (uint32_t) opts->message.len;
uint8_t flags = (uint8_t) (((opts->qos & 3) << 1) | (opts->retain ? 1 : 0));
size_t len = 2 + opts->topic.len + opts->message.len;
MG_DEBUG(("%lu [%.*s] -> [%.*s]", c->id, (int) opts->topic.len,
(char *) opts->topic.ptr, (int) opts->message.len,
(char *) opts->message.ptr));
if (opts->qos > 0) len += 2;
if (c->is_mqtt5)
len += get_full_properties_size(opts->props, opts->num_props);
if (c->is_mqtt5) len += get_props_size(opts->props, opts->num_props);
mg_mqtt_send_header(c, MQTT_CMD_PUBLISH, flags, len);
mg_mqtt_send_header(c, MQTT_CMD_PUBLISH, flags, (uint32_t) len);
mg_send_u16(c, mg_htons((uint16_t) opts->topic.len));
mg_send(c, opts->topic.ptr, opts->topic.len);
if (opts->qos > 0) {
@ -3232,12 +3241,10 @@ void mg_mqtt_pub(struct mg_connection *c, const struct mg_mqtt_opts *opts) {
void mg_mqtt_sub(struct mg_connection *c, const struct mg_mqtt_opts *opts) {
uint8_t qos_ = opts->qos & 3;
uint32_t len =
2 + (uint32_t) opts->topic.len + 2 + 1 +
(c->is_mqtt5 ? get_full_properties_size(opts->props, opts->num_props)
: 0);
size_t plen = c->is_mqtt5 ? get_props_size(opts->props, opts->num_props) : 0;
size_t len = 2 + opts->topic.len + 2 + 1 + plen;
mg_mqtt_send_header(c, MQTT_CMD_SUBSCRIBE, 2, len);
mg_mqtt_send_header(c, MQTT_CMD_SUBSCRIBE, 2, (uint32_t) len);
if (++c->mgr->mqtt_id == 0) ++c->mgr->mqtt_id;
mg_send_u16(c, mg_htons(c->mgr->mqtt_id));
if (c->is_mqtt5) mg_send_mqtt_properties(c, opts->props, opts->num_props);
@ -3255,21 +3262,21 @@ int mg_mqtt_parse(const uint8_t *buf, size_t len, uint8_t version,
memset(m, 0, sizeof(*m));
m->dgram.ptr = (char *) buf;
if (len < 2) return MQTT_INCOMPLETE;
m->cmd = (uint8_t)(buf[0] >> 4);
m->cmd = (uint8_t) (buf[0] >> 4);
m->qos = (buf[0] >> 1) & 3;
n = len_len = 0;
p = (uint8_t *) buf + 1;
while ((size_t)(p - buf) < len) {
while ((size_t) (p - buf) < len) {
lc = *((uint8_t *) p++);
n += (uint32_t)((lc & 0x7f) << 7 * len_len);
n += (uint32_t) ((lc & 0x7f) << 7 * len_len);
len_len++;
if (!(lc & 0x80)) break;
if (len_len >= 4) return MQTT_MALFORMED;
}
end = p + n;
if ((lc & 0x80) || (end > buf + len)) return MQTT_INCOMPLETE;
m->dgram.len = (size_t)(end - buf);
m->dgram.len = (size_t) (end - buf);
switch (m->cmd) {
case MQTT_CMD_CONNACK:
@ -3285,33 +3292,32 @@ int mg_mqtt_parse(const uint8_t *buf, size_t len, uint8_t version,
case MQTT_CMD_UNSUBSCRIBE:
case MQTT_CMD_UNSUBACK:
if (p + 2 > end) return MQTT_MALFORMED;
m->id = (uint16_t)((((uint16_t) p[0]) << 8) | p[1]);
m->id = (uint16_t) ((((uint16_t) p[0]) << 8) | p[1]);
p += 2;
break;
case MQTT_CMD_PUBLISH: {
if (p + 2 > end) return MQTT_MALFORMED;
m->topic.len = (uint16_t)((((uint16_t) p[0]) << 8) | p[1]);
m->topic.len = (uint16_t) ((((uint16_t) p[0]) << 8) | p[1]);
m->topic.ptr = (char *) p + 2;
p += 2 + m->topic.len;
if (p > end) return MQTT_MALFORMED;
if (m->qos > 0) {
if (p + 2 > end) return MQTT_MALFORMED;
m->id = (uint16_t)((((uint16_t) p[0]) << 8) | p[1]);
m->id = (uint16_t) ((((uint16_t) p[0]) << 8) | p[1]);
p += 2;
}
if (p > end) return MQTT_MALFORMED;
if (version == 5 && p + 2 < end) {
m->props_size = decode_variable_length((char *) p, &len_len);
m->props_start = (size_t)(p + len_len - buf);
m->props_start = (size_t) (p + len_len - buf);
p += len_len + m->props_size;
}
if (p > end) return MQTT_MALFORMED;
m->data.ptr = (char *) p;
m->data.len = (size_t)(end - p);
m->data.len = (size_t) (end - p);
break;
}
default:
break;
default: break;
}
return MQTT_OK;
}
@ -3379,21 +3385,16 @@ void mg_mqtt_pong(struct mg_connection *nc) {
mg_mqtt_send_header(nc, MQTT_CMD_PINGRESP, 0, 0);
}
void mg_mqtt_disconnect(struct mg_connection *nc,
void mg_mqtt_disconnect(struct mg_connection *c,
const struct mg_mqtt_opts *opts) {
uint32_t len;
size_t len = 0;
if (c->is_mqtt5) len = 1 + get_props_size(opts->props, opts->num_props);
mg_mqtt_send_header(c, MQTT_CMD_DISCONNECT, 0, (uint32_t) len);
if (nc->is_mqtt5)
len = 1 + get_full_properties_size(opts->props, opts->num_props);
else
len = 0;
mg_mqtt_send_header(nc, MQTT_CMD_DISCONNECT, 0, len);
if (nc->is_mqtt5) {
if (c->is_mqtt5) {
uint8_t zero = 0;
mg_send(nc, &zero, sizeof(zero)); // reason code
mg_send_mqtt_properties(nc, opts->props, opts->num_props);
mg_send(c, &zero, sizeof(zero)); // reason code
mg_send_mqtt_properties(c, opts->props, opts->num_props);
}
}

View File

@ -1454,74 +1454,40 @@ enum {
enum { MQTT_OK, MQTT_INCOMPLETE, MQTT_MALFORMED };
struct mqtt_prop_map_t {
uint8_t id;
uint8_t type;
};
static const struct mqtt_prop_map_t mqtt_prop_map[] = {
{MQTT_PROP_PAYLOAD_FORMAT_INDICATOR, MQTT_PROP_TYPE_BYTE},
{MQTT_PROP_MESSAGE_EXPIRY_INTERVAL, MQTT_PROP_TYPE_INT},
{MQTT_PROP_CONTENT_TYPE, MQTT_PROP_TYPE_STRING},
{MQTT_PROP_RESPONSE_TOPIC, MQTT_PROP_TYPE_STRING},
{MQTT_PROP_CORRELATION_DATA, MQTT_PROP_TYPE_BINARY_DATA},
{MQTT_PROP_SUBSCRIPTION_IDENTIFIER, MQTT_PROP_TYPE_VARIABLE_INT},
{MQTT_PROP_SESSION_EXPIRY_INTERVAL, MQTT_PROP_TYPE_INT},
{MQTT_PROP_ASSIGNED_CLIENT_IDENTIFIER, MQTT_PROP_TYPE_STRING},
{MQTT_PROP_SERVER_KEEP_ALIVE, MQTT_PROP_TYPE_SHORT},
{MQTT_PROP_AUTHENTICATION_METHOD, MQTT_PROP_TYPE_STRING},
{MQTT_PROP_AUTHENTICATION_DATA, MQTT_PROP_TYPE_BINARY_DATA},
{MQTT_PROP_REQUEST_PROBLEM_INFORMATION, MQTT_PROP_TYPE_BYTE},
{MQTT_PROP_WILL_DELAY_INTERVAL, MQTT_PROP_TYPE_INT},
{MQTT_PROP_REQUEST_RESPONSE_INFORMATION, MQTT_PROP_TYPE_BYTE},
{MQTT_PROP_RESPONSE_INFORMATION, MQTT_PROP_TYPE_STRING},
{MQTT_PROP_SERVER_REFERENCE, MQTT_PROP_TYPE_STRING},
{MQTT_PROP_REASON_STRING, MQTT_PROP_TYPE_STRING},
{MQTT_PROP_RECEIVE_MAXIMUM, MQTT_PROP_TYPE_SHORT},
{MQTT_PROP_TOPIC_ALIAS_MAXIMUM, MQTT_PROP_TYPE_SHORT},
{MQTT_PROP_TOPIC_ALIAS, MQTT_PROP_TYPE_SHORT},
{MQTT_PROP_MAXIMUM_QOS, MQTT_PROP_TYPE_BYTE},
{MQTT_PROP_RETAIN_AVAILABLE, MQTT_PROP_TYPE_BYTE},
{MQTT_PROP_USER_PROPERTY, MQTT_PROP_TYPE_STRING_PAIR},
{MQTT_PROP_MAXIMUM_PACKET_SIZE, MQTT_PROP_TYPE_INT},
{MQTT_PROP_WILDCARD_SUBSCRIPTION_AVAILABLE, MQTT_PROP_TYPE_BYTE},
{MQTT_PROP_SUBSCRIPTION_IDENTIFIER_AVAILABLE, MQTT_PROP_TYPE_BYTE},
{MQTT_PROP_SHARED_SUBSCRIPTION_AVAILABLE, MQTT_PROP_TYPE_BYTE}};
struct mg_mqtt_prop {
uint8_t id; // Enumerated at MQTT5 Reference
uint32_t iv; // integer value for 8-, 16-, 32-bit integers types
struct mg_str key; // non-NULL only for user property type
struct mg_str val; // non-NULL only for UTF-8 types and user properties
uint32_t iv; // Integer value for 8-, 16-, 32-bit integers types
struct mg_str key; // Non-NULL only for user property type
struct mg_str val; // Non-NULL only for UTF-8 types and user properties
};
struct mg_mqtt_opts {
struct mg_str user; // Username, can be empty
struct mg_str pass; // Password, can be empty
struct mg_str client_id; // Client ID
struct mg_str topic; // topic
struct mg_str message; // message
uint8_t qos; // message quality of service
uint8_t version; // Can be 4 (3.1.1), or 5. If 0, assume 4.
uint16_t keepalive; // Keep-alive timer in seconds
bool retain; // Retain last will
bool clean; // Use clean session, 0 or 1
struct mg_mqtt_prop *props; // MQTT5 props array
int num_props; // number of props
struct mg_mqtt_prop *will_props; // only found in the CONNECT packet
int num_will_props; // number of will props
struct mg_str user; // Username, can be empty
struct mg_str pass; // Password, can be empty
struct mg_str client_id; // Client ID
struct mg_str topic; // topic
struct mg_str message; // message
uint8_t qos; // message quality of service
uint8_t version; // Can be 4 (3.1.1), or 5. If 0, assume 4.
uint16_t keepalive; // Keep-alive timer in seconds
bool retain; // Retain last will
bool clean; // Use clean session, 0 or 1
struct mg_mqtt_prop *props; // MQTT5 props array
size_t num_props; // number of props
struct mg_mqtt_prop *will_props; // Valid only for CONNECT packet
size_t num_will_props; // Number of will props
};
struct mg_mqtt_message {
struct mg_str topic; // Parsed topic
struct mg_str data; // Parsed message
struct mg_str dgram; // Whole MQTT datagram, including headers
uint16_t id; // Set for PUBACK, PUBREC, PUBREL, PUBCOMP, SUBACK, PUBLISH
uint8_t cmd; // MQTT command, one of MQTT_CMD_*
uint8_t qos; // Quality of service
uint8_t ack; // Connack return code. 0 - success
size_t props_start; // offset to the start of the properties
size_t props_size; // length of the properties
uint16_t id; // For PUBACK, PUBREC, PUBREL, PUBCOMP, SUBACK, PUBLISH
uint8_t cmd; // MQTT command, one of MQTT_CMD_*
uint8_t qos; // Quality of service
uint8_t ack; // Connack return code. 0 - success
size_t props_start; // Offset to the start of the properties
size_t props_size; // Length of the properties
};
struct mg_connection *mg_mqtt_connect(struct mg_mgr *, const char *url,
@ -1537,8 +1503,7 @@ void mg_mqtt_send_header(struct mg_connection *, uint8_t cmd, uint8_t flags,
uint32_t len);
void mg_mqtt_ping(struct mg_connection *);
void mg_mqtt_pong(struct mg_connection *);
void mg_mqtt_disconnect(struct mg_connection *,
const struct mg_mqtt_opts *opts);
void mg_mqtt_disconnect(struct mg_connection *, const struct mg_mqtt_opts *);
size_t mg_mqtt_next_prop(struct mg_mqtt_message *, struct mg_mqtt_prop *,
size_t ofs);

View File

@ -1,8 +1,8 @@
#include "mqtt.h"
#include "arch.h"
#include "base64.h"
#include "event.h"
#include "log.h"
#include "mqtt.h"
#include "url.h"
#include "util.h"
@ -12,17 +12,51 @@
#define MQTT_HAS_PASSWORD 0x40
#define MQTT_HAS_USER_NAME 0x80
struct mg_mqtt_pmap {
uint8_t id;
uint8_t type;
};
static const struct mg_mqtt_pmap s_prop_map[] = {
{MQTT_PROP_PAYLOAD_FORMAT_INDICATOR, MQTT_PROP_TYPE_BYTE},
{MQTT_PROP_MESSAGE_EXPIRY_INTERVAL, MQTT_PROP_TYPE_INT},
{MQTT_PROP_CONTENT_TYPE, MQTT_PROP_TYPE_STRING},
{MQTT_PROP_RESPONSE_TOPIC, MQTT_PROP_TYPE_STRING},
{MQTT_PROP_CORRELATION_DATA, MQTT_PROP_TYPE_BINARY_DATA},
{MQTT_PROP_SUBSCRIPTION_IDENTIFIER, MQTT_PROP_TYPE_VARIABLE_INT},
{MQTT_PROP_SESSION_EXPIRY_INTERVAL, MQTT_PROP_TYPE_INT},
{MQTT_PROP_ASSIGNED_CLIENT_IDENTIFIER, MQTT_PROP_TYPE_STRING},
{MQTT_PROP_SERVER_KEEP_ALIVE, MQTT_PROP_TYPE_SHORT},
{MQTT_PROP_AUTHENTICATION_METHOD, MQTT_PROP_TYPE_STRING},
{MQTT_PROP_AUTHENTICATION_DATA, MQTT_PROP_TYPE_BINARY_DATA},
{MQTT_PROP_REQUEST_PROBLEM_INFORMATION, MQTT_PROP_TYPE_BYTE},
{MQTT_PROP_WILL_DELAY_INTERVAL, MQTT_PROP_TYPE_INT},
{MQTT_PROP_REQUEST_RESPONSE_INFORMATION, MQTT_PROP_TYPE_BYTE},
{MQTT_PROP_RESPONSE_INFORMATION, MQTT_PROP_TYPE_STRING},
{MQTT_PROP_SERVER_REFERENCE, MQTT_PROP_TYPE_STRING},
{MQTT_PROP_REASON_STRING, MQTT_PROP_TYPE_STRING},
{MQTT_PROP_RECEIVE_MAXIMUM, MQTT_PROP_TYPE_SHORT},
{MQTT_PROP_TOPIC_ALIAS_MAXIMUM, MQTT_PROP_TYPE_SHORT},
{MQTT_PROP_TOPIC_ALIAS, MQTT_PROP_TYPE_SHORT},
{MQTT_PROP_MAXIMUM_QOS, MQTT_PROP_TYPE_BYTE},
{MQTT_PROP_RETAIN_AVAILABLE, MQTT_PROP_TYPE_BYTE},
{MQTT_PROP_USER_PROPERTY, MQTT_PROP_TYPE_STRING_PAIR},
{MQTT_PROP_MAXIMUM_PACKET_SIZE, MQTT_PROP_TYPE_INT},
{MQTT_PROP_WILDCARD_SUBSCRIPTION_AVAILABLE, MQTT_PROP_TYPE_BYTE},
{MQTT_PROP_SUBSCRIPTION_IDENTIFIER_AVAILABLE, MQTT_PROP_TYPE_BYTE},
{MQTT_PROP_SHARED_SUBSCRIPTION_AVAILABLE, MQTT_PROP_TYPE_BYTE}};
void mg_mqtt_send_header(struct mg_connection *c, uint8_t cmd, uint8_t flags,
uint32_t len) {
uint8_t buf[1 + sizeof(len)], *vlen = &buf[1];
buf[0] = (uint8_t)((cmd << 4) | flags);
buf[0] = (uint8_t) ((cmd << 4) | flags);
do {
*vlen = len % 0x80;
len /= 0x80;
if (len > 0) *vlen |= 0x80;
vlen++;
} while (len > 0 && vlen < &buf[sizeof(buf)]);
mg_send(c, buf, (size_t)(vlen - buf));
mg_send(c, buf, (size_t) (vlen - buf));
}
static void mg_send_u16(struct mg_connection *c, uint16_t value) {
@ -33,21 +67,20 @@ static void mg_send_u32(struct mg_connection *c, uint32_t value) {
mg_send(c, &value, sizeof(value));
}
static uint8_t compute_variable_length_size(uint32_t length) {
static uint8_t compute_variable_length_size(size_t length) {
uint8_t bytes_needed = 0;
do {
bytes_needed++;
length /= 0x80;
} while (length > 0);
return bytes_needed;
}
static int encode_variable_length(uint8_t *buf, int value) {
static int encode_variable_length(uint8_t *buf, size_t value) {
int len = 0;
do {
uint8_t byte = (uint8_t)(value % 128);
uint8_t byte = (uint8_t) (value % 128);
value /= 128;
if (value > 0) byte |= 0x80;
buf[len++] = byte;
@ -58,14 +91,10 @@ static int encode_variable_length(uint8_t *buf, int value) {
static uint32_t decode_variable_length(const char *buf,
uint32_t *bytes_consumed) {
const uint8_t *p = (const uint8_t *) buf;
uint32_t value = 0;
uint32_t multiplier = 1;
uint8_t encoded_byte;
uint32_t offset;
uint32_t value = 0, multiplier = 1, offset;
for (offset = 0; offset < 4; offset++) {
encoded_byte = p[offset];
uint8_t encoded_byte = ((uint8_t *) buf)[offset];
value += (encoded_byte & 0x7F) * multiplier;
multiplier *= 128;
@ -78,72 +107,59 @@ static uint32_t decode_variable_length(const char *buf,
}
static int mqtt_prop_type_by_id(uint8_t prop_id) {
size_t num_properties = sizeof(mqtt_prop_map) / sizeof(mqtt_prop_map[0]);
for (size_t i = 0; i < num_properties; ++i) {
if (mqtt_prop_map[i].id == prop_id) {
return mqtt_prop_map[i].type;
}
size_t i, num_properties = sizeof(s_prop_map) / sizeof(s_prop_map[0]);
for (i = 0; i < num_properties; ++i) {
if (s_prop_map[i].id == prop_id) return s_prop_map[i].type;
}
return -1; // Property ID not found
}
/*
returns the size of the properties section, without the
size of the content's length
*/
static uint32_t get_properties_length(struct mg_mqtt_prop *props, int count) {
uint32_t size = 0;
for (int i = 0; i < count; i++) {
// Returns the size of the properties section, without the
// size of the content's length
static size_t get_properties_length(struct mg_mqtt_prop *props, size_t count) {
size_t i, size = 0;
for (i = 0; i < count; i++) {
size++; // identifier
switch (mqtt_prop_type_by_id(props[i].id)) {
case MQTT_PROP_TYPE_STRING_PAIR:
size += (uint32_t)(props[i].val.len + props[i].key.len +
2 * sizeof(uint16_t));
size += (uint32_t) (props[i].val.len + props[i].key.len +
2 * sizeof(uint16_t));
break;
case MQTT_PROP_TYPE_STRING:
size += (uint32_t)(props[i].val.len + sizeof(uint16_t));
size += (uint32_t) (props[i].val.len + sizeof(uint16_t));
break;
case MQTT_PROP_TYPE_BINARY_DATA:
size += (uint32_t)(props[i].val.len + sizeof(uint16_t));
size += (uint32_t) (props[i].val.len + sizeof(uint16_t));
break;
case MQTT_PROP_TYPE_VARIABLE_INT:
size += compute_variable_length_size((uint32_t) props[i].iv);
break;
case MQTT_PROP_TYPE_INT:
size += (uint32_t) sizeof(uint32_t);
break;
case MQTT_PROP_TYPE_SHORT:
size += (uint32_t) sizeof(uint16_t);
break;
default:
return size; // cannot parse further down
case MQTT_PROP_TYPE_INT: size += (uint32_t) sizeof(uint32_t); break;
case MQTT_PROP_TYPE_SHORT: size += (uint32_t) sizeof(uint16_t); break;
default: return size; // cannot parse further down
}
}
return size;
}
/*
returns the entire size of the properties section, including the
size of the variable length of the content
*/
static uint32_t get_full_properties_size(struct mg_mqtt_prop *props,
int count) {
uint32_t size = get_properties_length(props, count);
// returns the entire size of the properties section, including the
// size of the variable length of the content
static size_t get_props_size(struct mg_mqtt_prop *props, size_t count) {
size_t size = get_properties_length(props, count);
size += compute_variable_length_size(size);
return size;
}
static void mg_send_mqtt_properties(struct mg_connection *c,
struct mg_mqtt_prop *props, int nr_props) {
uint32_t total_size = get_properties_length(props, nr_props);
struct mg_mqtt_prop *props, size_t nprops) {
size_t total_size = get_properties_length(props, nprops);
uint8_t buf_v[4] = {0, 0, 0, 0};
uint8_t buf[4] = {0, 0, 0, 0};
int len = encode_variable_length(buf, (int) total_size);
int i, len = encode_variable_length(buf, total_size);
mg_send(c, buf, (size_t) len);
for (int i = 0; i < nr_props; i++) {
for (i = 0; i < (int) nprops; i++) {
mg_send(c, &props[i].id, sizeof(props[i].id));
switch (mqtt_prop_type_by_id(props[i].id)) {
case MQTT_PROP_TYPE_STRING_PAIR:
@ -170,7 +186,7 @@ static void mg_send_mqtt_properties(struct mg_connection *c,
mg_send(c, props[i].val.ptr, props[i].val.len);
break;
case MQTT_PROP_TYPE_VARIABLE_INT:
len = encode_variable_length(buf_v, (int) props[i].iv);
len = encode_variable_length(buf_v, props[i].iv);
mg_send(c, buf_v, (size_t) len);
break;
}
@ -178,26 +194,22 @@ static void mg_send_mqtt_properties(struct mg_connection *c,
}
size_t mg_mqtt_next_prop(struct mg_mqtt_message *msg, struct mg_mqtt_prop *prop,
size_t crt_pos) {
unsigned char *i =
(unsigned char *) msg->dgram.ptr + msg->props_start + crt_pos;
size_t new_pos = crt_pos;
size_t ofs) {
uint8_t *i = (uint8_t *) msg->dgram.ptr + msg->props_start + ofs;
size_t new_pos = ofs;
uint32_t bytes_consumed;
prop->id = i[0];
if (crt_pos >= msg->dgram.len ||
crt_pos >= msg->props_start + msg->props_size)
if (ofs >= msg->dgram.len || ofs >= msg->props_start + msg->props_size)
return 0;
uint8_t id = i[0];
i++, new_pos++;
prop->id = id;
switch (mqtt_prop_type_by_id(id)) {
switch (mqtt_prop_type_by_id(prop->id)) {
case MQTT_PROP_TYPE_STRING_PAIR:
prop->key.len = (uint16_t)((((uint16_t) i[0]) << 8) | i[1]);
prop->key.len = (uint16_t) ((((uint16_t) i[0]) << 8) | i[1]);
prop->key.ptr = (char *) i + 2;
i += 2 + prop->key.len;
prop->val.len = (uint16_t)((((uint16_t) i[0]) << 8) | i[1]);
prop->val.len = (uint16_t) ((((uint16_t) i[0]) << 8) | i[1]);
prop->val.ptr = (char *) i + 2;
new_pos += 2 * sizeof(uint16_t) + prop->val.len + prop->key.len;
break;
@ -206,7 +218,7 @@ size_t mg_mqtt_next_prop(struct mg_mqtt_message *msg, struct mg_mqtt_prop *prop,
new_pos++;
break;
case MQTT_PROP_TYPE_SHORT:
prop->iv = (uint16_t)((((uint16_t) i[0]) << 8) | i[1]);
prop->iv = (uint16_t) ((((uint16_t) i[0]) << 8) | i[1]);
new_pos += sizeof(uint16_t);
break;
case MQTT_PROP_TYPE_INT:
@ -215,12 +227,12 @@ size_t mg_mqtt_next_prop(struct mg_mqtt_message *msg, struct mg_mqtt_prop *prop,
new_pos += sizeof(uint32_t);
break;
case MQTT_PROP_TYPE_STRING:
prop->val.len = (uint16_t)((((uint16_t) i[0]) << 8) | i[1]);
prop->val.len = (uint16_t) ((((uint16_t) i[0]) << 8) | i[1]);
prop->val.ptr = (char *) i + 2;
new_pos += 2 + prop->val.len;
break;
case MQTT_PROP_TYPE_BINARY_DATA:
prop->val.len = (uint16_t)((((uint16_t) i[0]) << 8) | i[1]);
prop->val.len = (uint16_t) ((((uint16_t) i[0]) << 8) | i[1]);
prop->val.ptr = (char *) i + 2;
new_pos += 2 + prop->val.len;
break;
@ -228,8 +240,7 @@ size_t mg_mqtt_next_prop(struct mg_mqtt_message *msg, struct mg_mqtt_prop *prop,
prop->iv = decode_variable_length((char *) i, &bytes_consumed);
new_pos += bytes_consumed;
break;
default:
new_pos = 0;
default: new_pos = 0;
}
return new_pos;
@ -238,7 +249,7 @@ size_t mg_mqtt_next_prop(struct mg_mqtt_message *msg, struct mg_mqtt_prop *prop,
void mg_mqtt_login(struct mg_connection *c, const struct mg_mqtt_opts *opts) {
char rnd[10], client_id[21];
struct mg_str cid = opts->client_id;
uint32_t total_len = 7 + 1 + 2 + 2;
size_t total_len = 7 + 1 + 2 + 2;
uint8_t hdr[8] = {0, 4, 'M', 'Q', 'T', 'T', opts->version, 0};
if (cid.len == 0) {
@ -250,7 +261,7 @@ void mg_mqtt_login(struct mg_connection *c, const struct mg_mqtt_opts *opts) {
if (hdr[6] == 0) hdr[6] = 4; // If version is not set, use 4 (3.1.1)
c->is_mqtt5 = hdr[6] == 5; // Set version 5 flag
hdr[7] = (uint8_t)((opts->qos & 3) << 3); // Connection flags
hdr[7] = (uint8_t) ((opts->qos & 3) << 3); // Connection flags
if (opts->user.len > 0) {
total_len += 2 + (uint32_t) opts->user.len;
hdr[7] |= MQTT_HAS_USER_NAME;
@ -267,13 +278,12 @@ void mg_mqtt_login(struct mg_connection *c, const struct mg_mqtt_opts *opts) {
if (opts->retain) hdr[7] |= MQTT_WILL_RETAIN;
total_len += (uint32_t) cid.len;
if (c->is_mqtt5) {
total_len += get_full_properties_size(opts->props, opts->num_props);
total_len += get_props_size(opts->props, opts->num_props);
if (hdr[7] & MQTT_HAS_WILL)
total_len += get_full_properties_size(opts->will_props,
opts->num_will_props);
total_len += get_props_size(opts->will_props, opts->num_will_props);
}
mg_mqtt_send_header(c, MQTT_CMD_CONNECT, 0, total_len);
mg_mqtt_send_header(c, MQTT_CMD_CONNECT, 0, (uint32_t) total_len);
mg_send(c, hdr, sizeof(hdr));
// keepalive == 0 means "do not disconnect us!"
mg_send_u16(c, mg_htons((uint16_t) opts->keepalive));
@ -303,16 +313,15 @@ void mg_mqtt_login(struct mg_connection *c, const struct mg_mqtt_opts *opts) {
}
void mg_mqtt_pub(struct mg_connection *c, const struct mg_mqtt_opts *opts) {
uint8_t flags = (uint8_t)(((opts->qos & 3) << 1) | (opts->retain ? 1 : 0));
uint32_t len = 2 + (uint32_t) opts->topic.len + (uint32_t) opts->message.len;
uint8_t flags = (uint8_t) (((opts->qos & 3) << 1) | (opts->retain ? 1 : 0));
size_t len = 2 + opts->topic.len + opts->message.len;
MG_DEBUG(("%lu [%.*s] -> [%.*s]", c->id, (int) opts->topic.len,
(char *) opts->topic.ptr, (int) opts->message.len,
(char *) opts->message.ptr));
if (opts->qos > 0) len += 2;
if (c->is_mqtt5)
len += get_full_properties_size(opts->props, opts->num_props);
if (c->is_mqtt5) len += get_props_size(opts->props, opts->num_props);
mg_mqtt_send_header(c, MQTT_CMD_PUBLISH, flags, len);
mg_mqtt_send_header(c, MQTT_CMD_PUBLISH, flags, (uint32_t) len);
mg_send_u16(c, mg_htons((uint16_t) opts->topic.len));
mg_send(c, opts->topic.ptr, opts->topic.len);
if (opts->qos > 0) {
@ -327,12 +336,10 @@ void mg_mqtt_pub(struct mg_connection *c, const struct mg_mqtt_opts *opts) {
void mg_mqtt_sub(struct mg_connection *c, const struct mg_mqtt_opts *opts) {
uint8_t qos_ = opts->qos & 3;
uint32_t len =
2 + (uint32_t) opts->topic.len + 2 + 1 +
(c->is_mqtt5 ? get_full_properties_size(opts->props, opts->num_props)
: 0);
size_t plen = c->is_mqtt5 ? get_props_size(opts->props, opts->num_props) : 0;
size_t len = 2 + opts->topic.len + 2 + 1 + plen;
mg_mqtt_send_header(c, MQTT_CMD_SUBSCRIBE, 2, len);
mg_mqtt_send_header(c, MQTT_CMD_SUBSCRIBE, 2, (uint32_t) len);
if (++c->mgr->mqtt_id == 0) ++c->mgr->mqtt_id;
mg_send_u16(c, mg_htons(c->mgr->mqtt_id));
if (c->is_mqtt5) mg_send_mqtt_properties(c, opts->props, opts->num_props);
@ -350,21 +357,21 @@ int mg_mqtt_parse(const uint8_t *buf, size_t len, uint8_t version,
memset(m, 0, sizeof(*m));
m->dgram.ptr = (char *) buf;
if (len < 2) return MQTT_INCOMPLETE;
m->cmd = (uint8_t)(buf[0] >> 4);
m->cmd = (uint8_t) (buf[0] >> 4);
m->qos = (buf[0] >> 1) & 3;
n = len_len = 0;
p = (uint8_t *) buf + 1;
while ((size_t)(p - buf) < len) {
while ((size_t) (p - buf) < len) {
lc = *((uint8_t *) p++);
n += (uint32_t)((lc & 0x7f) << 7 * len_len);
n += (uint32_t) ((lc & 0x7f) << 7 * len_len);
len_len++;
if (!(lc & 0x80)) break;
if (len_len >= 4) return MQTT_MALFORMED;
}
end = p + n;
if ((lc & 0x80) || (end > buf + len)) return MQTT_INCOMPLETE;
m->dgram.len = (size_t)(end - buf);
m->dgram.len = (size_t) (end - buf);
switch (m->cmd) {
case MQTT_CMD_CONNACK:
@ -380,33 +387,32 @@ int mg_mqtt_parse(const uint8_t *buf, size_t len, uint8_t version,
case MQTT_CMD_UNSUBSCRIBE:
case MQTT_CMD_UNSUBACK:
if (p + 2 > end) return MQTT_MALFORMED;
m->id = (uint16_t)((((uint16_t) p[0]) << 8) | p[1]);
m->id = (uint16_t) ((((uint16_t) p[0]) << 8) | p[1]);
p += 2;
break;
case MQTT_CMD_PUBLISH: {
if (p + 2 > end) return MQTT_MALFORMED;
m->topic.len = (uint16_t)((((uint16_t) p[0]) << 8) | p[1]);
m->topic.len = (uint16_t) ((((uint16_t) p[0]) << 8) | p[1]);
m->topic.ptr = (char *) p + 2;
p += 2 + m->topic.len;
if (p > end) return MQTT_MALFORMED;
if (m->qos > 0) {
if (p + 2 > end) return MQTT_MALFORMED;
m->id = (uint16_t)((((uint16_t) p[0]) << 8) | p[1]);
m->id = (uint16_t) ((((uint16_t) p[0]) << 8) | p[1]);
p += 2;
}
if (p > end) return MQTT_MALFORMED;
if (version == 5 && p + 2 < end) {
m->props_size = decode_variable_length((char *) p, &len_len);
m->props_start = (size_t)(p + len_len - buf);
m->props_start = (size_t) (p + len_len - buf);
p += len_len + m->props_size;
}
if (p > end) return MQTT_MALFORMED;
m->data.ptr = (char *) p;
m->data.len = (size_t)(end - p);
m->data.len = (size_t) (end - p);
break;
}
default:
break;
default: break;
}
return MQTT_OK;
}
@ -474,21 +480,16 @@ void mg_mqtt_pong(struct mg_connection *nc) {
mg_mqtt_send_header(nc, MQTT_CMD_PINGRESP, 0, 0);
}
void mg_mqtt_disconnect(struct mg_connection *nc,
void mg_mqtt_disconnect(struct mg_connection *c,
const struct mg_mqtt_opts *opts) {
uint32_t len;
size_t len = 0;
if (c->is_mqtt5) len = 1 + get_props_size(opts->props, opts->num_props);
mg_mqtt_send_header(c, MQTT_CMD_DISCONNECT, 0, (uint32_t) len);
if (nc->is_mqtt5)
len = 1 + get_full_properties_size(opts->props, opts->num_props);
else
len = 0;
mg_mqtt_send_header(nc, MQTT_CMD_DISCONNECT, 0, len);
if (nc->is_mqtt5) {
if (c->is_mqtt5) {
uint8_t zero = 0;
mg_send(nc, &zero, sizeof(zero)); // reason code
mg_send_mqtt_properties(nc, opts->props, opts->num_props);
mg_send(c, &zero, sizeof(zero)); // reason code
mg_send_mqtt_properties(c, opts->props, opts->num_props);
}
}

View File

@ -59,74 +59,40 @@ enum {
enum { MQTT_OK, MQTT_INCOMPLETE, MQTT_MALFORMED };
struct mqtt_prop_map_t {
uint8_t id;
uint8_t type;
};
static const struct mqtt_prop_map_t mqtt_prop_map[] = {
{MQTT_PROP_PAYLOAD_FORMAT_INDICATOR, MQTT_PROP_TYPE_BYTE},
{MQTT_PROP_MESSAGE_EXPIRY_INTERVAL, MQTT_PROP_TYPE_INT},
{MQTT_PROP_CONTENT_TYPE, MQTT_PROP_TYPE_STRING},
{MQTT_PROP_RESPONSE_TOPIC, MQTT_PROP_TYPE_STRING},
{MQTT_PROP_CORRELATION_DATA, MQTT_PROP_TYPE_BINARY_DATA},
{MQTT_PROP_SUBSCRIPTION_IDENTIFIER, MQTT_PROP_TYPE_VARIABLE_INT},
{MQTT_PROP_SESSION_EXPIRY_INTERVAL, MQTT_PROP_TYPE_INT},
{MQTT_PROP_ASSIGNED_CLIENT_IDENTIFIER, MQTT_PROP_TYPE_STRING},
{MQTT_PROP_SERVER_KEEP_ALIVE, MQTT_PROP_TYPE_SHORT},
{MQTT_PROP_AUTHENTICATION_METHOD, MQTT_PROP_TYPE_STRING},
{MQTT_PROP_AUTHENTICATION_DATA, MQTT_PROP_TYPE_BINARY_DATA},
{MQTT_PROP_REQUEST_PROBLEM_INFORMATION, MQTT_PROP_TYPE_BYTE},
{MQTT_PROP_WILL_DELAY_INTERVAL, MQTT_PROP_TYPE_INT},
{MQTT_PROP_REQUEST_RESPONSE_INFORMATION, MQTT_PROP_TYPE_BYTE},
{MQTT_PROP_RESPONSE_INFORMATION, MQTT_PROP_TYPE_STRING},
{MQTT_PROP_SERVER_REFERENCE, MQTT_PROP_TYPE_STRING},
{MQTT_PROP_REASON_STRING, MQTT_PROP_TYPE_STRING},
{MQTT_PROP_RECEIVE_MAXIMUM, MQTT_PROP_TYPE_SHORT},
{MQTT_PROP_TOPIC_ALIAS_MAXIMUM, MQTT_PROP_TYPE_SHORT},
{MQTT_PROP_TOPIC_ALIAS, MQTT_PROP_TYPE_SHORT},
{MQTT_PROP_MAXIMUM_QOS, MQTT_PROP_TYPE_BYTE},
{MQTT_PROP_RETAIN_AVAILABLE, MQTT_PROP_TYPE_BYTE},
{MQTT_PROP_USER_PROPERTY, MQTT_PROP_TYPE_STRING_PAIR},
{MQTT_PROP_MAXIMUM_PACKET_SIZE, MQTT_PROP_TYPE_INT},
{MQTT_PROP_WILDCARD_SUBSCRIPTION_AVAILABLE, MQTT_PROP_TYPE_BYTE},
{MQTT_PROP_SUBSCRIPTION_IDENTIFIER_AVAILABLE, MQTT_PROP_TYPE_BYTE},
{MQTT_PROP_SHARED_SUBSCRIPTION_AVAILABLE, MQTT_PROP_TYPE_BYTE}};
struct mg_mqtt_prop {
uint8_t id; // Enumerated at MQTT5 Reference
uint32_t iv; // integer value for 8-, 16-, 32-bit integers types
struct mg_str key; // non-NULL only for user property type
struct mg_str val; // non-NULL only for UTF-8 types and user properties
uint32_t iv; // Integer value for 8-, 16-, 32-bit integers types
struct mg_str key; // Non-NULL only for user property type
struct mg_str val; // Non-NULL only for UTF-8 types and user properties
};
struct mg_mqtt_opts {
struct mg_str user; // Username, can be empty
struct mg_str pass; // Password, can be empty
struct mg_str client_id; // Client ID
struct mg_str topic; // topic
struct mg_str message; // message
uint8_t qos; // message quality of service
uint8_t version; // Can be 4 (3.1.1), or 5. If 0, assume 4.
uint16_t keepalive; // Keep-alive timer in seconds
bool retain; // Retain last will
bool clean; // Use clean session, 0 or 1
struct mg_mqtt_prop *props; // MQTT5 props array
int num_props; // number of props
struct mg_mqtt_prop *will_props; // only found in the CONNECT packet
int num_will_props; // number of will props
struct mg_str user; // Username, can be empty
struct mg_str pass; // Password, can be empty
struct mg_str client_id; // Client ID
struct mg_str topic; // topic
struct mg_str message; // message
uint8_t qos; // message quality of service
uint8_t version; // Can be 4 (3.1.1), or 5. If 0, assume 4.
uint16_t keepalive; // Keep-alive timer in seconds
bool retain; // Retain last will
bool clean; // Use clean session, 0 or 1
struct mg_mqtt_prop *props; // MQTT5 props array
size_t num_props; // number of props
struct mg_mqtt_prop *will_props; // Valid only for CONNECT packet
size_t num_will_props; // Number of will props
};
struct mg_mqtt_message {
struct mg_str topic; // Parsed topic
struct mg_str data; // Parsed message
struct mg_str dgram; // Whole MQTT datagram, including headers
uint16_t id; // Set for PUBACK, PUBREC, PUBREL, PUBCOMP, SUBACK, PUBLISH
uint8_t cmd; // MQTT command, one of MQTT_CMD_*
uint8_t qos; // Quality of service
uint8_t ack; // Connack return code. 0 - success
size_t props_start; // offset to the start of the properties
size_t props_size; // length of the properties
uint16_t id; // For PUBACK, PUBREC, PUBREL, PUBCOMP, SUBACK, PUBLISH
uint8_t cmd; // MQTT command, one of MQTT_CMD_*
uint8_t qos; // Quality of service
uint8_t ack; // Connack return code. 0 - success
size_t props_start; // Offset to the start of the properties
size_t props_size; // Length of the properties
};
struct mg_connection *mg_mqtt_connect(struct mg_mgr *, const char *url,
@ -142,7 +108,6 @@ void mg_mqtt_send_header(struct mg_connection *, uint8_t cmd, uint8_t flags,
uint32_t len);
void mg_mqtt_ping(struct mg_connection *);
void mg_mqtt_pong(struct mg_connection *);
void mg_mqtt_disconnect(struct mg_connection *,
const struct mg_mqtt_opts *opts);
void mg_mqtt_disconnect(struct mg_connection *, const struct mg_mqtt_opts *);
size_t mg_mqtt_next_prop(struct mg_mqtt_message *, struct mg_mqtt_prop *,
size_t ofs);