Better multithreading support: remove mg_socketpair, add mg_mkpipe() and mg_rmpipe()

This commit is contained in:
Sergey Lyubka 2021-08-07 17:22:47 +01:00
parent 86f43cd8d6
commit b7ce8213d7
12 changed files with 309 additions and 265 deletions

View File

@ -240,8 +240,6 @@ Here is a list of build constants and their default values:
|MG_ENABLE_IPV6 | 0 | Enable IPv6 |
|MG_ENABLE_LOG | 1 | Enable `LOG()` macro |
|MG_ENABLE_MD5 | 0 | Use native MD5 implementation |
|MG_ENABLE_SOCKETPAIR | 0 | Enable `mg_socketpair()` for multi-threading |
|MG_ENABLE_NATIVE_SOCKETPAIR | 0 | Use native `socketpair()` syscall for `mg_socketpair()`|
|MG_ENABLE_SSI | 1 | Enable serving SSI files by `mg_http_serve_dir()` |
|MG_ENABLE_DIRLIST | 0 | Enable directory listing |
|MG_ENABLE_CUSTOM_RANDOM | 0 | Provide custom RNG function `mg_random()` |
@ -524,16 +522,27 @@ int mg_vprintf(struct mg_connection *, const char *fmt, va_list ap);
Same as `mg_printf()`, but takes `va_list` argument as a parameter.
### mg\_socketpair()
### mg\_mkpipe()
```c
bool mg_socketpair(int *blocking, int *non_blocking);
bool mg_mkpipe(struct mg_connection *c, struct mg_connection *pc[2]);
void mg_rmpipe(struct mg_connection *c);
```
Create a socket pair for exchanging data in multi-threaded environment. The
`blocking` socket is blocking - it should be passed to the processing task.
The `non_blocking` socket is non blocking, it should be used by an event
handler function. Return value: true on success, false on error.
Create a pair of connected connections using UDP socketpair.
A sending connection, `pc[0]`, is safe to give to a different task,
and send data to it. A receiving connection `pc[1]`, forwards all received
data to `c`. A `pc[0]` is not added to event manager, therefore it must be
manually cleaned up by calling `mg_rmpipe()` when sending task is done.
A receiving side, `pc[1]` is added to the event manager, therefore it wakes
up a manager each time data gets sent.
NOTE: if there is a limit on the local UDP message size, do not send more
than that limit in one call.
See examples/multi-threaded for a usage example.
Return value: true on success, false on error.
## IO buffers

View File

@ -1,18 +1,17 @@
PROG ?= example
CFLAGS += -DMG_ENABLE_SOCKETPAIR=1
CDIR ?= $(realpath $(CURDIR))
ROOT ?= $(realpath $(CURDIR)/../..)
VC2017 = docker run --rm -e WINEDEBUG=-all -v $(ROOT):$(ROOT) -w $(CDIR) docker.io/mdashnet/vc2017
CWD ?= $(realpath $(CURDIR))
DOCKER ?= docker run --rm -it -e Tmp=. -e WINEDEBUG=-all -v $(ROOT):$(ROOT) -w $(CWD)
all: $(PROG)
$(DEBUGGER) ./$(PROG)
$(RUN) ./$(PROG)
$(PROG):
$(CC) ../../mongoose.c -I../.. -pthread $(CFLAGS) -o $(PROG) main.c
vc2017:
$(VC2017) wine64 cl ../../mongoose.c main.c -I../.. $(CFLAGS) ws2_32.lib /Fe$@.exe
$(VC2017) wine64 $@.exe
$(PROG).exe:
$(DOCKER) mdashnet/vc98 wine cl ../../mongoose.c main.c -I../.. /MD ws2_32.lib /Fe$@
$(DOCKER) mdashnet/vc98 wine $@
clean:
rm -rf $(PROG) *.o *.dSYM *.gcov *.gcno *.gcda *.obj *.exe *.ilk *.pdb log.txt

View File

