Move mg_mqtt_next-* to the example, and some nits

This commit is contained in:
cpq 2022-08-26 14:19:13 +01:00
parent ffa033cca9
commit c7b9ca68c2
10 changed files with 45 additions and 150 deletions

View File

@ -9,8 +9,7 @@ INCS ?= -Isrc -I.
SSL ?= MBEDTLS
CWD ?= $(realpath $(CURDIR))
ENV ?= -e Tmp=. -e WINEDEBUG=-all
D ?= docker
DOCKER ?= $(D) run --rm $(ENV) -v $(CWD):$(CWD) -w $(CWD)
DOCKER ?= docker run --platform linux/amd64 --rm $(ENV) -v $(CWD):$(CWD) -w $(CWD)
VCFLAGS = /nologo /W3 /O2 /MD /I. $(DEFS) $(TFLAGS)
IPV6 ?= 1
ASAN ?= -fsanitize=address,undefined -fno-sanitize-recover=all
@ -42,13 +41,13 @@ examples:
test/packed_fs.c: Makefile src/ssi.h test/fuzz.c test/data/a.txt
$(CC) $(CFLAGS) test/pack.c -o pack
./pack Makefile src/ssi.h test/fuzz.c test/data/a.txt test/data/range.txt > $@
$(RUN) ./pack Makefile src/ssi.h test/fuzz.c test/data/a.txt test/data/range.txt > $@
DIR ?= test/data/
OUT ?= fs_packed.c
mkfs:
$(CC) $(CFLAGS) test/pack.c -o pack
./pack -s $(DIR) `find $(DIR) -type f` > $(OUT)
$(RUN) ./pack -s $(DIR) `find $(DIR) -type f` > $(OUT)
# find $(DIR) -type f | sed -e s,^$(DIR),,g -e s,^/,,g
# Check that all external (exported) symbols have "mg_" prefix
@ -56,14 +55,15 @@ mg_prefix: mongoose.c mongoose.h
$(CC) mongoose.c $(CFLAGS) -c -o /tmp/x.o && nm /tmp/x.o | grep ' T' | grep -v 'mg_' ; test $$? = 1
# C++ build
test++: test
test++: CC = g++
test++: C_WARN = -std=c++2a -Wno-vla -Wno-shadow -Wno-missing-field-initializers -Wno-deprecated
test++: test
musl: test
musl: ASAN =
musl: WARN += -Wno-sign-conversion
musl: CC = $(DOCKER) mdashnet/cc1 gcc
musl: RUN = $(DOCKER) mdashnet/cc1
# Make sure we can build from an unamalgamated sources
unamalgamated: $(HDRS) Makefile test/packed_fs.c

View File

@ -1799,81 +1799,6 @@ Return value: None
mg_mqtt_sub(c, mg_str("my/topic"), 1);
```
### mg\_mqtt\_next\_sub()
```c
size_t mg_mqtt_next_sub(struct mg_mqtt_message *msg, struct mg_str *topic, uint8_t *qos, size_t pos);
```
Traverse list of subscribed topics.
Used to implement MQTT server when `MQTT_CMD_SUBSCRIBE` is received.
Initial position `pos` should be 4.
Parameters:
- `mgs` - MQTT message
- `topic` - Pointer to `mg_str` to receive topic
- `qos` - Pointer to `uint8_t` to receive QoS
- `pos` - Position to list from
Return value: Next position, or 0 when done
Usage example:
```c
// Mongoose events handler
void fn(struct mg_connection *c, int ev, void *evd, void *fnd) {
if (ev == MG_EV_MQTT_CMD) {
struct mg_mqtt_message *mm = (struct mg_mqtt_message *) ev_data;
if (mm->cmd == MQTT_CMD_SUBSCRIBE) {
size_t pos = 4;
uint8_t qos;
struct mg_str topic;
// Iterate over all subscribed topics
while ((pos = mg_mqtt_next_sub(mm, &topic, &qos, pos)) > 0) {
LOG(LL_INFO, ("SUB [%.*s]", (int) topic.len, topic.ptr));
}
}
}
}
```
### mg\_mqtt\_next\_unsub()
```c
size_t mg_mqtt_next_unsub(struct mg_mqtt_message *msg, struct mg_str *topic, size_t pos);
```
Same as `mg_mqtt_next_sub()`, but for unsubscribed topics. The difference
is that there is no QoS in unsubscribe request.
Parameters:
- `mgs` - MQTT message
- `topic` - Pointer to `mg_str` to receive topic
- `pos` - Position from which to list
Return value: Next position, or 0 when done
Usage example:
```c
// Mongoose events handler
void fn(struct mg_connection *c, int ev, void *evd, void *fnd) {
if (ev == MG_EV_MQTT_CMD) {
struct mg_mqtt_message *mm = (struct mg_mqtt_message *) ev_data;
if (mm->cmd == MQTT_CMD_UNSUBSCRIBE) {
size_t pos = 4;
struct mg_str topic;
if (mm->cmd == MQTT_CMD_UNSUBSCRIBE) {
// Iterate over all unsubscribed topics
while ((pos = mg_mqtt_next_unsub(mm, &topic, pos)) > 0) {
LOG(LL_INFO, ("SUB [%.*s]", (int) topic.len, topic.ptr));
}
}
}
}
}
```
### mg\_mqtt\_send_header()
```c

