diff --git a/docs/README.md b/docs/README.md index ffadd3a3..49b19933 100644 --- a/docs/README.md +++ b/docs/README.md @@ -522,33 +522,23 @@ int mg_vprintf(struct mg_connection *, const char *fmt, va_list ap); Same as `mg_printf()`, but takes `va_list` argument as a parameter. -### mg\_mkpipe() +### mg\_mkpipe(), mg\_mgr\_wakeup() ```c -bool mg_mkpipe(struct mg_connection *c, struct mg_connection *pc[2]); -void mg_rmpipe(struct mg_connection *c); +struct mg_connection *mg_mkpipe(struct mg_mgr *, mg_event_handler_t, void *); +void mg_mgr_wakeup(struct mg_connection *pipe); ``` -Create a pair of connected connections using UDP socketpair. A sending side of -the pair is `pc[0]`, a receiving side is `pc[1]`. This API is indended for a -multi-threaded usage only. +Create a "pipe" connection which is safe to pass to a different task/thread, +and which is used to wake up event manager from a different task. These two +functions are designed to implement multi-threaded support, to handle -A sending connection `pc[0]` should be passed to another task (thread) - in -fact, it is the only data structure that is safe to pass to another task. -`pc[0]` is not added to an event manager, therefore its callback function and -flags are ignored. A task function must write to `pc[0]` using any of the -existing `mg_*` API, then call `mg_rmpipe()` when done. Writing to `pc[0]` -wakes up an event manager, because the receiving side of the pipe `pc[1]` is -added to an event manager. - -A receiving side `pc[1]` forwards all received data to `c`. NOTE: if -there is a limit on the local UDP message size, do not send more than that -limit in one call. +Another task can wake up a sleeping event manager (in `mg_mgr_poll()` call) +using `mg_mgr_wakeup()`. When an event manager is woken up, a pipe +connection event handler function receives `MG_EV_READ` event. See [examples/multi-threaded](../examples/multi-threaded) for a usage example. -Return value: true on success, false on error. - ## IO buffers diff --git a/examples/multi-threaded/main.c b/examples/multi-threaded/main.c index e2d26f84..433ec1ff 100644 --- a/examples/multi-threaded/main.c +++ b/examples/multi-threaded/main.c @@ -19,10 +19,6 @@ #include "mongoose.h" -#ifndef SLEEP_TIME -#define SLEEP_TIME 2 // Seconds to sleep to simulate calculation -#endif - static void start_thread(void (*f)(void *), void *p) { #ifdef _WIN32 _beginthread((void(__cdecl *)(void *)) f, 0, p); @@ -40,48 +36,46 @@ static void start_thread(void (*f)(void *), void *p) { static void thread_function(void *param) { struct mg_connection *c = param; // Pipe connection - LOG(LL_INFO, ("Thread started, pipe %lu/%ld", c->id, (long) (size_t) c->fd)); - LOG(LL_INFO, ("Sleeping for %d sec...", SLEEP_TIME)); - mg_usleep(SLEEP_TIME * 1000000); // Simulate long execution - LOG(LL_INFO, ("Sending data...")); - mg_http_reply(c, 200, "Host: foo.com\r\n", "hi\n"); - mg_rmpipe(c); + mg_usleep(2 * 1000000); // Simulate long execution + mg_mgr_wakeup(c); // Wakeup event manager } // HTTP request callback -static void cb(struct mg_connection *c, int ev, void *ev_data, void *fn_data) { +static void fn(struct mg_connection *c, int ev, void *ev_data, void *fn_data) { if (ev == MG_EV_HTTP_MSG) { - // Incoming request. Create socket pair. - // Pass blocking socket to the thread, and keep the non-blocking socket. struct mg_http_message *hm = (struct mg_http_message *) ev_data; if (mg_http_match_uri(hm, "/fast")) { - // The /fast URI responds immediately without hitting a multi-threaded - // codepath. It is for measuing performance impact + // Single-threaded code path, for performance comparison + // The /fast URI responds immediately mg_http_reply(c, 200, "Host: foo.com\r\n", "hi\n"); } else { - // Multithreading code path. Create "pipe" connection. - // Pipe connection is safe to pass to a different task/thread. - // Spawn a thread and pass created pipe connection to it. - // Save a receiving end of the pipe into c->fn_data, in order to - // close it when this (client) connection closes. - struct mg_connection *pc[2]; // pc[0]: send, pc[1]: recv - mg_mkpipe(c, pc); // Create pipe - start_thread(thread_function, pc[0]); // Spawn a task, pass pc[0] there - c->fn_data = pc[1]; // And save recv end for later + // Multithreading code path + c->label[0] = 'W'; // Mark us as waiting for data + start_thread(thread_function, fn_data); // Start handling thread + } + } +} + +// Pipe event handler +static void pcb(struct mg_connection *c, int ev, void *ev_data, void *fn_data) { + if (ev == MG_EV_READ) { + struct mg_connection *t; + for (t = c->mgr->conns; t != NULL; t = t->next) { + if (t->label[0] != 'W') continue; // Ignore un-marked connections + mg_http_reply(t, 200, "Host: foo.com\r\n", "hi\n"); // Respond! + t->label[0] = 0; // Clear mark } - } else if (ev == MG_EV_CLOSE) { - // Tell the receiving end of the pipe to close - struct mg_connection *pc = (struct mg_connection *) fn_data; - if (pc) pc->is_closing = 1, pc->fn_data = NULL; } } int main(void) { struct mg_mgr mgr; + struct mg_connection *pipe; // Used to wake up event manager mg_mgr_init(&mgr); mg_log_set("3"); - mg_http_listen(&mgr, "http://localhost:8000", cb, NULL); - for (;;) mg_mgr_poll(&mgr, 1000); - mg_mgr_free(&mgr); + pipe = mg_mkpipe(&mgr, pcb, NULL); // Create pipe + mg_http_listen(&mgr, "http://localhost:8000", fn, pipe); // Create listener + for (;;) mg_mgr_poll(&mgr, 1000); // Event loop + mg_mgr_free(&mgr); // Cleanup return 0; } diff --git a/mongoose.c b/mongoose.c index 937618c6..485d2a25 100644 --- a/mongoose.c +++ b/mongoose.c @@ -3233,51 +3233,38 @@ static bool mg_socketpair(SOCKET sp[2], union usa usa[2]) { return result; } -// Event handler function for the receiving end of the pipe connection -// Read data from another task and send it to the remote peer +void mg_mgr_wakeup(struct mg_connection *c) { + LOG(LL_INFO, ("skt: %p", c->pfn_data)); + send((SOCKET) (size_t) c->pfn_data, "\x01", 1, MSG_NONBLOCKING); +} + static void pf1(struct mg_connection *c, int ev, void *ev_data, void *fn_data) { - struct mg_connection *cc = (struct mg_connection *) fn_data; - if (ev == MG_EV_READ) { - struct mg_str *s = (struct mg_str *) ev_data; - // LOG(LL_INFO, ("got %d [%.*s]", (int) s->len, (int) s->len, s->ptr)); - mg_send(cc, s->ptr, s->len); - c->recv.len = 0; - } else if (ev == MG_EV_CLOSE) { - if (cc) cc->is_draining = 1, cc->fn_data = NULL; - } + if (ev == MG_EV_READ) mg_iobuf_free(&c->recv); + (void) ev_data, (void) fn_data; } -void mg_rmpipe(struct mg_connection *c) { - LOG(LL_DEBUG, ("%lu send pipe closed", c->id)); - mg_send(c, "", 0); // Signal receiver, 0-length packet means we're done - closesocket(FD(c)); // We're not managed by c->mgr, close manually - free(c); // No buffers to clear - just free up -} - -bool mg_mkpipe(struct mg_connection *c, struct mg_connection *pc[2]) { +struct mg_connection *mg_mkpipe(struct mg_mgr *mgr, mg_event_handler_t fn, + void *fn_data) { union usa usa[2]; SOCKET sp[2] = {INVALID_SOCKET, INVALID_SOCKET}; - if (!mg_socketpair(sp, usa)) goto fail; - if ((pc[0] = alloc_conn(c->mgr, true, sp[0])) == NULL) goto fail; - if ((pc[1] = alloc_conn(c->mgr, false, sp[1])) == NULL) goto fail; - tomgaddr(&usa[0], &pc[1]->peer, false); - tomgaddr(&usa[1], &pc[0]->peer, false); - pc[0]->is_udp = 1; - pc[1]->is_udp = 1; - pc[1]->is_accepted = 1; - pc[1]->fn = pf1; - pc[1]->fn_data = c; - LIST_ADD_HEAD(struct mg_connection, &c->mgr->conns, pc[1]); - // LOG(LL_DEBUG, ("%lu/%ld %lu/%ld", pc[0]->id, (long) (size_t) pc[0]->fd, - // pc[1]->id, (long) (size_t) pc[1]->fd)); - return true; -fail: - free(pc[0]); - free(pc[1]); - if (sp[0] != INVALID_SOCKET) closesocket(sp[0]); - if (sp[1] != INVALID_SOCKET) closesocket(sp[1]); - pc[0] = pc[1] = NULL; - return false; + struct mg_connection *c = NULL; + if (!mg_socketpair(sp, usa)) { + LOG(LL_ERROR, ("Cannot create socket pair")); + } else if ((c = alloc_conn(mgr, false, sp[1])) == NULL) { + closesocket(sp[0]); + closesocket(sp[1]); + LOG(LL_ERROR, ("OOM")); + } else { + LOG(LL_INFO, ("pipe %lu", (unsigned long) sp[0])); + tomgaddr(&usa[0], &c->peer, false); + c->is_udp = 1; + c->pfn = pf1; + c->pfn_data = (void *) (size_t) sp[0]; + c->fn = fn; + c->fn_data = fn_data; + LIST_ADD_HEAD(struct mg_connection, &mgr->conns, c); + } + return c; } struct mg_connection *mg_listen(struct mg_mgr *mgr, const char *url, diff --git a/mongoose.h b/mongoose.h index 0dfc51f6..ba48ced2 100644 --- a/mongoose.h +++ b/mongoose.h @@ -762,8 +762,8 @@ char *mg_straddr(struct mg_connection *, char *, size_t); bool mg_aton(struct mg_str str, struct mg_addr *addr); char *mg_ntoa(const struct mg_addr *addr, char *buf, size_t len); -bool mg_mkpipe(struct mg_connection *c, struct mg_connection *pc[2]); -void mg_rmpipe(struct mg_connection *c); +struct mg_connection *mg_mkpipe(struct mg_mgr *, mg_event_handler_t, void *); +void mg_mgr_wakeup(struct mg_connection *pipe); diff --git a/src/net.h b/src/net.h index 71d656c3..0950db9c 100644 --- a/src/net.h +++ b/src/net.h @@ -74,5 +74,5 @@ char *mg_straddr(struct mg_connection *, char *, size_t); bool mg_aton(struct mg_str str, struct mg_addr *addr); char *mg_ntoa(const struct mg_addr *addr, char *buf, size_t len); -bool mg_mkpipe(struct mg_connection *c, struct mg_connection *pc[2]); -void mg_rmpipe(struct mg_connection *c); +struct mg_connection *mg_mkpipe(struct mg_mgr *, mg_event_handler_t, void *); +void mg_mgr_wakeup(struct mg_connection *pipe); diff --git a/src/sock.c b/src/sock.c index 545d053c..05d87594 100644 --- a/src/sock.c +++ b/src/sock.c @@ -407,51 +407,38 @@ static bool mg_socketpair(SOCKET sp[2], union usa usa[2]) { return result; } -// Event handler function for the receiving end of the pipe connection -// Read data from another task and send it to the remote peer +void mg_mgr_wakeup(struct mg_connection *c) { + LOG(LL_INFO, ("skt: %p", c->pfn_data)); + send((SOCKET) (size_t) c->pfn_data, "\x01", 1, MSG_NONBLOCKING); +} + static void pf1(struct mg_connection *c, int ev, void *ev_data, void *fn_data) { - struct mg_connection *cc = (struct mg_connection *) fn_data; - if (ev == MG_EV_READ) { - struct mg_str *s = (struct mg_str *) ev_data; - // LOG(LL_INFO, ("got %d [%.*s]", (int) s->len, (int) s->len, s->ptr)); - mg_send(cc, s->ptr, s->len); - c->recv.len = 0; - } else if (ev == MG_EV_CLOSE) { - if (cc) cc->is_draining = 1, cc->fn_data = NULL; - } + if (ev == MG_EV_READ) mg_iobuf_free(&c->recv); + (void) ev_data, (void) fn_data; } -void mg_rmpipe(struct mg_connection *c) { - LOG(LL_DEBUG, ("%lu send pipe closed", c->id)); - mg_send(c, "", 0); // Signal receiver, 0-length packet means we're done - closesocket(FD(c)); // We're not managed by c->mgr, close manually - free(c); // No buffers to clear - just free up -} - -bool mg_mkpipe(struct mg_connection *c, struct mg_connection *pc[2]) { +struct mg_connection *mg_mkpipe(struct mg_mgr *mgr, mg_event_handler_t fn, + void *fn_data) { union usa usa[2]; SOCKET sp[2] = {INVALID_SOCKET, INVALID_SOCKET}; - if (!mg_socketpair(sp, usa)) goto fail; - if ((pc[0] = alloc_conn(c->mgr, true, sp[0])) == NULL) goto fail; - if ((pc[1] = alloc_conn(c->mgr, false, sp[1])) == NULL) goto fail; - tomgaddr(&usa[0], &pc[1]->peer, false); - tomgaddr(&usa[1], &pc[0]->peer, false); - pc[0]->is_udp = 1; - pc[1]->is_udp = 1; - pc[1]->is_accepted = 1; - pc[1]->fn = pf1; - pc[1]->fn_data = c; - LIST_ADD_HEAD(struct mg_connection, &c->mgr->conns, pc[1]); - // LOG(LL_DEBUG, ("%lu/%ld %lu/%ld", pc[0]->id, (long) (size_t) pc[0]->fd, - // pc[1]->id, (long) (size_t) pc[1]->fd)); - return true; -fail: - free(pc[0]); - free(pc[1]); - if (sp[0] != INVALID_SOCKET) closesocket(sp[0]); - if (sp[1] != INVALID_SOCKET) closesocket(sp[1]); - pc[0] = pc[1] = NULL; - return false; + struct mg_connection *c = NULL; + if (!mg_socketpair(sp, usa)) { + LOG(LL_ERROR, ("Cannot create socket pair")); + } else if ((c = alloc_conn(mgr, false, sp[1])) == NULL) { + closesocket(sp[0]); + closesocket(sp[1]); + LOG(LL_ERROR, ("OOM")); + } else { + LOG(LL_INFO, ("pipe %lu", (unsigned long) sp[0])); + tomgaddr(&usa[0], &c->peer, false); + c->is_udp = 1; + c->pfn = pf1; + c->pfn_data = (void *) (size_t) sp[0]; + c->fn = fn; + c->fn_data = fn_data; + LIST_ADD_HEAD(struct mg_connection, &mgr->conns, c); + } + return c; } struct mg_connection *mg_listen(struct mg_mgr *mgr, const char *url, diff --git a/test/mongoose_custom.c b/test/mongoose_custom.c index dbc7f1d8..3d882d7f 100644 --- a/test/mongoose_custom.c +++ b/test/mongoose_custom.c @@ -42,13 +42,14 @@ bool mg_send(struct mg_connection *c, const void *buf, size_t len) { return 0; } -void mg_rmpipe(struct mg_connection *c) { +void mg_mgr_wakeup(struct mg_connection *c) { (void) c; } -bool mg_mkpipe(struct mg_connection *c, struct mg_connection *pc[2]) { - (void) c, (void) pc; - return false; +struct mg_connection *mg_mkpipe(struct mg_mgr *mgr, mg_event_handler_t fn, + void *fn_data) { + (void) mgr, (void) fn, (void) fn_data; + return NULL; } void _fini(void) { diff --git a/test/unit_test.c b/test/unit_test.c index 5dc18571..e179851e 100644 --- a/test/unit_test.c +++ b/test/unit_test.c @@ -1415,27 +1415,19 @@ static void test_packed(void) { } static void eh6(struct mg_connection *c, int ev, void *ev_data, void *fn_data) { - if (ev == MG_EV_HTTP_MSG) { - struct mg_http_message *hm = (struct mg_http_message *) ev_data; - struct mg_connection *pc[2]; - mg_mkpipe(c, pc); - mg_http_reply(pc[0], 200, "", "hi, %.*s", (int) hm->uri.len, hm->uri.ptr); - mg_rmpipe(pc[0]); - c->fn_data = pc[1]; - } else if (ev == MG_EV_CLOSE) { - struct mg_connection *pc = (struct mg_connection *) fn_data; - if (pc) pc->is_closing = 1, pc->fn_data = NULL; - } + if (ev == MG_EV_READ) *(int *) fn_data = 1; + (void) c, (void) ev_data; } static void test_pipe(void) { struct mg_mgr mgr; - const char *url = "http://127.0.0.1:12352"; - char buf[FETCH_BUF_SIZE]; + struct mg_connection *c; + int i, done = 0; mg_mgr_init(&mgr); - mg_http_listen(&mgr, url, eh6, NULL); - ASSERT(fetch(&mgr, buf, url, "GET /foo HTTP/1.0\n\n") == 200); - ASSERT(cmpbody(buf, "hi, /foo") == 0); + ASSERT((c = mg_mkpipe(&mgr, eh6, (void *) &done)) != NULL); + mg_mgr_wakeup(c); + for (i = 0; i < 10 && done == 0; i++) mg_mgr_poll(&mgr, 1); + ASSERT(done == 1); mg_mgr_free(&mgr); ASSERT(mgr.conns == NULL); }