@ -6,8 +6,6 @@
// some time to simulate long processing time, produces an output and
// hands over that output to the request handler function.
//
// IMPORTANT: this program must be compiled with -DMG_ENABLE_SOCKETPAIR=1
//
// The following procedure is used to benchmark the multi-threaded codepath
// against the single-threaded codepath on MacOS:
// $ make clean all CFLAGS="-DSLEEP_TIME=0 -DMG_ENABLE_SOCKETPAIR=1"
@ -21,14 +19,8 @@
#include "mongoose.h"
// thread_function() sends this structure back to the request handler
struct response {
char *data;
int len;
};
#ifndef SLEEP_TIME
#define SLEEP_TIME 3 // Seconds to sleep to simulate calculation
#define SLEEP_TIME 2 // Seconds to sleep to simulate calculation
#endif
static void start_thread(void (*f)(void *), void *p) {
@ -47,12 +39,13 @@ static void start_thread(void (*f)(void *), void *p) {
}
static void thread_function(void *param) {
int sock = (long) param; // Grab our blocking socket
struct response r = {strdup("hello\n"), 6}; // Create response
struct mg_connection *c = param; // Pipe connection
LOG(LL_INFO, ("Thread started, pipe %lu/%ld", c->id, (long) (size_t) c->fd));
LOG(LL_INFO, ("Sleeping for %d sec...", SLEEP_TIME));
mg_usleep(SLEEP_TIME * 1000000); // Simulate long execution
LOG(LL_INFO, ("got sock %d", sock)); // For debugging
send(sock, (void *) &r, sizeof(r), 0); // Send to request handler
closesocket(sock); // Done, close socket, end thread
LOG(LL_INFO, ("Sending data..."));
mg_http_reply(c, 200, "Host: foo.com\r\n", "hi\n");
mg_rmpipe(c);
}
// HTTP request callback
@ -61,46 +54,34 @@ static void cb(struct mg_connection *c, int ev, void *ev_data, void *fn_data) {
// Incoming request. Create socket pair.
// Pass blocking socket to the thread, and keep the non-blocking socket.
struct mg_http_message *hm = (struct mg_http_message *) ev_data;
if (mg_http_match_uri(hm, "/fast")) {
// The /fast URI is for performance impact of the multithreading codepath
mg_printf(c,
"HTTP/1.1 200 OK\r\n" // Reply success
"Host: foo\r\n" // Mandatory header
"Content-Length: 3\r\n\r\n" // Set to allow keep-alive
"hi\n");
// The /fast URI responds immediately without hitting a multi-threaded
// codepath. It is for measuing performance impact
mg_http_reply(c, 200, "Host: foo.com\r\n", "hi\n");
} else {
int blocking = -1, non_blocking = -1;
mg_socketpair(&blocking, &non_blocking); // Create connected pair
// Pass blocking socket to the thread_function.
start_thread(thread_function, (void *) (long) blocking);
// Non-blocking is ours. Store it in the fn_data, in
// order to use it in the subsequent invocations
c->fn_data = (void *) (long) non_blocking;
}
} else if (ev == MG_EV_POLL && c->fn_data != NULL) {
// On each poll iteration, try to receive response data
int sock = (int) (long) c->fn_data;
struct response response = {NULL, 0};
if (recv(sock, (void *) &response, sizeof(response), 0) ==
sizeof(response)) {
// Yeah! Got the response.
mg_printf(c, "HTTP/1.0 200 OK\r\nContent-Length: %d\r\n\r\n%.*s",
response.len, response.len, response.data);
free(response.data); // We can free produced data now
closesocket(sock); // And close our end of the socket pair
c->fn_data = NULL;
// Multithreading code path. Create "pipe" connection.
// Pipe connection is safe to pass to a different task/thread.
// Spawn a thread and pass created pipe connection to it.
// Save a receiving end of the pipe into c->fn_data, in order to
// close it when this (client) connection closes.
struct mg_connection *pc[2]; // pc[0]: send, pc[1]: recv
mg_mkpipe(c, pc); // Create pipe
start_thread(thread_function, pc[0]); // Spawn a task, pass pc[0] there
c->fn_data = pc[1]; // And save recv end for later
}
} else if (ev == MG_EV_CLOSE) {
// Tell the receiving end of the pipe to close
struct mg_connection *pc = (struct mg_connection *) fn_data;
if (pc) pc->is_closing = 1, pc->fn_data = NULL;
}
}
int main(void) {
struct mg_mgr mgr;
mg_mgr_init(&mgr);
mg_log_set("3");
mg_http_listen(&mgr, "http://localhost:8000", cb, NULL);
for (;;) mg_mgr_poll(&mgr, 50);
for (;;) mg_mgr_poll(&mgr, 1000);
mg_mgr_free(&mgr);
return 0;
}

View File