View File

@ -26,6 +26,32 @@ static void signal_handler(int signo) {
s_signo = signo;
}
static size_t mg_mqtt_next_topic(struct mg_mqtt_message *msg,
struct mg_str *topic, uint8_t *qos,
size_t pos) {
unsigned char *buf = (unsigned char *) msg->dgram.ptr + pos;
size_t new_pos;
if (pos >= msg->dgram.len) return 0;
topic->len = (size_t) (((unsigned) buf[0]) << 8 | buf[1]);
topic->ptr = (char *) buf + 2;
new_pos = pos + 2 + topic->len + (qos == NULL ? 0 : 1);
if ((size_t) new_pos > msg->dgram.len) return 0;
if (qos != NULL) *qos = buf[2 + topic->len];
return new_pos;
}
size_t mg_mqtt_next_sub(struct mg_mqtt_message *msg, struct mg_str *topic,
uint8_t *qos, size_t pos) {
uint8_t tmp;
return mg_mqtt_next_topic(msg, topic, qos == NULL ? &tmp : qos, pos);
}
size_t mg_mqtt_next_unsub(struct mg_mqtt_message *msg, struct mg_str *topic,
size_t pos) {
return mg_mqtt_next_topic(msg, topic, NULL, pos);
}
// Event handler function
static void fn(struct mg_connection *c, int ev, void *ev_data, void *fn_data) {
if (ev == MG_EV_MQTT_CMD) {

View File

@ -3172,32 +3172,6 @@ int mg_mqtt_parse(const uint8_t *buf, size_t len, uint8_t version,
return MQTT_OK;
}
static size_t mg_mqtt_next_topic(struct mg_mqtt_message *msg,
struct mg_str *topic, uint8_t *qos,
size_t pos) {
unsigned char *buf = (unsigned char *) msg->dgram.ptr + pos;
size_t new_pos;
if (pos >= msg->dgram.len) return 0;
topic->len = (size_t) (((unsigned) buf[0]) << 8 | buf[1]);
topic->ptr = (char *) buf + 2;
new_pos = pos + 2 + topic->len + (qos == NULL ? 0 : 1);
if ((size_t) new_pos > msg->dgram.len) return 0;
if (qos != NULL) *qos = buf[2 + topic->len];
return new_pos;
}
size_t mg_mqtt_next_sub(struct mg_mqtt_message *msg, struct mg_str *topic,
uint8_t *qos, size_t pos) {
uint8_t tmp;
return mg_mqtt_next_topic(msg, topic, qos == NULL ? &tmp : qos, pos);
}
size_t mg_mqtt_next_unsub(struct mg_mqtt_message *msg, struct mg_str *topic,
size_t pos) {
return mg_mqtt_next_topic(msg, topic, NULL, pos);
}
static void mqtt_cb(struct mg_connection *c, int ev, void *ev_data,
void *fn_data) {
if (ev == MG_EV_READ) {
@ -3439,7 +3413,7 @@ void mg_close_conn(struct mg_connection *c) {
// Order of operations is important. `MG_EV_CLOSE` event must be fired
// before we deallocate received data, see #1331
mg_call(c, MG_EV_CLOSE, NULL);
MG_DEBUG(("%lu closed", c->id));
MG_DEBUG(("%lu %p closed", c->id, c->fd));
mg_tls_free(c);
mg_iobuf_free(&c->recv);
@ -4280,7 +4254,6 @@ static void close_conn(struct mg_connection *c) {
#if MG_ARCH == MG_ARCH_FREERTOS_TCP
FreeRTOS_FD_CLR(c->fd, c->mgr->ss, eSELECT_ALL);
#endif
c->fd = NULL;
}
mg_close_conn(c);
}
@ -4380,10 +4353,9 @@ static void accept_conn(struct mg_mgr *mgr, struct mg_connection *lsn) {
MG_ERROR(("%lu OOM", lsn->id));
closesocket(fd);
} else {
char buf[40];
// char buf[40];
tomgaddr(&usa, &c->rem, sa_len != sizeof(usa.sin));
mg_straddr(&c->rem, buf, sizeof(buf));
MG_DEBUG(("%lu accepted %s", c->id, buf));
// mg_straddr(&c->rem, buf, sizeof(buf));
LIST_ADD_HEAD(struct mg_connection, &mgr->conns, c);
c->fd = S2PTR(fd);
MG_EPOLL_ADD(c);
@ -4396,6 +4368,9 @@ static void accept_conn(struct mg_mgr *mgr, struct mg_connection *lsn) {
c->pfn_data = lsn->pfn_data;
c->fn = lsn->fn;
c->fn_data = lsn->fn_data;
MG_DEBUG(("%lu %p accepted %x.%hu -> %x.%hu", c->id, c->fd,
mg_ntohl(c->rem.ip), mg_ntohs(c->rem.port), mg_ntohl(c->loc.ip),
mg_ntohs(c->loc.port)));
mg_call(c, MG_EV_OPEN, NULL);
mg_call(c, MG_EV_ACCEPT, NULL);
}
@ -5918,6 +5893,7 @@ void mg_ws_upgrade(struct mg_connection *c, struct mg_http_message *hm,
ws_handshake(c, wskey, wsproto, fmt, &ap);
va_end(ap);
c->is_websocket = 1;
c->is_resp = 0;
mg_call(c, MG_EV_WS_OPEN, hm);
}
}

View File

@ -1324,10 +1324,6 @@ void mg_mqtt_sub(struct mg_connection *, struct mg_str topic, int qos);
int mg_mqtt_parse(const uint8_t *, size_t, uint8_t, struct mg_mqtt_message *);
void mg_mqtt_send_header(struct mg_connection *, uint8_t cmd, uint8_t flags,
uint32_t len);
size_t mg_mqtt_next_sub(struct mg_mqtt_message *msg, struct mg_str *topic,
uint8_t *qos, size_t pos);
size_t mg_mqtt_next_unsub(struct mg_mqtt_message *msg, struct mg_str *topic,
size_t pos);
void mg_mqtt_ping(struct mg_connection *);
void mg_mqtt_pong(struct mg_connection *);
void mg_mqtt_disconnect(struct mg_connection *);

View File

@ -185,32 +185,6 @@ int mg_mqtt_parse(const uint8_t *buf, size_t len, uint8_t version,
return MQTT_OK;
}
static size_t mg_mqtt_next_topic(struct mg_mqtt_message *msg,
struct mg_str *topic, uint8_t *qos,
size_t pos) {
unsigned char *buf = (unsigned char *) msg->dgram.ptr + pos;
size_t new_pos;
if (pos >= msg->dgram.len) return 0;
topic->len = (size_t) (((unsigned) buf[0]) << 8 | buf[1]);
topic->ptr = (char *) buf + 2;
new_pos = pos + 2 + topic->len + (qos == NULL ? 0 : 1);
if ((size_t) new_pos > msg->dgram.len) return 0;
if (qos != NULL) *qos = buf[2 + topic->len];
return new_pos;
}
size_t mg_mqtt_next_sub(struct mg_mqtt_message *msg, struct mg_str *topic,
uint8_t *qos, size_t pos) {
uint8_t tmp;
return mg_mqtt_next_topic(msg, topic, qos == NULL ? &tmp : qos, pos);
}
size_t mg_mqtt_next_unsub(struct mg_mqtt_message *msg, struct mg_str *topic,
size_t pos) {
return mg_mqtt_next_topic(msg, topic, NULL, pos);
}
static void mqtt_cb(struct mg_connection *c, int ev, void *ev_data,
void *fn_data) {
if (ev == MG_EV_READ) {

View File

@ -56,10 +56,6 @@ void mg_mqtt_sub(struct mg_connection *, struct mg_str topic, int qos);
int mg_mqtt_parse(const uint8_t *, size_t, uint8_t, struct mg_mqtt_message *);
void mg_mqtt_send_header(struct mg_connection *, uint8_t cmd, uint8_t flags,
uint32_t len);
size_t mg_mqtt_next_sub(struct mg_mqtt_message *msg, struct mg_str *topic,
uint8_t *qos, size_t pos);
size_t mg_mqtt_next_unsub(struct mg_mqtt_message *msg, struct mg_str *topic,
size_t pos);
void mg_mqtt_ping(struct mg_connection *);
void mg_mqtt_pong(struct mg_connection *);
void mg_mqtt_disconnect(struct mg_connection *);

View File

@ -157,7 +157,7 @@ void mg_close_conn(struct mg_connection *c) {
// Order of operations is important. `MG_EV_CLOSE` event must be fired
// before we deallocate received data, see #1331
mg_call(c, MG_EV_CLOSE, NULL);
MG_DEBUG(("%lu closed", c->id));
MG_DEBUG(("%lu %p closed", c->id, c->fd));
mg_tls_free(c);
mg_iobuf_free(&c->recv);

View File

@ -322,7 +322,6 @@ static void close_conn(struct mg_connection *c) {
#if MG_ARCH == MG_ARCH_FREERTOS_TCP
FreeRTOS_FD_CLR(c->fd, c->mgr->ss, eSELECT_ALL);
#endif
c->fd = NULL;
}
mg_close_conn(c);
}
@ -422,10 +421,9 @@ static void accept_conn(struct mg_mgr *mgr, struct mg_connection *lsn) {
MG_ERROR(("%lu OOM", lsn->id));
closesocket(fd);
} else {
char buf[40];
// char buf[40];
tomgaddr(&usa, &c->rem, sa_len != sizeof(usa.sin));
mg_straddr(&c->rem, buf, sizeof(buf));
MG_DEBUG(("%lu accepted %s", c->id, buf));
// mg_straddr(&c->rem, buf, sizeof(buf));
LIST_ADD_HEAD(struct mg_connection, &mgr->conns, c);
c->fd = S2PTR(fd);
MG_EPOLL_ADD(c);
@ -438,6 +436,9 @@ static void accept_conn(struct mg_mgr *mgr, struct mg_connection *lsn) {
c->pfn_data = lsn->pfn_data;
c->fn = lsn->fn;
c->fn_data = lsn->fn_data;
MG_DEBUG(("%lu %p accepted %x.%hu -> %x.%hu", c->id, c->fd,
mg_ntohl(c->rem.ip), mg_ntohs(c->rem.port), mg_ntohl(c->loc.ip),
mg_ntohs(c->loc.port)));
mg_call(c, MG_EV_OPEN, NULL);
mg_call(c, MG_EV_ACCEPT, NULL);
}

View File

@ -272,6 +272,7 @@ void mg_ws_upgrade(struct mg_connection *c, struct mg_http_message *hm,
ws_handshake(c, wskey, wsproto, fmt, &ap);
va_end(ap);
c->is_websocket = 1;
c->is_resp = 0;
mg_call(c, MG_EV_WS_OPEN, hm);
}
}