Refactor MQTT parsing

This commit is contained in:
cpq 2020-12-16 10:14:00 +00:00
parent 8d45cf6972
commit d9551c75b1
4 changed files with 40 additions and 90 deletions

View File

@ -1658,12 +1658,6 @@ void mg_md5_final(mg_md5_ctx *ctx, unsigned char digest[16]) {
enum { MQTT_OK, MQTT_INCOMPLETE, MQTT_MALFORMED };
struct mqtt_message {
size_t len, topic_offset, topic_len, data_offset, data_len;
uint16_t id;
uint8_t cmd, qos, connack_ret_code;
};
void mg_mqtt_send_header(struct mg_connection *c, uint8_t cmd, uint8_t flags,
uint32_t len) {
uint8_t buf[1 + sizeof(len)], *vlen = &buf[1];
@ -1759,33 +1753,33 @@ void mg_mqtt_sub(struct mg_connection *c, struct mg_str *topic) {
mg_send(c, &qos, sizeof(qos));
}
static int parse(const uint8_t *in, size_t inlen, struct mqtt_message *m) {
int mg_mqtt_parse(const uint8_t *buf, size_t len, struct mg_mqtt_message *m) {
uint8_t lc = 0, *p, *end;
uint32_t len = 0, len_len = 0;
uint32_t n = 0, len_len = 0;
memset(m, 0, sizeof(*m));
if (inlen < 2) return MQTT_INCOMPLETE;
m->cmd = in[0] >> 4;
m->qos = (in[0] >> 1) & 3;
m->dgram.ptr = (char *) buf;
if (len < 2) return MQTT_INCOMPLETE;
m->cmd = buf[0] >> 4;
m->qos = (buf[0] >> 1) & 3;
len = len_len = 0;
p = (uint8_t *) in + 1;
while ((size_t)(p - in) < inlen) {
n = len_len = 0;
p = (uint8_t *) buf + 1;
while ((size_t)(p - buf) < len) {
lc = *((uint8_t *) p++);
len += (lc & 0x7f) << 7 * len_len;
n += (lc & 0x7f) << 7 * len_len;
len_len++;
if (!(lc & 0x80)) break;
if (len_len >= 4) return MQTT_MALFORMED;
}
end = p + len;
if (lc & 0x80 || end > in + inlen) return MQTT_INCOMPLETE;
m->len = (int) (end - in);
end = p + n;
if (lc & 0x80 || end > buf + len) return MQTT_INCOMPLETE;
m->dgram.len = end - buf;
switch (m->cmd) {
case MQTT_CMD_CONNACK:
if (end - p < 2) return MQTT_MALFORMED;
m->connack_ret_code = p[1];
m->ack = p[1];
break;
case MQTT_CMD_PUBACK:
case MQTT_CMD_PUBREC:
@ -1803,9 +1797,9 @@ static int parse(const uint8_t *in, size_t inlen, struct mqtt_message *m) {
}
case MQTT_CMD_PUBLISH: {
if (p + 2 > end) return MQTT_MALFORMED;
m->topic_len = (p[0] << 8) | p[1];
m->topic_offset = p + 2 - in;
p += 2 + m->topic_len;
m->topic.len = (p[0] << 8) | p[1];
m->topic.ptr = (char *) p + 2;
p += 2 + m->topic.len;
if (p > end) return MQTT_MALFORMED;
if (m->qos > 0) {
if (p + 2 > end) return MQTT_MALFORMED;
@ -1813,8 +1807,8 @@ static int parse(const uint8_t *in, size_t inlen, struct mqtt_message *m) {
p += 2;
}
if (p > end) return MQTT_MALFORMED;
m->data_offset = p - in;
m->data_len = end - p;
m->data.ptr = (char *) p;
m->data.len = end - p;
break;
}
default:
@ -1837,31 +1831,12 @@ int mg_mqtt_next_sub(struct mg_mqtt_message *msg, struct mg_str *topic,
return new_pos;
}
int mg_mqtt_parse(const unsigned char *s, int n, struct mg_mqtt_message *mm) {
struct mqtt_message m;
int rc = parse(s, n, &m);
if (rc == MQTT_OK) {
mm->dgram.ptr = (char *) s;
mm->dgram.len = m.len;
mm->topic.ptr = (char *) s + m.topic_offset;
mm->topic.len = m.topic_len;
mm->data.ptr = (char *) s + m.data_offset;
mm->data.len = m.data_len;
mm->id = m.id;
mm->cmd = m.cmd;
mm->qos = m.qos;
mm->ack = m.connack_ret_code;
}
return rc;
}
static void mqtt_cb(struct mg_connection *c, int ev, void *ev_data,
void *fn_data) {
if (ev == MG_EV_READ) {
for (;;) {
struct mg_mqtt_message mm;
int rc = mg_mqtt_parse(c->recv.buf, c->recv.len, &mm);
LOG(LL_DEBUG, ("rc %d", rc));
if (rc == MQTT_MALFORMED) {
LOG(LL_ERROR, ("%p MQTT malformed message", c->fd));
c->is_closing = 1;

View File

@ -786,7 +786,7 @@ struct mg_connection *mg_mqtt_listen(struct mg_mgr *mgr, const char *url,
void mg_mqtt_pub(struct mg_connection *, struct mg_str *topic,
struct mg_str *data);
void mg_mqtt_sub(struct mg_connection *, struct mg_str *topic);
int mg_mqtt_parse(const unsigned char *buf, int len, struct mg_mqtt_message *m);
int mg_mqtt_parse(const uint8_t *buf, size_t len, struct mg_mqtt_message *m);
void mg_mqtt_send_header(struct mg_connection *, uint8_t cmd, uint8_t flags,
uint32_t len);
int mg_mqtt_next_sub(struct mg_mqtt_message *msg, struct mg_str *topic,

View File

@ -17,12 +17,6 @@
enum { MQTT_OK, MQTT_INCOMPLETE, MQTT_MALFORMED };
struct mqtt_message {
size_t len, topic_offset, topic_len, data_offset, data_len;
uint16_t id;
uint8_t cmd, qos, connack_ret_code;
};
void mg_mqtt_send_header(struct mg_connection *c, uint8_t cmd, uint8_t flags,
uint32_t len) {
uint8_t buf[1 + sizeof(len)], *vlen = &buf[1];
@ -118,33 +112,33 @@ void mg_mqtt_sub(struct mg_connection *c, struct mg_str *topic) {
mg_send(c, &qos, sizeof(qos));
}
static int parse(const uint8_t *in, size_t inlen, struct mqtt_message *m) {
int mg_mqtt_parse(const uint8_t *buf, size_t len, struct mg_mqtt_message *m) {
uint8_t lc = 0, *p, *end;
uint32_t len = 0, len_len = 0;
uint32_t n = 0, len_len = 0;
memset(m, 0, sizeof(*m));
if (inlen < 2) return MQTT_INCOMPLETE;
m->cmd = in[0] >> 4;
m->qos = (in[0] >> 1) & 3;
m->dgram.ptr = (char *) buf;
if (len < 2) return MQTT_INCOMPLETE;
m->cmd = buf[0] >> 4;
m->qos = (buf[0] >> 1) & 3;
len = len_len = 0;
p = (uint8_t *) in + 1;
while ((size_t)(p - in) < inlen) {
n = len_len = 0;
p = (uint8_t *) buf + 1;
while ((size_t)(p - buf) < len) {
lc = *((uint8_t *) p++);
len += (lc & 0x7f) << 7 * len_len;
n += (lc & 0x7f) << 7 * len_len;
len_len++;
if (!(lc & 0x80)) break;
if (len_len >= 4) return MQTT_MALFORMED;
}
end = p + len;
if (lc & 0x80 || end > in + inlen) return MQTT_INCOMPLETE;
m->len = (int) (end - in);
end = p + n;
if (lc & 0x80 || end > buf + len) return MQTT_INCOMPLETE;
m->dgram.len = end - buf;
switch (m->cmd) {
case MQTT_CMD_CONNACK:
if (end - p < 2) return MQTT_MALFORMED;
m->connack_ret_code = p[1];
m->ack = p[1];
break;
case MQTT_CMD_PUBACK:
case MQTT_CMD_PUBREC:
@ -162,9 +156,9 @@ static int parse(const uint8_t *in, size_t inlen, struct mqtt_message *m) {
}
case MQTT_CMD_PUBLISH: {
if (p + 2 > end) return MQTT_MALFORMED;
m->topic_len = (p[0] << 8) | p[1];
m->topic_offset = p + 2 - in;
p += 2 + m->topic_len;
m->topic.len = (p[0] << 8) | p[1];
m->topic.ptr = (char *) p + 2;
p += 2 + m->topic.len;
if (p > end) return MQTT_MALFORMED;
if (m->qos > 0) {
if (p + 2 > end) return MQTT_MALFORMED;
@ -172,8 +166,8 @@ static int parse(const uint8_t *in, size_t inlen, struct mqtt_message *m) {
p += 2;
}
if (p > end) return MQTT_MALFORMED;
m->data_offset = p - in;
m->data_len = end - p;
m->data.ptr = (char *) p;
m->data.len = end - p;
break;
}
default:
@ -196,31 +190,12 @@ int mg_mqtt_next_sub(struct mg_mqtt_message *msg, struct mg_str *topic,
return new_pos;
}
int mg_mqtt_parse(const unsigned char *s, int n, struct mg_mqtt_message *mm) {
struct mqtt_message m;
int rc = parse(s, n, &m);
if (rc == MQTT_OK) {
mm->dgram.ptr = (char *) s;
mm->dgram.len = m.len;
mm->topic.ptr = (char *) s + m.topic_offset;
mm->topic.len = m.topic_len;
mm->data.ptr = (char *) s + m.data_offset;
mm->data.len = m.data_len;
mm->id = m.id;
mm->cmd = m.cmd;
mm->qos = m.qos;
mm->ack = m.connack_ret_code;
}
return rc;
}
static void mqtt_cb(struct mg_connection *c, int ev, void *ev_data,
void *fn_data) {
if (ev == MG_EV_READ) {
for (;;) {
struct mg_mqtt_message mm;
int rc = mg_mqtt_parse(c->recv.buf, c->recv.len, &mm);
LOG(LL_DEBUG, ("rc %d", rc));
if (rc == MQTT_MALFORMED) {
LOG(LL_ERROR, ("%p MQTT malformed message", c->fd));
c->is_closing = 1;

View File

@ -50,7 +50,7 @@ struct mg_connection *mg_mqtt_listen(struct mg_mgr *mgr, const char *url,
void mg_mqtt_pub(struct mg_connection *, struct mg_str *topic,
struct mg_str *data);
void mg_mqtt_sub(struct mg_connection *, struct mg_str *topic);
int mg_mqtt_parse(const unsigned char *buf, int len, struct mg_mqtt_message *m);
int mg_mqtt_parse(const uint8_t *buf, size_t len, struct mg_mqtt_message *m);
void mg_mqtt_send_header(struct mg_connection *, uint8_t cmd, uint8_t flags,
uint32_t len);
int mg_mqtt_next_sub(struct mg_mqtt_message *msg, struct mg_str *topic,