mirror of
https://github.com/cesanta/mongoose.git
synced 2024-12-27 06:51:04 +08:00
Add struct mg_queue and API, change examples/multi-threaded, add tests
This commit is contained in:
parent
c0f4bde5fc
commit
bf9d69ab56
2
.github/workflows/test.yml
vendored
2
.github/workflows/test.yml
vendored
@ -92,7 +92,7 @@ jobs:
|
||||
- if: steps.check.outputs.MATCH == 1
|
||||
run: make test upload-coverage TFLAGS=-DNO_SNTP_CHECK SSL=OPENSSL ASAN_OPTIONS= OPENSSL=`echo /usr/local/Cellar/openssl*/*`
|
||||
- if: steps.check.outputs.MATCH == 1
|
||||
run: make test SSL=MBEDTLS TFLAGS=-DNO_SNTP_CHECK ASAN_OPTIONS= MBEDTLS=`echo /usr/local/Cellar/mbedtls*/*`
|
||||
run: make test SSL=MBEDTLS TFLAGS="-DNO_SNTP_CHECK -DMG_ENABLE_ATOMIC=1" ASAN_OPTIONS= MBEDTLS=`echo /usr/local/Cellar/mbedtls*/*`
|
||||
#- run: make mip_test ASAN_OPTIONS=
|
||||
- if: steps.check.outputs.MATCH == 1
|
||||
run: make mg_prefix
|
||||
|
4
Makefile
4
Makefile
@ -16,7 +16,7 @@ EXAMPLES := $(dir $(wildcard examples/*/Makefile))
|
||||
PREFIX ?= /usr/local
|
||||
VERSION ?= $(shell cut -d'"' -f2 src/version.h)
|
||||
COMMON_CFLAGS ?= $(C_WARN) $(WARN) $(INCS) $(DEFS) -DMG_ENABLE_IPV6=$(IPV6) $(TFLAGS)
|
||||
CFLAGS ?= $(OPTS) $(ASAN) $(COMMON_CFLAGS)
|
||||
CFLAGS ?= $(OPTS) $(ASAN) $(COMMON_CFLAGS) -pthread
|
||||
VALGRIND_CFLAGS ?= $(OPTS) $(COMMON_CFLAGS)
|
||||
VALGRIND_RUN ?= valgrind --tool=memcheck --gen-suppressions=all --leak-check=full --show-leak-kinds=all --leak-resolution=high --track-origins=yes --error-exitcode=1 --exit-on-first-error=yes
|
||||
.PHONY: examples test valgrind mip_test
|
||||
@ -179,7 +179,7 @@ mongoose.c: Makefile $(wildcard src/*.c) $(wildcard src/tcpip/*.c)
|
||||
(cat src/license.h; echo; echo '#include "mongoose.h"' ; (for F in src/*.c src/tcpip/*.c ; do echo; echo '#ifdef MG_ENABLE_LINES'; echo "#line 1 \"$$F\""; echo '#endif'; cat $$F | sed -e 's,#include ".*,,'; done))> $@
|
||||
|
||||
mongoose.h: $(HDRS) Makefile
|
||||
(cat src/license.h; echo; echo '#ifndef MONGOOSE_H'; echo '#define MONGOOSE_H'; echo; cat src/version.h ; echo; echo '#ifdef __cplusplus'; echo 'extern "C" {'; echo '#endif'; cat src/arch.h src/arch_*.h src/net_*.h src/config.h src/str.h src/fmt.h src/log.h src/timer.h src/fs.h src/util.h src/url.h src/iobuf.h src/base64.h src/md5.h src/sha1.h src/event.h src/net.h src/http.h src/ssi.h src/tls.h src/tls_mbed.h src/tls_openssl.h src/ws.h src/sntp.h src/mqtt.h src/dns.h src/json.h src/rpc.h src/tcpip/tcpip.h src/tcpip/driver_*.h | sed -e '/keep/! s,#include ".*,,' -e 's,^#pragma once,,'; echo; echo '#ifdef __cplusplus'; echo '}'; echo '#endif'; echo '#endif // MONGOOSE_H')> $@
|
||||
(cat src/license.h; echo; echo '#ifndef MONGOOSE_H'; echo '#define MONGOOSE_H'; echo; cat src/version.h ; echo; echo '#ifdef __cplusplus'; echo 'extern "C" {'; echo '#endif'; cat src/arch.h src/arch_*.h src/net_*.h src/config.h src/str.h src/queue.h src/fmt.h src/log.h src/timer.h src/fs.h src/util.h src/url.h src/iobuf.h src/base64.h src/md5.h src/sha1.h src/event.h src/net.h src/http.h src/ssi.h src/tls.h src/tls_mbed.h src/tls_openssl.h src/ws.h src/sntp.h src/mqtt.h src/dns.h src/json.h src/rpc.h src/tcpip/tcpip.h src/tcpip/driver_*.h | sed -e '/keep/! s,#include ".*,,' -e 's,^#pragma once,,'; echo; echo '#ifdef __cplusplus'; echo '}'; echo '#endif'; echo '#endif // MONGOOSE_H')> $@
|
||||
|
||||
clean:
|
||||
rm -rf $(PROG) *.exe *.o *.dSYM *_test* ut fuzzer *.gcov *.gcno *.gcda *.obj *.exe *.ilk *.pdb slow-unit* _CL_* infer-out data.txt crash-* test/packed_fs.c pack arduino tmp
|
||||
|
@ -13,12 +13,9 @@ Rules for creating a new example:
|
||||
- An example must build on Windows, Mac and Ubuntu Linux systems
|
||||
- Assume that user installed tools according to https://mongoose.ws/tutorials/tools/
|
||||
- Makefile must not include any other make files
|
||||
- Use `CFLAGS` for compilation options
|
||||
- Add `CFLAGS_EXTRA` to the end of `CFLAGS`, to allow user to specify
|
||||
extra options, for example:
|
||||
```
|
||||
make CFLAGS_EXTRA="-pedantic -O2"
|
||||
```
|
||||
- Use `CFLAGS` for system-specific compilation options
|
||||
- Use `CFLAGS_MONGOOSE` for mongoose-specific compilation options
|
||||
- Use `$(CFLAGS) $(CFLAGS_MONGOOSE) $(CFLAGS_EXTRA)` to compile
|
||||
- If external repository is required, download it on demand using git
|
||||
shallow clone. See embedded example golden reference
|
||||
- Keep Makefile as short as possible, but verbose to understand it easily
|
||||
|
@ -1,17 +1,16 @@
|
||||
SOURCES = main.c ../../mongoose.c # Source code files
|
||||
CFLAGS = -W -Wall -Wextra -g # Build options
|
||||
SOURCES = main.c mongoose.c # Source code files
|
||||
CFLAGS = -W -Wall -Wextra -g -I. # Build options
|
||||
|
||||
# Mongoose build options. See https://mongoose.ws/documentation/#build-options
|
||||
CFLAGS += -I ../..
|
||||
CFLAGS += -DMG_HTTP_DIRLIST_TIME=1 -DMG_ENABLE_SSI=1
|
||||
CFLAGS += -DMG_ENABLE_LINES=1 -DMG_ENABLE_IPV6=1
|
||||
CFLAGS_MONGOOSE += -DMG_HTTP_DIRLIST_TIME=1 -DMG_ENABLE_SSI=1
|
||||
CFLAGS_MONGOOSE += -DMG_ENABLE_LINES=1 -DMG_ENABLE_IPV6=1
|
||||
|
||||
ifeq ($(OS),Windows_NT)
|
||||
# Windows settings. Assume MinGW compiler
|
||||
# Windows settings. Assume MinGW compiler. To use VC: make CC=cl CFLAGS=/MD
|
||||
PROG ?= example.exe # Use .exe suffix for the binary
|
||||
CC = gcc # Use MinGW gcc compiler
|
||||
CFLAGS += -lws2_32 # Link against Winsock library
|
||||
DELETE = cmd /C del /Q /F /S # Command prompt command to delete files
|
||||
DELETE = cmd /C del /q /f /s # Command prompt command to delete files
|
||||
else
|
||||
# Mac, Linux
|
||||
PROG ?= example
|
||||
@ -22,7 +21,7 @@ all: $(PROG)
|
||||
$(RUN) ./$(PROG) $(ARGS)
|
||||
|
||||
$(PROG): $(SOURCES)
|
||||
$(CC) $(SOURCES) $(CFLAGS) $(CFLAGS_EXTRA) -o $@
|
||||
$(CC) $(SOURCES) $(CFLAGS) $(CFLAGS_MONGOOSE) $(CFLAGS_EXTRA) -o $@
|
||||
|
||||
clean:
|
||||
$(DELETE) $(PROG) *.o *.dSYM
|
||||
$(DELETE) $(PROG) *.o *.obj *.exe *.dSYM
|
||||
|
1
examples/http-server/mongoose.c
Symbolic link
1
examples/http-server/mongoose.c
Symbolic link
@ -0,0 +1 @@
|
||||
../../mongoose.c
|
1
examples/http-server/mongoose.h
Symbolic link
1
examples/http-server/mongoose.h
Symbolic link
@ -0,0 +1 @@
|
||||
../../mongoose.h
|
@ -1,17 +1,26 @@
|
||||
PROG ?= example
|
||||
ROOT ?= $(realpath $(CURDIR)/../..)
|
||||
CWD ?= $(realpath $(CURDIR))
|
||||
DOCKER ?= docker run --rm -it -e Tmp=. -e WINEDEBUG=-all -v $(ROOT):$(ROOT) -w $(CWD)
|
||||
SOURCES = main.c mongoose.c # Source code files
|
||||
CFLAGS = -W -Wall -Wextra -g -I. # Build options
|
||||
|
||||
# Mongoose build options. See https://mongoose.ws/documentation/#build-options
|
||||
CFLAGS_MONGOOSE += -DMG_ENABLE_ATOMIC=1
|
||||
|
||||
ifeq ($(OS),Windows_NT)
|
||||
# Windows settings. Assume MinGW compiler. To use VC: make CC=cl CFLAGS=/MD
|
||||
PROG ?= example.exe # Use .exe suffix for the binary
|
||||
CC = gcc # Use MinGW gcc compiler
|
||||
CFLAGS += -lws2_32 # Link against Winsock library
|
||||
DELETE = cmd /C del /f /q /s # Command prompt command to delete files
|
||||
else
|
||||
# Mac, Linux
|
||||
PROG ?= example
|
||||
DELETE = rm -rf
|
||||
endif
|
||||
|
||||
all: $(PROG)
|
||||
$(RUN) ./$(PROG)
|
||||
$(RUN) ./$(PROG) $(ARGS)
|
||||
|
||||
$(PROG):
|
||||
$(CC) ../../mongoose.c -I../.. -pthread $(CFLAGS) -o $(PROG) main.c
|
||||
|
||||
$(PROG).exe:
|
||||
$(DOCKER) mdashnet/vc98 wine cl ../../mongoose.c main.c -I../.. /MD ws2_32.lib /Fe$@
|
||||
$(DOCKER) mdashnet/vc98 wine $@
|
||||
$(PROG): $(SOURCES)
|
||||
$(CC) $(SOURCES) $(CFLAGS) $(CFLAGS_MONGOOSE) $(CFLAGS_EXTRA) -o $@
|
||||
|
||||
clean:
|
||||
rm -rf $(PROG) *.o *.dSYM *.gcov *.gcno *.gcda *.obj *.exe *.ilk *.pdb log.txt
|
||||
$(DELETE) $(PROG) *.o *.obj *.exe *.dSYM
|
||||
|
@ -1,4 +1,4 @@
|
||||
// Copyright (c) 2020 Cesanta Software Limited
|
||||
// Copyright (c) 2020-2023 Cesanta Software Limited
|
||||
// All rights reserved
|
||||
//
|
||||
// Multithreading example.
|
||||
@ -6,14 +6,20 @@
|
||||
// some time to simulate long processing time, produces an output and
|
||||
// hands over that output to the request handler function.
|
||||
//
|
||||
// We pass POST body to the worker thread, and respond with a calculated CRC
|
||||
|
||||
#include "mongoose.h"
|
||||
|
||||
struct thread_data {
|
||||
struct mg_queue queue; // Worker -> Connection queue
|
||||
struct mg_str body; // Copy of message body
|
||||
};
|
||||
|
||||
static void start_thread(void (*f)(void *), void *p) {
|
||||
#ifdef _WIN32
|
||||
#define usleep(x) Sleep((x) / 1000)
|
||||
_beginthread((void(__cdecl *)(void *)) f, 0, p);
|
||||
#else
|
||||
#define closesocket(x) close(x)
|
||||
#include <pthread.h>
|
||||
pthread_t thread_id = (pthread_t) 0;
|
||||
pthread_attr_t attr;
|
||||
@ -24,71 +30,51 @@ static void start_thread(void (*f)(void *), void *p) {
|
||||
#endif
|
||||
}
|
||||
|
||||
struct userdata {
|
||||
int sock; // Paired socket, worker thread owns it
|
||||
struct mg_str
|
||||
body; // data to be processed
|
||||
};
|
||||
static void worker_thread(void *param) {
|
||||
struct thread_data *d = (struct thread_data *) param;
|
||||
char buf[100]; // On-stack buffer for the message queue
|
||||
|
||||
static void thread_function(void *param) {
|
||||
struct userdata *p = (struct userdata *) param;
|
||||
sleep(2); // Simulate long execution
|
||||
// Respond, wakeup event manager
|
||||
if (p->body.len)
|
||||
send(p->sock, p->body.ptr, p->body.len, 0); // if there is a body, echo it
|
||||
else
|
||||
send(p->sock, "hi", 2, 0); // otherwise just respond "hi"
|
||||
close(p->sock); // Close the connection
|
||||
free((void *) p->body.ptr); // free body
|
||||
free(p); // free userdata
|
||||
}
|
||||
d->queue.buf = buf; // Caller passed us an empty queue with NULL
|
||||
d->queue.len = sizeof(buf); // buffer. Initialise it now
|
||||
usleep(1 * 1000 * 1000); // Simulate long execution time
|
||||
|
||||
static void link_conns(struct mg_connection *c1, struct mg_connection *c2) {
|
||||
c1->fn_data = c2;
|
||||
c2->fn_data = c1;
|
||||
}
|
||||
|
||||
static void unlink_conns(struct mg_connection *c1, struct mg_connection *c2) {
|
||||
c1->fn_data = c2->fn_data = NULL;
|
||||
}
|
||||
|
||||
// Pipe event handler
|
||||
static void pcb(struct mg_connection *c, int ev, void *ev_data, void *fn_data) {
|
||||
struct mg_connection *parent = (struct mg_connection *) fn_data;
|
||||
MG_INFO(("%lu %p %d %p", c->id, c->fd, ev, parent));
|
||||
if (parent == NULL) { // If parent connection closed, close too
|
||||
c->is_closing = 1;
|
||||
} else if (ev == MG_EV_READ) { // Got data from the worker thread
|
||||
mg_http_reply(parent, 200, "Host: foo.com\r\n", "%.*s\n", c->recv.len,
|
||||
c->recv.buf); // Respond!
|
||||
c->recv.len = 0; // Tell Mongoose we've consumed data
|
||||
} else if (ev == MG_EV_OPEN) {
|
||||
link_conns(c, parent);
|
||||
} else if (ev == MG_EV_CLOSE) {
|
||||
unlink_conns(c, parent);
|
||||
// Send a response to the connection
|
||||
if (d->body.len == 0) {
|
||||
mg_queue_printf(&d->queue, "Send me POST data");
|
||||
} else {
|
||||
uint32_t crc = mg_crc32(0, d->body.ptr, d->body.len);
|
||||
mg_queue_printf(&d->queue, "crc32: %#x", crc);
|
||||
free((char *) d->body.ptr);
|
||||
}
|
||||
|
||||
// Wait until connection reads our message, then it is safe to quit
|
||||
while (mg_queue_next(&d->queue, NULL) != MG_QUEUE_EMPTY) usleep(1000);
|
||||
MG_INFO(("done, cleaning up..."));
|
||||
free(d);
|
||||
}
|
||||
|
||||
// HTTP request callback
|
||||
static void fn(struct mg_connection *c, int ev, void *ev_data, void *fn_data) {
|
||||
if (ev == MG_EV_HTTP_MSG) {
|
||||
// Received HTTP request. Allocate thread data and spawn a worker thread
|
||||
struct mg_http_message *hm = (struct mg_http_message *) ev_data;
|
||||
if (mg_http_match_uri(hm, "/fast")) {
|
||||
// Single-threaded code path, for performance comparison
|
||||
// The /fast URI responds immediately
|
||||
mg_http_reply(c, 200, "Host: foo.com\r\n", "hi\n");
|
||||
} else {
|
||||
// Multithreading code path
|
||||
struct userdata *data = calloc(1, sizeof(*data)); // worker will free it
|
||||
// parse headers or use request body, duplicate data and pass it to worker
|
||||
// thread
|
||||
data->body = mg_strdup(hm->body); // worker will free it
|
||||
data->sock = mg_mkpipe(c->mgr, pcb, c, true); // Create pipe
|
||||
start_thread(thread_function, data); // Start thread and pass data
|
||||
struct thread_data *d = (struct thread_data *) calloc(1, sizeof(*d));
|
||||
d->body = mg_strdup(hm->body); // Pass received body to the worker
|
||||
start_thread(worker_thread, d); // Start a thread
|
||||
*(void **) c->data = d; // Memorise data pointer in c->data
|
||||
} else if (ev == MG_EV_POLL) {
|
||||
// Poll event. Delivered to us every mg_mgr_poll interval or faster
|
||||
struct thread_data *d = *(struct thread_data **) c->data;
|
||||
size_t len;
|
||||
char *buf;
|
||||
// Check if we have a message from the worker
|
||||
if (d != NULL && (len = mg_queue_next(&d->queue, &buf)) != MG_QUEUE_EMPTY) {
|
||||
// Got message from worker. Send a response and cleanup
|
||||
mg_http_reply(c, 200, "", "%.*s\n", (int) len, buf);
|
||||
mg_queue_del(&d->queue); // Delete message: signal worker that we're done
|
||||
}
|
||||
} else if (ev == MG_EV_CLOSE) {
|
||||
if (c->fn_data != NULL) unlink_conns(c, c->fn_data);
|
||||
}
|
||||
(void) fn_data;
|
||||
}
|
||||
|
||||
int main(void) {
|
||||
@ -96,7 +82,7 @@ int main(void) {
|
||||
mg_mgr_init(&mgr);
|
||||
mg_log_set(MG_LL_DEBUG); // Set debug log level
|
||||
mg_http_listen(&mgr, "http://localhost:8000", fn, NULL); // Create listener
|
||||
for (;;) mg_mgr_poll(&mgr, 1000); // Event loop
|
||||
mg_mgr_free(&mgr); // Cleanup
|
||||
for (;;) mg_mgr_poll(&mgr, 10); // Event loop. Use 10ms poll interval
|
||||
mg_mgr_free(&mgr); // Cleanup
|
||||
return 0;
|
||||
}
|
||||
|
1
examples/multi-threaded/mongoose.c
Symbolic link
1
examples/multi-threaded/mongoose.c
Symbolic link
@ -0,0 +1 @@
|
||||
../../mongoose.c
|
1
examples/multi-threaded/mongoose.h
Symbolic link
1
examples/multi-threaded/mongoose.h
Symbolic link
@ -0,0 +1 @@
|
||||
../../mongoose.h
|
65
mongoose.c
65
mongoose.c
@ -3490,6 +3490,27 @@ void mg_mgr_init(struct mg_mgr *mgr) {
|
||||
|
||||
|
||||
|
||||
size_t mg_queue_vprintf(struct mg_queue *q, const char *fmt, va_list *ap) {
|
||||
size_t len = mg_snprintf(NULL, 0, fmt, ap);
|
||||
char *buf;
|
||||
if (mg_queue_space(q, &buf) < len + 1) {
|
||||
len = 0; // Nah. Not enough space
|
||||
} else {
|
||||
len = mg_vsnprintf((char *)buf, len + 1, fmt, ap);
|
||||
mg_queue_add(q, len);
|
||||
}
|
||||
return len;
|
||||
}
|
||||
|
||||
size_t mg_queue_printf(struct mg_queue *q, const char *fmt, ...) {
|
||||
va_list ap;
|
||||
size_t len;
|
||||
va_start(ap, fmt);
|
||||
len = mg_queue_vprintf(q, fmt, &ap);
|
||||
va_end(ap);
|
||||
return len;
|
||||
}
|
||||
|
||||
static void mg_pfn_iobuf_private(char ch, void *param, bool expand) {
|
||||
struct mg_iobuf *io = (struct mg_iobuf *) param;
|
||||
if (expand && io->len + 2 > io->size) mg_iobuf_resize(io, io->len + 2);
|
||||
@ -3583,6 +3604,50 @@ size_t mg_print_mac(void (*out)(char, void *), void *arg, va_list *ap) {
|
||||
p[3], p[4], p[5]);
|
||||
}
|
||||
|
||||
#ifdef MG_ENABLE_LINES
|
||||
#line 1 "src/queue.c"
|
||||
#endif
|
||||
|
||||
|
||||
// Every message in the queue is prepended by the message length (ML)
|
||||
// ML is sizeof(size_t) in size
|
||||
// Tail points to the message data
|
||||
//
|
||||
// |------| ML | message1 | ML | message2 |--- free space ---|
|
||||
// ^ ^ ^ ^
|
||||
// buf tail head len
|
||||
|
||||
size_t mg_queue_space(struct mg_queue *q, char **buf) {
|
||||
size_t ofs;
|
||||
if (q->head > 0 && q->tail >= q->head) { // All messages read?
|
||||
q->head = 0; // Yes. Reset head first
|
||||
q->tail = 0; // Now reset the tail
|
||||
}
|
||||
ofs = q->head + sizeof(size_t);
|
||||
if (buf != NULL) *buf = q->buf + ofs;
|
||||
return ofs > q->len ? 0 : q->len - ofs;
|
||||
}
|
||||
|
||||
size_t mg_queue_next(struct mg_queue *q, char **buf) {
|
||||
size_t len = MG_QUEUE_EMPTY;
|
||||
if (q->tail < q->head) memcpy(&len, &q->buf[q->tail], sizeof(len));
|
||||
if (buf != NULL) *buf = &q->buf[q->tail + sizeof(len)];
|
||||
return len;
|
||||
}
|
||||
|
||||
void mg_queue_add(struct mg_queue *q, size_t len) {
|
||||
size_t head = q->head + len + (size_t) sizeof(head); // New head
|
||||
if (head <= q->len) { // Have space ?
|
||||
memcpy(q->buf + q->head, &len, sizeof(len)); // Yes. Store ML
|
||||
q->head = head; // Advance head
|
||||
}
|
||||
}
|
||||
|
||||
void mg_queue_del(struct mg_queue *q) {
|
||||
size_t len = mg_queue_next(q, NULL), tail = q->tail + len + sizeof(size_t);
|
||||
if (len != MG_QUEUE_EMPTY) q->tail = tail;
|
||||
}
|
||||
|
||||
#ifdef MG_ENABLE_LINES
|
||||
#line 1 "src/rpc.c"
|
||||
#endif
|
||||
|
50
mongoose.h
50
mongoose.h
@ -223,6 +223,7 @@ static inline int mg_mkdir(const char *path, mode_t mode) {
|
||||
#define MG_PATH_MAX 100
|
||||
#define MG_ENABLE_SOCKET 0
|
||||
#define MG_ENABLE_DIRLIST 0
|
||||
#define MG_ENABLE_ATOMIC 1
|
||||
|
||||
#endif
|
||||
|
||||
@ -239,6 +240,7 @@ static inline int mg_mkdir(const char *path, mode_t mode) {
|
||||
|
||||
#include <pico/stdlib.h>
|
||||
int mkdir(const char *, mode_t);
|
||||
#define MG_ENABLE_ATOMIC 1
|
||||
#endif
|
||||
|
||||
|
||||
@ -597,6 +599,10 @@ struct timeval {
|
||||
#define MG_ENABLE_TCPIP 0 // Mongoose built-in network stack
|
||||
#endif
|
||||
|
||||
#ifndef MG_ENABLE_ATOMIC
|
||||
#define MG_ENABLE_ATOMIC 0 // Required by mg_queue
|
||||
#endif
|
||||
|
||||
#ifndef MG_ENABLE_LWIP
|
||||
#define MG_ENABLE_LWIP 0 // lWIP network stack
|
||||
#endif
|
||||
@ -787,19 +793,58 @@ char *mg_remove_double_dots(char *s);
|
||||
|
||||
|
||||
|
||||
#if MG_ENABLE_ATOMIC
|
||||
#include <stdatomic.h>
|
||||
#elif !defined(_Atomic)
|
||||
#define _Atomic
|
||||
#endif
|
||||
|
||||
// Single producer, single consumer non-blocking queue
|
||||
//
|
||||
// Producer:
|
||||
// void *buf;
|
||||
// while (mg_queue_space(q, &buf) < len) WAIT(); // Wait for free space
|
||||
// memcpy(buf, data, len); // Copy data to the queue
|
||||
// mg_queue_add(q, len); // Advance q->head
|
||||
//
|
||||
// Consumer:
|
||||
// void *buf;
|
||||
// while ((len = mg_queue_next(q, &buf)) == MG_QUEUE_EMPTY) WAIT();
|
||||
// mg_hexdump(buf, len); // Handle message
|
||||
// mg_queue_del(q); // Delete message
|
||||
//
|
||||
struct mg_queue {
|
||||
char *buf;
|
||||
size_t len;
|
||||
volatile _Atomic size_t tail;
|
||||
volatile _Atomic size_t head;
|
||||
};
|
||||
|
||||
#define MG_QUEUE_EMPTY ((size_t) ~0ul) // Next message size when queue is empty
|
||||
|
||||
void mg_queue_add(struct mg_queue *, size_t len); // Advance head
|
||||
void mg_queue_del(struct mg_queue *); // Advance tail
|
||||
size_t mg_queue_space(struct mg_queue *, char **); // Get free space
|
||||
size_t mg_queue_next(struct mg_queue *, char **); // Get next message size
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
typedef void (*mg_pfn_t)(char, void *); // Output function
|
||||
typedef size_t (*mg_pm_t)(mg_pfn_t, void *, va_list *); // %M printer
|
||||
|
||||
size_t mg_vxprintf(void (*)(char, void *), void *, const char *fmt, va_list *);
|
||||
size_t mg_xprintf(void (*fn)(char, void *), void *, const char *fmt, ...);
|
||||
|
||||
|
||||
|
||||
// Convenience wrappers around mg_xprintf
|
||||
size_t mg_vsnprintf(char *buf, size_t len, const char *fmt, va_list *ap);
|
||||
size_t mg_snprintf(char *, size_t, const char *fmt, ...);
|
||||
char *mg_vmprintf(const char *fmt, va_list *ap);
|
||||
char *mg_mprintf(const char *fmt, ...);
|
||||
size_t mg_queue_vprintf(struct mg_queue *, const char *fmt, va_list *);
|
||||
size_t mg_queue_printf(struct mg_queue *, const char *fmt, ...);
|
||||
|
||||
// %M print helper functions
|
||||
size_t mg_print_ip(void (*out)(char, void *), void *arg, va_list *ap);
|
||||
@ -810,7 +855,6 @@ size_t mg_print_mac(void (*out)(char, void *), void *arg, va_list *ap);
|
||||
|
||||
// Various output functions
|
||||
void mg_pfn_iobuf(char ch, void *param); // param: struct mg_iobuf *
|
||||
void mg_pfn_queue(char ch, void *param); // param: struct mg_queue *
|
||||
void mg_pfn_stdout(char c, void *param); // param: ignored
|
||||
|
||||
|
||||
|
@ -19,5 +19,6 @@
|
||||
#define MG_PATH_MAX 100
|
||||
#define MG_ENABLE_SOCKET 0
|
||||
#define MG_ENABLE_DIRLIST 0
|
||||
#define MG_ENABLE_ATOMIC 1
|
||||
|
||||
#endif
|
||||
|
@ -12,4 +12,5 @@
|
||||
|
||||
#include <pico/stdlib.h>
|
||||
int mkdir(const char *, mode_t);
|
||||
#define MG_ENABLE_ATOMIC 1
|
||||
#endif
|
||||
|
@ -8,6 +8,10 @@
|
||||
#define MG_ENABLE_TCPIP 0 // Mongoose built-in network stack
|
||||
#endif
|
||||
|
||||
#ifndef MG_ENABLE_ATOMIC
|
||||
#define MG_ENABLE_ATOMIC 0 // Required by mg_queue
|
||||
#endif
|
||||
|
||||
#ifndef MG_ENABLE_LWIP
|
||||
#define MG_ENABLE_LWIP 0 // lWIP network stack
|
||||
#endif
|
||||
|
@ -2,11 +2,11 @@
|
||||
|
||||
#include "arch.h"
|
||||
#include "iobuf.h"
|
||||
#include "queue.h"
|
||||
|
||||
typedef void (*mg_pfn_t)(char, void *); // Output function
|
||||
typedef size_t (*mg_pm_t)(mg_pfn_t, void *, va_list *); // %M printer
|
||||
|
||||
// The lowest level
|
||||
size_t mg_vxprintf(void (*)(char, void *), void *, const char *fmt, va_list *);
|
||||
size_t mg_xprintf(void (*fn)(char, void *), void *, const char *fmt, ...);
|
||||
|
||||
@ -15,6 +15,8 @@ size_t mg_vsnprintf(char *buf, size_t len, const char *fmt, va_list *ap);
|
||||
size_t mg_snprintf(char *, size_t, const char *fmt, ...);
|
||||
char *mg_vmprintf(const char *fmt, va_list *ap);
|
||||
char *mg_mprintf(const char *fmt, ...);
|
||||
size_t mg_queue_vprintf(struct mg_queue *, const char *fmt, va_list *);
|
||||
size_t mg_queue_printf(struct mg_queue *, const char *fmt, ...);
|
||||
|
||||
// %M print helper functions
|
||||
size_t mg_print_ip(void (*out)(char, void *), void *arg, va_list *ap);
|
||||
|
22
src/printf.c
22
src/printf.c
@ -1,7 +1,29 @@
|
||||
#include "fmt.h"
|
||||
#include "iobuf.h"
|
||||
#include "queue.h"
|
||||
#include "util.h"
|
||||
|
||||
size_t mg_queue_vprintf(struct mg_queue *q, const char *fmt, va_list *ap) {
|
||||
size_t len = mg_snprintf(NULL, 0, fmt, ap);
|
||||
char *buf;
|
||||
if (mg_queue_space(q, &buf) < len + 1) {
|
||||
len = 0; // Nah. Not enough space
|
||||
} else {
|
||||
len = mg_vsnprintf((char *)buf, len + 1, fmt, ap);
|
||||
mg_queue_add(q, len);
|
||||
}
|
||||
return len;
|
||||
}
|
||||
|
||||
size_t mg_queue_printf(struct mg_queue *q, const char *fmt, ...) {
|
||||
va_list ap;
|
||||
size_t len;
|
||||
va_start(ap, fmt);
|
||||
len = mg_queue_vprintf(q, fmt, &ap);
|
||||
va_end(ap);
|
||||
return len;
|
||||
}
|
||||
|
||||
static void mg_pfn_iobuf_private(char ch, void *param, bool expand) {
|
||||
struct mg_iobuf *io = (struct mg_iobuf *) param;
|
||||
if (expand && io->len + 2 > io->size) mg_iobuf_resize(io, io->len + 2);
|
||||
|
40
src/queue.c
Normal file
40
src/queue.c
Normal file
@ -0,0 +1,40 @@
|
||||
#include "queue.h"
|
||||
|
||||
// Every message in the queue is prepended by the message length (ML)
|
||||
// ML is sizeof(size_t) in size
|
||||
// Tail points to the message data
|
||||
//
|
||||
// |------| ML | message1 | ML | message2 |--- free space ---|
|
||||
// ^ ^ ^ ^
|
||||
// buf tail head len
|
||||
|
||||
size_t mg_queue_space(struct mg_queue *q, char **buf) {
|
||||
size_t ofs;
|
||||
if (q->head > 0 && q->tail >= q->head) { // All messages read?
|
||||
q->head = 0; // Yes. Reset head first
|
||||
q->tail = 0; // Now reset the tail
|
||||
}
|
||||
ofs = q->head + sizeof(size_t);
|
||||
if (buf != NULL) *buf = q->buf + ofs;
|
||||
return ofs > q->len ? 0 : q->len - ofs;
|
||||
}
|
||||
|
||||
size_t mg_queue_next(struct mg_queue *q, char **buf) {
|
||||
size_t len = MG_QUEUE_EMPTY;
|
||||
if (q->tail < q->head) memcpy(&len, &q->buf[q->tail], sizeof(len));
|
||||
if (buf != NULL) *buf = &q->buf[q->tail + sizeof(len)];
|
||||
return len;
|
||||
}
|
||||
|
||||
void mg_queue_add(struct mg_queue *q, size_t len) {
|
||||
size_t head = q->head + len + (size_t) sizeof(head); // New head
|
||||
if (head <= q->len) { // Have space ?
|
||||
memcpy(q->buf + q->head, &len, sizeof(len)); // Yes. Store ML
|
||||
q->head = head; // Advance head
|
||||
}
|
||||
}
|
||||
|
||||
void mg_queue_del(struct mg_queue *q) {
|
||||
size_t len = mg_queue_next(q, NULL), tail = q->tail + len + sizeof(size_t);
|
||||
if (len != MG_QUEUE_EMPTY) q->tail = tail;
|
||||
}
|
38
src/queue.h
Normal file
38
src/queue.h
Normal file
@ -0,0 +1,38 @@
|
||||
#pragma once
|
||||
|
||||
#include "arch.h" // For size_t
|
||||
#include "config.h" // For MG_ENABLE_ATOMIC
|
||||
|
||||
#if MG_ENABLE_ATOMIC
|
||||
#include <stdatomic.h>
|
||||
#elif !defined(_Atomic)
|
||||
#define _Atomic
|
||||
#endif
|
||||
|
||||
// Single producer, single consumer non-blocking queue
|
||||
//
|
||||
// Producer:
|
||||
// void *buf;
|
||||
// while (mg_queue_space(q, &buf) < len) WAIT(); // Wait for free space
|
||||
// memcpy(buf, data, len); // Copy data to the queue
|
||||
// mg_queue_add(q, len); // Advance q->head
|
||||
//
|
||||
// Consumer:
|
||||
// void *buf;
|
||||
// while ((len = mg_queue_next(q, &buf)) == MG_QUEUE_EMPTY) WAIT();
|
||||
// mg_hexdump(buf, len); // Handle message
|
||||
// mg_queue_del(q); // Delete message
|
||||
//
|
||||
struct mg_queue {
|
||||
char *buf;
|
||||
size_t len;
|
||||
volatile _Atomic size_t tail;
|
||||
volatile _Atomic size_t head;
|
||||
};
|
||||
|
||||
#define MG_QUEUE_EMPTY ((size_t) ~0ul) // Next message size when queue is empty
|
||||
|
||||
void mg_queue_add(struct mg_queue *, size_t len); // Advance head
|
||||
void mg_queue_del(struct mg_queue *); // Advance tail
|
||||
size_t mg_queue_space(struct mg_queue *, char **); // Get free space
|
||||
size_t mg_queue_next(struct mg_queue *, char **); // Get next message size
|
125
test/unit_test.c
125
test/unit_test.c
@ -381,7 +381,8 @@ static void test_mqtt_base() {
|
||||
// Ping the client
|
||||
c = mg_mqtt_connect(&mgr, url, NULL, mqtt_cb, &test_data);
|
||||
mg_mqtt_ping(c);
|
||||
for (i = 0; i < 300 && !(c->is_client && !c->is_connecting); i++) mg_mgr_poll(&mgr, 10);
|
||||
for (i = 0; i < 300 && !(c->is_client && !c->is_connecting); i++)
|
||||
mg_mgr_poll(&mgr, 10);
|
||||
ASSERT(c->is_client && !c->is_connecting);
|
||||
|
||||
mg_mgr_free(&mgr);
|
||||
@ -1308,7 +1309,9 @@ static void test_http_range(void) {
|
||||
ASSERT(mgr.conns == NULL);
|
||||
}
|
||||
|
||||
static void f1(void *arg) { (*(int *) arg)++; }
|
||||
static void f1(void *arg) {
|
||||
(*(int *) arg)++;
|
||||
}
|
||||
|
||||
static void test_timer(void) {
|
||||
int v1 = 0, v2 = 0, v3 = 0;
|
||||
@ -2683,12 +2686,130 @@ static void test_poll(void) {
|
||||
mg_mgr_free(&mgr);
|
||||
}
|
||||
|
||||
#if MG_ENABLE_ATOMIC
|
||||
#define NMESSAGES 49999
|
||||
static uint32_t s_qcrc = 0;
|
||||
static void producer(void *param) {
|
||||
struct mg_queue *q = (struct mg_queue *) param;
|
||||
volatile size_t i, n, len;
|
||||
for (i = 0; i < NMESSAGES; i++) {
|
||||
char buf[100];
|
||||
char *p;
|
||||
mg_random(buf, sizeof(buf));
|
||||
n = ((unsigned char *) buf)[0] % sizeof(buf);
|
||||
while ((len = mg_queue_space(q, &p)) < n) (void) 0;
|
||||
memcpy(p, buf, n);
|
||||
mg_queue_add(q, n);
|
||||
s_qcrc = mg_crc32(s_qcrc, buf, n);
|
||||
}
|
||||
}
|
||||
|
||||
static uint32_t consumer(struct mg_queue *q) {
|
||||
uint32_t i, crc = 0;
|
||||
for (i = 0; i < NMESSAGES; i++) {
|
||||
char *p;
|
||||
volatile size_t len;
|
||||
while ((len = mg_queue_next(q, &p)) == MG_QUEUE_EMPTY) (void) 0;
|
||||
crc = mg_crc32(crc, (char *) p, len);
|
||||
mg_queue_del(q);
|
||||
}
|
||||
return crc;
|
||||
}
|
||||
|
||||
#if MG_ARCH == MG_ARCH_WIN32
|
||||
static void start_thread(void (*f)(void *), void *p) {
|
||||
_beginthread((void(__cdecl *)(void *)) f, 0, p);
|
||||
}
|
||||
#elif MG_ARCH == MG_ARCH_UNIX
|
||||
#include <pthread.h>
|
||||
static void start_thread(void (*f)(void *), void *p) {
|
||||
union {
|
||||
void (*f1)(void *);
|
||||
void *(*f2)(void *);
|
||||
} u = {f};
|
||||
pthread_t thread_id = (pthread_t) 0;
|
||||
pthread_attr_t attr;
|
||||
(void) pthread_attr_init(&attr);
|
||||
(void) pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
|
||||
pthread_create(&thread_id, &attr, u.f2, p);
|
||||
pthread_attr_destroy(&attr);
|
||||
}
|
||||
#else
|
||||
static void start_thread(void (*f)(void *), void *p) {
|
||||
(void) f, (void) p;
|
||||
}
|
||||
#endif
|
||||
#endif
|
||||
|
||||
static void test_queue(void) {
|
||||
char buf[512], *p;
|
||||
size_t size = sizeof(size);
|
||||
struct mg_queue queue, *q = &queue;
|
||||
|
||||
memset(q, 0, sizeof(*q));
|
||||
q->buf = buf;
|
||||
q->len = sizeof(buf);
|
||||
|
||||
ASSERT(mg_queue_next(q, &p) == MG_QUEUE_EMPTY);
|
||||
|
||||
// Write "hi"
|
||||
ASSERT(mg_queue_space(q, &p) == sizeof(buf) - size);
|
||||
ASSERT(mg_queue_printf(q, "hi") == 2);
|
||||
ASSERT(q->head == size + 2);
|
||||
ASSERT(mg_queue_next(q, &p) == 2);
|
||||
|
||||
// Write zero-length message
|
||||
ASSERT(mg_queue_space(q, &p) == sizeof(buf) - 2 - 2 * size);
|
||||
ASSERT(mg_queue_printf(q, "") == 0);
|
||||
ASSERT(q->head == size * 2 + 2);
|
||||
ASSERT(mg_queue_next(q, &p) == 2);
|
||||
|
||||
// Write "dude"
|
||||
ASSERT(mg_queue_space(q, &p) == sizeof(buf) - 2 - 3 * size);
|
||||
ASSERT(mg_queue_printf(q, "dude") == 4);
|
||||
ASSERT(q->head == size * 3 + 2 + 4);
|
||||
ASSERT(mg_queue_next(q, &p) == 2);
|
||||
|
||||
// Read "hi"
|
||||
ASSERT(mg_queue_next(q, &p) == 2);
|
||||
ASSERT(memcmp(p, "hi", 2) == 0);
|
||||
mg_queue_del(q);
|
||||
ASSERT(q->head == size * 3 + 2 + 4);
|
||||
ASSERT(q->tail == size + 2);
|
||||
|
||||
// Read empty message
|
||||
ASSERT(mg_queue_next(q, &p) == 0);
|
||||
mg_queue_del(q);
|
||||
ASSERT(q->tail == size * 2 + 2);
|
||||
|
||||
ASSERT(mg_queue_next(q, &p) == 4);
|
||||
ASSERT(memcmp(p, "dude", 4) == 0);
|
||||
mg_queue_del(q);
|
||||
ASSERT(q->tail == q->head);
|
||||
|
||||
ASSERT(mg_queue_space(q, &p) == sizeof(buf) - size);
|
||||
ASSERT(q->tail == 0 && q->head == 0);
|
||||
q->tail = q->head = 0;
|
||||
|
||||
#if MG_ENABLE_ATOMIC
|
||||
{
|
||||
// Test concurrent queue access
|
||||
uint32_t crc;
|
||||
start_thread(producer, q);
|
||||
crc = consumer(q);
|
||||
MG_INFO(("DONE. %x %x", s_qcrc, crc));
|
||||
ASSERT(s_qcrc == crc);
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
int main(void) {
|
||||
const char *debug_level = getenv("V");
|
||||
if (debug_level == NULL) debug_level = "3";
|
||||
mg_log_set(atoi(debug_level));
|
||||
|
||||
test_json();
|
||||
test_queue();
|
||||
test_rpc();
|
||||
test_str();
|
||||
test_globmatch();
|
||||
|
Loading…
x
Reference in New Issue
Block a user