@ -1123,7 +1123,7 @@ static const char *mg_http_status_code_str(int status_code) {
void mg_http_reply(struct mg_connection *c, int code, const char *headers,
const char *fmt, ...) {
char mem[100], *buf = mem;
char mem[256], *buf = mem;
va_list ap;
int len;
va_start(ap, fmt);
@ -1777,7 +1777,7 @@ size_t mg_iobuf_append(struct mg_iobuf *io, const void *buf, size_t len,
}
size_t mg_iobuf_delete(struct mg_iobuf *io, size_t len) {
if (len > io->len) len = 0;
if (len > io->len) len = io->len;
memmove(io->buf, io->buf + len, io->len - len);
zeromem(io->buf + io->len - len, len);
io->len -= len;
@ -2870,20 +2870,33 @@ union usa {
#endif
};
static union usa tousa(struct mg_addr *a) {
union usa usa;
memset(&usa, 0, sizeof(usa));
usa.sin.sin_family = AF_INET;
usa.sin.sin_port = a->port;
*(uint32_t *) &usa.sin.sin_addr = a->ip;
static socklen_t tousa(struct mg_addr *a, union usa *usa) {
socklen_t len = sizeof(usa->sin);
memset(usa, 0, sizeof(*usa));
usa->sin.sin_family = AF_INET;
usa->sin.sin_port = a->port;
*(uint32_t *) &usa->sin.sin_addr = a->ip;
#if MG_ENABLE_IPV6
if (a->is_ip6) {
usa.sin.sin_family = AF_INET6;
usa.sin6.sin6_port = a->port;
memcpy(&usa.sin6.sin6_addr, a->ip6, sizeof(a->ip6));
usa->sin.sin_family = AF_INET6;
usa->sin6.sin6_port = a->port;
memcpy(&usa->sin6.sin6_addr, a->ip6, sizeof(a->ip6));
len = sizeof(usa->sin6);
}
#endif
return len;
}
static void tomgaddr(union usa *usa, struct mg_addr *a, bool is_ip6) {
a->is_ip6 = is_ip6;
a->port = usa->sin.sin_port;
memcpy(&a->ip, &usa->sin.sin_addr, sizeof(a->ip));
#if MG_ENABLE_IPV6
if (is_ip6) {
memcpy(a->ip6, &usa->sin6.sin6_addr, sizeof(a->ip6));
a->port = usa->sin6.sin6_port;
}
#endif
return usa;
}
static bool mg_sock_would_block(void) {
@ -2913,11 +2926,8 @@ static struct mg_connection *alloc_conn(struct mg_mgr *mgr, bool is_client,
static long mg_sock_send(struct mg_connection *c, const void *buf, size_t len) {
long n = 0;
if (c->is_udp) {
union usa usa = tousa(&c->peer);
socklen_t slen = sizeof(usa.sin);
#if MG_ENABLE_IPV6
if (c->peer.is_ip6) slen = sizeof(usa.sin6);
#endif
union usa usa;
socklen_t slen = tousa(&c->peer, &usa);
n = sendto(FD(c), (char *) buf, len, 0, &usa.sa, slen);
} else {
n = send(FD(c), (char *) buf, len, MSG_NONBLOCKING);
@ -2950,14 +2960,11 @@ SOCKET mg_open_listener(const char *url, struct mg_addr *addr) {
if (!mg_aton(mg_url_host(url), addr)) {
LOG(LL_ERROR, ("invalid listening URL: %s", url));
} else {
union usa usa = tousa(addr);
int on = 1, af = AF_INET;
union usa usa;
socklen_t slen = tousa(addr, &usa);
int on = 1, af = addr->is_ip6 ? AF_INET6 : AF_INET;
int type = strncmp(url, "udp:", 4) == 0 ? SOCK_DGRAM : SOCK_STREAM;
int proto = type == SOCK_DGRAM ? IPPROTO_UDP : IPPROTO_TCP;
socklen_t slen = sizeof(usa.sin);
#if MG_ENABLE_IPV6
if (addr->is_ip6) af = AF_INET6, slen = sizeof(usa.sin6);
#endif
if ((fd = socket(af, type, proto)) != INVALID_SOCKET &&
#if !defined(_WIN32) || !defined(SO_EXCLUSIVEADDRUSE)
@ -3004,22 +3011,9 @@ static long mg_sock_recv(struct mg_connection *c, void *buf, size_t len) {
long n = 0;
if (c->is_udp) {
union usa usa;
socklen_t slen = sizeof(usa.sin);
#if MG_ENABLE_IPV6
if (c->peer.is_ip6) slen = sizeof(usa.sin6);
#endif
socklen_t slen = tousa(&c->peer, &usa);
n = recvfrom(FD(c), (char *) buf, len, 0, &usa.sa, &slen);
if (n > 0) {
if (c->peer.is_ip6) {
#if MG_ENABLE_IPV6
memcpy(c->peer.ip6, &usa.sin6.sin6_addr, sizeof(c->peer.ip6));
c->peer.port = usa.sin6.sin6_port;
#endif
} else {
c->peer.ip = *(uint32_t *) &usa.sin.sin_addr;
c->peer.port = usa.sin.sin_port;
}
}
if (n > 0) tomgaddr(&usa, &c->peer, slen != sizeof(usa.sin));
} else {
n = recv(FD(c), (char *) buf, len, MSG_NONBLOCKING);
}
@ -3158,11 +3152,8 @@ void mg_connect_resolved(struct mg_connection *c) {
mg_set_non_blocking_mode(FD(c));
mg_call(c, MG_EV_RESOLVE, NULL);
if (type == SOCK_STREAM) {
union usa usa = tousa(&c->peer);
socklen_t slen = sizeof(usa.sin);
#if MG_ENABLE_IPV6
if (c->peer.is_ip6) slen = sizeof(usa.sin6);
#endif
union usa usa;
socklen_t slen = tousa(&c->peer, &usa);
if ((rc = connect(FD(c), &usa.sa, slen)) == 0 || mg_sock_would_block()) {
setsockopts(c);
if (rc != 0) c->is_connecting = 1;
@ -3207,15 +3198,7 @@ static void accept_conn(struct mg_mgr *mgr, struct mg_connection *lsn) {
closesocket(fd);
} else {
char buf[40];
c->peer.port = usa.sin.sin_port;
memcpy(&c->peer.ip, &usa.sin.sin_addr, sizeof(c->peer.ip));
#if MG_ENABLE_IPV6
if (sa_len == sizeof(usa.sin6)) {
memcpy(c->peer.ip6, &usa.sin6.sin6_addr, sizeof(c->peer.ip6));
c->peer.port = usa.sin6.sin6_port;
c->peer.is_ip6 = 1;
}
#endif
tomgaddr(&usa, &c->peer, sa_len != sizeof(usa.sin));
mg_straddr(c, buf, sizeof(buf));
LOG(LL_DEBUG, ("%lu accepted %s", c->id, buf));
mg_set_non_blocking_mode(FD(c));
@ -3231,47 +3214,78 @@ static void accept_conn(struct mg_mgr *mgr, struct mg_connection *lsn) {
}
}
bool mg_socketpair(int *s1, int *s2) {
#if MG_ENABLE_NATIVE_SOCKETPAIR
// For some reason, native socketpair() call fails on Macos
// Enable this codepath only when MG_ENABLE_NATIVE_SOCKETPAIR is defined
int sp[2], ret = 0;
if (socketpair(AF_INET, SOCK_DGRAM, IPPROTO_UDP, sp) == 0) {
*s1 = sp[0], *s2 = sp[1], ret = 1;
}
LOG(LL_INFO, ("errno %d", errno));
return ret;
#elif MG_ENABLE_SOCKETPAIR
union usa sa, sa2;
SOCKET sp[2] = {INVALID_SOCKET, INVALID_SOCKET};
socklen_t len = sizeof(sa.sin);
int ret = 0;
static bool mg_socketpair(SOCKET sp[2], union usa usa[2]) {
socklen_t n = sizeof(usa[0].sin);
bool result = false;
(void) memset(&sa, 0, sizeof(sa));
sa.sin.sin_family = AF_INET;
sa.sin.sin_addr.s_addr = htonl(0x7f000001); /* 127.0.0.1 */
sa2 = sa;
(void) memset(&usa[0], 0, sizeof(usa[0]));
usa[0].sin.sin_family = AF_INET;
usa[0].sin.sin_addr.s_addr = htonl(0x7f000001); // 127.0.0.1
usa[1] = usa[0];
if ((sp[0] = socket(AF_INET, SOCK_DGRAM, 0)) != INVALID_SOCKET &&
(sp[1] = socket(AF_INET, SOCK_DGRAM, 0)) != INVALID_SOCKET &&
bind(sp[0], &sa.sa, len) == 0 && bind(sp[1], &sa2.sa, len) == 0 &&
getsockname(sp[0], &sa.sa, &len) == 0 &&
getsockname(sp[1], &sa2.sa, &len) == 0 &&
connect(sp[0], &sa2.sa, len) == 0 && connect(sp[1], &sa.sa, len) == 0) {
mg_set_non_blocking_mode(sp[1]);
*s1 = sp[0];
*s2 = sp[1];
ret = 1;
bind(sp[0], &usa[0].sa, n) == 0 && bind(sp[1], &usa[1].sa, n) == 0 &&
getsockname(sp[0], &usa[0].sa, &n) == 0 &&
getsockname(sp[1], &usa[1].sa, &n) == 0 &&
connect(sp[0], &usa[1].sa, n) == 0 &&
connect(sp[1], &usa[0].sa, n) == 0) {
mg_set_non_blocking_mode(sp[1]); // Set close-on-exec
result = true;
} else {
if (sp[0] != INVALID_SOCKET) closesocket(sp[0]);
if (sp[1] != INVALID_SOCKET) closesocket(sp[1]);
sp[0] = sp[1] = INVALID_SOCKET;
}
return ret;
#else
*s1 = *s2 = INVALID_SOCKET;
return result;
}
// Event handler function for the receiving end of the pipe connection
// Read data from another task and send it to the remote peer
static void pf1(struct mg_connection *c, int ev, void *ev_data, void *fn_data) {
struct mg_connection *cc = (struct mg_connection *) fn_data;
if (ev == MG_EV_READ) {
struct mg_str *s = (struct mg_str *) ev_data;
// LOG(LL_INFO, ("got %d [%.*s]", (int) s->len, (int) s->len, s->ptr));
mg_send(cc, s->ptr, s->len);
c->recv.len = 0;
} else if (ev == MG_EV_CLOSE) {
if (cc) cc->is_draining = 1, cc->fn_data = NULL;
}
}
void mg_rmpipe(struct mg_connection *c) {
LOG(LL_DEBUG, ("%lu send pipe closed", c->id));
mg_send(c, "", 0); // Signal receiver, 0-length packet means we're done
closesocket(FD(c)); // We're not managed by c->mgr, close manually
free(c); // No buffers to clear - just free up
}
bool mg_mkpipe(struct mg_connection *c, struct mg_connection *pc[2]) {
union usa usa[2];
SOCKET sp[2] = {INVALID_SOCKET, INVALID_SOCKET};
if (!mg_socketpair(sp, usa)) goto fail;
if ((pc[0] = alloc_conn(c->mgr, true, sp[0])) == NULL) goto fail;
if ((pc[1] = alloc_conn(c->mgr, false, sp[1])) == NULL) goto fail;
tomgaddr(&usa[0], &pc[1]->peer, false);
tomgaddr(&usa[1], &pc[0]->peer, false);
pc[0]->is_udp = 1;
pc[1]->is_udp = 1;
pc[1]->is_accepted = 1;
pc[1]->fn = pf1;
pc[1]->fn_data = c;
LIST_ADD_HEAD(struct mg_connection, &c->mgr->conns, pc[1]);
// LOG(LL_DEBUG, ("%lu/%ld %lu/%ld", pc[0]->id, (long) (size_t) pc[0]->fd,
// pc[1]->id, (long) (size_t) pc[1]->fd));
return true;
fail:
free(pc[0]);
free(pc[1]);
if (sp[0] != INVALID_SOCKET) closesocket(sp[0]);
if (sp[1] != INVALID_SOCKET) closesocket(sp[1]);
pc[0] = pc[1] = NULL;
return false;
#endif
}
struct mg_connection *mg_listen(struct mg_mgr *mgr, const char *url,

View File

@ -62,14 +62,6 @@ extern "C" {
#define MG_ENABLE_DIRLIST 0
#endif
#ifndef MG_ENABLE_SOCKETPAIR
#define MG_ENABLE_SOCKETPAIR 0
#endif
#ifndef MG_ENABLE_NATIVE_SOCKETPAIR
#define MG_ENABLE_NATIVE_SOCKETPAIR 0
#endif
#ifndef MG_ENABLE_CUSTOM_RANDOM
#define MG_ENABLE_CUSTOM_RANDOM 0
#endif
@ -767,10 +759,12 @@ bool mg_send(struct mg_connection *, const void *, size_t);
int mg_printf(struct mg_connection *, const char *fmt, ...);
int mg_vprintf(struct mg_connection *, const char *fmt, va_list ap);
char *mg_straddr(struct mg_connection *, char *, size_t);
bool mg_socketpair(int *s1, int *s2);
bool mg_aton(struct mg_str str, struct mg_addr *addr);
char *mg_ntoa(const struct mg_addr *addr, char *buf, size_t len);
bool mg_mkpipe(struct mg_connection *c, struct mg_connection *pc[2]);
void mg_rmpipe(struct mg_connection *c);

View File

@ -37,14 +37,6 @@
#define MG_ENABLE_DIRLIST 0
#endif
#ifndef MG_ENABLE_SOCKETPAIR
#define MG_ENABLE_SOCKETPAIR 0
#endif
#ifndef MG_ENABLE_NATIVE_SOCKETPAIR
#define MG_ENABLE_NATIVE_SOCKETPAIR 0
#endif
#ifndef MG_ENABLE_CUSTOM_RANDOM
#define MG_ENABLE_CUSTOM_RANDOM 0
#endif

View File

@ -352,7 +352,7 @@ static const char *mg_http_status_code_str(int status_code) {
void mg_http_reply(struct mg_connection *c, int code, const char *headers,
const char *fmt, ...) {
char mem[100], *buf = mem;
char mem[256], *buf = mem;
va_list ap;
int len;
va_start(ap, fmt);

View File

@ -57,7 +57,7 @@ size_t mg_iobuf_append(struct mg_iobuf *io, const void *buf, size_t len,
}
size_t mg_iobuf_delete(struct mg_iobuf *io, size_t len) {
if (len > io->len) len = 0;
if (len > io->len) len = io->len;
memmove(io->buf, io->buf + len, io->len - len);
zeromem(io->buf + io->len - len, len);
io->len -= len;

View File

@ -71,6 +71,8 @@ bool mg_send(struct mg_connection *, const void *, size_t);
int mg_printf(struct mg_connection *, const char *fmt, ...);
int mg_vprintf(struct mg_connection *, const char *fmt, va_list ap);
char *mg_straddr(struct mg_connection *, char *, size_t);
bool mg_socketpair(int *s1, int *s2);
bool mg_aton(struct mg_str str, struct mg_addr *addr);
char *mg_ntoa(const struct mg_addr *addr, char *buf, size_t len);
bool mg_mkpipe(struct mg_connection *c, struct mg_connection *pc[2]);
void mg_rmpipe(struct mg_connection *c);

View File

@ -43,20 +43,33 @@ union usa {
#endif
};
static union usa tousa(struct mg_addr *a) {
union usa usa;
memset(&usa, 0, sizeof(usa));
usa.sin.sin_family = AF_INET;
usa.sin.sin_port = a->port;
*(uint32_t *) &usa.sin.sin_addr = a->ip;
static socklen_t tousa(struct mg_addr *a, union usa *usa) {
socklen_t len = sizeof(usa->sin);
memset(usa, 0, sizeof(*usa));
usa->sin.sin_family = AF_INET;
usa->sin.sin_port = a->port;
*(uint32_t *) &usa->sin.sin_addr = a->ip;
#if MG_ENABLE_IPV6
if (a->is_ip6) {
usa.sin.sin_family = AF_INET6;
usa.sin6.sin6_port = a->port;
memcpy(&usa.sin6.sin6_addr, a->ip6, sizeof(a->ip6));
usa->sin.sin_family = AF_INET6;
usa->sin6.sin6_port = a->port;
memcpy(&usa->sin6.sin6_addr, a->ip6, sizeof(a->ip6));
len = sizeof(usa->sin6);
}
#endif
return len;
}
static void tomgaddr(union usa *usa, struct mg_addr *a, bool is_ip6) {
a->is_ip6 = is_ip6;
a->port = usa->sin.sin_port;
memcpy(&a->ip, &usa->sin.sin_addr, sizeof(a->ip));
#if MG_ENABLE_IPV6
if (is_ip6) {
memcpy(a->ip6, &usa->sin6.sin6_addr, sizeof(a->ip6));
a->port = usa->sin6.sin6_port;
}
#endif
return usa;
}
static bool mg_sock_would_block(void) {
@ -86,11 +99,8 @@ static struct mg_connection *alloc_conn(struct mg_mgr *mgr, bool is_client,
static long mg_sock_send(struct mg_connection *c, const void *buf, size_t len) {
long n = 0;
if (c->is_udp) {
union usa usa = tousa(&c->peer);
socklen_t slen = sizeof(usa.sin);
#if MG_ENABLE_IPV6
if (c->peer.is_ip6) slen = sizeof(usa.sin6);
#endif
union usa usa;
socklen_t slen = tousa(&c->peer, &usa);
n = sendto(FD(c), (char *) buf, len, 0, &usa.sa, slen);
} else {
n = send(FD(c), (char *) buf, len, MSG_NONBLOCKING);
@ -123,14 +133,11 @@ SOCKET mg_open_listener(const char *url, struct mg_addr *addr) {
if (!mg_aton(mg_url_host(url), addr)) {
LOG(LL_ERROR, ("invalid listening URL: %s", url));
} else {
union usa usa = tousa(addr);
int on = 1, af = AF_INET;
union usa usa;
socklen_t slen = tousa(addr, &usa);
int on = 1, af = addr->is_ip6 ? AF_INET6 : AF_INET;
int type = strncmp(url, "udp:", 4) == 0 ? SOCK_DGRAM : SOCK_STREAM;
int proto = type == SOCK_DGRAM ? IPPROTO_UDP : IPPROTO_TCP;
socklen_t slen = sizeof(usa.sin);
#if MG_ENABLE_IPV6
if (addr->is_ip6) af = AF_INET6, slen = sizeof(usa.sin6);
#endif
if ((fd = socket(af, type, proto)) != INVALID_SOCKET &&
#if !defined(_WIN32) || !defined(SO_EXCLUSIVEADDRUSE)
@ -177,22 +184,9 @@ static long mg_sock_recv(struct mg_connection *c, void *buf, size_t len) {
long n = 0;
if (c->is_udp) {
union usa usa;
socklen_t slen = sizeof(usa.sin);
#if MG_ENABLE_IPV6
if (c->peer.is_ip6) slen = sizeof(usa.sin6);
#endif
socklen_t slen = tousa(&c->peer, &usa);
n = recvfrom(FD(c), (char *) buf, len, 0, &usa.sa, &slen);
if (n > 0) {
if (c->peer.is_ip6) {
#if MG_ENABLE_IPV6
memcpy(c->peer.ip6, &usa.sin6.sin6_addr, sizeof(c->peer.ip6));
c->peer.port = usa.sin6.sin6_port;
#endif
} else {
c->peer.ip = *(uint32_t *) &usa.sin.sin_addr;
c->peer.port = usa.sin.sin_port;
}
}
if (n > 0) tomgaddr(&usa, &c->peer, slen != sizeof(usa.sin));
} else {
n = recv(FD(c), (char *) buf, len, MSG_NONBLOCKING);
}
@ -331,11 +325,8 @@ void mg_connect_resolved(struct mg_connection *c) {
mg_set_non_blocking_mode(FD(c));
mg_call(c, MG_EV_RESOLVE, NULL);
if (type == SOCK_STREAM) {
union usa usa = tousa(&c->peer);
socklen_t slen = sizeof(usa.sin);
#if MG_ENABLE_IPV6
if (c->peer.is_ip6) slen = sizeof(usa.sin6);
#endif
union usa usa;
socklen_t slen = tousa(&c->peer, &usa);
if ((rc = connect(FD(c), &usa.sa, slen)) == 0 || mg_sock_would_block()) {
setsockopts(c);
if (rc != 0) c->is_connecting = 1;
@ -380,15 +371,7 @@ static void accept_conn(struct mg_mgr *mgr, struct mg_connection *lsn) {
closesocket(fd);
} else {
char buf[40];
c->peer.port = usa.sin.sin_port;
memcpy(&c->peer.ip, &usa.sin.sin_addr, sizeof(c->peer.ip));
#if MG_ENABLE_IPV6
if (sa_len == sizeof(usa.sin6)) {
memcpy(c->peer.ip6, &usa.sin6.sin6_addr, sizeof(c->peer.ip6));
c->peer.port = usa.sin6.sin6_port;
c->peer.is_ip6 = 1;
}
#endif
tomgaddr(&usa, &c->peer, sa_len != sizeof(usa.sin));
mg_straddr(c, buf, sizeof(buf));
LOG(LL_DEBUG, ("%lu accepted %s", c->id, buf));
mg_set_non_blocking_mode(FD(c));
@ -404,47 +387,78 @@ static void accept_conn(struct mg_mgr *mgr, struct mg_connection *lsn) {
}
}
bool mg_socketpair(int *s1, int *s2) {
#if MG_ENABLE_NATIVE_SOCKETPAIR
// For some reason, native socketpair() call fails on Macos
// Enable this codepath only when MG_ENABLE_NATIVE_SOCKETPAIR is defined
int sp[2], ret = 0;
if (socketpair(AF_INET, SOCK_DGRAM, IPPROTO_UDP, sp) == 0) {
*s1 = sp[0], *s2 = sp[1], ret = 1;
}
LOG(LL_INFO, ("errno %d", errno));
return ret;
#elif MG_ENABLE_SOCKETPAIR
union usa sa, sa2;
SOCKET sp[2] = {INVALID_SOCKET, INVALID_SOCKET};
socklen_t len = sizeof(sa.sin);
int ret = 0;
static bool mg_socketpair(SOCKET sp[2], union usa usa[2]) {
socklen_t n = sizeof(usa[0].sin);
bool result = false;
(void) memset(&sa, 0, sizeof(sa));
sa.sin.sin_family = AF_INET;
sa.sin.sin_addr.s_addr = htonl(0x7f000001); /* 127.0.0.1 */
sa2 = sa;
(void) memset(&usa[0], 0, sizeof(usa[0]));
usa[0].sin.sin_family = AF_INET;
usa[0].sin.sin_addr.s_addr = htonl(0x7f000001); // 127.0.0.1
usa[1] = usa[0];
if ((sp[0] = socket(AF_INET, SOCK_DGRAM, 0)) != INVALID_SOCKET &&
(sp[1] = socket(AF_INET, SOCK_DGRAM, 0)) != INVALID_SOCKET &&
bind(sp[0], &sa.sa, len) == 0 && bind(sp[1], &sa2.sa, len) == 0 &&
getsockname(sp[0], &sa.sa, &len) == 0 &&
getsockname(sp[1], &sa2.sa, &len) == 0 &&
connect(sp[0], &sa2.sa, len) == 0 && connect(sp[1], &sa.sa, len) == 0) {
mg_set_non_blocking_mode(sp[1]);
*s1 = sp[0];
*s2 = sp[1];
ret = 1;
bind(sp[0], &usa[0].sa, n) == 0 && bind(sp[1], &usa[1].sa, n) == 0 &&
getsockname(sp[0], &usa[0].sa, &n) == 0 &&
getsockname(sp[1], &usa[1].sa, &n) == 0 &&
connect(sp[0], &usa[1].sa, n) == 0 &&
connect(sp[1], &usa[0].sa, n) == 0) {
mg_set_non_blocking_mode(sp[1]); // Set close-on-exec
result = true;
} else {
if (sp[0] != INVALID_SOCKET) closesocket(sp[0]);
if (sp[1] != INVALID_SOCKET) closesocket(sp[1]);
sp[0] = sp[1] = INVALID_SOCKET;
}
return ret;
#else
*s1 = *s2 = INVALID_SOCKET;
return result;
}
// Event handler function for the receiving end of the pipe connection
// Read data from another task and send it to the remote peer
static void pf1(struct mg_connection *c, int ev, void *ev_data, void *fn_data) {
struct mg_connection *cc = (struct mg_connection *) fn_data;
if (ev == MG_EV_READ) {
struct mg_str *s = (struct mg_str *) ev_data;
// LOG(LL_INFO, ("got %d [%.*s]", (int) s->len, (int) s->len, s->ptr));
mg_send(cc, s->ptr, s->len);
c->recv.len = 0;
} else if (ev == MG_EV_CLOSE) {
if (cc) cc->is_draining = 1, cc->fn_data = NULL;
}
}
void mg_rmpipe(struct mg_connection *c) {
LOG(LL_DEBUG, ("%lu send pipe closed", c->id));
mg_send(c, "", 0); // Signal receiver, 0-length packet means we're done
closesocket(FD(c)); // We're not managed by c->mgr, close manually
free(c); // No buffers to clear - just free up
}
bool mg_mkpipe(struct mg_connection *c, struct mg_connection *pc[2]) {
union usa usa[2];
SOCKET sp[2] = {INVALID_SOCKET, INVALID_SOCKET};
if (!mg_socketpair(sp, usa)) goto fail;
if ((pc[0] = alloc_conn(c->mgr, true, sp[0])) == NULL) goto fail;
if ((pc[1] = alloc_conn(c->mgr, false, sp[1])) == NULL) goto fail;
tomgaddr(&usa[0], &pc[1]->peer, false);
tomgaddr(&usa[1], &pc[0]->peer, false);
pc[0]->is_udp = 1;
pc[1]->is_udp = 1;
pc[1]->is_accepted = 1;
pc[1]->fn = pf1;
pc[1]->fn_data = c;
LIST_ADD_HEAD(struct mg_connection, &c->mgr->conns, pc[1]);
// LOG(LL_DEBUG, ("%lu/%ld %lu/%ld", pc[0]->id, (long) (size_t) pc[0]->fd,
// pc[1]->id, (long) (size_t) pc[1]->fd));
return true;
fail:
free(pc[0]);
free(pc[1]);
if (sp[0] != INVALID_SOCKET) closesocket(sp[0]);
if (sp[1] != INVALID_SOCKET) closesocket(sp[1]);
pc[0] = pc[1] = NULL;
return false;
#endif
}
struct mg_connection *mg_listen(struct mg_mgr *mgr, const char *url,

View File

@ -42,5 +42,14 @@ bool mg_send(struct mg_connection *c, const void *buf, size_t len) {
return 0;
}
void mg_rmpipe(struct mg_connection *c) {
(void) c;
}
bool mg_mkpipe(struct mg_connection *c, struct mg_connection *pc[2]) {
(void) c, (void) pc;
return false;
}
void _fini(void) {
}

View File

@ -437,8 +437,11 @@ static int fetch(struct mg_mgr *mgr, char *buf, const char *url,
static int cmpbody(const char *buf, const char *str) {
struct mg_http_message hm;
mg_http_parse(buf, strlen(buf), &hm);
return strncmp(hm.body.ptr, str, hm.body.len);
struct mg_str s = mg_str(str);
size_t len = strlen(buf);
mg_http_parse(buf, len, &hm);
if (hm.body.len > len) hm.body.len = len - (size_t) (hm.body.ptr - buf);
return mg_strcmp(hm.body, s);
}
static void wcb(struct mg_connection *c, int ev, void *ev_data, void *fn_data) {
@ -570,7 +573,7 @@ static void test_http_server(void) {
{
char *data = mg_file_read("./test/data/ca.pem", NULL);
ASSERT(fetch(&mgr, buf, url, "GET /ca.pem HTTP/1.0\r\n\n") == 200);
ASSERT(cmpbody(data, buf) == 0);
ASSERT(cmpbody(buf, data) == 0);
free(data);
}
@ -1407,8 +1410,35 @@ static void test_packed(void) {
ASSERT(mgr.conns == NULL);
}
static void eh6(struct mg_connection *c, int ev, void *ev_data, void *fn_data) {
if (ev == MG_EV_HTTP_MSG) {
struct mg_http_message *hm = (struct mg_http_message *) ev_data;
struct mg_connection *pc[2];
mg_mkpipe(c, pc);
mg_http_reply(pc[0], 200, "", "hi, %.*s", (int) hm->uri.len, hm->uri.ptr);
mg_rmpipe(pc[0]);
c->fn_data = pc[1];
} else if (ev == MG_EV_CLOSE) {
struct mg_connection *pc = (struct mg_connection *) fn_data;
if (pc) pc->is_closing = 1, pc->fn_data = NULL;
}
}
static void test_pipe(void) {
struct mg_mgr mgr;
const char *url = "http://127.0.0.1:12352";
char buf[FETCH_BUF_SIZE];
mg_mgr_init(&mgr);
mg_http_listen(&mgr, url, eh6, NULL);
ASSERT(fetch(&mgr, buf, url, "GET /foo HTTP/1.0\n\n") == 200);
ASSERT(cmpbody(buf, "hi, /foo") == 0);
mg_mgr_free(&mgr);
ASSERT(mgr.conns == NULL);
}
int main(void) {
mg_log_set("3");
test_pipe();
test_packed();
test_crc32();
test_multipart();