Refactor mg_mkpipe()

This commit is contained in:
Sergey Lyubka 2021-08-11 19:17:04 +01:00
parent 8b770d2a0d
commit fd5eb2d3f9
8 changed files with 105 additions and 154 deletions

View File

@ -522,33 +522,23 @@ int mg_vprintf(struct mg_connection *, const char *fmt, va_list ap);
Same as `mg_printf()`, but takes `va_list` argument as a parameter.
### mg\_mkpipe()
### mg\_mkpipe(), mg\_mgr\_wakeup()
```c
bool mg_mkpipe(struct mg_connection *c, struct mg_connection *pc[2]);
void mg_rmpipe(struct mg_connection *c);
struct mg_connection *mg_mkpipe(struct mg_mgr *, mg_event_handler_t, void *);
void mg_mgr_wakeup(struct mg_connection *pipe);
```
Create a pair of connected connections using UDP socketpair. A sending side of
the pair is `pc[0]`, a receiving side is `pc[1]`. This API is indended for a
multi-threaded usage only.
Create a "pipe" connection which is safe to pass to a different task/thread,
and which is used to wake up event manager from a different task. These two
functions are designed to implement multi-threaded support, to handle
A sending connection `pc[0]` should be passed to another task (thread) - in
fact, it is the only data structure that is safe to pass to another task.
`pc[0]` is not added to an event manager, therefore its callback function and
flags are ignored. A task function must write to `pc[0]` using any of the
existing `mg_*` API, then call `mg_rmpipe()` when done. Writing to `pc[0]`
wakes up an event manager, because the receiving side of the pipe `pc[1]` is
added to an event manager.
A receiving side `pc[1]` forwards all received data to `c`. NOTE: if
there is a limit on the local UDP message size, do not send more than that
limit in one call.
Another task can wake up a sleeping event manager (in `mg_mgr_poll()` call)
using `mg_mgr_wakeup()`. When an event manager is woken up, a pipe
connection event handler function receives `MG_EV_READ` event.
See [examples/multi-threaded](../examples/multi-threaded) for a usage example.
Return value: true on success, false on error.
## IO buffers

View File

