From b7ce8213d78311e172f0a8ded0a5ddbce4a86b1d Mon Sep 17 00:00:00 2001 From: Sergey Lyubka Date: Sat, 7 Aug 2021 17:22:47 +0100 Subject: [PATCH] Better multithreading support: remove mg_socketpair, add mg_mkpipe() and mg_rmpipe() --- docs/README.md | 25 ++-- examples/multi-threaded/Makefile | 13 +- examples/multi-threaded/main.c | 71 ++++------- mongoose.c | 198 +++++++++++++++++-------------- mongoose.h | 12 +- src/config.h | 8 -- src/http.c | 20 ++-- src/iobuf.c | 2 +- src/net.h | 4 +- src/sock.c | 176 ++++++++++++++------------- test/mongoose_custom.c | 9 ++ test/unit_test.c | 36 +++++- 12 files changed, 309 insertions(+), 265 deletions(-) diff --git a/docs/README.md b/docs/README.md index c708c5b7..1bb72aa9 100644 --- a/docs/README.md +++ b/docs/README.md @@ -240,8 +240,6 @@ Here is a list of build constants and their default values: |MG_ENABLE_IPV6 | 0 | Enable IPv6 | |MG_ENABLE_LOG | 1 | Enable `LOG()` macro | |MG_ENABLE_MD5 | 0 | Use native MD5 implementation | -|MG_ENABLE_SOCKETPAIR | 0 | Enable `mg_socketpair()` for multi-threading | -|MG_ENABLE_NATIVE_SOCKETPAIR | 0 | Use native `socketpair()` syscall for `mg_socketpair()`| |MG_ENABLE_SSI | 1 | Enable serving SSI files by `mg_http_serve_dir()` | |MG_ENABLE_DIRLIST | 0 | Enable directory listing | |MG_ENABLE_CUSTOM_RANDOM | 0 | Provide custom RNG function `mg_random()` | @@ -524,16 +522,27 @@ int mg_vprintf(struct mg_connection *, const char *fmt, va_list ap); Same as `mg_printf()`, but takes `va_list` argument as a parameter. -### mg\_socketpair() +### mg\_mkpipe() ```c -bool mg_socketpair(int *blocking, int *non_blocking); +bool mg_mkpipe(struct mg_connection *c, struct mg_connection *pc[2]); +void mg_rmpipe(struct mg_connection *c); ``` -Create a socket pair for exchanging data in multi-threaded environment. The -`blocking` socket is blocking - it should be passed to the processing task. -The `non_blocking` socket is non blocking, it should be used by an event -handler function. Return value: true on success, false on error. +Create a pair of connected connections using UDP socketpair. +A sending connection, `pc[0]`, is safe to give to a different task, +and send data to it. A receiving connection `pc[1]`, forwards all received +data to `c`. A `pc[0]` is not added to event manager, therefore it must be +manually cleaned up by calling `mg_rmpipe()` when sending task is done. +A receiving side, `pc[1]` is added to the event manager, therefore it wakes +up a manager each time data gets sent. + +NOTE: if there is a limit on the local UDP message size, do not send more +than that limit in one call. + +See examples/multi-threaded for a usage example. + +Return value: true on success, false on error. ## IO buffers diff --git a/examples/multi-threaded/Makefile b/examples/multi-threaded/Makefile index dd9844fc..6b999313 100644 --- a/examples/multi-threaded/Makefile +++ b/examples/multi-threaded/Makefile @@ -1,18 +1,17 @@ PROG ?= example -CFLAGS += -DMG_ENABLE_SOCKETPAIR=1 -CDIR ?= $(realpath $(CURDIR)) ROOT ?= $(realpath $(CURDIR)/../..) -VC2017 = docker run --rm -e WINEDEBUG=-all -v $(ROOT):$(ROOT) -w $(CDIR) docker.io/mdashnet/vc2017 +CWD ?= $(realpath $(CURDIR)) +DOCKER ?= docker run --rm -it -e Tmp=. -e WINEDEBUG=-all -v $(ROOT):$(ROOT) -w $(CWD) all: $(PROG) - $(DEBUGGER) ./$(PROG) + $(RUN) ./$(PROG) $(PROG): $(CC) ../../mongoose.c -I../.. -pthread $(CFLAGS) -o $(PROG) main.c -vc2017: - $(VC2017) wine64 cl ../../mongoose.c main.c -I../.. $(CFLAGS) ws2_32.lib /Fe$@.exe - $(VC2017) wine64 $@.exe +$(PROG).exe: + $(DOCKER) mdashnet/vc98 wine cl ../../mongoose.c main.c -I../.. /MD ws2_32.lib /Fe$@ + $(DOCKER) mdashnet/vc98 wine $@ clean: rm -rf $(PROG) *.o *.dSYM *.gcov *.gcno *.gcda *.obj *.exe *.ilk *.pdb log.txt diff --git a/examples/multi-threaded/main.c b/examples/multi-threaded/main.c index d48a4c68..e2d26f84 100644 --- a/examples/multi-threaded/main.c +++ b/examples/multi-threaded/main.c @@ -6,8 +6,6 @@ // some time to simulate long processing time, produces an output and // hands over that output to the request handler function. // -// IMPORTANT: this program must be compiled with -DMG_ENABLE_SOCKETPAIR=1 -// // The following procedure is used to benchmark the multi-threaded codepath // against the single-threaded codepath on MacOS: // $ make clean all CFLAGS="-DSLEEP_TIME=0 -DMG_ENABLE_SOCKETPAIR=1" @@ -21,14 +19,8 @@ #include "mongoose.h" -// thread_function() sends this structure back to the request handler -struct response { - char *data; - int len; -}; - #ifndef SLEEP_TIME -#define SLEEP_TIME 3 // Seconds to sleep to simulate calculation +#define SLEEP_TIME 2 // Seconds to sleep to simulate calculation #endif static void start_thread(void (*f)(void *), void *p) { @@ -47,12 +39,13 @@ static void start_thread(void (*f)(void *), void *p) { } static void thread_function(void *param) { - int sock = (long) param; // Grab our blocking socket - struct response r = {strdup("hello\n"), 6}; // Create response - mg_usleep(SLEEP_TIME * 1000000); // Simulate long execution - LOG(LL_INFO, ("got sock %d", sock)); // For debugging - send(sock, (void *) &r, sizeof(r), 0); // Send to request handler - closesocket(sock); // Done, close socket, end thread + struct mg_connection *c = param; // Pipe connection + LOG(LL_INFO, ("Thread started, pipe %lu/%ld", c->id, (long) (size_t) c->fd)); + LOG(LL_INFO, ("Sleeping for %d sec...", SLEEP_TIME)); + mg_usleep(SLEEP_TIME * 1000000); // Simulate long execution + LOG(LL_INFO, ("Sending data...")); + mg_http_reply(c, 200, "Host: foo.com\r\n", "hi\n"); + mg_rmpipe(c); } // HTTP request callback @@ -61,46 +54,34 @@ static void cb(struct mg_connection *c, int ev, void *ev_data, void *fn_data) { // Incoming request. Create socket pair. // Pass blocking socket to the thread, and keep the non-blocking socket. struct mg_http_message *hm = (struct mg_http_message *) ev_data; - if (mg_http_match_uri(hm, "/fast")) { - // The /fast URI is for performance impact of the multithreading codepath - mg_printf(c, - "HTTP/1.1 200 OK\r\n" // Reply success - "Host: foo\r\n" // Mandatory header - "Content-Length: 3\r\n\r\n" // Set to allow keep-alive - "hi\n"); + // The /fast URI responds immediately without hitting a multi-threaded + // codepath. It is for measuing performance impact + mg_http_reply(c, 200, "Host: foo.com\r\n", "hi\n"); } else { - int blocking = -1, non_blocking = -1; - mg_socketpair(&blocking, &non_blocking); // Create connected pair - - // Pass blocking socket to the thread_function. - start_thread(thread_function, (void *) (long) blocking); - - // Non-blocking is ours. Store it in the fn_data, in - // order to use it in the subsequent invocations - c->fn_data = (void *) (long) non_blocking; - } - } else if (ev == MG_EV_POLL && c->fn_data != NULL) { - // On each poll iteration, try to receive response data - int sock = (int) (long) c->fn_data; - struct response response = {NULL, 0}; - if (recv(sock, (void *) &response, sizeof(response), 0) == - sizeof(response)) { - // Yeah! Got the response. - mg_printf(c, "HTTP/1.0 200 OK\r\nContent-Length: %d\r\n\r\n%.*s", - response.len, response.len, response.data); - free(response.data); // We can free produced data now - closesocket(sock); // And close our end of the socket pair - c->fn_data = NULL; + // Multithreading code path. Create "pipe" connection. + // Pipe connection is safe to pass to a different task/thread. + // Spawn a thread and pass created pipe connection to it. + // Save a receiving end of the pipe into c->fn_data, in order to + // close it when this (client) connection closes. + struct mg_connection *pc[2]; // pc[0]: send, pc[1]: recv + mg_mkpipe(c, pc); // Create pipe + start_thread(thread_function, pc[0]); // Spawn a task, pass pc[0] there + c->fn_data = pc[1]; // And save recv end for later } + } else if (ev == MG_EV_CLOSE) { + // Tell the receiving end of the pipe to close + struct mg_connection *pc = (struct mg_connection *) fn_data; + if (pc) pc->is_closing = 1, pc->fn_data = NULL; } } int main(void) { struct mg_mgr mgr; mg_mgr_init(&mgr); + mg_log_set("3"); mg_http_listen(&mgr, "http://localhost:8000", cb, NULL); - for (;;) mg_mgr_poll(&mgr, 50); + for (;;) mg_mgr_poll(&mgr, 1000); mg_mgr_free(&mgr); return 0; } diff --git a/mongoose.c b/mongoose.c index 84deb1ad..7181b9f5 100644 --- a/mongoose.c +++ b/mongoose.c @@ -879,9 +879,9 @@ int mg_http_get_var(const struct mg_str *buf, const char *name, char *dst, if ((p == buf->ptr || p[-1] == '&') && p[name_len] == '=' && !mg_ncasecmp(name, p, name_len)) { p += name_len + 1; - s = (const char *) memchr(p, '&', (size_t)(e - p)); + s = (const char *) memchr(p, '&', (size_t) (e - p)); if (s == NULL) s = e; - len = mg_url_decode(p, (size_t)(s - p), dst, dst_len, 1); + len = mg_url_decode(p, (size_t) (s - p), dst, dst_len, 1); if (len < 0) len = -3; // Failed to decode break; } @@ -929,7 +929,7 @@ static const char *skip(const char *s, const char *e, const char *d, struct mg_str *v) { v->ptr = s; while (s < e && *s != '\n' && strchr(d, *s) == NULL) s++; - v->len = (size_t)(s - v->ptr); + v->len = (size_t) (s - v->ptr); while (s < e && strchr(d, *s) != NULL) s++; return s; } @@ -986,8 +986,8 @@ int mg_http_parse(const char *s, size_t len, struct mg_http_message *hm) { // If URI contains '?' character, setup query string if ((qs = (const char *) memchr(hm->uri.ptr, '?', hm->uri.len)) != NULL) { hm->query.ptr = qs + 1; - hm->query.len = (size_t)(&hm->uri.ptr[hm->uri.len] - (qs + 1)); - hm->uri.len = (size_t)(qs - hm->uri.ptr); + hm->query.len = (size_t) (&hm->uri.ptr[hm->uri.len] - (qs + 1)); + hm->uri.len = (size_t) (qs - hm->uri.ptr); } mg_http_parse_headers(s, end, hm->headers, @@ -1123,7 +1123,7 @@ static const char *mg_http_status_code_str(int status_code) { void mg_http_reply(struct mg_connection *c, int code, const char *headers, const char *fmt, ...) { - char mem[100], *buf = mem; + char mem[256], *buf = mem; va_list ap; int len; va_start(ap, fmt); @@ -1582,7 +1582,7 @@ struct mg_str mg_http_get_header_var(struct mg_str s, struct mg_str v) { while (p < x && (q ? p == b || *p != '"' : *p != ';' && *p != ' ')) p++; // LOG(LL_INFO, ("[%.*s] [%.*s] [%.*s]", (int) s.len, s.ptr, (int) v.len, // v.ptr, (int) (p - b), b)); - return stripquotes(mg_str_n(b, (size_t)(p - b + q))); + return stripquotes(mg_str_n(b, (size_t) (p - b + q))); } } return mg_str_n(NULL, 0); @@ -1657,9 +1657,9 @@ void mg_http_delete_chunk(struct mg_connection *c, struct mg_http_message *hm) { } { const char *end = &ch.ptr[ch.len]; - size_t n = (size_t)(end - (char *) c->recv.buf); + size_t n = (size_t) (end - (char *) c->recv.buf); if (c->recv.len > n) { - memmove((char *) ch.ptr, end, (size_t)(c->recv.len - n)); + memmove((char *) ch.ptr, end, (size_t) (c->recv.len - n)); } // LOG(LL_INFO, ("DELETING CHUNK: %zu %zu %zu\n%.*s", c->recv.len, n, // ch.len, (int) ch.len, ch.ptr)); @@ -1675,7 +1675,7 @@ static void http_cb(struct mg_connection *c, int ev, void *evd, void *fnd) { bool is_chunked = n > 0 && mg_is_chunked(&hm); if (ev == MG_EV_CLOSE) { hm.message.len = c->recv.len; - hm.body.len = hm.message.len - (size_t)(hm.body.ptr - hm.message.ptr); + hm.body.len = hm.message.len - (size_t) (hm.body.ptr - hm.message.ptr); } else if (is_chunked && n > 0) { walkchunks(c, &hm, (size_t) n); } @@ -1777,7 +1777,7 @@ size_t mg_iobuf_append(struct mg_iobuf *io, const void *buf, size_t len, } size_t mg_iobuf_delete(struct mg_iobuf *io, size_t len) { - if (len > io->len) len = 0; + if (len > io->len) len = io->len; memmove(io->buf, io->buf + len, io->len - len); zeromem(io->buf + io->len - len, len); io->len -= len; @@ -2870,20 +2870,33 @@ union usa { #endif }; -static union usa tousa(struct mg_addr *a) { - union usa usa; - memset(&usa, 0, sizeof(usa)); - usa.sin.sin_family = AF_INET; - usa.sin.sin_port = a->port; - *(uint32_t *) &usa.sin.sin_addr = a->ip; +static socklen_t tousa(struct mg_addr *a, union usa *usa) { + socklen_t len = sizeof(usa->sin); + memset(usa, 0, sizeof(*usa)); + usa->sin.sin_family = AF_INET; + usa->sin.sin_port = a->port; + *(uint32_t *) &usa->sin.sin_addr = a->ip; #if MG_ENABLE_IPV6 if (a->is_ip6) { - usa.sin.sin_family = AF_INET6; - usa.sin6.sin6_port = a->port; - memcpy(&usa.sin6.sin6_addr, a->ip6, sizeof(a->ip6)); + usa->sin.sin_family = AF_INET6; + usa->sin6.sin6_port = a->port; + memcpy(&usa->sin6.sin6_addr, a->ip6, sizeof(a->ip6)); + len = sizeof(usa->sin6); + } +#endif + return len; +} + +static void tomgaddr(union usa *usa, struct mg_addr *a, bool is_ip6) { + a->is_ip6 = is_ip6; + a->port = usa->sin.sin_port; + memcpy(&a->ip, &usa->sin.sin_addr, sizeof(a->ip)); +#if MG_ENABLE_IPV6 + if (is_ip6) { + memcpy(a->ip6, &usa->sin6.sin6_addr, sizeof(a->ip6)); + a->port = usa->sin6.sin6_port; } #endif - return usa; } static bool mg_sock_would_block(void) { @@ -2913,11 +2926,8 @@ static struct mg_connection *alloc_conn(struct mg_mgr *mgr, bool is_client, static long mg_sock_send(struct mg_connection *c, const void *buf, size_t len) { long n = 0; if (c->is_udp) { - union usa usa = tousa(&c->peer); - socklen_t slen = sizeof(usa.sin); -#if MG_ENABLE_IPV6 - if (c->peer.is_ip6) slen = sizeof(usa.sin6); -#endif + union usa usa; + socklen_t slen = tousa(&c->peer, &usa); n = sendto(FD(c), (char *) buf, len, 0, &usa.sa, slen); } else { n = send(FD(c), (char *) buf, len, MSG_NONBLOCKING); @@ -2950,14 +2960,11 @@ SOCKET mg_open_listener(const char *url, struct mg_addr *addr) { if (!mg_aton(mg_url_host(url), addr)) { LOG(LL_ERROR, ("invalid listening URL: %s", url)); } else { - union usa usa = tousa(addr); - int on = 1, af = AF_INET; + union usa usa; + socklen_t slen = tousa(addr, &usa); + int on = 1, af = addr->is_ip6 ? AF_INET6 : AF_INET; int type = strncmp(url, "udp:", 4) == 0 ? SOCK_DGRAM : SOCK_STREAM; int proto = type == SOCK_DGRAM ? IPPROTO_UDP : IPPROTO_TCP; - socklen_t slen = sizeof(usa.sin); -#if MG_ENABLE_IPV6 - if (addr->is_ip6) af = AF_INET6, slen = sizeof(usa.sin6); -#endif if ((fd = socket(af, type, proto)) != INVALID_SOCKET && #if !defined(_WIN32) || !defined(SO_EXCLUSIVEADDRUSE) @@ -3004,22 +3011,9 @@ static long mg_sock_recv(struct mg_connection *c, void *buf, size_t len) { long n = 0; if (c->is_udp) { union usa usa; - socklen_t slen = sizeof(usa.sin); -#if MG_ENABLE_IPV6 - if (c->peer.is_ip6) slen = sizeof(usa.sin6); -#endif + socklen_t slen = tousa(&c->peer, &usa); n = recvfrom(FD(c), (char *) buf, len, 0, &usa.sa, &slen); - if (n > 0) { - if (c->peer.is_ip6) { -#if MG_ENABLE_IPV6 - memcpy(c->peer.ip6, &usa.sin6.sin6_addr, sizeof(c->peer.ip6)); - c->peer.port = usa.sin6.sin6_port; -#endif - } else { - c->peer.ip = *(uint32_t *) &usa.sin.sin_addr; - c->peer.port = usa.sin.sin_port; - } - } + if (n > 0) tomgaddr(&usa, &c->peer, slen != sizeof(usa.sin)); } else { n = recv(FD(c), (char *) buf, len, MSG_NONBLOCKING); } @@ -3158,11 +3152,8 @@ void mg_connect_resolved(struct mg_connection *c) { mg_set_non_blocking_mode(FD(c)); mg_call(c, MG_EV_RESOLVE, NULL); if (type == SOCK_STREAM) { - union usa usa = tousa(&c->peer); - socklen_t slen = sizeof(usa.sin); -#if MG_ENABLE_IPV6 - if (c->peer.is_ip6) slen = sizeof(usa.sin6); -#endif + union usa usa; + socklen_t slen = tousa(&c->peer, &usa); if ((rc = connect(FD(c), &usa.sa, slen)) == 0 || mg_sock_would_block()) { setsockopts(c); if (rc != 0) c->is_connecting = 1; @@ -3207,15 +3198,7 @@ static void accept_conn(struct mg_mgr *mgr, struct mg_connection *lsn) { closesocket(fd); } else { char buf[40]; - c->peer.port = usa.sin.sin_port; - memcpy(&c->peer.ip, &usa.sin.sin_addr, sizeof(c->peer.ip)); -#if MG_ENABLE_IPV6 - if (sa_len == sizeof(usa.sin6)) { - memcpy(c->peer.ip6, &usa.sin6.sin6_addr, sizeof(c->peer.ip6)); - c->peer.port = usa.sin6.sin6_port; - c->peer.is_ip6 = 1; - } -#endif + tomgaddr(&usa, &c->peer, sa_len != sizeof(usa.sin)); mg_straddr(c, buf, sizeof(buf)); LOG(LL_DEBUG, ("%lu accepted %s", c->id, buf)); mg_set_non_blocking_mode(FD(c)); @@ -3231,47 +3214,78 @@ static void accept_conn(struct mg_mgr *mgr, struct mg_connection *lsn) { } } -bool mg_socketpair(int *s1, int *s2) { -#if MG_ENABLE_NATIVE_SOCKETPAIR - // For some reason, native socketpair() call fails on Macos - // Enable this codepath only when MG_ENABLE_NATIVE_SOCKETPAIR is defined - int sp[2], ret = 0; - if (socketpair(AF_INET, SOCK_DGRAM, IPPROTO_UDP, sp) == 0) { - *s1 = sp[0], *s2 = sp[1], ret = 1; - } - LOG(LL_INFO, ("errno %d", errno)); - return ret; -#elif MG_ENABLE_SOCKETPAIR - union usa sa, sa2; - SOCKET sp[2] = {INVALID_SOCKET, INVALID_SOCKET}; - socklen_t len = sizeof(sa.sin); - int ret = 0; +static bool mg_socketpair(SOCKET sp[2], union usa usa[2]) { + socklen_t n = sizeof(usa[0].sin); + bool result = false; - (void) memset(&sa, 0, sizeof(sa)); - sa.sin.sin_family = AF_INET; - sa.sin.sin_addr.s_addr = htonl(0x7f000001); /* 127.0.0.1 */ - sa2 = sa; + (void) memset(&usa[0], 0, sizeof(usa[0])); + usa[0].sin.sin_family = AF_INET; + usa[0].sin.sin_addr.s_addr = htonl(0x7f000001); // 127.0.0.1 + usa[1] = usa[0]; if ((sp[0] = socket(AF_INET, SOCK_DGRAM, 0)) != INVALID_SOCKET && (sp[1] = socket(AF_INET, SOCK_DGRAM, 0)) != INVALID_SOCKET && - bind(sp[0], &sa.sa, len) == 0 && bind(sp[1], &sa2.sa, len) == 0 && - getsockname(sp[0], &sa.sa, &len) == 0 && - getsockname(sp[1], &sa2.sa, &len) == 0 && - connect(sp[0], &sa2.sa, len) == 0 && connect(sp[1], &sa.sa, len) == 0) { - mg_set_non_blocking_mode(sp[1]); - *s1 = sp[0]; - *s2 = sp[1]; - ret = 1; + bind(sp[0], &usa[0].sa, n) == 0 && bind(sp[1], &usa[1].sa, n) == 0 && + getsockname(sp[0], &usa[0].sa, &n) == 0 && + getsockname(sp[1], &usa[1].sa, &n) == 0 && + connect(sp[0], &usa[1].sa, n) == 0 && + connect(sp[1], &usa[0].sa, n) == 0) { + mg_set_non_blocking_mode(sp[1]); // Set close-on-exec + result = true; } else { if (sp[0] != INVALID_SOCKET) closesocket(sp[0]); if (sp[1] != INVALID_SOCKET) closesocket(sp[1]); + sp[0] = sp[1] = INVALID_SOCKET; } - return ret; -#else - *s1 = *s2 = INVALID_SOCKET; + return result; +} + +// Event handler function for the receiving end of the pipe connection +// Read data from another task and send it to the remote peer +static void pf1(struct mg_connection *c, int ev, void *ev_data, void *fn_data) { + struct mg_connection *cc = (struct mg_connection *) fn_data; + if (ev == MG_EV_READ) { + struct mg_str *s = (struct mg_str *) ev_data; + // LOG(LL_INFO, ("got %d [%.*s]", (int) s->len, (int) s->len, s->ptr)); + mg_send(cc, s->ptr, s->len); + c->recv.len = 0; + } else if (ev == MG_EV_CLOSE) { + if (cc) cc->is_draining = 1, cc->fn_data = NULL; + } +} + +void mg_rmpipe(struct mg_connection *c) { + LOG(LL_DEBUG, ("%lu send pipe closed", c->id)); + mg_send(c, "", 0); // Signal receiver, 0-length packet means we're done + closesocket(FD(c)); // We're not managed by c->mgr, close manually + free(c); // No buffers to clear - just free up +} + +bool mg_mkpipe(struct mg_connection *c, struct mg_connection *pc[2]) { + union usa usa[2]; + SOCKET sp[2] = {INVALID_SOCKET, INVALID_SOCKET}; + if (!mg_socketpair(sp, usa)) goto fail; + if ((pc[0] = alloc_conn(c->mgr, true, sp[0])) == NULL) goto fail; + if ((pc[1] = alloc_conn(c->mgr, false, sp[1])) == NULL) goto fail; + tomgaddr(&usa[0], &pc[1]->peer, false); + tomgaddr(&usa[1], &pc[0]->peer, false); + pc[0]->is_udp = 1; + pc[1]->is_udp = 1; + pc[1]->is_accepted = 1; + pc[1]->fn = pf1; + pc[1]->fn_data = c; + LIST_ADD_HEAD(struct mg_connection, &c->mgr->conns, pc[1]); + // LOG(LL_DEBUG, ("%lu/%ld %lu/%ld", pc[0]->id, (long) (size_t) pc[0]->fd, + // pc[1]->id, (long) (size_t) pc[1]->fd)); + return true; +fail: + free(pc[0]); + free(pc[1]); + if (sp[0] != INVALID_SOCKET) closesocket(sp[0]); + if (sp[1] != INVALID_SOCKET) closesocket(sp[1]); + pc[0] = pc[1] = NULL; return false; -#endif } struct mg_connection *mg_listen(struct mg_mgr *mgr, const char *url, diff --git a/mongoose.h b/mongoose.h index e25c10b5..0dfc51f6 100644 --- a/mongoose.h +++ b/mongoose.h @@ -62,14 +62,6 @@ extern "C" { #define MG_ENABLE_DIRLIST 0 #endif -#ifndef MG_ENABLE_SOCKETPAIR -#define MG_ENABLE_SOCKETPAIR 0 -#endif - -#ifndef MG_ENABLE_NATIVE_SOCKETPAIR -#define MG_ENABLE_NATIVE_SOCKETPAIR 0 -#endif - #ifndef MG_ENABLE_CUSTOM_RANDOM #define MG_ENABLE_CUSTOM_RANDOM 0 #endif @@ -767,10 +759,12 @@ bool mg_send(struct mg_connection *, const void *, size_t); int mg_printf(struct mg_connection *, const char *fmt, ...); int mg_vprintf(struct mg_connection *, const char *fmt, va_list ap); char *mg_straddr(struct mg_connection *, char *, size_t); -bool mg_socketpair(int *s1, int *s2); bool mg_aton(struct mg_str str, struct mg_addr *addr); char *mg_ntoa(const struct mg_addr *addr, char *buf, size_t len); +bool mg_mkpipe(struct mg_connection *c, struct mg_connection *pc[2]); +void mg_rmpipe(struct mg_connection *c); + diff --git a/src/config.h b/src/config.h index 32f06f82..a7c54913 100644 --- a/src/config.h +++ b/src/config.h @@ -37,14 +37,6 @@ #define MG_ENABLE_DIRLIST 0 #endif -#ifndef MG_ENABLE_SOCKETPAIR -#define MG_ENABLE_SOCKETPAIR 0 -#endif - -#ifndef MG_ENABLE_NATIVE_SOCKETPAIR -#define MG_ENABLE_NATIVE_SOCKETPAIR 0 -#endif - #ifndef MG_ENABLE_CUSTOM_RANDOM #define MG_ENABLE_CUSTOM_RANDOM 0 #endif diff --git a/src/http.c b/src/http.c index c9be9ef4..3aa784bb 100644 --- a/src/http.c +++ b/src/http.c @@ -108,9 +108,9 @@ int mg_http_get_var(const struct mg_str *buf, const char *name, char *dst, if ((p == buf->ptr || p[-1] == '&') && p[name_len] == '=' && !mg_ncasecmp(name, p, name_len)) { p += name_len + 1; - s = (const char *) memchr(p, '&', (size_t)(e - p)); + s = (const char *) memchr(p, '&', (size_t) (e - p)); if (s == NULL) s = e; - len = mg_url_decode(p, (size_t)(s - p), dst, dst_len, 1); + len = mg_url_decode(p, (size_t) (s - p), dst, dst_len, 1); if (len < 0) len = -3; // Failed to decode break; } @@ -158,7 +158,7 @@ static const char *skip(const char *s, const char *e, const char *d, struct mg_str *v) { v->ptr = s; while (s < e && *s != '\n' && strchr(d, *s) == NULL) s++; - v->len = (size_t)(s - v->ptr); + v->len = (size_t) (s - v->ptr); while (s < e && strchr(d, *s) != NULL) s++; return s; } @@ -215,8 +215,8 @@ int mg_http_parse(const char *s, size_t len, struct mg_http_message *hm) { // If URI contains '?' character, setup query string if ((qs = (const char *) memchr(hm->uri.ptr, '?', hm->uri.len)) != NULL) { hm->query.ptr = qs + 1; - hm->query.len = (size_t)(&hm->uri.ptr[hm->uri.len] - (qs + 1)); - hm->uri.len = (size_t)(qs - hm->uri.ptr); + hm->query.len = (size_t) (&hm->uri.ptr[hm->uri.len] - (qs + 1)); + hm->uri.len = (size_t) (qs - hm->uri.ptr); } mg_http_parse_headers(s, end, hm->headers, @@ -352,7 +352,7 @@ static const char *mg_http_status_code_str(int status_code) { void mg_http_reply(struct mg_connection *c, int code, const char *headers, const char *fmt, ...) { - char mem[100], *buf = mem; + char mem[256], *buf = mem; va_list ap; int len; va_start(ap, fmt); @@ -811,7 +811,7 @@ struct mg_str mg_http_get_header_var(struct mg_str s, struct mg_str v) { while (p < x && (q ? p == b || *p != '"' : *p != ';' && *p != ' ')) p++; // LOG(LL_INFO, ("[%.*s] [%.*s] [%.*s]", (int) s.len, s.ptr, (int) v.len, // v.ptr, (int) (p - b), b)); - return stripquotes(mg_str_n(b, (size_t)(p - b + q))); + return stripquotes(mg_str_n(b, (size_t) (p - b + q))); } } return mg_str_n(NULL, 0); @@ -886,9 +886,9 @@ void mg_http_delete_chunk(struct mg_connection *c, struct mg_http_message *hm) { } { const char *end = &ch.ptr[ch.len]; - size_t n = (size_t)(end - (char *) c->recv.buf); + size_t n = (size_t) (end - (char *) c->recv.buf); if (c->recv.len > n) { - memmove((char *) ch.ptr, end, (size_t)(c->recv.len - n)); + memmove((char *) ch.ptr, end, (size_t) (c->recv.len - n)); } // LOG(LL_INFO, ("DELETING CHUNK: %zu %zu %zu\n%.*s", c->recv.len, n, // ch.len, (int) ch.len, ch.ptr)); @@ -904,7 +904,7 @@ static void http_cb(struct mg_connection *c, int ev, void *evd, void *fnd) { bool is_chunked = n > 0 && mg_is_chunked(&hm); if (ev == MG_EV_CLOSE) { hm.message.len = c->recv.len; - hm.body.len = hm.message.len - (size_t)(hm.body.ptr - hm.message.ptr); + hm.body.len = hm.message.len - (size_t) (hm.body.ptr - hm.message.ptr); } else if (is_chunked && n > 0) { walkchunks(c, &hm, (size_t) n); } diff --git a/src/iobuf.c b/src/iobuf.c index 28d5ea64..141a8e32 100644 --- a/src/iobuf.c +++ b/src/iobuf.c @@ -57,7 +57,7 @@ size_t mg_iobuf_append(struct mg_iobuf *io, const void *buf, size_t len, } size_t mg_iobuf_delete(struct mg_iobuf *io, size_t len) { - if (len > io->len) len = 0; + if (len > io->len) len = io->len; memmove(io->buf, io->buf + len, io->len - len); zeromem(io->buf + io->len - len, len); io->len -= len; diff --git a/src/net.h b/src/net.h index c071d312..71d656c3 100644 --- a/src/net.h +++ b/src/net.h @@ -71,6 +71,8 @@ bool mg_send(struct mg_connection *, const void *, size_t); int mg_printf(struct mg_connection *, const char *fmt, ...); int mg_vprintf(struct mg_connection *, const char *fmt, va_list ap); char *mg_straddr(struct mg_connection *, char *, size_t); -bool mg_socketpair(int *s1, int *s2); bool mg_aton(struct mg_str str, struct mg_addr *addr); char *mg_ntoa(const struct mg_addr *addr, char *buf, size_t len); + +bool mg_mkpipe(struct mg_connection *c, struct mg_connection *pc[2]); +void mg_rmpipe(struct mg_connection *c); diff --git a/src/sock.c b/src/sock.c index 678e3a2c..bec31b8c 100644 --- a/src/sock.c +++ b/src/sock.c @@ -43,20 +43,33 @@ union usa { #endif }; -static union usa tousa(struct mg_addr *a) { - union usa usa; - memset(&usa, 0, sizeof(usa)); - usa.sin.sin_family = AF_INET; - usa.sin.sin_port = a->port; - *(uint32_t *) &usa.sin.sin_addr = a->ip; +static socklen_t tousa(struct mg_addr *a, union usa *usa) { + socklen_t len = sizeof(usa->sin); + memset(usa, 0, sizeof(*usa)); + usa->sin.sin_family = AF_INET; + usa->sin.sin_port = a->port; + *(uint32_t *) &usa->sin.sin_addr = a->ip; #if MG_ENABLE_IPV6 if (a->is_ip6) { - usa.sin.sin_family = AF_INET6; - usa.sin6.sin6_port = a->port; - memcpy(&usa.sin6.sin6_addr, a->ip6, sizeof(a->ip6)); + usa->sin.sin_family = AF_INET6; + usa->sin6.sin6_port = a->port; + memcpy(&usa->sin6.sin6_addr, a->ip6, sizeof(a->ip6)); + len = sizeof(usa->sin6); + } +#endif + return len; +} + +static void tomgaddr(union usa *usa, struct mg_addr *a, bool is_ip6) { + a->is_ip6 = is_ip6; + a->port = usa->sin.sin_port; + memcpy(&a->ip, &usa->sin.sin_addr, sizeof(a->ip)); +#if MG_ENABLE_IPV6 + if (is_ip6) { + memcpy(a->ip6, &usa->sin6.sin6_addr, sizeof(a->ip6)); + a->port = usa->sin6.sin6_port; } #endif - return usa; } static bool mg_sock_would_block(void) { @@ -86,11 +99,8 @@ static struct mg_connection *alloc_conn(struct mg_mgr *mgr, bool is_client, static long mg_sock_send(struct mg_connection *c, const void *buf, size_t len) { long n = 0; if (c->is_udp) { - union usa usa = tousa(&c->peer); - socklen_t slen = sizeof(usa.sin); -#if MG_ENABLE_IPV6 - if (c->peer.is_ip6) slen = sizeof(usa.sin6); -#endif + union usa usa; + socklen_t slen = tousa(&c->peer, &usa); n = sendto(FD(c), (char *) buf, len, 0, &usa.sa, slen); } else { n = send(FD(c), (char *) buf, len, MSG_NONBLOCKING); @@ -123,14 +133,11 @@ SOCKET mg_open_listener(const char *url, struct mg_addr *addr) { if (!mg_aton(mg_url_host(url), addr)) { LOG(LL_ERROR, ("invalid listening URL: %s", url)); } else { - union usa usa = tousa(addr); - int on = 1, af = AF_INET; + union usa usa; + socklen_t slen = tousa(addr, &usa); + int on = 1, af = addr->is_ip6 ? AF_INET6 : AF_INET; int type = strncmp(url, "udp:", 4) == 0 ? SOCK_DGRAM : SOCK_STREAM; int proto = type == SOCK_DGRAM ? IPPROTO_UDP : IPPROTO_TCP; - socklen_t slen = sizeof(usa.sin); -#if MG_ENABLE_IPV6 - if (addr->is_ip6) af = AF_INET6, slen = sizeof(usa.sin6); -#endif if ((fd = socket(af, type, proto)) != INVALID_SOCKET && #if !defined(_WIN32) || !defined(SO_EXCLUSIVEADDRUSE) @@ -177,22 +184,9 @@ static long mg_sock_recv(struct mg_connection *c, void *buf, size_t len) { long n = 0; if (c->is_udp) { union usa usa; - socklen_t slen = sizeof(usa.sin); -#if MG_ENABLE_IPV6 - if (c->peer.is_ip6) slen = sizeof(usa.sin6); -#endif + socklen_t slen = tousa(&c->peer, &usa); n = recvfrom(FD(c), (char *) buf, len, 0, &usa.sa, &slen); - if (n > 0) { - if (c->peer.is_ip6) { -#if MG_ENABLE_IPV6 - memcpy(c->peer.ip6, &usa.sin6.sin6_addr, sizeof(c->peer.ip6)); - c->peer.port = usa.sin6.sin6_port; -#endif - } else { - c->peer.ip = *(uint32_t *) &usa.sin.sin_addr; - c->peer.port = usa.sin.sin_port; - } - } + if (n > 0) tomgaddr(&usa, &c->peer, slen != sizeof(usa.sin)); } else { n = recv(FD(c), (char *) buf, len, MSG_NONBLOCKING); } @@ -331,11 +325,8 @@ void mg_connect_resolved(struct mg_connection *c) { mg_set_non_blocking_mode(FD(c)); mg_call(c, MG_EV_RESOLVE, NULL); if (type == SOCK_STREAM) { - union usa usa = tousa(&c->peer); - socklen_t slen = sizeof(usa.sin); -#if MG_ENABLE_IPV6 - if (c->peer.is_ip6) slen = sizeof(usa.sin6); -#endif + union usa usa; + socklen_t slen = tousa(&c->peer, &usa); if ((rc = connect(FD(c), &usa.sa, slen)) == 0 || mg_sock_would_block()) { setsockopts(c); if (rc != 0) c->is_connecting = 1; @@ -380,15 +371,7 @@ static void accept_conn(struct mg_mgr *mgr, struct mg_connection *lsn) { closesocket(fd); } else { char buf[40]; - c->peer.port = usa.sin.sin_port; - memcpy(&c->peer.ip, &usa.sin.sin_addr, sizeof(c->peer.ip)); -#if MG_ENABLE_IPV6 - if (sa_len == sizeof(usa.sin6)) { - memcpy(c->peer.ip6, &usa.sin6.sin6_addr, sizeof(c->peer.ip6)); - c->peer.port = usa.sin6.sin6_port; - c->peer.is_ip6 = 1; - } -#endif + tomgaddr(&usa, &c->peer, sa_len != sizeof(usa.sin)); mg_straddr(c, buf, sizeof(buf)); LOG(LL_DEBUG, ("%lu accepted %s", c->id, buf)); mg_set_non_blocking_mode(FD(c)); @@ -404,47 +387,78 @@ static void accept_conn(struct mg_mgr *mgr, struct mg_connection *lsn) { } } -bool mg_socketpair(int *s1, int *s2) { -#if MG_ENABLE_NATIVE_SOCKETPAIR - // For some reason, native socketpair() call fails on Macos - // Enable this codepath only when MG_ENABLE_NATIVE_SOCKETPAIR is defined - int sp[2], ret = 0; - if (socketpair(AF_INET, SOCK_DGRAM, IPPROTO_UDP, sp) == 0) { - *s1 = sp[0], *s2 = sp[1], ret = 1; - } - LOG(LL_INFO, ("errno %d", errno)); - return ret; -#elif MG_ENABLE_SOCKETPAIR - union usa sa, sa2; - SOCKET sp[2] = {INVALID_SOCKET, INVALID_SOCKET}; - socklen_t len = sizeof(sa.sin); - int ret = 0; +static bool mg_socketpair(SOCKET sp[2], union usa usa[2]) { + socklen_t n = sizeof(usa[0].sin); + bool result = false; - (void) memset(&sa, 0, sizeof(sa)); - sa.sin.sin_family = AF_INET; - sa.sin.sin_addr.s_addr = htonl(0x7f000001); /* 127.0.0.1 */ - sa2 = sa; + (void) memset(&usa[0], 0, sizeof(usa[0])); + usa[0].sin.sin_family = AF_INET; + usa[0].sin.sin_addr.s_addr = htonl(0x7f000001); // 127.0.0.1 + usa[1] = usa[0]; if ((sp[0] = socket(AF_INET, SOCK_DGRAM, 0)) != INVALID_SOCKET && (sp[1] = socket(AF_INET, SOCK_DGRAM, 0)) != INVALID_SOCKET && - bind(sp[0], &sa.sa, len) == 0 && bind(sp[1], &sa2.sa, len) == 0 && - getsockname(sp[0], &sa.sa, &len) == 0 && - getsockname(sp[1], &sa2.sa, &len) == 0 && - connect(sp[0], &sa2.sa, len) == 0 && connect(sp[1], &sa.sa, len) == 0) { - mg_set_non_blocking_mode(sp[1]); - *s1 = sp[0]; - *s2 = sp[1]; - ret = 1; + bind(sp[0], &usa[0].sa, n) == 0 && bind(sp[1], &usa[1].sa, n) == 0 && + getsockname(sp[0], &usa[0].sa, &n) == 0 && + getsockname(sp[1], &usa[1].sa, &n) == 0 && + connect(sp[0], &usa[1].sa, n) == 0 && + connect(sp[1], &usa[0].sa, n) == 0) { + mg_set_non_blocking_mode(sp[1]); // Set close-on-exec + result = true; } else { if (sp[0] != INVALID_SOCKET) closesocket(sp[0]); if (sp[1] != INVALID_SOCKET) closesocket(sp[1]); + sp[0] = sp[1] = INVALID_SOCKET; } - return ret; -#else - *s1 = *s2 = INVALID_SOCKET; + return result; +} + +// Event handler function for the receiving end of the pipe connection +// Read data from another task and send it to the remote peer +static void pf1(struct mg_connection *c, int ev, void *ev_data, void *fn_data) { + struct mg_connection *cc = (struct mg_connection *) fn_data; + if (ev == MG_EV_READ) { + struct mg_str *s = (struct mg_str *) ev_data; + // LOG(LL_INFO, ("got %d [%.*s]", (int) s->len, (int) s->len, s->ptr)); + mg_send(cc, s->ptr, s->len); + c->recv.len = 0; + } else if (ev == MG_EV_CLOSE) { + if (cc) cc->is_draining = 1, cc->fn_data = NULL; + } +} + +void mg_rmpipe(struct mg_connection *c) { + LOG(LL_DEBUG, ("%lu send pipe closed", c->id)); + mg_send(c, "", 0); // Signal receiver, 0-length packet means we're done + closesocket(FD(c)); // We're not managed by c->mgr, close manually + free(c); // No buffers to clear - just free up +} + +bool mg_mkpipe(struct mg_connection *c, struct mg_connection *pc[2]) { + union usa usa[2]; + SOCKET sp[2] = {INVALID_SOCKET, INVALID_SOCKET}; + if (!mg_socketpair(sp, usa)) goto fail; + if ((pc[0] = alloc_conn(c->mgr, true, sp[0])) == NULL) goto fail; + if ((pc[1] = alloc_conn(c->mgr, false, sp[1])) == NULL) goto fail; + tomgaddr(&usa[0], &pc[1]->peer, false); + tomgaddr(&usa[1], &pc[0]->peer, false); + pc[0]->is_udp = 1; + pc[1]->is_udp = 1; + pc[1]->is_accepted = 1; + pc[1]->fn = pf1; + pc[1]->fn_data = c; + LIST_ADD_HEAD(struct mg_connection, &c->mgr->conns, pc[1]); + // LOG(LL_DEBUG, ("%lu/%ld %lu/%ld", pc[0]->id, (long) (size_t) pc[0]->fd, + // pc[1]->id, (long) (size_t) pc[1]->fd)); + return true; +fail: + free(pc[0]); + free(pc[1]); + if (sp[0] != INVALID_SOCKET) closesocket(sp[0]); + if (sp[1] != INVALID_SOCKET) closesocket(sp[1]); + pc[0] = pc[1] = NULL; return false; -#endif } struct mg_connection *mg_listen(struct mg_mgr *mgr, const char *url, diff --git a/test/mongoose_custom.c b/test/mongoose_custom.c index 326f9b74..dbc7f1d8 100644 --- a/test/mongoose_custom.c +++ b/test/mongoose_custom.c @@ -42,5 +42,14 @@ bool mg_send(struct mg_connection *c, const void *buf, size_t len) { return 0; } +void mg_rmpipe(struct mg_connection *c) { + (void) c; +} + +bool mg_mkpipe(struct mg_connection *c, struct mg_connection *pc[2]) { + (void) c, (void) pc; + return false; +} + void _fini(void) { } diff --git a/test/unit_test.c b/test/unit_test.c index 0474b5b2..040c6232 100644 --- a/test/unit_test.c +++ b/test/unit_test.c @@ -437,8 +437,11 @@ static int fetch(struct mg_mgr *mgr, char *buf, const char *url, static int cmpbody(const char *buf, const char *str) { struct mg_http_message hm; - mg_http_parse(buf, strlen(buf), &hm); - return strncmp(hm.body.ptr, str, hm.body.len); + struct mg_str s = mg_str(str); + size_t len = strlen(buf); + mg_http_parse(buf, len, &hm); + if (hm.body.len > len) hm.body.len = len - (size_t) (hm.body.ptr - buf); + return mg_strcmp(hm.body, s); } static void wcb(struct mg_connection *c, int ev, void *ev_data, void *fn_data) { @@ -570,7 +573,7 @@ static void test_http_server(void) { { char *data = mg_file_read("./test/data/ca.pem", NULL); ASSERT(fetch(&mgr, buf, url, "GET /ca.pem HTTP/1.0\r\n\n") == 200); - ASSERT(cmpbody(data, buf) == 0); + ASSERT(cmpbody(buf, data) == 0); free(data); } @@ -1407,8 +1410,35 @@ static void test_packed(void) { ASSERT(mgr.conns == NULL); } +static void eh6(struct mg_connection *c, int ev, void *ev_data, void *fn_data) { + if (ev == MG_EV_HTTP_MSG) { + struct mg_http_message *hm = (struct mg_http_message *) ev_data; + struct mg_connection *pc[2]; + mg_mkpipe(c, pc); + mg_http_reply(pc[0], 200, "", "hi, %.*s", (int) hm->uri.len, hm->uri.ptr); + mg_rmpipe(pc[0]); + c->fn_data = pc[1]; + } else if (ev == MG_EV_CLOSE) { + struct mg_connection *pc = (struct mg_connection *) fn_data; + if (pc) pc->is_closing = 1, pc->fn_data = NULL; + } +} + +static void test_pipe(void) { + struct mg_mgr mgr; + const char *url = "http://127.0.0.1:12352"; + char buf[FETCH_BUF_SIZE]; + mg_mgr_init(&mgr); + mg_http_listen(&mgr, url, eh6, NULL); + ASSERT(fetch(&mgr, buf, url, "GET /foo HTTP/1.0\n\n") == 200); + ASSERT(cmpbody(buf, "hi, /foo") == 0); + mg_mgr_free(&mgr); + ASSERT(mgr.conns == NULL); +} + int main(void) { mg_log_set("3"); + test_pipe(); test_packed(); test_crc32(); test_multipart();