mirror of
https://github.com/cesanta/mongoose.git
synced 2025-01-02 20:05:24 +08:00
Handle multiple MQTT messages per RECV event
PUBLISHED_FROM=fc98c51254dd94d3f443fb66e49449da7d9e754c
This commit is contained in:
parent
bd130136bc
commit
08a10a8af5
24
mongoose.c
24
mongoose.c
@ -9740,21 +9740,27 @@ static const char *scanto(const char *p, struct mg_str *s) {
|
|||||||
|
|
||||||
MG_INTERNAL int parse_mqtt(struct mbuf *io, struct mg_mqtt_message *mm) {
|
MG_INTERNAL int parse_mqtt(struct mbuf *io, struct mg_mqtt_message *mm) {
|
||||||
uint8_t header;
|
uint8_t header;
|
||||||
size_t len = 0;
|
size_t len = 0, len_len = 0;
|
||||||
|
const char *p, *end;
|
||||||
|
unsigned char lc = 0;
|
||||||
int cmd;
|
int cmd;
|
||||||
const char *p = &io->buf[1], *end;
|
|
||||||
|
|
||||||
if (io->len < 2) return -1;
|
if (io->len < 2) return -1;
|
||||||
header = io->buf[0];
|
header = io->buf[0];
|
||||||
cmd = header >> 4;
|
cmd = header >> 4;
|
||||||
|
|
||||||
/* decode mqtt variable length */
|
/* decode mqtt variable length */
|
||||||
do {
|
len = len_len = 0;
|
||||||
len += (*p & 127) << 7 * (p - &io->buf[1]);
|
p = io->buf + 1;
|
||||||
} while ((*p++ & 128) != 0 && ((size_t)(p - io->buf) <= io->len));
|
while ((size_t)(p - io->buf) < io->len) {
|
||||||
|
lc = *((const unsigned char *) p++);
|
||||||
|
len += (lc & 0x7f) << 7 * len_len;
|
||||||
|
len_len++;
|
||||||
|
if (!(lc & 0x80) || (len_len > sizeof(len))) break;
|
||||||
|
}
|
||||||
|
|
||||||
end = p + len;
|
end = p + len;
|
||||||
if (end > io->buf + io->len + 1) {
|
if (lc & 0x80 || end > io->buf + io->len) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -9829,7 +9835,6 @@ MG_INTERNAL int parse_mqtt(struct mbuf *io, struct mg_mqtt_message *mm) {
|
|||||||
|
|
||||||
static void mqtt_handler(struct mg_connection *nc, int ev,
|
static void mqtt_handler(struct mg_connection *nc, int ev,
|
||||||
void *ev_data MG_UD_ARG(void *user_data)) {
|
void *ev_data MG_UD_ARG(void *user_data)) {
|
||||||
int len;
|
|
||||||
struct mbuf *io = &nc->recv_mbuf;
|
struct mbuf *io = &nc->recv_mbuf;
|
||||||
struct mg_mqtt_message mm;
|
struct mg_mqtt_message mm;
|
||||||
memset(&mm, 0, sizeof(mm));
|
memset(&mm, 0, sizeof(mm));
|
||||||
@ -9838,10 +9843,13 @@ static void mqtt_handler(struct mg_connection *nc, int ev,
|
|||||||
|
|
||||||
switch (ev) {
|
switch (ev) {
|
||||||
case MG_EV_RECV:
|
case MG_EV_RECV:
|
||||||
len = parse_mqtt(io, &mm);
|
/* There can be multiple messages in the buffer, process them all. */
|
||||||
|
while (1) {
|
||||||
|
int len = parse_mqtt(io, &mm);
|
||||||
if (len == -1) break; /* not fully buffered */
|
if (len == -1) break; /* not fully buffered */
|
||||||
nc->handler(nc, MG_MQTT_EVENT_BASE + mm.cmd, &mm MG_UD_ARG(user_data));
|
nc->handler(nc, MG_MQTT_EVENT_BASE + mm.cmd, &mm MG_UD_ARG(user_data));
|
||||||
mbuf_remove(io, len);
|
mbuf_remove(io, len);
|
||||||
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user