@ -19,10 +19,6 @@
#include "mongoose.h"
#ifndef SLEEP_TIME
#define SLEEP_TIME 2 // Seconds to sleep to simulate calculation
#endif
static void start_thread(void (*f)(void *), void *p) {
#ifdef _WIN32
_beginthread((void(__cdecl *)(void *)) f, 0, p);
@ -40,48 +36,46 @@ static void start_thread(void (*f)(void *), void *p) {
static void thread_function(void *param) {
struct mg_connection *c = param; // Pipe connection
LOG(LL_INFO, ("Thread started, pipe %lu/%ld", c->id, (long) (size_t) c->fd));
LOG(LL_INFO, ("Sleeping for %d sec...", SLEEP_TIME));
mg_usleep(SLEEP_TIME * 1000000); // Simulate long execution
LOG(LL_INFO, ("Sending data..."));
mg_http_reply(c, 200, "Host: foo.com\r\n", "hi\n");
mg_rmpipe(c);
mg_usleep(2 * 1000000); // Simulate long execution
mg_mgr_wakeup(c); // Wakeup event manager
}
// HTTP request callback
static void cb(struct mg_connection *c, int ev, void *ev_data, void *fn_data) {
static void fn(struct mg_connection *c, int ev, void *ev_data, void *fn_data) {
if (ev == MG_EV_HTTP_MSG) {
// Incoming request. Create socket pair.
// Pass blocking socket to the thread, and keep the non-blocking socket.
struct mg_http_message *hm = (struct mg_http_message *) ev_data;
if (mg_http_match_uri(hm, "/fast")) {
// The /fast URI responds immediately without hitting a multi-threaded
// codepath. It is for measuing performance impact
// Single-threaded code path, for performance comparison
// The /fast URI responds immediately
mg_http_reply(c, 200, "Host: foo.com\r\n", "hi\n");
} else {
// Multithreading code path. Create "pipe" connection.
// Pipe connection is safe to pass to a different task/thread.
// Spawn a thread and pass created pipe connection to it.
// Save a receiving end of the pipe into c->fn_data, in order to
// close it when this (client) connection closes.
struct mg_connection *pc[2]; // pc[0]: send, pc[1]: recv
mg_mkpipe(c, pc); // Create pipe
start_thread(thread_function, pc[0]); // Spawn a task, pass pc[0] there
c->fn_data = pc[1]; // And save recv end for later
// Multithreading code path
c->label[0] = 'W'; // Mark us as waiting for data
start_thread(thread_function, fn_data); // Start handling thread
}
}
}
// Pipe event handler
static void pcb(struct mg_connection *c, int ev, void *ev_data, void *fn_data) {
if (ev == MG_EV_READ) {
struct mg_connection *t;
for (t = c->mgr->conns; t != NULL; t = t->next) {
if (t->label[0] != 'W') continue; // Ignore un-marked connections
mg_http_reply(t, 200, "Host: foo.com\r\n", "hi\n"); // Respond!
t->label[0] = 0; // Clear mark
}
} else if (ev == MG_EV_CLOSE) {
// Tell the receiving end of the pipe to close
struct mg_connection *pc = (struct mg_connection *) fn_data;
if (pc) pc->is_closing = 1, pc->fn_data = NULL;
}
}
int main(void) {
struct mg_mgr mgr;
struct mg_connection *pipe; // Used to wake up event manager
mg_mgr_init(&mgr);
mg_log_set("3");
mg_http_listen(&mgr, "http://localhost:8000", cb, NULL);
for (;;) mg_mgr_poll(&mgr, 1000);
mg_mgr_free(&mgr);
pipe = mg_mkpipe(&mgr, pcb, NULL); // Create pipe
mg_http_listen(&mgr, "http://localhost:8000", fn, pipe); // Create listener
for (;;) mg_mgr_poll(&mgr, 1000); // Event loop
mg_mgr_free(&mgr); // Cleanup
return 0;
}

View File

@ -3233,51 +3233,38 @@ static bool mg_socketpair(SOCKET sp[2], union usa usa[2]) {
return result;
}
// Event handler function for the receiving end of the pipe connection
// Read data from another task and send it to the remote peer
void mg_mgr_wakeup(struct mg_connection *c) {
LOG(LL_INFO, ("skt: %p", c->pfn_data));
send((SOCKET) (size_t) c->pfn_data, "\x01", 1, MSG_NONBLOCKING);
}
static void pf1(struct mg_connection *c, int ev, void *ev_data, void *fn_data) {
struct mg_connection *cc = (struct mg_connection *) fn_data;
if (ev == MG_EV_READ) {
struct mg_str *s = (struct mg_str *) ev_data;
// LOG(LL_INFO, ("got %d [%.*s]", (int) s->len, (int) s->len, s->ptr));
mg_send(cc, s->ptr, s->len);
c->recv.len = 0;
} else if (ev == MG_EV_CLOSE) {
if (cc) cc->is_draining = 1, cc->fn_data = NULL;
}
if (ev == MG_EV_READ) mg_iobuf_free(&c->recv);
(void) ev_data, (void) fn_data;
}
void mg_rmpipe(struct mg_connection *c) {
LOG(LL_DEBUG, ("%lu send pipe closed", c->id));
mg_send(c, "", 0); // Signal receiver, 0-length packet means we're done
closesocket(FD(c)); // We're not managed by c->mgr, close manually
free(c); // No buffers to clear - just free up
}
bool mg_mkpipe(struct mg_connection *c, struct mg_connection *pc[2]) {
struct mg_connection *mg_mkpipe(struct mg_mgr *mgr, mg_event_handler_t fn,
void *fn_data) {
union usa usa[2];
SOCKET sp[2] = {INVALID_SOCKET, INVALID_SOCKET};
if (!mg_socketpair(sp, usa)) goto fail;
if ((pc[0] = alloc_conn(c->mgr, true, sp[0])) == NULL) goto fail;
if ((pc[1] = alloc_conn(c->mgr, false, sp[1])) == NULL) goto fail;
tomgaddr(&usa[0], &pc[1]->peer, false);
tomgaddr(&usa[1], &pc[0]->peer, false);
pc[0]->is_udp = 1;
pc[1]->is_udp = 1;
pc[1]->is_accepted = 1;
pc[1]->fn = pf1;
pc[1]->fn_data = c;
LIST_ADD_HEAD(struct mg_connection, &c->mgr->conns, pc[1]);
// LOG(LL_DEBUG, ("%lu/%ld %lu/%ld", pc[0]->id, (long) (size_t) pc[0]->fd,
// pc[1]->id, (long) (size_t) pc[1]->fd));
return true;
fail:
free(pc[0]);
free(pc[1]);
if (sp[0] != INVALID_SOCKET) closesocket(sp[0]);
if (sp[1] != INVALID_SOCKET) closesocket(sp[1]);
pc[0] = pc[1] = NULL;
return false;
struct mg_connection *c = NULL;
if (!mg_socketpair(sp, usa)) {
LOG(LL_ERROR, ("Cannot create socket pair"));
} else if ((c = alloc_conn(mgr, false, sp[1])) == NULL) {
closesocket(sp[0]);
closesocket(sp[1]);
LOG(LL_ERROR, ("OOM"));
} else {
LOG(LL_INFO, ("pipe %lu", (unsigned long) sp[0]));
tomgaddr(&usa[0], &c->peer, false);
c->is_udp = 1;
c->pfn = pf1;
c->pfn_data = (void *) (size_t) sp[0];
c->fn = fn;
c->fn_data = fn_data;
LIST_ADD_HEAD(struct mg_connection, &mgr->conns, c);
}
return c;
}
struct mg_connection *mg_listen(struct mg_mgr *mgr, const char *url,

View File

@ -762,8 +762,8 @@ char *mg_straddr(struct mg_connection *, char *, size_t);
bool mg_aton(struct mg_str str, struct mg_addr *addr);
char *mg_ntoa(const struct mg_addr *addr, char *buf, size_t len);
bool mg_mkpipe(struct mg_connection *c, struct mg_connection *pc[2]);
void mg_rmpipe(struct mg_connection *c);
struct mg_connection *mg_mkpipe(struct mg_mgr *, mg_event_handler_t, void *);
void mg_mgr_wakeup(struct mg_connection *pipe);

View File

@ -74,5 +74,5 @@ char *mg_straddr(struct mg_connection *, char *, size_t);
bool mg_aton(struct mg_str str, struct mg_addr *addr);
char *mg_ntoa(const struct mg_addr *addr, char *buf, size_t len);
bool mg_mkpipe(struct mg_connection *c, struct mg_connection *pc[2]);
void mg_rmpipe(struct mg_connection *c);
struct mg_connection *mg_mkpipe(struct mg_mgr *, mg_event_handler_t, void *);
void mg_mgr_wakeup(struct mg_connection *pipe);

View File

@ -407,51 +407,38 @@ static bool mg_socketpair(SOCKET sp[2], union usa usa[2]) {
return result;
}
// Event handler function for the receiving end of the pipe connection
// Read data from another task and send it to the remote peer
void mg_mgr_wakeup(struct mg_connection *c) {
LOG(LL_INFO, ("skt: %p", c->pfn_data));
send((SOCKET) (size_t) c->pfn_data, "\x01", 1, MSG_NONBLOCKING);
}
static void pf1(struct mg_connection *c, int ev, void *ev_data, void *fn_data) {
struct mg_connection *cc = (struct mg_connection *) fn_data;
if (ev == MG_EV_READ) {
struct mg_str *s = (struct mg_str *) ev_data;
// LOG(LL_INFO, ("got %d [%.*s]", (int) s->len, (int) s->len, s->ptr));
mg_send(cc, s->ptr, s->len);
c->recv.len = 0;
} else if (ev == MG_EV_CLOSE) {
if (cc) cc->is_draining = 1, cc->fn_data = NULL;
}
if (ev == MG_EV_READ) mg_iobuf_free(&c->recv);
(void) ev_data, (void) fn_data;
}
void mg_rmpipe(struct mg_connection *c) {
LOG(LL_DEBUG, ("%lu send pipe closed", c->id));
mg_send(c, "", 0); // Signal receiver, 0-length packet means we're done
closesocket(FD(c)); // We're not managed by c->mgr, close manually
free(c); // No buffers to clear - just free up
}
bool mg_mkpipe(struct mg_connection *c, struct mg_connection *pc[2]) {
struct mg_connection *mg_mkpipe(struct mg_mgr *mgr, mg_event_handler_t fn,
void *fn_data) {
union usa usa[2];
SOCKET sp[2] = {INVALID_SOCKET, INVALID_SOCKET};
if (!mg_socketpair(sp, usa)) goto fail;
if ((pc[0] = alloc_conn(c->mgr, true, sp[0])) == NULL) goto fail;
if ((pc[1] = alloc_conn(c->mgr, false, sp[1])) == NULL) goto fail;
tomgaddr(&usa[0], &pc[1]->peer, false);
tomgaddr(&usa[1], &pc[0]->peer, false);
pc[0]->is_udp = 1;
pc[1]->is_udp = 1;
pc[1]->is_accepted = 1;
pc[1]->fn = pf1;
pc[1]->fn_data = c;
LIST_ADD_HEAD(struct mg_connection, &c->mgr->conns, pc[1]);
// LOG(LL_DEBUG, ("%lu/%ld %lu/%ld", pc[0]->id, (long) (size_t) pc[0]->fd,
// pc[1]->id, (long) (size_t) pc[1]->fd));
return true;
fail:
free(pc[0]);
free(pc[1]);
if (sp[0] != INVALID_SOCKET) closesocket(sp[0]);
if (sp[1] != INVALID_SOCKET) closesocket(sp[1]);
pc[0] = pc[1] = NULL;
return false;
struct mg_connection *c = NULL;
if (!mg_socketpair(sp, usa)) {
LOG(LL_ERROR, ("Cannot create socket pair"));
} else if ((c = alloc_conn(mgr, false, sp[1])) == NULL) {
closesocket(sp[0]);
closesocket(sp[1]);
LOG(LL_ERROR, ("OOM"));
} else {
LOG(LL_INFO, ("pipe %lu", (unsigned long) sp[0]));
tomgaddr(&usa[0], &c->peer, false);
c->is_udp = 1;
c->pfn = pf1;
c->pfn_data = (void *) (size_t) sp[0];
c->fn = fn;
c->fn_data = fn_data;
LIST_ADD_HEAD(struct mg_connection, &mgr->conns, c);
}
return c;
}
struct mg_connection *mg_listen(struct mg_mgr *mgr, const char *url,

View File

@ -42,13 +42,14 @@ bool mg_send(struct mg_connection *c, const void *buf, size_t len) {
return 0;
}
void mg_rmpipe(struct mg_connection *c) {
void mg_mgr_wakeup(struct mg_connection *c) {
(void) c;
}
bool mg_mkpipe(struct mg_connection *c, struct mg_connection *pc[2]) {
(void) c, (void) pc;
return false;
struct mg_connection *mg_mkpipe(struct mg_mgr *mgr, mg_event_handler_t fn,
void *fn_data) {
(void) mgr, (void) fn, (void) fn_data;
return NULL;
}
void _fini(void) {

View File

@ -1415,27 +1415,19 @@ static void test_packed(void) {
}
static void eh6(struct mg_connection *c, int ev, void *ev_data, void *fn_data) {
if (ev == MG_EV_HTTP_MSG) {
struct mg_http_message *hm = (struct mg_http_message *) ev_data;
struct mg_connection *pc[2];
mg_mkpipe(c, pc);
mg_http_reply(pc[0], 200, "", "hi, %.*s", (int) hm->uri.len, hm->uri.ptr);
mg_rmpipe(pc[0]);
c->fn_data = pc[1];
} else if (ev == MG_EV_CLOSE) {
struct mg_connection *pc = (struct mg_connection *) fn_data;
if (pc) pc->is_closing = 1, pc->fn_data = NULL;
}
if (ev == MG_EV_READ) *(int *) fn_data = 1;
(void) c, (void) ev_data;
}
static void test_pipe(void) {
struct mg_mgr mgr;
const char *url = "http://127.0.0.1:12352";
char buf[FETCH_BUF_SIZE];
struct mg_connection *c;
int i, done = 0;
mg_mgr_init(&mgr);
mg_http_listen(&mgr, url, eh6, NULL);
ASSERT(fetch(&mgr, buf, url, "GET /foo HTTP/1.0\n\n") == 200);
ASSERT(cmpbody(buf, "hi, /foo") == 0);
ASSERT((c = mg_mkpipe(&mgr, eh6, (void *) &done)) != NULL);
mg_mgr_wakeup(c);
for (i = 0; i < 10 && done == 0; i++) mg_mgr_poll(&mgr, 1);
ASSERT(done == 1);
mg_mgr_free(&mgr);
ASSERT(mgr.conns == NULL);
}