diff --git a/examples/multi-threaded/main.c b/examples/multi-threaded/main.c index 8f469d9a..83ba42d0 100644 --- a/examples/multi-threaded/main.c +++ b/examples/multi-threaded/main.c @@ -4,29 +4,15 @@ // Multithreading example. // For each incoming request, we spawn a separate thread, that sleeps for // some time to simulate long processing time, produces an output and -// hands over that output to the request handler function. -// -// We pass POST body to the worker thread, and respond with a calculated CRC +// sends that output to the parent connection. #include "mongoose.h" -#define LISTENING_ADDR "http://localhost:8000" - -struct thread_data { - struct mg_queue queue; // Worker -> Connection queue - struct mg_str body; // Copy of message body -}; - -// These two helper UDP connections are used to wake up mongoose thread -static struct mg_connection *s_wakeup_server; -static struct mg_connection *s_wakeup_client; -#define WAKEUP_URL "udp://127.0.0.1:40111" - static void start_thread(void *(*f)(void *), void *p) { #ifdef _WIN32 -#define usleep(x) Sleep((x) / 1000) _beginthread((void(__cdecl *)(void *)) f, 0, p); #else +#define closesocket(x) close(x) #include pthread_t thread_id = (pthread_t) 0; pthread_attr_t attr; @@ -37,72 +23,53 @@ static void start_thread(void *(*f)(void *), void *p) { #endif } -static void *worker_thread(void *param) { - struct thread_data *d = (struct thread_data *) param; - char buf[100]; // On-stack buffer for the message queue +struct thread_data { + struct mg_mgr *mgr; + unsigned long conn_id; // Parent connection ID + struct mg_str message; // Original HTTP request +}; - mg_queue_init(&d->queue, buf, sizeof(buf)); // Init queue - usleep(1 * 1000 * 1000); // Simulate long execution time - - // Send a response to the connection - if (d->body.len == 0) { - mg_queue_printf(&d->queue, "Send me POST data"); - } else { - uint32_t crc = mg_crc32(0, d->body.ptr, d->body.len); - mg_queue_printf(&d->queue, "crc32: %#x", crc); - free((char *) d->body.ptr); - } - - // Wake up Mongoose thread by sending something to one of its connections - mg_send(s_wakeup_client, "hi", 2); - - // Wait until connection reads our message, then it is safe to quit - while (d->queue.tail != d->queue.head) usleep(1000); - MG_INFO(("done, cleaning up...")); - free(d); +static void *thread_function(void *param) { + struct thread_data *p = (struct thread_data *) param; + sleep(2); // Simulate long execution + mg_wakeup(p->mgr, p->conn_id, "hi!", 3); // Respond to parent + free((void *) p->message.ptr); // Free all resources that were + free(p); // passed to us return NULL; } -static void wfn(struct mg_connection *c, int ev, void *ev_data, void *fn_data) { - if (ev == MG_EV_READ) { - c->recv.len = 0; // Discard received data - } - (void) ev_data, (void) fn_data; -} - // HTTP request callback static void fn(struct mg_connection *c, int ev, void *ev_data, void *fn_data) { if (ev == MG_EV_HTTP_MSG) { - // Received HTTP request. Allocate thread data and spawn a worker thread struct mg_http_message *hm = (struct mg_http_message *) ev_data; - struct thread_data *d = (struct thread_data *) calloc(1, sizeof(*d)); - d->body = mg_strdup(hm->body); // Pass received body to the worker - start_thread(worker_thread, d); // Start a thread - *(void **) c->data = d; // Memorise data pointer in c->data - } else if (ev == MG_EV_POLL) { - // Poll event. Delivered to us every mg_mgr_poll interval or faster - struct thread_data *d = *(struct thread_data **) c->data; - size_t len; - char *buf; - // Check if we have a message from the worker - if (d != NULL && (len = mg_queue_next(&d->queue, &buf)) > 0) { - // Got message from worker. Send a response and cleanup - mg_http_reply(c, 200, "", "%.*s\n", (int) len, buf); - mg_queue_del(&d->queue, len); // Delete message - *(void **) c->data = NULL; // Forget about thread data + if (mg_http_match_uri(hm, "/fast")) { + // Single-threaded code path, for performance comparison + // The /fast URI responds immediately + mg_http_reply(c, 200, "Host: foo.com\r\n", "hi\n"); + } else { + // Multithreading code path + struct thread_data *data = calloc(1, sizeof(*data)); // Worker owns it + data->message = mg_strdup(hm->message); // Pass message + data->conn_id = c->id; + data->mgr = c->mgr; + start_thread(thread_function, data); // Start thread and pass data } + } else if (ev == MG_EV_WAKEUP) { + struct mg_str *data = (struct mg_str *) ev_data; + mg_http_reply(c, 200, "", "Result: %.*s\n", data->len, data->ptr); } (void) fn_data; } int main(void) { struct mg_mgr mgr; - mg_mgr_init(&mgr); + mg_mgr_init(&mgr); // Initialise event manager mg_log_set(MG_LL_DEBUG); // Set debug log level - mg_http_listen(&mgr, LISTENING_ADDR, fn, NULL); - s_wakeup_server = mg_listen(&mgr, WAKEUP_URL, wfn, NULL); - s_wakeup_client = mg_connect(&mgr, WAKEUP_URL, wfn, NULL); - for (;;) mg_mgr_poll(&mgr, 5000); // Event loop. Use 5s poll interval - mg_mgr_free(&mgr); // Cleanup + mg_http_listen(&mgr, "http://localhost:8000", fn, NULL); // Create listener + mg_wakeup_init(&mgr); // Initialise wakeup socket pair + for (;;) { // Event loop + mg_mgr_poll(&mgr, 1000); + } + mg_mgr_free(&mgr); return 0; } diff --git a/mongoose.c b/mongoose.c index 9a5e97e4..64d4906d 100644 --- a/mongoose.c +++ b/mongoose.c @@ -4466,6 +4466,7 @@ void mg_mgr_init(struct mg_mgr *mgr) { // won't kill the whole process. signal(SIGPIPE, SIG_IGN); #endif + mgr->pipe = MG_INVALID_SOCKET; mgr->dnstimeout = 3000; mgr->dns4.url = "udp://8.8.8.8:53"; mgr->dns6.url = "udp://[2001:4860:4860::8888]:53"; @@ -7261,6 +7262,92 @@ static void mg_iotest(struct mg_mgr *mgr, int ms) { #endif } +static bool mg_socketpair(MG_SOCKET_TYPE sp[2], union usa usa[2]) { + socklen_t n = sizeof(usa[0].sin); + bool success = false; + + sp[0] = sp[1] = MG_INVALID_SOCKET; + (void) memset(&usa[0], 0, sizeof(usa[0])); + usa[0].sin.sin_family = AF_INET; + *(uint32_t *) &usa->sin.sin_addr = mg_htonl(0x7f000001U); // 127.0.0.1 + usa[1] = usa[0]; + + if ((sp[0] = socket(AF_INET, SOCK_DGRAM, 0)) != MG_INVALID_SOCKET && + (sp[1] = socket(AF_INET, SOCK_DGRAM, 0)) != MG_INVALID_SOCKET && + bind(sp[0], &usa[0].sa, n) == 0 && // + bind(sp[1], &usa[1].sa, n) == 0 && // + getsockname(sp[0], &usa[0].sa, &n) == 0 && // + getsockname(sp[1], &usa[1].sa, &n) == 0 && // + connect(sp[0], &usa[1].sa, n) == 0 && // + connect(sp[1], &usa[0].sa, n) == 0) { // + success = true; + } + if (!success) { + if (sp[0] != MG_INVALID_SOCKET) closesocket(sp[0]); + if (sp[1] != MG_INVALID_SOCKET) closesocket(sp[1]); + sp[0] = sp[1] = MG_INVALID_SOCKET; + } + return success; +} + +// mg_wakeup() event handler +static void wufn(struct mg_connection *c, int ev, void *evd, void *fnd) { + if (ev == MG_EV_READ) { + unsigned long *id = (unsigned long *) c->recv.buf; + // MG_INFO(("Got data")); + // mg_hexdump(c->recv.buf, c->recv.len); + if (c->recv.len >= sizeof(*id)) { + struct mg_connection *t; + for (t = c->mgr->conns; t != NULL; t = t->next) { + if (t->id == *id) { + struct mg_str data = mg_str_n((char *) c->recv.buf + sizeof(*id), + c->recv.len - sizeof(*id)); + mg_call(t, MG_EV_WAKEUP, &data); + } + } + } + c->recv.len = 0; // Consume received data + } else if (ev == MG_EV_CLOSE) { + closesocket(c->mgr->pipe); // When we're closing, close the other + c->mgr->pipe = MG_INVALID_SOCKET; // side of the socketpair, too + } + (void) evd, (void) fnd; +} + +bool mg_wakeup_init(struct mg_mgr *mgr) { + bool ok = false; + if (mgr->pipe == MG_INVALID_SOCKET) { + union usa usa[2]; + MG_SOCKET_TYPE sp[2] = {MG_INVALID_SOCKET, MG_INVALID_SOCKET}; + struct mg_connection *c = NULL; + if (!mg_socketpair(sp, usa)) { + MG_ERROR(("Cannot create socket pair")); + } else if ((c = mg_wrapfd(mgr, (int) sp[1], wufn, NULL)) == NULL) { + closesocket(sp[0]); + closesocket(sp[1]); + sp[0] = sp[1] = MG_INVALID_SOCKET; + } else { + tomgaddr(&usa[0], &c->rem, false); + MG_DEBUG(("%lu %p pipe %lu", c->id, c->fd, (unsigned long) sp[0])); + mgr->pipe = sp[0]; + ok = true; + } + } + return ok; +} + +bool mg_wakeup(struct mg_mgr *mgr, unsigned long conn_id, const void *buf, + size_t len) { + if (mgr->pipe != MG_INVALID_SOCKET && conn_id > 0) { + char *extended_buf = (char *) alloca(len + sizeof(conn_id)); + memcpy(extended_buf, &conn_id, sizeof(conn_id)); + memcpy(extended_buf + sizeof(conn_id), buf, len); + send(mgr->pipe, extended_buf, len + sizeof(conn_id), MSG_NONBLOCKING); + return true; + } + return false; +} + void mg_mgr_poll(struct mg_mgr *mgr, int ms) { struct mg_connection *c, *tmp; uint64_t now; diff --git a/mongoose.h b/mongoose.h index dda6e79d..ba25a50f 100644 --- a/mongoose.h +++ b/mongoose.h @@ -2119,6 +2119,7 @@ enum { MG_EV_MQTT_MSG, // MQTT PUBLISH received struct mg_mqtt_message * MG_EV_MQTT_OPEN, // MQTT CONNACK received int *connack_status_code MG_EV_SNTP_TIME, // SNTP time received uint64_t *epoch_millis + MG_EV_WAKEUP, // mg_wakeup() data received struct mg_str *data MG_EV_USER // Starting ID for user events }; @@ -2158,6 +2159,7 @@ struct mg_mgr { int epoll_fd; // Used when MG_EPOLL_ENABLE=1 void *priv; // Used by the MIP stack size_t extraconnsize; // Used by the MIP stack + MG_SOCKET_TYPE pipe; // Socketpair end for mg_wakeup() #if MG_ENABLE_FREERTOS_TCP SocketSet_t ss; // NOTE(lsm): referenced from socket struct #endif @@ -2222,6 +2224,8 @@ void mg_close_conn(struct mg_connection *c); bool mg_open_listener(struct mg_connection *c, const char *url); // Utility functions +bool mg_wakeup(struct mg_mgr *, unsigned long id, const void *buf, size_t len); +bool mg_wakeup_init(struct mg_mgr *); struct mg_timer *mg_timer_add(struct mg_mgr *mgr, uint64_t milliseconds, unsigned flags, void (*fn)(void *), void *arg); diff --git a/src/event.h b/src/event.h index da7a938a..8fb521d5 100644 --- a/src/event.h +++ b/src/event.h @@ -25,5 +25,6 @@ enum { MG_EV_MQTT_MSG, // MQTT PUBLISH received struct mg_mqtt_message * MG_EV_MQTT_OPEN, // MQTT CONNACK received int *connack_status_code MG_EV_SNTP_TIME, // SNTP time received uint64_t *epoch_millis + MG_EV_WAKEUP, // mg_wakeup() data received struct mg_str *data MG_EV_USER // Starting ID for user events }; diff --git a/src/net.c b/src/net.c index 36d3132b..f291bd30 100644 --- a/src/net.c +++ b/src/net.c @@ -271,6 +271,7 @@ void mg_mgr_init(struct mg_mgr *mgr) { // won't kill the whole process. signal(SIGPIPE, SIG_IGN); #endif + mgr->pipe = MG_INVALID_SOCKET; mgr->dnstimeout = 3000; mgr->dns4.url = "udp://8.8.8.8:53"; mgr->dns6.url = "udp://[2001:4860:4860::8888]:53"; diff --git a/src/net.h b/src/net.h index 2d2117db..679dc5b2 100644 --- a/src/net.h +++ b/src/net.h @@ -35,6 +35,7 @@ struct mg_mgr { int epoll_fd; // Used when MG_EPOLL_ENABLE=1 void *priv; // Used by the MIP stack size_t extraconnsize; // Used by the MIP stack + MG_SOCKET_TYPE pipe; // Socketpair end for mg_wakeup() #if MG_ENABLE_FREERTOS_TCP SocketSet_t ss; // NOTE(lsm): referenced from socket struct #endif @@ -99,5 +100,7 @@ void mg_close_conn(struct mg_connection *c); bool mg_open_listener(struct mg_connection *c, const char *url); // Utility functions +bool mg_wakeup(struct mg_mgr *, unsigned long id, const void *buf, size_t len); +bool mg_wakeup_init(struct mg_mgr *); struct mg_timer *mg_timer_add(struct mg_mgr *mgr, uint64_t milliseconds, unsigned flags, void (*fn)(void *), void *arg); diff --git a/src/sock.c b/src/sock.c index a9e22a60..76755d17 100644 --- a/src/sock.c +++ b/src/sock.c @@ -589,6 +589,92 @@ static void mg_iotest(struct mg_mgr *mgr, int ms) { #endif } +static bool mg_socketpair(MG_SOCKET_TYPE sp[2], union usa usa[2]) { + socklen_t n = sizeof(usa[0].sin); + bool success = false; + + sp[0] = sp[1] = MG_INVALID_SOCKET; + (void) memset(&usa[0], 0, sizeof(usa[0])); + usa[0].sin.sin_family = AF_INET; + *(uint32_t *) &usa->sin.sin_addr = mg_htonl(0x7f000001U); // 127.0.0.1 + usa[1] = usa[0]; + + if ((sp[0] = socket(AF_INET, SOCK_DGRAM, 0)) != MG_INVALID_SOCKET && + (sp[1] = socket(AF_INET, SOCK_DGRAM, 0)) != MG_INVALID_SOCKET && + bind(sp[0], &usa[0].sa, n) == 0 && // + bind(sp[1], &usa[1].sa, n) == 0 && // + getsockname(sp[0], &usa[0].sa, &n) == 0 && // + getsockname(sp[1], &usa[1].sa, &n) == 0 && // + connect(sp[0], &usa[1].sa, n) == 0 && // + connect(sp[1], &usa[0].sa, n) == 0) { // + success = true; + } + if (!success) { + if (sp[0] != MG_INVALID_SOCKET) closesocket(sp[0]); + if (sp[1] != MG_INVALID_SOCKET) closesocket(sp[1]); + sp[0] = sp[1] = MG_INVALID_SOCKET; + } + return success; +} + +// mg_wakeup() event handler +static void wufn(struct mg_connection *c, int ev, void *evd, void *fnd) { + if (ev == MG_EV_READ) { + unsigned long *id = (unsigned long *) c->recv.buf; + // MG_INFO(("Got data")); + // mg_hexdump(c->recv.buf, c->recv.len); + if (c->recv.len >= sizeof(*id)) { + struct mg_connection *t; + for (t = c->mgr->conns; t != NULL; t = t->next) { + if (t->id == *id) { + struct mg_str data = mg_str_n((char *) c->recv.buf + sizeof(*id), + c->recv.len - sizeof(*id)); + mg_call(t, MG_EV_WAKEUP, &data); + } + } + } + c->recv.len = 0; // Consume received data + } else if (ev == MG_EV_CLOSE) { + closesocket(c->mgr->pipe); // When we're closing, close the other + c->mgr->pipe = MG_INVALID_SOCKET; // side of the socketpair, too + } + (void) evd, (void) fnd; +} + +bool mg_wakeup_init(struct mg_mgr *mgr) { + bool ok = false; + if (mgr->pipe == MG_INVALID_SOCKET) { + union usa usa[2]; + MG_SOCKET_TYPE sp[2] = {MG_INVALID_SOCKET, MG_INVALID_SOCKET}; + struct mg_connection *c = NULL; + if (!mg_socketpair(sp, usa)) { + MG_ERROR(("Cannot create socket pair")); + } else if ((c = mg_wrapfd(mgr, (int) sp[1], wufn, NULL)) == NULL) { + closesocket(sp[0]); + closesocket(sp[1]); + sp[0] = sp[1] = MG_INVALID_SOCKET; + } else { + tomgaddr(&usa[0], &c->rem, false); + MG_DEBUG(("%lu %p pipe %lu", c->id, c->fd, (unsigned long) sp[0])); + mgr->pipe = sp[0]; + ok = true; + } + } + return ok; +} + +bool mg_wakeup(struct mg_mgr *mgr, unsigned long conn_id, const void *buf, + size_t len) { + if (mgr->pipe != MG_INVALID_SOCKET && conn_id > 0) { + char *extended_buf = (char *) alloca(len + sizeof(conn_id)); + memcpy(extended_buf, &conn_id, sizeof(conn_id)); + memcpy(extended_buf + sizeof(conn_id), buf, len); + send(mgr->pipe, extended_buf, len + sizeof(conn_id), MSG_NONBLOCKING); + return true; + } + return false; +} + void mg_mgr_poll(struct mg_mgr *mgr, int ms) { struct mg_connection *c, *tmp; uint64_t now;