mirror of
https://github.com/cesanta/mongoose.git
synced 2025-01-14 09:48:01 +08:00
Add mqtt-server example
This commit is contained in:
parent
f176e1ae59
commit
8d45cf6972
10
examples/mqtt-server/Makefile
Normal file
10
examples/mqtt-server/Makefile
Normal file
@ -0,0 +1,10 @@
|
||||
PROG ?= example
|
||||
|
||||
all: $(PROG)
|
||||
$(DEBUGGER) ./$(PROG) $(ARGS)
|
||||
|
||||
$(PROG): main.c
|
||||
$(CC) ../../mongoose.c -I../.. -W -Wall -DMG_ENABLE_LINES=1 $(CFLAGS) -o $(PROG) main.c
|
||||
|
||||
clean:
|
||||
rm -rf $(PROG) *.o *.dSYM *.gcov *.gcno *.gcda *.obj *.exe *.ilk *.pdb
|
83
examples/mqtt-server/main.c
Normal file
83
examples/mqtt-server/main.c
Normal file
@ -0,0 +1,83 @@
|
||||
// Copyright (c) 2020 Cesanta Software Limited
|
||||
// All rights reserved
|
||||
//
|
||||
// Example MQTT server. Usage:
|
||||
// 1. Start this server, type `make`
|
||||
// 2. Install mosquitto MQTT client
|
||||
// 3. In one terminal, run: mosquitto_sub -h localhost -t foo -t bar
|
||||
// 4. In another, run: mosquitto_pub -h localhost -t foo -m hi
|
||||
|
||||
#include "mongoose.h"
|
||||
|
||||
static const char *s_listen_on = "mqtt://0.0.0.0:1883";
|
||||
|
||||
// A list of subscription, held in memory
|
||||
struct sub {
|
||||
struct sub *next;
|
||||
struct mg_connection *c;
|
||||
struct mg_str topic;
|
||||
uint8_t qos;
|
||||
};
|
||||
static struct sub *s_subs = NULL;
|
||||
|
||||
// Event handler function
|
||||
static void fn(struct mg_connection *c, int ev, void *ev_data, void *fn_data) {
|
||||
if (ev == MG_EV_MQTT_CMD) {
|
||||
struct mg_mqtt_message *mm = (struct mg_mqtt_message *) ev_data;
|
||||
LOG(LL_DEBUG, ("cmd %d qos %d", mm->cmd, mm->qos));
|
||||
switch (mm->cmd) {
|
||||
case MQTT_CMD_CONNECT: {
|
||||
// Client connects. Return success, do not check user/password
|
||||
uint8_t response[] = {0, 0};
|
||||
mg_mqtt_send_header(c, MQTT_CMD_CONNACK, 0, sizeof(response));
|
||||
mg_send(c, response, sizeof(response));
|
||||
break;
|
||||
}
|
||||
case MQTT_CMD_SUBSCRIBE: {
|
||||
// Client subscribes
|
||||
int pos = 4; // Initial topic offset, where ID ends
|
||||
uint8_t qos;
|
||||
struct mg_str topic;
|
||||
while ((pos = mg_mqtt_next_sub(mm, &topic, &qos, pos)) > 0) {
|
||||
struct sub *sub = calloc(1, sizeof(*sub));
|
||||
sub->c = c;
|
||||
sub->topic = mg_strdup(topic);
|
||||
sub->qos = qos;
|
||||
LIST_ADD_HEAD(struct sub, &s_subs, sub);
|
||||
LOG(LL_INFO,
|
||||
("SUB %p [%.*s]", c->fd, (int) sub->topic.len, sub->topic.ptr));
|
||||
}
|
||||
break;
|
||||
}
|
||||
case MQTT_CMD_PUBLISH: {
|
||||
// Client published message. Push to all subscribed channels
|
||||
LOG(LL_INFO, ("PUB %p [%.*s] -> [%.*s]", c->fd, (int) mm->data.len,
|
||||
mm->data.ptr, (int) mm->topic.len, mm->topic.ptr));
|
||||
for (struct sub *sub = s_subs; sub != NULL; sub = sub->next) {
|
||||
if (mg_strcmp(mm->topic, sub->topic) != 0) continue;
|
||||
mg_mqtt_pub(sub->c, &mm->topic, &mm->data);
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
} else if (ev == MG_EV_CLOSE) {
|
||||
// Client disconnects. Remove from the subscription list
|
||||
for (struct sub *next, *sub = s_subs; sub != NULL; sub = next) {
|
||||
next = sub->next;
|
||||
if (c != sub->c) continue;
|
||||
LOG(LL_INFO,
|
||||
("UNSUB %p [%.*s]", c->fd, (int) sub->topic.len, sub->topic.ptr));
|
||||
LIST_DELETE(struct sub, &s_subs, sub);
|
||||
}
|
||||
}
|
||||
(void) fn_data;
|
||||
}
|
||||
|
||||
int main(void) {
|
||||
struct mg_mgr mgr; // Event manager
|
||||
mg_mgr_init(&mgr); // Initialise event manager
|
||||
mg_mqtt_listen(&mgr, s_listen_on, fn, NULL); // Create MQTT listener
|
||||
for (;;) mg_mgr_poll(&mgr, 1000); // Infinite event loop
|
||||
mg_mgr_free(&mgr);
|
||||
return 0;
|
||||
}
|
101
mongoose.c
101
mongoose.c
@ -664,6 +664,7 @@ static const char *guess_content_type(const char *filename) {
|
||||
const char *type;
|
||||
} * t, types[] = {
|
||||
MIME_ENTRY("html", "text/html"),
|
||||
MIME_ENTRY("htm", "text/html"),
|
||||
MIME_ENTRY("shtml", "text/html"),
|
||||
MIME_ENTRY("css", "text/css"),
|
||||
MIME_ENTRY("js", "text/javascript"),
|
||||
@ -676,7 +677,6 @@ static const char *guess_content_type(const char *filename) {
|
||||
MIME_ENTRY("png", "image/png"),
|
||||
MIME_ENTRY("svg", "image/svg+xml"),
|
||||
MIME_ENTRY("txt", "text/plain"),
|
||||
MIME_ENTRY("torrent", "application/x-bittorrent"),
|
||||
MIME_ENTRY("wav", "audio/wav"),
|
||||
MIME_ENTRY("mp3", "audio/mpeg"),
|
||||
MIME_ENTRY("mid", "audio/mid"),
|
||||
@ -700,10 +700,10 @@ static const char *guess_content_type(const char *filename) {
|
||||
MIME_ENTRY("mpeg", "video/mpeg"),
|
||||
MIME_ENTRY("mov", "video/quicktime"),
|
||||
MIME_ENTRY("mp4", "video/mp4"),
|
||||
MIME_ENTRY("m4v", "video/x-m4v"),
|
||||
MIME_ENTRY("asf", "video/x-ms-asf"),
|
||||
MIME_ENTRY("avi", "video/x-msvideo"),
|
||||
MIME_ENTRY("csv", "text/csv"),
|
||||
MIME_ENTRY("bmp", "image/bmp"),
|
||||
MIME_ENTRY("bin", "application/octet-stream"),
|
||||
MIME_ENTRY("wasm", "application/wasm"),
|
||||
{NULL, 0, NULL},
|
||||
};
|
||||
@ -1648,21 +1648,6 @@ void mg_md5_final(mg_md5_ctx *ctx, unsigned char digest[16]) {
|
||||
|
||||
|
||||
|
||||
#define MQTT_CMD_CONNECT 1
|
||||
#define MQTT_CMD_CONNACK 2
|
||||
#define MQTT_CMD_PUBLISH 3
|
||||
#define MQTT_CMD_PUBACK 4
|
||||
#define MQTT_CMD_PUBREC 5
|
||||
#define MQTT_CMD_PUBREL 6
|
||||
#define MQTT_CMD_PUBCOMP 7
|
||||
#define MQTT_CMD_SUBSCRIBE 8
|
||||
#define MQTT_CMD_SUBACK 9
|
||||
#define MQTT_CMD_UNSUBSCRIBE 10
|
||||
#define MQTT_CMD_UNSUBACK 11
|
||||
#define MQTT_CMD_PINGREQ 12
|
||||
#define MQTT_CMD_PINGRESP 13
|
||||
#define MQTT_CMD_DISCONNECT 14
|
||||
|
||||
#define MQTT_CLEAN_SESSION 0x02
|
||||
#define MQTT_HAS_WILL 0x04
|
||||
#define MQTT_WILL_RETAIN 0x20
|
||||
@ -1670,9 +1655,6 @@ void mg_md5_final(mg_md5_ctx *ctx, unsigned char digest[16]) {
|
||||
#define MQTT_HAS_USER_NAME 0x80
|
||||
#define MQTT_GET_WILL_QOS(flags) (((flags) &0x18) >> 3)
|
||||
#define MQTT_SET_WILL_QOS(flags, qos) (flags) = ((flags) & ~0x18) | ((qos) << 3)
|
||||
#define MQTT_QOS(qos) ((qos) << 1)
|
||||
#define MQTT_GET_QOS(flags) (((flags) &0x6) >> 1)
|
||||
#define MQTT_SET_QOS(flags, qos) (flags) = ((flags) & ~0x6) | ((qos) << 1)
|
||||
|
||||
enum { MQTT_OK, MQTT_INCOMPLETE, MQTT_MALFORMED };
|
||||
|
||||
@ -1682,8 +1664,8 @@ struct mqtt_message {
|
||||
uint8_t cmd, qos, connack_ret_code;
|
||||
};
|
||||
|
||||
static void mqtt_send_header(struct mg_connection *c, uint8_t cmd,
|
||||
uint8_t flags, uint32_t len) {
|
||||
void mg_mqtt_send_header(struct mg_connection *c, uint8_t cmd, uint8_t flags,
|
||||
uint32_t len) {
|
||||
uint8_t buf[1 + sizeof(len)], *vlen = &buf[1];
|
||||
buf[0] = (cmd << 4) | flags;
|
||||
do {
|
||||
@ -1723,7 +1705,7 @@ static void mqtt_login(struct mg_connection *c, const char *url,
|
||||
if (opts->will_retain) flags |= MQTT_WILL_RETAIN;
|
||||
total_len += (uint32_t) opts->client_id.len;
|
||||
|
||||
mqtt_send_header(c, MQTT_CMD_CONNECT, 0, total_len);
|
||||
mg_mqtt_send_header(c, MQTT_CMD_CONNECT, 0, total_len);
|
||||
mg_send(c, "\00\04MQTT\04", 7);
|
||||
mg_send(c, &flags, 1);
|
||||
// keepalive == 0 means "do not disconnect us!"
|
||||
@ -1753,7 +1735,7 @@ void mg_mqtt_pub(struct mg_connection *c, struct mg_str *topic,
|
||||
LOG(LL_DEBUG, ("%p [%.*s] -> [%.*s]", c->fd, (int) topic->len,
|
||||
(char *) topic->ptr, (int) data->len, (char *) data->ptr));
|
||||
if (MQTT_GET_QOS(flags) > 0) total_len += 2;
|
||||
mqtt_send_header(c, MQTT_CMD_PUBLISH, flags, total_len);
|
||||
mg_mqtt_send_header(c, MQTT_CMD_PUBLISH, flags, total_len);
|
||||
mg_send_u16(c, mg_htons((uint16_t) topic->len));
|
||||
mg_send(c, topic->ptr, topic->len);
|
||||
if (MQTT_GET_QOS(flags) > 0) {
|
||||
@ -1768,7 +1750,8 @@ void mg_mqtt_sub(struct mg_connection *c, struct mg_str *topic) {
|
||||
static uint16_t s_id;
|
||||
uint8_t qos = 1;
|
||||
uint32_t total_len = 2 + (uint32_t) topic->len + 2 + 1;
|
||||
mqtt_send_header(c, MQTT_CMD_SUBSCRIBE, (uint8_t) MQTT_QOS(qos), total_len);
|
||||
mg_mqtt_send_header(c, MQTT_CMD_SUBSCRIBE, (uint8_t) MQTT_QOS(qos),
|
||||
total_len);
|
||||
if (++s_id == 0) ++s_id;
|
||||
mg_send_u16(c, mg_htons(s_id));
|
||||
mg_send_u16(c, mg_htons((uint16_t) topic->len));
|
||||
@ -1780,6 +1763,7 @@ static int parse(const uint8_t *in, size_t inlen, struct mqtt_message *m) {
|
||||
uint8_t lc = 0, *p, *end;
|
||||
uint32_t len = 0, len_len = 0;
|
||||
|
||||
memset(m, 0, sizeof(*m));
|
||||
if (inlen < 2) return MQTT_INCOMPLETE;
|
||||
m->cmd = in[0] >> 4;
|
||||
m->qos = (in[0] >> 1) & 3;
|
||||
@ -1811,6 +1795,12 @@ static int parse(const uint8_t *in, size_t inlen, struct mqtt_message *m) {
|
||||
if (p + 2 > end) return MQTT_MALFORMED;
|
||||
m->id = (p[0] << 8) | p[1];
|
||||
break;
|
||||
case MQTT_CMD_SUBSCRIBE: {
|
||||
if (p + 2 > end) return MQTT_MALFORMED;
|
||||
m->id = (p[0] << 8) | p[1];
|
||||
p += 2;
|
||||
break;
|
||||
}
|
||||
case MQTT_CMD_PUBLISH: {
|
||||
if (p + 2 > end) return MQTT_MALFORMED;
|
||||
m->topic_len = (p[0] << 8) | p[1];
|
||||
@ -1825,6 +1815,7 @@ static int parse(const uint8_t *in, size_t inlen, struct mqtt_message *m) {
|
||||
if (p > end) return MQTT_MALFORMED;
|
||||
m->data_offset = p - in;
|
||||
m->data_len = end - p;
|
||||
break;
|
||||
}
|
||||
default:
|
||||
break;
|
||||
@ -1832,14 +1823,34 @@ static int parse(const uint8_t *in, size_t inlen, struct mqtt_message *m) {
|
||||
return MQTT_OK;
|
||||
}
|
||||
|
||||
int mg_mqtt_next_sub(struct mg_mqtt_message *msg, struct mg_str *topic,
|
||||
uint8_t *qos, int pos) {
|
||||
unsigned char *buf = (unsigned char *) msg->dgram.ptr + pos;
|
||||
int new_pos;
|
||||
if ((size_t) pos >= msg->dgram.len) return -1;
|
||||
|
||||
topic->len = buf[0] << 8 | buf[1];
|
||||
topic->ptr = (char *) buf + 2;
|
||||
new_pos = pos + 2 + topic->len + 1;
|
||||
if ((size_t) new_pos > msg->dgram.len) return -1;
|
||||
*qos = buf[2 + topic->len];
|
||||
return new_pos;
|
||||
}
|
||||
|
||||
int mg_mqtt_parse(const unsigned char *s, int n, struct mg_mqtt_message *mm) {
|
||||
struct mqtt_message m;
|
||||
int rc = parse(s, n, &m);
|
||||
if (rc == MQTT_OK) {
|
||||
mm->dgram.ptr = (char *) s;
|
||||
mm->dgram.len = m.len;
|
||||
mm->topic.ptr = (char *) s + m.topic_offset;
|
||||
mm->topic.len = m.topic_len;
|
||||
mm->data.ptr = (char *) s + m.data_offset;
|
||||
mm->data.len = m.data_len;
|
||||
mm->id = m.id;
|
||||
mm->cmd = m.cmd;
|
||||
mm->qos = m.qos;
|
||||
mm->ack = m.connack_ret_code;
|
||||
}
|
||||
return rc;
|
||||
}
|
||||
@ -1847,40 +1858,37 @@ int mg_mqtt_parse(const unsigned char *s, int n, struct mg_mqtt_message *mm) {
|
||||
static void mqtt_cb(struct mg_connection *c, int ev, void *ev_data,
|
||||
void *fn_data) {
|
||||
if (ev == MG_EV_READ) {
|
||||
struct mqtt_message m;
|
||||
memset(&m, 0, sizeof(m));
|
||||
for (;;) {
|
||||
int rc = parse(c->recv.buf, c->recv.len, &m);
|
||||
struct mg_mqtt_message mm;
|
||||
int rc = mg_mqtt_parse(c->recv.buf, c->recv.len, &mm);
|
||||
LOG(LL_DEBUG, ("rc %d", rc));
|
||||
if (rc == MQTT_MALFORMED) {
|
||||
LOG(LL_ERROR, ("%p MQTT malformed message", c->fd));
|
||||
c->is_closing = 1;
|
||||
break;
|
||||
} else if (rc == MQTT_OK) {
|
||||
LOG(LL_VERBOSE_DEBUG,
|
||||
("%p MQTT CMD %d len %d [%.*s]", c->fd, m.cmd, (int) m.len,
|
||||
(int) m.data_len, (char *) c->recv.buf + m.data_offset));
|
||||
switch (m.cmd) {
|
||||
("%p MQTT CMD %d len %d [%.*s]", c->fd, mm.cmd, (int) mm.dgram.len,
|
||||
(int) mm.data.len, mm.data.ptr));
|
||||
switch (mm.cmd) {
|
||||
case MQTT_CMD_CONNACK:
|
||||
mg_call(c, MG_EV_MQTT_OPEN, &m.connack_ret_code);
|
||||
if (m.connack_ret_code == 0) {
|
||||
mg_call(c, MG_EV_MQTT_OPEN, &mm.ack);
|
||||
if (mm.ack == 0) {
|
||||
LOG(LL_INFO, ("%p Connected", c->fd));
|
||||
} else {
|
||||
LOG(LL_ERROR,
|
||||
("%p MQTT auth failed, code %d", c->fd, m.connack_ret_code));
|
||||
LOG(LL_ERROR, ("%p MQTT auth failed, code %d", c->fd, mm.ack));
|
||||
c->is_closing = 1;
|
||||
}
|
||||
break;
|
||||
case MQTT_CMD_PUBLISH: {
|
||||
struct mg_mqtt_message evd = {
|
||||
{(char *) c->recv.buf + m.topic_offset, m.topic_len},
|
||||
{(char *) c->recv.buf + m.data_offset, m.data_len}};
|
||||
LOG(LL_DEBUG, ("%p [%.*s] -> [%.*s]", c->fd, (int) evd.topic.len,
|
||||
evd.topic.ptr, (int) evd.data.len, evd.data.ptr));
|
||||
mg_call(c, MG_EV_MQTT_MSG, &evd);
|
||||
LOG(LL_DEBUG, ("%p [%.*s] -> [%.*s]", c->fd, (int) mm.topic.len,
|
||||
mm.topic.ptr, (int) mm.data.len, mm.data.ptr));
|
||||
mg_call(c, MG_EV_MQTT_MSG, &mm);
|
||||
break;
|
||||
}
|
||||
}
|
||||
mg_iobuf_delete(&c->recv, m.len);
|
||||
mg_call(c, MG_EV_MQTT_CMD, &mm);
|
||||
mg_iobuf_delete(&c->recv, mm.dgram.len);
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
@ -1903,6 +1911,13 @@ struct mg_connection *mg_mqtt_connect(struct mg_mgr *mgr, const char *url,
|
||||
return c;
|
||||
}
|
||||
|
||||
struct mg_connection *mg_mqtt_listen(struct mg_mgr *mgr, const char *url,
|
||||
mg_event_handler_t fn, void *fn_data) {
|
||||
struct mg_connection *c = mg_listen(mgr, url, fn, fn_data);
|
||||
if (c != NULL) c->pfn = mqtt_cb, c->pfn_data = mgr;
|
||||
return c;
|
||||
}
|
||||
|
||||
#ifdef MG_ENABLE_LINES
|
||||
#line 1 "src/net.c"
|
||||
#endif
|
||||
|
56
mongoose.h
56
mongoose.h
@ -562,18 +562,19 @@ void mg_call(struct mg_connection *c, int ev, void *ev_data);
|
||||
void mg_error(struct mg_connection *c, const char *fmt, ...);
|
||||
|
||||
enum {
|
||||
MG_EV_ERROR, // Error char *error_message
|
||||
MG_EV_POLL, // mg_mgr_poll iteration unsigned long *millis
|
||||
MG_EV_RESOLVE, // Host name is resolved NULL
|
||||
MG_EV_CONNECT, // Connection established NULL
|
||||
MG_EV_ACCEPT, // Connection accepted NULL
|
||||
MG_EV_READ, // Data received from socket struct mg_str *
|
||||
MG_EV_WRITE, // Data written to socket int *num_bytes_written
|
||||
MG_EV_CLOSE, // Connection closed NULL
|
||||
MG_EV_HTTP_MSG, // HTTP request/response struct mg_http_message *
|
||||
MG_EV_ERROR, // Error char *error_message
|
||||
MG_EV_POLL, // mg_mgr_poll iteration unsigned long *millis
|
||||
MG_EV_RESOLVE, // Host name is resolved NULL
|
||||
MG_EV_CONNECT, // Connection established NULL
|
||||
MG_EV_ACCEPT, // Connection accepted NULL
|
||||
MG_EV_READ, // Data received from socket struct mg_str *
|
||||
MG_EV_WRITE, // Data written to socket int *num_bytes_written
|
||||
MG_EV_CLOSE, // Connection closed NULL
|
||||
MG_EV_HTTP_MSG, // HTTP request/response struct mg_http_message *
|
||||
MG_EV_WS_OPEN, // Websocket handshake done NULL
|
||||
MG_EV_WS_MSG, // Websocket message received struct mg_ws_message *
|
||||
MG_EV_MQTT_MSG, // MQTT message struct mg_mqtt_message *
|
||||
MG_EV_MQTT_CMD, // MQTT low-level command struct mg_mqtt_message *
|
||||
MG_EV_MQTT_MSG, // MQTT PUBLISH received struct mg_mqtt_message *
|
||||
MG_EV_MQTT_OPEN, // MQTT CONNACK received int *connack_status_code
|
||||
MG_EV_SNTP_TIME, // SNTP time received struct timeval *
|
||||
MG_EV_USER, // Starting ID for user events
|
||||
@ -584,7 +585,6 @@ enum {
|
||||
|
||||
|
||||
|
||||
|
||||
struct mg_mgr {
|
||||
struct mg_connection *conns; // List of active connections
|
||||
struct mg_connection *dnsc; // DNS resolver connection
|
||||
@ -739,6 +739,25 @@ int mg_sntp_parse(const unsigned char *buf, size_t len, struct timeval *tv);
|
||||
|
||||
|
||||
|
||||
#define MQTT_CMD_CONNECT 1
|
||||
#define MQTT_CMD_CONNACK 2
|
||||
#define MQTT_CMD_PUBLISH 3
|
||||
#define MQTT_CMD_PUBACK 4
|
||||
#define MQTT_CMD_PUBREC 5
|
||||
#define MQTT_CMD_PUBREL 6
|
||||
#define MQTT_CMD_PUBCOMP 7
|
||||
#define MQTT_CMD_SUBSCRIBE 8
|
||||
#define MQTT_CMD_SUBACK 9
|
||||
#define MQTT_CMD_UNSUBSCRIBE 10
|
||||
#define MQTT_CMD_UNSUBACK 11
|
||||
#define MQTT_CMD_PINGREQ 12
|
||||
#define MQTT_CMD_PINGRESP 13
|
||||
#define MQTT_CMD_DISCONNECT 14
|
||||
|
||||
#define MQTT_QOS(qos) ((qos) << 1)
|
||||
#define MQTT_GET_QOS(flags) (((flags) &0x6) >> 1)
|
||||
#define MQTT_SET_QOS(flags, qos) (flags) = ((flags) & ~0x6) | ((qos) << 1)
|
||||
|
||||
struct mg_mqtt_opts {
|
||||
struct mg_str client_id;
|
||||
struct mg_str will_topic;
|
||||
@ -750,17 +769,28 @@ struct mg_mqtt_opts {
|
||||
};
|
||||
|
||||
struct mg_mqtt_message {
|
||||
struct mg_str topic;
|
||||
struct mg_str data;
|
||||
struct mg_str topic; // Parsed topic
|
||||
struct mg_str data; // Parsed message
|
||||
struct mg_str dgram; // Whole MQTT datagram, including headers
|
||||
uint16_t id; // Set for PUBACK, PUBREC, PUBREL, PUBCOMP, SUBACK, PUBLISH
|
||||
uint8_t cmd; // MQTT command, one of MQTT_CMD_*
|
||||
uint8_t qos; // Quality of service
|
||||
uint8_t ack; // Connack return code. 0 - success
|
||||
};
|
||||
|
||||
struct mg_connection *mg_mqtt_connect(struct mg_mgr *, const char *url,
|
||||
struct mg_mqtt_opts *opts,
|
||||
mg_event_handler_t fn, void *fn_data);
|
||||
struct mg_connection *mg_mqtt_listen(struct mg_mgr *mgr, const char *url,
|
||||
mg_event_handler_t fn, void *fn_data);
|
||||
void mg_mqtt_pub(struct mg_connection *, struct mg_str *topic,
|
||||
struct mg_str *data);
|
||||
void mg_mqtt_sub(struct mg_connection *, struct mg_str *topic);
|
||||
int mg_mqtt_parse(const unsigned char *buf, int len, struct mg_mqtt_message *m);
|
||||
void mg_mqtt_send_header(struct mg_connection *, uint8_t cmd, uint8_t flags,
|
||||
uint32_t len);
|
||||
int mg_mqtt_next_sub(struct mg_mqtt_message *msg, struct mg_str *topic,
|
||||
uint8_t *qos, int pos);
|
||||
|
||||
|
||||
|
||||
|
22
src/event.h
22
src/event.h
@ -18,20 +18,20 @@ void mg_call(struct mg_connection *c, int ev, void *ev_data);
|
||||
void mg_error(struct mg_connection *c, const char *fmt, ...);
|
||||
|
||||
enum {
|
||||
MG_EV_ERROR, // Error char *error_message
|
||||
MG_EV_POLL, // mg_mgr_poll iteration unsigned long *millis
|
||||
MG_EV_RESOLVE, // Host name is resolved NULL
|
||||
MG_EV_CONNECT, // Connection established NULL
|
||||
MG_EV_ACCEPT, // Connection accepted NULL
|
||||
MG_EV_READ, // Data received from socket struct mg_str *
|
||||
MG_EV_WRITE, // Data written to socket int *num_bytes_written
|
||||
MG_EV_CLOSE, // Connection closed NULL
|
||||
MG_EV_HTTP_MSG, // HTTP request/response struct mg_http_message *
|
||||
MG_EV_ERROR, // Error char *error_message
|
||||
MG_EV_POLL, // mg_mgr_poll iteration unsigned long *millis
|
||||
MG_EV_RESOLVE, // Host name is resolved NULL
|
||||
MG_EV_CONNECT, // Connection established NULL
|
||||
MG_EV_ACCEPT, // Connection accepted NULL
|
||||
MG_EV_READ, // Data received from socket struct mg_str *
|
||||
MG_EV_WRITE, // Data written to socket int *num_bytes_written
|
||||
MG_EV_CLOSE, // Connection closed NULL
|
||||
MG_EV_HTTP_MSG, // HTTP request/response struct mg_http_message *
|
||||
MG_EV_WS_OPEN, // Websocket handshake done NULL
|
||||
MG_EV_WS_MSG, // Websocket message received struct mg_ws_message *
|
||||
MG_EV_MQTT_MSG, // MQTT message struct mg_mqtt_message *
|
||||
MG_EV_MQTT_CMD, // MQTT low-level command struct mg_mqtt_message *
|
||||
MG_EV_MQTT_MSG, // MQTT PUBLISH received struct mg_mqtt_message *
|
||||
MG_EV_MQTT_OPEN, // MQTT CONNACK received int *connack_status_code
|
||||
MG_EV_SNTP_TIME, // SNTP time received struct timeval *
|
||||
MG_EV_USER, // Starting ID for user events
|
||||
};
|
||||
|
||||
|
95
src/mqtt.c
95
src/mqtt.c
@ -7,21 +7,6 @@
|
||||
#include "url.h"
|
||||
#include "util.h"
|
||||
|
||||
#define MQTT_CMD_CONNECT 1
|
||||
#define MQTT_CMD_CONNACK 2
|
||||
#define MQTT_CMD_PUBLISH 3
|
||||
#define MQTT_CMD_PUBACK 4
|
||||
#define MQTT_CMD_PUBREC 5
|
||||
#define MQTT_CMD_PUBREL 6
|
||||
#define MQTT_CMD_PUBCOMP 7
|
||||
#define MQTT_CMD_SUBSCRIBE 8
|
||||
#define MQTT_CMD_SUBACK 9
|
||||
#define MQTT_CMD_UNSUBSCRIBE 10
|
||||
#define MQTT_CMD_UNSUBACK 11
|
||||
#define MQTT_CMD_PINGREQ 12
|
||||
#define MQTT_CMD_PINGRESP 13
|
||||
#define MQTT_CMD_DISCONNECT 14
|
||||
|
||||
#define MQTT_CLEAN_SESSION 0x02
|
||||
#define MQTT_HAS_WILL 0x04
|
||||
#define MQTT_WILL_RETAIN 0x20
|
||||
@ -29,9 +14,6 @@
|
||||
#define MQTT_HAS_USER_NAME 0x80
|
||||
#define MQTT_GET_WILL_QOS(flags) (((flags) &0x18) >> 3)
|
||||
#define MQTT_SET_WILL_QOS(flags, qos) (flags) = ((flags) & ~0x18) | ((qos) << 3)
|
||||
#define MQTT_QOS(qos) ((qos) << 1)
|
||||
#define MQTT_GET_QOS(flags) (((flags) &0x6) >> 1)
|
||||
#define MQTT_SET_QOS(flags, qos) (flags) = ((flags) & ~0x6) | ((qos) << 1)
|
||||
|
||||
enum { MQTT_OK, MQTT_INCOMPLETE, MQTT_MALFORMED };
|
||||
|
||||
@ -41,8 +23,8 @@ struct mqtt_message {
|
||||
uint8_t cmd, qos, connack_ret_code;
|
||||
};
|
||||
|
||||
static void mqtt_send_header(struct mg_connection *c, uint8_t cmd,
|
||||
uint8_t flags, uint32_t len) {
|
||||
void mg_mqtt_send_header(struct mg_connection *c, uint8_t cmd, uint8_t flags,
|
||||
uint32_t len) {
|
||||
uint8_t buf[1 + sizeof(len)], *vlen = &buf[1];
|
||||
buf[0] = (cmd << 4) | flags;
|
||||
do {
|
||||
@ -82,7 +64,7 @@ static void mqtt_login(struct mg_connection *c, const char *url,
|
||||
if (opts->will_retain) flags |= MQTT_WILL_RETAIN;
|
||||
total_len += (uint32_t) opts->client_id.len;
|
||||
|
||||
mqtt_send_header(c, MQTT_CMD_CONNECT, 0, total_len);
|
||||
mg_mqtt_send_header(c, MQTT_CMD_CONNECT, 0, total_len);
|
||||
mg_send(c, "\00\04MQTT\04", 7);
|
||||
mg_send(c, &flags, 1);
|
||||
// keepalive == 0 means "do not disconnect us!"
|
||||
@ -112,7 +94,7 @@ void mg_mqtt_pub(struct mg_connection *c, struct mg_str *topic,
|
||||
LOG(LL_DEBUG, ("%p [%.*s] -> [%.*s]", c->fd, (int) topic->len,
|
||||
(char *) topic->ptr, (int) data->len, (char *) data->ptr));
|
||||
if (MQTT_GET_QOS(flags) > 0) total_len += 2;
|
||||
mqtt_send_header(c, MQTT_CMD_PUBLISH, flags, total_len);
|
||||
mg_mqtt_send_header(c, MQTT_CMD_PUBLISH, flags, total_len);
|
||||
mg_send_u16(c, mg_htons((uint16_t) topic->len));
|
||||
mg_send(c, topic->ptr, topic->len);
|
||||
if (MQTT_GET_QOS(flags) > 0) {
|
||||
@ -127,7 +109,8 @@ void mg_mqtt_sub(struct mg_connection *c, struct mg_str *topic) {
|
||||
static uint16_t s_id;
|
||||
uint8_t qos = 1;
|
||||
uint32_t total_len = 2 + (uint32_t) topic->len + 2 + 1;
|
||||
mqtt_send_header(c, MQTT_CMD_SUBSCRIBE, (uint8_t) MQTT_QOS(qos), total_len);
|
||||
mg_mqtt_send_header(c, MQTT_CMD_SUBSCRIBE, (uint8_t) MQTT_QOS(qos),
|
||||
total_len);
|
||||
if (++s_id == 0) ++s_id;
|
||||
mg_send_u16(c, mg_htons(s_id));
|
||||
mg_send_u16(c, mg_htons((uint16_t) topic->len));
|
||||
@ -139,6 +122,7 @@ static int parse(const uint8_t *in, size_t inlen, struct mqtt_message *m) {
|
||||
uint8_t lc = 0, *p, *end;
|
||||
uint32_t len = 0, len_len = 0;
|
||||
|
||||
memset(m, 0, sizeof(*m));
|
||||
if (inlen < 2) return MQTT_INCOMPLETE;
|
||||
m->cmd = in[0] >> 4;
|
||||
m->qos = (in[0] >> 1) & 3;
|
||||
@ -170,6 +154,12 @@ static int parse(const uint8_t *in, size_t inlen, struct mqtt_message *m) {
|
||||
if (p + 2 > end) return MQTT_MALFORMED;
|
||||
m->id = (p[0] << 8) | p[1];
|
||||
break;
|
||||
case MQTT_CMD_SUBSCRIBE: {
|
||||
if (p + 2 > end) return MQTT_MALFORMED;
|
||||
m->id = (p[0] << 8) | p[1];
|
||||
p += 2;
|
||||
break;
|
||||
}
|
||||
case MQTT_CMD_PUBLISH: {
|
||||
if (p + 2 > end) return MQTT_MALFORMED;
|
||||
m->topic_len = (p[0] << 8) | p[1];
|
||||
@ -184,6 +174,7 @@ static int parse(const uint8_t *in, size_t inlen, struct mqtt_message *m) {
|
||||
if (p > end) return MQTT_MALFORMED;
|
||||
m->data_offset = p - in;
|
||||
m->data_len = end - p;
|
||||
break;
|
||||
}
|
||||
default:
|
||||
break;
|
||||
@ -191,14 +182,34 @@ static int parse(const uint8_t *in, size_t inlen, struct mqtt_message *m) {
|
||||
return MQTT_OK;
|
||||
}
|
||||
|
||||
int mg_mqtt_next_sub(struct mg_mqtt_message *msg, struct mg_str *topic,
|
||||
uint8_t *qos, int pos) {
|
||||
unsigned char *buf = (unsigned char *) msg->dgram.ptr + pos;
|
||||
int new_pos;
|
||||
if ((size_t) pos >= msg->dgram.len) return -1;
|
||||
|
||||
topic->len = buf[0] << 8 | buf[1];
|
||||
topic->ptr = (char *) buf + 2;
|
||||
new_pos = pos + 2 + topic->len + 1;
|
||||
if ((size_t) new_pos > msg->dgram.len) return -1;
|
||||
*qos = buf[2 + topic->len];
|
||||
return new_pos;
|
||||
}
|
||||
|
||||
int mg_mqtt_parse(const unsigned char *s, int n, struct mg_mqtt_message *mm) {
|
||||
struct mqtt_message m;
|
||||
int rc = parse(s, n, &m);
|
||||
if (rc == MQTT_OK) {
|
||||
mm->dgram.ptr = (char *) s;
|
||||
mm->dgram.len = m.len;
|
||||
mm->topic.ptr = (char *) s + m.topic_offset;
|
||||
mm->topic.len = m.topic_len;
|
||||
mm->data.ptr = (char *) s + m.data_offset;
|
||||
mm->data.len = m.data_len;
|
||||
mm->id = m.id;
|
||||
mm->cmd = m.cmd;
|
||||
mm->qos = m.qos;
|
||||
mm->ack = m.connack_ret_code;
|
||||
}
|
||||
return rc;
|
||||
}
|
||||
@ -206,40 +217,37 @@ int mg_mqtt_parse(const unsigned char *s, int n, struct mg_mqtt_message *mm) {
|
||||
static void mqtt_cb(struct mg_connection *c, int ev, void *ev_data,
|
||||
void *fn_data) {
|
||||
if (ev == MG_EV_READ) {
|
||||
struct mqtt_message m;
|
||||
memset(&m, 0, sizeof(m));
|
||||
for (;;) {
|
||||
int rc = parse(c->recv.buf, c->recv.len, &m);
|
||||
struct mg_mqtt_message mm;
|
||||
int rc = mg_mqtt_parse(c->recv.buf, c->recv.len, &mm);
|
||||
LOG(LL_DEBUG, ("rc %d", rc));
|
||||
if (rc == MQTT_MALFORMED) {
|
||||
LOG(LL_ERROR, ("%p MQTT malformed message", c->fd));
|
||||
c->is_closing = 1;
|
||||
break;
|
||||
} else if (rc == MQTT_OK) {
|
||||
LOG(LL_VERBOSE_DEBUG,
|
||||
("%p MQTT CMD %d len %d [%.*s]", c->fd, m.cmd, (int) m.len,
|
||||
(int) m.data_len, (char *) c->recv.buf + m.data_offset));
|
||||
switch (m.cmd) {
|
||||
("%p MQTT CMD %d len %d [%.*s]", c->fd, mm.cmd, (int) mm.dgram.len,
|
||||
(int) mm.data.len, mm.data.ptr));
|
||||
switch (mm.cmd) {
|
||||
case MQTT_CMD_CONNACK:
|
||||
mg_call(c, MG_EV_MQTT_OPEN, &m.connack_ret_code);
|
||||
if (m.connack_ret_code == 0) {
|
||||
mg_call(c, MG_EV_MQTT_OPEN, &mm.ack);
|
||||
if (mm.ack == 0) {
|
||||
LOG(LL_INFO, ("%p Connected", c->fd));
|
||||
} else {
|
||||
LOG(LL_ERROR,
|
||||
("%p MQTT auth failed, code %d", c->fd, m.connack_ret_code));
|
||||
LOG(LL_ERROR, ("%p MQTT auth failed, code %d", c->fd, mm.ack));
|
||||
c->is_closing = 1;
|
||||
}
|
||||
break;
|
||||
case MQTT_CMD_PUBLISH: {
|
||||
struct mg_mqtt_message evd = {
|
||||
{(char *) c->recv.buf + m.topic_offset, m.topic_len},
|
||||
{(char *) c->recv.buf + m.data_offset, m.data_len}};
|
||||
LOG(LL_DEBUG, ("%p [%.*s] -> [%.*s]", c->fd, (int) evd.topic.len,
|
||||
evd.topic.ptr, (int) evd.data.len, evd.data.ptr));
|
||||
mg_call(c, MG_EV_MQTT_MSG, &evd);
|
||||
LOG(LL_DEBUG, ("%p [%.*s] -> [%.*s]", c->fd, (int) mm.topic.len,
|
||||
mm.topic.ptr, (int) mm.data.len, mm.data.ptr));
|
||||
mg_call(c, MG_EV_MQTT_MSG, &mm);
|
||||
break;
|
||||
}
|
||||
}
|
||||
mg_iobuf_delete(&c->recv, m.len);
|
||||
mg_call(c, MG_EV_MQTT_CMD, &mm);
|
||||
mg_iobuf_delete(&c->recv, mm.dgram.len);
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
@ -261,3 +269,10 @@ struct mg_connection *mg_mqtt_connect(struct mg_mgr *mgr, const char *url,
|
||||
}
|
||||
return c;
|
||||
}
|
||||
|
||||
struct mg_connection *mg_mqtt_listen(struct mg_mgr *mgr, const char *url,
|
||||
mg_event_handler_t fn, void *fn_data) {
|
||||
struct mg_connection *c = mg_listen(mgr, url, fn, fn_data);
|
||||
if (c != NULL) c->pfn = mqtt_cb, c->pfn_data = mgr;
|
||||
return c;
|
||||
}
|
||||
|
34
src/mqtt.h
34
src/mqtt.h
@ -3,6 +3,25 @@
|
||||
#include "net.h"
|
||||
#include "str.h"
|
||||
|
||||
#define MQTT_CMD_CONNECT 1
|
||||
#define MQTT_CMD_CONNACK 2
|
||||
#define MQTT_CMD_PUBLISH 3
|
||||
#define MQTT_CMD_PUBACK 4
|
||||
#define MQTT_CMD_PUBREC 5
|
||||
#define MQTT_CMD_PUBREL 6
|
||||
#define MQTT_CMD_PUBCOMP 7
|
||||
#define MQTT_CMD_SUBSCRIBE 8
|
||||
#define MQTT_CMD_SUBACK 9
|
||||
#define MQTT_CMD_UNSUBSCRIBE 10
|
||||
#define MQTT_CMD_UNSUBACK 11
|
||||
#define MQTT_CMD_PINGREQ 12
|
||||
#define MQTT_CMD_PINGRESP 13
|
||||
#define MQTT_CMD_DISCONNECT 14
|
||||
|
||||
#define MQTT_QOS(qos) ((qos) << 1)
|
||||
#define MQTT_GET_QOS(flags) (((flags) &0x6) >> 1)
|
||||
#define MQTT_SET_QOS(flags, qos) (flags) = ((flags) & ~0x6) | ((qos) << 1)
|
||||
|
||||
struct mg_mqtt_opts {
|
||||
struct mg_str client_id;
|
||||
struct mg_str will_topic;
|
||||
@ -14,14 +33,25 @@ struct mg_mqtt_opts {
|
||||
};
|
||||
|
||||
struct mg_mqtt_message {
|
||||
struct mg_str topic;
|
||||
struct mg_str data;
|
||||
struct mg_str topic; // Parsed topic
|
||||
struct mg_str data; // Parsed message
|
||||
struct mg_str dgram; // Whole MQTT datagram, including headers
|
||||
uint16_t id; // Set for PUBACK, PUBREC, PUBREL, PUBCOMP, SUBACK, PUBLISH
|
||||
uint8_t cmd; // MQTT command, one of MQTT_CMD_*
|
||||
uint8_t qos; // Quality of service
|
||||
uint8_t ack; // Connack return code. 0 - success
|
||||
};
|
||||
|
||||
struct mg_connection *mg_mqtt_connect(struct mg_mgr *, const char *url,
|
||||
struct mg_mqtt_opts *opts,
|
||||
mg_event_handler_t fn, void *fn_data);
|
||||
struct mg_connection *mg_mqtt_listen(struct mg_mgr *mgr, const char *url,
|
||||
mg_event_handler_t fn, void *fn_data);
|
||||
void mg_mqtt_pub(struct mg_connection *, struct mg_str *topic,
|
||||
struct mg_str *data);
|
||||
void mg_mqtt_sub(struct mg_connection *, struct mg_str *topic);
|
||||
int mg_mqtt_parse(const unsigned char *buf, int len, struct mg_mqtt_message *m);
|
||||
void mg_mqtt_send_header(struct mg_connection *, uint8_t cmd, uint8_t flags,
|
||||
uint32_t len);
|
||||
int mg_mqtt_next_sub(struct mg_mqtt_message *msg, struct mg_str *topic,
|
||||
uint8_t *qos, int pos);
|
||||
|
Loading…
x
Reference in New Issue
Block a user