From bf9d69ab56db6da434024929bce249575f50ba71 Mon Sep 17 00:00:00 2001 From: cpq Date: Wed, 8 Feb 2023 15:00:04 +0000 Subject: [PATCH] Add struct mg_queue and API, change examples/multi-threaded, add tests --- .github/workflows/test.yml | 2 +- Makefile | 4 +- examples/README.md | 9 +-- examples/http-server/Makefile | 17 ++-- examples/http-server/mongoose.c | 1 + examples/http-server/mongoose.h | 1 + examples/multi-threaded/Makefile | 33 +++++--- examples/multi-threaded/main.c | 102 ++++++++++------------- examples/multi-threaded/mongoose.c | 1 + examples/multi-threaded/mongoose.h | 1 + mongoose.c | 65 +++++++++++++++ mongoose.h | 50 +++++++++++- src/arch_newlib.h | 1 + src/arch_rp2040.h | 1 + src/config.h | 4 + src/fmt.h | 4 +- src/printf.c | 22 +++++ src/queue.c | 40 +++++++++ src/queue.h | 38 +++++++++ test/unit_test.c | 125 ++++++++++++++++++++++++++++- 20 files changed, 427 insertions(+), 94 deletions(-) create mode 120000 examples/http-server/mongoose.c create mode 120000 examples/http-server/mongoose.h create mode 120000 examples/multi-threaded/mongoose.c create mode 120000 examples/multi-threaded/mongoose.h create mode 100644 src/queue.c create mode 100644 src/queue.h diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index c2a5a6ad..42ff3e35 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -92,7 +92,7 @@ jobs: - if: steps.check.outputs.MATCH == 1 run: make test upload-coverage TFLAGS=-DNO_SNTP_CHECK SSL=OPENSSL ASAN_OPTIONS= OPENSSL=`echo /usr/local/Cellar/openssl*/*` - if: steps.check.outputs.MATCH == 1 - run: make test SSL=MBEDTLS TFLAGS=-DNO_SNTP_CHECK ASAN_OPTIONS= MBEDTLS=`echo /usr/local/Cellar/mbedtls*/*` + run: make test SSL=MBEDTLS TFLAGS="-DNO_SNTP_CHECK -DMG_ENABLE_ATOMIC=1" ASAN_OPTIONS= MBEDTLS=`echo /usr/local/Cellar/mbedtls*/*` #- run: make mip_test ASAN_OPTIONS= - if: steps.check.outputs.MATCH == 1 run: make mg_prefix diff --git a/Makefile b/Makefile index 18cca0ee..84466ac4 100644 --- a/Makefile +++ b/Makefile @@ -16,7 +16,7 @@ EXAMPLES := $(dir $(wildcard examples/*/Makefile)) PREFIX ?= /usr/local VERSION ?= $(shell cut -d'"' -f2 src/version.h) COMMON_CFLAGS ?= $(C_WARN) $(WARN) $(INCS) $(DEFS) -DMG_ENABLE_IPV6=$(IPV6) $(TFLAGS) -CFLAGS ?= $(OPTS) $(ASAN) $(COMMON_CFLAGS) +CFLAGS ?= $(OPTS) $(ASAN) $(COMMON_CFLAGS) -pthread VALGRIND_CFLAGS ?= $(OPTS) $(COMMON_CFLAGS) VALGRIND_RUN ?= valgrind --tool=memcheck --gen-suppressions=all --leak-check=full --show-leak-kinds=all --leak-resolution=high --track-origins=yes --error-exitcode=1 --exit-on-first-error=yes .PHONY: examples test valgrind mip_test @@ -179,7 +179,7 @@ mongoose.c: Makefile $(wildcard src/*.c) $(wildcard src/tcpip/*.c) (cat src/license.h; echo; echo '#include "mongoose.h"' ; (for F in src/*.c src/tcpip/*.c ; do echo; echo '#ifdef MG_ENABLE_LINES'; echo "#line 1 \"$$F\""; echo '#endif'; cat $$F | sed -e 's,#include ".*,,'; done))> $@ mongoose.h: $(HDRS) Makefile - (cat src/license.h; echo; echo '#ifndef MONGOOSE_H'; echo '#define MONGOOSE_H'; echo; cat src/version.h ; echo; echo '#ifdef __cplusplus'; echo 'extern "C" {'; echo '#endif'; cat src/arch.h src/arch_*.h src/net_*.h src/config.h src/str.h src/fmt.h src/log.h src/timer.h src/fs.h src/util.h src/url.h src/iobuf.h src/base64.h src/md5.h src/sha1.h src/event.h src/net.h src/http.h src/ssi.h src/tls.h src/tls_mbed.h src/tls_openssl.h src/ws.h src/sntp.h src/mqtt.h src/dns.h src/json.h src/rpc.h src/tcpip/tcpip.h src/tcpip/driver_*.h | sed -e '/keep/! s,#include ".*,,' -e 's,^#pragma once,,'; echo; echo '#ifdef __cplusplus'; echo '}'; echo '#endif'; echo '#endif // MONGOOSE_H')> $@ + (cat src/license.h; echo; echo '#ifndef MONGOOSE_H'; echo '#define MONGOOSE_H'; echo; cat src/version.h ; echo; echo '#ifdef __cplusplus'; echo 'extern "C" {'; echo '#endif'; cat src/arch.h src/arch_*.h src/net_*.h src/config.h src/str.h src/queue.h src/fmt.h src/log.h src/timer.h src/fs.h src/util.h src/url.h src/iobuf.h src/base64.h src/md5.h src/sha1.h src/event.h src/net.h src/http.h src/ssi.h src/tls.h src/tls_mbed.h src/tls_openssl.h src/ws.h src/sntp.h src/mqtt.h src/dns.h src/json.h src/rpc.h src/tcpip/tcpip.h src/tcpip/driver_*.h | sed -e '/keep/! s,#include ".*,,' -e 's,^#pragma once,,'; echo; echo '#ifdef __cplusplus'; echo '}'; echo '#endif'; echo '#endif // MONGOOSE_H')> $@ clean: rm -rf $(PROG) *.exe *.o *.dSYM *_test* ut fuzzer *.gcov *.gcno *.gcda *.obj *.exe *.ilk *.pdb slow-unit* _CL_* infer-out data.txt crash-* test/packed_fs.c pack arduino tmp diff --git a/examples/README.md b/examples/README.md index 1d6d5838..06e24caf 100644 --- a/examples/README.md +++ b/examples/README.md @@ -13,12 +13,9 @@ Rules for creating a new example: - An example must build on Windows, Mac and Ubuntu Linux systems - Assume that user installed tools according to https://mongoose.ws/tutorials/tools/ - Makefile must not include any other make files -- Use `CFLAGS` for compilation options -- Add `CFLAGS_EXTRA` to the end of `CFLAGS`, to allow user to specify - extra options, for example: - ``` - make CFLAGS_EXTRA="-pedantic -O2" - ``` +- Use `CFLAGS` for system-specific compilation options +- Use `CFLAGS_MONGOOSE` for mongoose-specific compilation options +- Use `$(CFLAGS) $(CFLAGS_MONGOOSE) $(CFLAGS_EXTRA)` to compile - If external repository is required, download it on demand using git shallow clone. See embedded example golden reference - Keep Makefile as short as possible, but verbose to understand it easily diff --git a/examples/http-server/Makefile b/examples/http-server/Makefile index b5ea8c09..91de167b 100644 --- a/examples/http-server/Makefile +++ b/examples/http-server/Makefile @@ -1,17 +1,16 @@ -SOURCES = main.c ../../mongoose.c # Source code files -CFLAGS = -W -Wall -Wextra -g # Build options +SOURCES = main.c mongoose.c # Source code files +CFLAGS = -W -Wall -Wextra -g -I. # Build options # Mongoose build options. See https://mongoose.ws/documentation/#build-options -CFLAGS += -I ../.. -CFLAGS += -DMG_HTTP_DIRLIST_TIME=1 -DMG_ENABLE_SSI=1 -CFLAGS += -DMG_ENABLE_LINES=1 -DMG_ENABLE_IPV6=1 +CFLAGS_MONGOOSE += -DMG_HTTP_DIRLIST_TIME=1 -DMG_ENABLE_SSI=1 +CFLAGS_MONGOOSE += -DMG_ENABLE_LINES=1 -DMG_ENABLE_IPV6=1 ifeq ($(OS),Windows_NT) - # Windows settings. Assume MinGW compiler + # Windows settings. Assume MinGW compiler. To use VC: make CC=cl CFLAGS=/MD PROG ?= example.exe # Use .exe suffix for the binary CC = gcc # Use MinGW gcc compiler CFLAGS += -lws2_32 # Link against Winsock library - DELETE = cmd /C del /Q /F /S # Command prompt command to delete files + DELETE = cmd /C del /q /f /s # Command prompt command to delete files else # Mac, Linux PROG ?= example @@ -22,7 +21,7 @@ all: $(PROG) $(RUN) ./$(PROG) $(ARGS) $(PROG): $(SOURCES) - $(CC) $(SOURCES) $(CFLAGS) $(CFLAGS_EXTRA) -o $@ + $(CC) $(SOURCES) $(CFLAGS) $(CFLAGS_MONGOOSE) $(CFLAGS_EXTRA) -o $@ clean: - $(DELETE) $(PROG) *.o *.dSYM + $(DELETE) $(PROG) *.o *.obj *.exe *.dSYM diff --git a/examples/http-server/mongoose.c b/examples/http-server/mongoose.c new file mode 120000 index 00000000..8ef6e62d --- /dev/null +++ b/examples/http-server/mongoose.c @@ -0,0 +1 @@ +../../mongoose.c \ No newline at end of file diff --git a/examples/http-server/mongoose.h b/examples/http-server/mongoose.h new file mode 120000 index 00000000..488ef358 --- /dev/null +++ b/examples/http-server/mongoose.h @@ -0,0 +1 @@ +../../mongoose.h \ No newline at end of file diff --git a/examples/multi-threaded/Makefile b/examples/multi-threaded/Makefile index 6b999313..0abba701 100644 --- a/examples/multi-threaded/Makefile +++ b/examples/multi-threaded/Makefile @@ -1,17 +1,26 @@ -PROG ?= example -ROOT ?= $(realpath $(CURDIR)/../..) -CWD ?= $(realpath $(CURDIR)) -DOCKER ?= docker run --rm -it -e Tmp=. -e WINEDEBUG=-all -v $(ROOT):$(ROOT) -w $(CWD) +SOURCES = main.c mongoose.c # Source code files +CFLAGS = -W -Wall -Wextra -g -I. # Build options + +# Mongoose build options. See https://mongoose.ws/documentation/#build-options +CFLAGS_MONGOOSE += -DMG_ENABLE_ATOMIC=1 + +ifeq ($(OS),Windows_NT) + # Windows settings. Assume MinGW compiler. To use VC: make CC=cl CFLAGS=/MD + PROG ?= example.exe # Use .exe suffix for the binary + CC = gcc # Use MinGW gcc compiler + CFLAGS += -lws2_32 # Link against Winsock library + DELETE = cmd /C del /f /q /s # Command prompt command to delete files +else + # Mac, Linux + PROG ?= example + DELETE = rm -rf +endif all: $(PROG) - $(RUN) ./$(PROG) + $(RUN) ./$(PROG) $(ARGS) -$(PROG): - $(CC) ../../mongoose.c -I../.. -pthread $(CFLAGS) -o $(PROG) main.c - -$(PROG).exe: - $(DOCKER) mdashnet/vc98 wine cl ../../mongoose.c main.c -I../.. /MD ws2_32.lib /Fe$@ - $(DOCKER) mdashnet/vc98 wine $@ +$(PROG): $(SOURCES) + $(CC) $(SOURCES) $(CFLAGS) $(CFLAGS_MONGOOSE) $(CFLAGS_EXTRA) -o $@ clean: - rm -rf $(PROG) *.o *.dSYM *.gcov *.gcno *.gcda *.obj *.exe *.ilk *.pdb log.txt + $(DELETE) $(PROG) *.o *.obj *.exe *.dSYM diff --git a/examples/multi-threaded/main.c b/examples/multi-threaded/main.c index 2b7913f7..e584cbf0 100644 --- a/examples/multi-threaded/main.c +++ b/examples/multi-threaded/main.c @@ -1,4 +1,4 @@ -// Copyright (c) 2020 Cesanta Software Limited +// Copyright (c) 2020-2023 Cesanta Software Limited // All rights reserved // // Multithreading example. @@ -6,14 +6,20 @@ // some time to simulate long processing time, produces an output and // hands over that output to the request handler function. // +// We pass POST body to the worker thread, and respond with a calculated CRC #include "mongoose.h" +struct thread_data { + struct mg_queue queue; // Worker -> Connection queue + struct mg_str body; // Copy of message body +}; + static void start_thread(void (*f)(void *), void *p) { #ifdef _WIN32 +#define usleep(x) Sleep((x) / 1000) _beginthread((void(__cdecl *)(void *)) f, 0, p); #else -#define closesocket(x) close(x) #include pthread_t thread_id = (pthread_t) 0; pthread_attr_t attr; @@ -24,71 +30,51 @@ static void start_thread(void (*f)(void *), void *p) { #endif } -struct userdata { - int sock; // Paired socket, worker thread owns it - struct mg_str - body; // data to be processed -}; +static void worker_thread(void *param) { + struct thread_data *d = (struct thread_data *) param; + char buf[100]; // On-stack buffer for the message queue -static void thread_function(void *param) { - struct userdata *p = (struct userdata *) param; - sleep(2); // Simulate long execution - // Respond, wakeup event manager - if (p->body.len) - send(p->sock, p->body.ptr, p->body.len, 0); // if there is a body, echo it - else - send(p->sock, "hi", 2, 0); // otherwise just respond "hi" - close(p->sock); // Close the connection - free((void *) p->body.ptr); // free body - free(p); // free userdata -} + d->queue.buf = buf; // Caller passed us an empty queue with NULL + d->queue.len = sizeof(buf); // buffer. Initialise it now + usleep(1 * 1000 * 1000); // Simulate long execution time -static void link_conns(struct mg_connection *c1, struct mg_connection *c2) { - c1->fn_data = c2; - c2->fn_data = c1; -} - -static void unlink_conns(struct mg_connection *c1, struct mg_connection *c2) { - c1->fn_data = c2->fn_data = NULL; -} - -// Pipe event handler -static void pcb(struct mg_connection *c, int ev, void *ev_data, void *fn_data) { - struct mg_connection *parent = (struct mg_connection *) fn_data; - MG_INFO(("%lu %p %d %p", c->id, c->fd, ev, parent)); - if (parent == NULL) { // If parent connection closed, close too - c->is_closing = 1; - } else if (ev == MG_EV_READ) { // Got data from the worker thread - mg_http_reply(parent, 200, "Host: foo.com\r\n", "%.*s\n", c->recv.len, - c->recv.buf); // Respond! - c->recv.len = 0; // Tell Mongoose we've consumed data - } else if (ev == MG_EV_OPEN) { - link_conns(c, parent); - } else if (ev == MG_EV_CLOSE) { - unlink_conns(c, parent); + // Send a response to the connection + if (d->body.len == 0) { + mg_queue_printf(&d->queue, "Send me POST data"); + } else { + uint32_t crc = mg_crc32(0, d->body.ptr, d->body.len); + mg_queue_printf(&d->queue, "crc32: %#x", crc); + free((char *) d->body.ptr); } + + // Wait until connection reads our message, then it is safe to quit + while (mg_queue_next(&d->queue, NULL) != MG_QUEUE_EMPTY) usleep(1000); + MG_INFO(("done, cleaning up...")); + free(d); } // HTTP request callback static void fn(struct mg_connection *c, int ev, void *ev_data, void *fn_data) { if (ev == MG_EV_HTTP_MSG) { + // Received HTTP request. Allocate thread data and spawn a worker thread struct mg_http_message *hm = (struct mg_http_message *) ev_data; - if (mg_http_match_uri(hm, "/fast")) { - // Single-threaded code path, for performance comparison - // The /fast URI responds immediately - mg_http_reply(c, 200, "Host: foo.com\r\n", "hi\n"); - } else { - // Multithreading code path - struct userdata *data = calloc(1, sizeof(*data)); // worker will free it - // parse headers or use request body, duplicate data and pass it to worker - // thread - data->body = mg_strdup(hm->body); // worker will free it - data->sock = mg_mkpipe(c->mgr, pcb, c, true); // Create pipe - start_thread(thread_function, data); // Start thread and pass data + struct thread_data *d = (struct thread_data *) calloc(1, sizeof(*d)); + d->body = mg_strdup(hm->body); // Pass received body to the worker + start_thread(worker_thread, d); // Start a thread + *(void **) c->data = d; // Memorise data pointer in c->data + } else if (ev == MG_EV_POLL) { + // Poll event. Delivered to us every mg_mgr_poll interval or faster + struct thread_data *d = *(struct thread_data **) c->data; + size_t len; + char *buf; + // Check if we have a message from the worker + if (d != NULL && (len = mg_queue_next(&d->queue, &buf)) != MG_QUEUE_EMPTY) { + // Got message from worker. Send a response and cleanup + mg_http_reply(c, 200, "", "%.*s\n", (int) len, buf); + mg_queue_del(&d->queue); // Delete message: signal worker that we're done } - } else if (ev == MG_EV_CLOSE) { - if (c->fn_data != NULL) unlink_conns(c, c->fn_data); } + (void) fn_data; } int main(void) { @@ -96,7 +82,7 @@ int main(void) { mg_mgr_init(&mgr); mg_log_set(MG_LL_DEBUG); // Set debug log level mg_http_listen(&mgr, "http://localhost:8000", fn, NULL); // Create listener - for (;;) mg_mgr_poll(&mgr, 1000); // Event loop - mg_mgr_free(&mgr); // Cleanup + for (;;) mg_mgr_poll(&mgr, 10); // Event loop. Use 10ms poll interval + mg_mgr_free(&mgr); // Cleanup return 0; } diff --git a/examples/multi-threaded/mongoose.c b/examples/multi-threaded/mongoose.c new file mode 120000 index 00000000..8ef6e62d --- /dev/null +++ b/examples/multi-threaded/mongoose.c @@ -0,0 +1 @@ +../../mongoose.c \ No newline at end of file diff --git a/examples/multi-threaded/mongoose.h b/examples/multi-threaded/mongoose.h new file mode 120000 index 00000000..488ef358 --- /dev/null +++ b/examples/multi-threaded/mongoose.h @@ -0,0 +1 @@ +../../mongoose.h \ No newline at end of file diff --git a/mongoose.c b/mongoose.c index 45891923..dfb4b651 100644 --- a/mongoose.c +++ b/mongoose.c @@ -3490,6 +3490,27 @@ void mg_mgr_init(struct mg_mgr *mgr) { +size_t mg_queue_vprintf(struct mg_queue *q, const char *fmt, va_list *ap) { + size_t len = mg_snprintf(NULL, 0, fmt, ap); + char *buf; + if (mg_queue_space(q, &buf) < len + 1) { + len = 0; // Nah. Not enough space + } else { + len = mg_vsnprintf((char *)buf, len + 1, fmt, ap); + mg_queue_add(q, len); + } + return len; +} + +size_t mg_queue_printf(struct mg_queue *q, const char *fmt, ...) { + va_list ap; + size_t len; + va_start(ap, fmt); + len = mg_queue_vprintf(q, fmt, &ap); + va_end(ap); + return len; +} + static void mg_pfn_iobuf_private(char ch, void *param, bool expand) { struct mg_iobuf *io = (struct mg_iobuf *) param; if (expand && io->len + 2 > io->size) mg_iobuf_resize(io, io->len + 2); @@ -3583,6 +3604,50 @@ size_t mg_print_mac(void (*out)(char, void *), void *arg, va_list *ap) { p[3], p[4], p[5]); } +#ifdef MG_ENABLE_LINES +#line 1 "src/queue.c" +#endif + + +// Every message in the queue is prepended by the message length (ML) +// ML is sizeof(size_t) in size +// Tail points to the message data +// +// |------| ML | message1 | ML | message2 |--- free space ---| +// ^ ^ ^ ^ +// buf tail head len + +size_t mg_queue_space(struct mg_queue *q, char **buf) { + size_t ofs; + if (q->head > 0 && q->tail >= q->head) { // All messages read? + q->head = 0; // Yes. Reset head first + q->tail = 0; // Now reset the tail + } + ofs = q->head + sizeof(size_t); + if (buf != NULL) *buf = q->buf + ofs; + return ofs > q->len ? 0 : q->len - ofs; +} + +size_t mg_queue_next(struct mg_queue *q, char **buf) { + size_t len = MG_QUEUE_EMPTY; + if (q->tail < q->head) memcpy(&len, &q->buf[q->tail], sizeof(len)); + if (buf != NULL) *buf = &q->buf[q->tail + sizeof(len)]; + return len; +} + +void mg_queue_add(struct mg_queue *q, size_t len) { + size_t head = q->head + len + (size_t) sizeof(head); // New head + if (head <= q->len) { // Have space ? + memcpy(q->buf + q->head, &len, sizeof(len)); // Yes. Store ML + q->head = head; // Advance head + } +} + +void mg_queue_del(struct mg_queue *q) { + size_t len = mg_queue_next(q, NULL), tail = q->tail + len + sizeof(size_t); + if (len != MG_QUEUE_EMPTY) q->tail = tail; +} + #ifdef MG_ENABLE_LINES #line 1 "src/rpc.c" #endif diff --git a/mongoose.h b/mongoose.h index 4ea28615..d66bbb32 100644 --- a/mongoose.h +++ b/mongoose.h @@ -223,6 +223,7 @@ static inline int mg_mkdir(const char *path, mode_t mode) { #define MG_PATH_MAX 100 #define MG_ENABLE_SOCKET 0 #define MG_ENABLE_DIRLIST 0 +#define MG_ENABLE_ATOMIC 1 #endif @@ -239,6 +240,7 @@ static inline int mg_mkdir(const char *path, mode_t mode) { #include int mkdir(const char *, mode_t); +#define MG_ENABLE_ATOMIC 1 #endif @@ -597,6 +599,10 @@ struct timeval { #define MG_ENABLE_TCPIP 0 // Mongoose built-in network stack #endif +#ifndef MG_ENABLE_ATOMIC +#define MG_ENABLE_ATOMIC 0 // Required by mg_queue +#endif + #ifndef MG_ENABLE_LWIP #define MG_ENABLE_LWIP 0 // lWIP network stack #endif @@ -787,19 +793,58 @@ char *mg_remove_double_dots(char *s); +#if MG_ENABLE_ATOMIC +#include +#elif !defined(_Atomic) +#define _Atomic +#endif + +// Single producer, single consumer non-blocking queue +// +// Producer: +// void *buf; +// while (mg_queue_space(q, &buf) < len) WAIT(); // Wait for free space +// memcpy(buf, data, len); // Copy data to the queue +// mg_queue_add(q, len); // Advance q->head +// +// Consumer: +// void *buf; +// while ((len = mg_queue_next(q, &buf)) == MG_QUEUE_EMPTY) WAIT(); +// mg_hexdump(buf, len); // Handle message +// mg_queue_del(q); // Delete message +// +struct mg_queue { + char *buf; + size_t len; + volatile _Atomic size_t tail; + volatile _Atomic size_t head; +}; + +#define MG_QUEUE_EMPTY ((size_t) ~0ul) // Next message size when queue is empty + +void mg_queue_add(struct mg_queue *, size_t len); // Advance head +void mg_queue_del(struct mg_queue *); // Advance tail +size_t mg_queue_space(struct mg_queue *, char **); // Get free space +size_t mg_queue_next(struct mg_queue *, char **); // Get next message size + + + + + + typedef void (*mg_pfn_t)(char, void *); // Output function typedef size_t (*mg_pm_t)(mg_pfn_t, void *, va_list *); // %M printer size_t mg_vxprintf(void (*)(char, void *), void *, const char *fmt, va_list *); size_t mg_xprintf(void (*fn)(char, void *), void *, const char *fmt, ...); - - // Convenience wrappers around mg_xprintf size_t mg_vsnprintf(char *buf, size_t len, const char *fmt, va_list *ap); size_t mg_snprintf(char *, size_t, const char *fmt, ...); char *mg_vmprintf(const char *fmt, va_list *ap); char *mg_mprintf(const char *fmt, ...); +size_t mg_queue_vprintf(struct mg_queue *, const char *fmt, va_list *); +size_t mg_queue_printf(struct mg_queue *, const char *fmt, ...); // %M print helper functions size_t mg_print_ip(void (*out)(char, void *), void *arg, va_list *ap); @@ -810,7 +855,6 @@ size_t mg_print_mac(void (*out)(char, void *), void *arg, va_list *ap); // Various output functions void mg_pfn_iobuf(char ch, void *param); // param: struct mg_iobuf * -void mg_pfn_queue(char ch, void *param); // param: struct mg_queue * void mg_pfn_stdout(char c, void *param); // param: ignored diff --git a/src/arch_newlib.h b/src/arch_newlib.h index bb0281b9..6618e9bd 100644 --- a/src/arch_newlib.h +++ b/src/arch_newlib.h @@ -19,5 +19,6 @@ #define MG_PATH_MAX 100 #define MG_ENABLE_SOCKET 0 #define MG_ENABLE_DIRLIST 0 +#define MG_ENABLE_ATOMIC 1 #endif diff --git a/src/arch_rp2040.h b/src/arch_rp2040.h index ace069bf..d5c28eba 100644 --- a/src/arch_rp2040.h +++ b/src/arch_rp2040.h @@ -12,4 +12,5 @@ #include int mkdir(const char *, mode_t); +#define MG_ENABLE_ATOMIC 1 #endif diff --git a/src/config.h b/src/config.h index 3e974fa3..f0e38d26 100644 --- a/src/config.h +++ b/src/config.h @@ -8,6 +8,10 @@ #define MG_ENABLE_TCPIP 0 // Mongoose built-in network stack #endif +#ifndef MG_ENABLE_ATOMIC +#define MG_ENABLE_ATOMIC 0 // Required by mg_queue +#endif + #ifndef MG_ENABLE_LWIP #define MG_ENABLE_LWIP 0 // lWIP network stack #endif diff --git a/src/fmt.h b/src/fmt.h index 7a217e8b..b9afee3c 100644 --- a/src/fmt.h +++ b/src/fmt.h @@ -2,11 +2,11 @@ #include "arch.h" #include "iobuf.h" +#include "queue.h" typedef void (*mg_pfn_t)(char, void *); // Output function typedef size_t (*mg_pm_t)(mg_pfn_t, void *, va_list *); // %M printer -// The lowest level size_t mg_vxprintf(void (*)(char, void *), void *, const char *fmt, va_list *); size_t mg_xprintf(void (*fn)(char, void *), void *, const char *fmt, ...); @@ -15,6 +15,8 @@ size_t mg_vsnprintf(char *buf, size_t len, const char *fmt, va_list *ap); size_t mg_snprintf(char *, size_t, const char *fmt, ...); char *mg_vmprintf(const char *fmt, va_list *ap); char *mg_mprintf(const char *fmt, ...); +size_t mg_queue_vprintf(struct mg_queue *, const char *fmt, va_list *); +size_t mg_queue_printf(struct mg_queue *, const char *fmt, ...); // %M print helper functions size_t mg_print_ip(void (*out)(char, void *), void *arg, va_list *ap); diff --git a/src/printf.c b/src/printf.c index a4de5153..79a86ff8 100644 --- a/src/printf.c +++ b/src/printf.c @@ -1,7 +1,29 @@ #include "fmt.h" #include "iobuf.h" +#include "queue.h" #include "util.h" +size_t mg_queue_vprintf(struct mg_queue *q, const char *fmt, va_list *ap) { + size_t len = mg_snprintf(NULL, 0, fmt, ap); + char *buf; + if (mg_queue_space(q, &buf) < len + 1) { + len = 0; // Nah. Not enough space + } else { + len = mg_vsnprintf((char *)buf, len + 1, fmt, ap); + mg_queue_add(q, len); + } + return len; +} + +size_t mg_queue_printf(struct mg_queue *q, const char *fmt, ...) { + va_list ap; + size_t len; + va_start(ap, fmt); + len = mg_queue_vprintf(q, fmt, &ap); + va_end(ap); + return len; +} + static void mg_pfn_iobuf_private(char ch, void *param, bool expand) { struct mg_iobuf *io = (struct mg_iobuf *) param; if (expand && io->len + 2 > io->size) mg_iobuf_resize(io, io->len + 2); diff --git a/src/queue.c b/src/queue.c new file mode 100644 index 00000000..faf26d6f --- /dev/null +++ b/src/queue.c @@ -0,0 +1,40 @@ +#include "queue.h" + +// Every message in the queue is prepended by the message length (ML) +// ML is sizeof(size_t) in size +// Tail points to the message data +// +// |------| ML | message1 | ML | message2 |--- free space ---| +// ^ ^ ^ ^ +// buf tail head len + +size_t mg_queue_space(struct mg_queue *q, char **buf) { + size_t ofs; + if (q->head > 0 && q->tail >= q->head) { // All messages read? + q->head = 0; // Yes. Reset head first + q->tail = 0; // Now reset the tail + } + ofs = q->head + sizeof(size_t); + if (buf != NULL) *buf = q->buf + ofs; + return ofs > q->len ? 0 : q->len - ofs; +} + +size_t mg_queue_next(struct mg_queue *q, char **buf) { + size_t len = MG_QUEUE_EMPTY; + if (q->tail < q->head) memcpy(&len, &q->buf[q->tail], sizeof(len)); + if (buf != NULL) *buf = &q->buf[q->tail + sizeof(len)]; + return len; +} + +void mg_queue_add(struct mg_queue *q, size_t len) { + size_t head = q->head + len + (size_t) sizeof(head); // New head + if (head <= q->len) { // Have space ? + memcpy(q->buf + q->head, &len, sizeof(len)); // Yes. Store ML + q->head = head; // Advance head + } +} + +void mg_queue_del(struct mg_queue *q) { + size_t len = mg_queue_next(q, NULL), tail = q->tail + len + sizeof(size_t); + if (len != MG_QUEUE_EMPTY) q->tail = tail; +} diff --git a/src/queue.h b/src/queue.h new file mode 100644 index 00000000..fbd905ab --- /dev/null +++ b/src/queue.h @@ -0,0 +1,38 @@ +#pragma once + +#include "arch.h" // For size_t +#include "config.h" // For MG_ENABLE_ATOMIC + +#if MG_ENABLE_ATOMIC +#include +#elif !defined(_Atomic) +#define _Atomic +#endif + +// Single producer, single consumer non-blocking queue +// +// Producer: +// void *buf; +// while (mg_queue_space(q, &buf) < len) WAIT(); // Wait for free space +// memcpy(buf, data, len); // Copy data to the queue +// mg_queue_add(q, len); // Advance q->head +// +// Consumer: +// void *buf; +// while ((len = mg_queue_next(q, &buf)) == MG_QUEUE_EMPTY) WAIT(); +// mg_hexdump(buf, len); // Handle message +// mg_queue_del(q); // Delete message +// +struct mg_queue { + char *buf; + size_t len; + volatile _Atomic size_t tail; + volatile _Atomic size_t head; +}; + +#define MG_QUEUE_EMPTY ((size_t) ~0ul) // Next message size when queue is empty + +void mg_queue_add(struct mg_queue *, size_t len); // Advance head +void mg_queue_del(struct mg_queue *); // Advance tail +size_t mg_queue_space(struct mg_queue *, char **); // Get free space +size_t mg_queue_next(struct mg_queue *, char **); // Get next message size diff --git a/test/unit_test.c b/test/unit_test.c index 6f0836ed..d0e0430b 100644 --- a/test/unit_test.c +++ b/test/unit_test.c @@ -381,7 +381,8 @@ static void test_mqtt_base() { // Ping the client c = mg_mqtt_connect(&mgr, url, NULL, mqtt_cb, &test_data); mg_mqtt_ping(c); - for (i = 0; i < 300 && !(c->is_client && !c->is_connecting); i++) mg_mgr_poll(&mgr, 10); + for (i = 0; i < 300 && !(c->is_client && !c->is_connecting); i++) + mg_mgr_poll(&mgr, 10); ASSERT(c->is_client && !c->is_connecting); mg_mgr_free(&mgr); @@ -1308,7 +1309,9 @@ static void test_http_range(void) { ASSERT(mgr.conns == NULL); } -static void f1(void *arg) { (*(int *) arg)++; } +static void f1(void *arg) { + (*(int *) arg)++; +} static void test_timer(void) { int v1 = 0, v2 = 0, v3 = 0; @@ -2683,12 +2686,130 @@ static void test_poll(void) { mg_mgr_free(&mgr); } +#if MG_ENABLE_ATOMIC +#define NMESSAGES 49999 +static uint32_t s_qcrc = 0; +static void producer(void *param) { + struct mg_queue *q = (struct mg_queue *) param; + volatile size_t i, n, len; + for (i = 0; i < NMESSAGES; i++) { + char buf[100]; + char *p; + mg_random(buf, sizeof(buf)); + n = ((unsigned char *) buf)[0] % sizeof(buf); + while ((len = mg_queue_space(q, &p)) < n) (void) 0; + memcpy(p, buf, n); + mg_queue_add(q, n); + s_qcrc = mg_crc32(s_qcrc, buf, n); + } +} + +static uint32_t consumer(struct mg_queue *q) { + uint32_t i, crc = 0; + for (i = 0; i < NMESSAGES; i++) { + char *p; + volatile size_t len; + while ((len = mg_queue_next(q, &p)) == MG_QUEUE_EMPTY) (void) 0; + crc = mg_crc32(crc, (char *) p, len); + mg_queue_del(q); + } + return crc; +} + +#if MG_ARCH == MG_ARCH_WIN32 +static void start_thread(void (*f)(void *), void *p) { + _beginthread((void(__cdecl *)(void *)) f, 0, p); +} +#elif MG_ARCH == MG_ARCH_UNIX +#include +static void start_thread(void (*f)(void *), void *p) { + union { + void (*f1)(void *); + void *(*f2)(void *); + } u = {f}; + pthread_t thread_id = (pthread_t) 0; + pthread_attr_t attr; + (void) pthread_attr_init(&attr); + (void) pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); + pthread_create(&thread_id, &attr, u.f2, p); + pthread_attr_destroy(&attr); +} +#else +static void start_thread(void (*f)(void *), void *p) { + (void) f, (void) p; +} +#endif +#endif + +static void test_queue(void) { + char buf[512], *p; + size_t size = sizeof(size); + struct mg_queue queue, *q = &queue; + + memset(q, 0, sizeof(*q)); + q->buf = buf; + q->len = sizeof(buf); + + ASSERT(mg_queue_next(q, &p) == MG_QUEUE_EMPTY); + + // Write "hi" + ASSERT(mg_queue_space(q, &p) == sizeof(buf) - size); + ASSERT(mg_queue_printf(q, "hi") == 2); + ASSERT(q->head == size + 2); + ASSERT(mg_queue_next(q, &p) == 2); + + // Write zero-length message + ASSERT(mg_queue_space(q, &p) == sizeof(buf) - 2 - 2 * size); + ASSERT(mg_queue_printf(q, "") == 0); + ASSERT(q->head == size * 2 + 2); + ASSERT(mg_queue_next(q, &p) == 2); + + // Write "dude" + ASSERT(mg_queue_space(q, &p) == sizeof(buf) - 2 - 3 * size); + ASSERT(mg_queue_printf(q, "dude") == 4); + ASSERT(q->head == size * 3 + 2 + 4); + ASSERT(mg_queue_next(q, &p) == 2); + + // Read "hi" + ASSERT(mg_queue_next(q, &p) == 2); + ASSERT(memcmp(p, "hi", 2) == 0); + mg_queue_del(q); + ASSERT(q->head == size * 3 + 2 + 4); + ASSERT(q->tail == size + 2); + + // Read empty message + ASSERT(mg_queue_next(q, &p) == 0); + mg_queue_del(q); + ASSERT(q->tail == size * 2 + 2); + + ASSERT(mg_queue_next(q, &p) == 4); + ASSERT(memcmp(p, "dude", 4) == 0); + mg_queue_del(q); + ASSERT(q->tail == q->head); + + ASSERT(mg_queue_space(q, &p) == sizeof(buf) - size); + ASSERT(q->tail == 0 && q->head == 0); + q->tail = q->head = 0; + +#if MG_ENABLE_ATOMIC + { + // Test concurrent queue access + uint32_t crc; + start_thread(producer, q); + crc = consumer(q); + MG_INFO(("DONE. %x %x", s_qcrc, crc)); + ASSERT(s_qcrc == crc); + } +#endif +} + int main(void) { const char *debug_level = getenv("V"); if (debug_level == NULL) debug_level = "3"; mg_log_set(atoi(debug_level)); test_json(); + test_queue(); test_rpc(); test_str(); test_globmatch();