mirror of
https://github.com/cesanta/mongoose.git
synced 2025-01-14 01:38:01 +08:00
Add mg_wakeup()
This commit is contained in:
parent
669ffe2c74
commit
61825b2390
@ -4,29 +4,15 @@
|
||||
// Multithreading example.
|
||||
// For each incoming request, we spawn a separate thread, that sleeps for
|
||||
// some time to simulate long processing time, produces an output and
|
||||
// hands over that output to the request handler function.
|
||||
//
|
||||
// We pass POST body to the worker thread, and respond with a calculated CRC
|
||||
// sends that output to the parent connection.
|
||||
|
||||
#include "mongoose.h"
|
||||
|
||||
#define LISTENING_ADDR "http://localhost:8000"
|
||||
|
||||
struct thread_data {
|
||||
struct mg_queue queue; // Worker -> Connection queue
|
||||
struct mg_str body; // Copy of message body
|
||||
};
|
||||
|
||||
// These two helper UDP connections are used to wake up mongoose thread
|
||||
static struct mg_connection *s_wakeup_server;
|
||||
static struct mg_connection *s_wakeup_client;
|
||||
#define WAKEUP_URL "udp://127.0.0.1:40111"
|
||||
|
||||
static void start_thread(void *(*f)(void *), void *p) {
|
||||
#ifdef _WIN32
|
||||
#define usleep(x) Sleep((x) / 1000)
|
||||
_beginthread((void(__cdecl *)(void *)) f, 0, p);
|
||||
#else
|
||||
#define closesocket(x) close(x)
|
||||
#include <pthread.h>
|
||||
pthread_t thread_id = (pthread_t) 0;
|
||||
pthread_attr_t attr;
|
||||
@ -37,72 +23,53 @@ static void start_thread(void *(*f)(void *), void *p) {
|
||||
#endif
|
||||
}
|
||||
|
||||
static void *worker_thread(void *param) {
|
||||
struct thread_data *d = (struct thread_data *) param;
|
||||
char buf[100]; // On-stack buffer for the message queue
|
||||
struct thread_data {
|
||||
struct mg_mgr *mgr;
|
||||
unsigned long conn_id; // Parent connection ID
|
||||
struct mg_str message; // Original HTTP request
|
||||
};
|
||||
|
||||
mg_queue_init(&d->queue, buf, sizeof(buf)); // Init queue
|
||||
usleep(1 * 1000 * 1000); // Simulate long execution time
|
||||
|
||||
// Send a response to the connection
|
||||
if (d->body.len == 0) {
|
||||
mg_queue_printf(&d->queue, "Send me POST data");
|
||||
} else {
|
||||
uint32_t crc = mg_crc32(0, d->body.ptr, d->body.len);
|
||||
mg_queue_printf(&d->queue, "crc32: %#x", crc);
|
||||
free((char *) d->body.ptr);
|
||||
}
|
||||
|
||||
// Wake up Mongoose thread by sending something to one of its connections
|
||||
mg_send(s_wakeup_client, "hi", 2);
|
||||
|
||||
// Wait until connection reads our message, then it is safe to quit
|
||||
while (d->queue.tail != d->queue.head) usleep(1000);
|
||||
MG_INFO(("done, cleaning up..."));
|
||||
free(d);
|
||||
static void *thread_function(void *param) {
|
||||
struct thread_data *p = (struct thread_data *) param;
|
||||
sleep(2); // Simulate long execution
|
||||
mg_wakeup(p->mgr, p->conn_id, "hi!", 3); // Respond to parent
|
||||
free((void *) p->message.ptr); // Free all resources that were
|
||||
free(p); // passed to us
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static void wfn(struct mg_connection *c, int ev, void *ev_data, void *fn_data) {
|
||||
if (ev == MG_EV_READ) {
|
||||
c->recv.len = 0; // Discard received data
|
||||
}
|
||||
(void) ev_data, (void) fn_data;
|
||||
}
|
||||
|
||||
// HTTP request callback
|
||||
static void fn(struct mg_connection *c, int ev, void *ev_data, void *fn_data) {
|
||||
if (ev == MG_EV_HTTP_MSG) {
|
||||
// Received HTTP request. Allocate thread data and spawn a worker thread
|
||||
struct mg_http_message *hm = (struct mg_http_message *) ev_data;
|
||||
struct thread_data *d = (struct thread_data *) calloc(1, sizeof(*d));
|
||||
d->body = mg_strdup(hm->body); // Pass received body to the worker
|
||||
start_thread(worker_thread, d); // Start a thread
|
||||
*(void **) c->data = d; // Memorise data pointer in c->data
|
||||
} else if (ev == MG_EV_POLL) {
|
||||
// Poll event. Delivered to us every mg_mgr_poll interval or faster
|
||||
struct thread_data *d = *(struct thread_data **) c->data;
|
||||
size_t len;
|
||||
char *buf;
|
||||
// Check if we have a message from the worker
|
||||
if (d != NULL && (len = mg_queue_next(&d->queue, &buf)) > 0) {
|
||||
// Got message from worker. Send a response and cleanup
|
||||
mg_http_reply(c, 200, "", "%.*s\n", (int) len, buf);
|
||||
mg_queue_del(&d->queue, len); // Delete message
|
||||
*(void **) c->data = NULL; // Forget about thread data
|
||||
if (mg_http_match_uri(hm, "/fast")) {
|
||||
// Single-threaded code path, for performance comparison
|
||||
// The /fast URI responds immediately
|
||||
mg_http_reply(c, 200, "Host: foo.com\r\n", "hi\n");
|
||||
} else {
|
||||
// Multithreading code path
|
||||
struct thread_data *data = calloc(1, sizeof(*data)); // Worker owns it
|
||||
data->message = mg_strdup(hm->message); // Pass message
|
||||
data->conn_id = c->id;
|
||||
data->mgr = c->mgr;
|
||||
start_thread(thread_function, data); // Start thread and pass data
|
||||
}
|
||||
} else if (ev == MG_EV_WAKEUP) {
|
||||
struct mg_str *data = (struct mg_str *) ev_data;
|
||||
mg_http_reply(c, 200, "", "Result: %.*s\n", data->len, data->ptr);
|
||||
}
|
||||
(void) fn_data;
|
||||
}
|
||||
|
||||
int main(void) {
|
||||
struct mg_mgr mgr;
|
||||
mg_mgr_init(&mgr);
|
||||
mg_mgr_init(&mgr); // Initialise event manager
|
||||
mg_log_set(MG_LL_DEBUG); // Set debug log level
|
||||
mg_http_listen(&mgr, LISTENING_ADDR, fn, NULL);
|
||||
s_wakeup_server = mg_listen(&mgr, WAKEUP_URL, wfn, NULL);
|
||||
s_wakeup_client = mg_connect(&mgr, WAKEUP_URL, wfn, NULL);
|
||||
for (;;) mg_mgr_poll(&mgr, 5000); // Event loop. Use 5s poll interval
|
||||
mg_mgr_free(&mgr); // Cleanup
|
||||
mg_http_listen(&mgr, "http://localhost:8000", fn, NULL); // Create listener
|
||||
mg_wakeup_init(&mgr); // Initialise wakeup socket pair
|
||||
for (;;) { // Event loop
|
||||
mg_mgr_poll(&mgr, 1000);
|
||||
}
|
||||
mg_mgr_free(&mgr);
|
||||
return 0;
|
||||
}
|
||||
|
87
mongoose.c
87
mongoose.c
@ -4466,6 +4466,7 @@ void mg_mgr_init(struct mg_mgr *mgr) {
|
||||
// won't kill the whole process.
|
||||
signal(SIGPIPE, SIG_IGN);
|
||||
#endif
|
||||
mgr->pipe = MG_INVALID_SOCKET;
|
||||
mgr->dnstimeout = 3000;
|
||||
mgr->dns4.url = "udp://8.8.8.8:53";
|
||||
mgr->dns6.url = "udp://[2001:4860:4860::8888]:53";
|
||||
@ -7261,6 +7262,92 @@ static void mg_iotest(struct mg_mgr *mgr, int ms) {
|
||||
#endif
|
||||
}
|
||||
|
||||
static bool mg_socketpair(MG_SOCKET_TYPE sp[2], union usa usa[2]) {
|
||||
socklen_t n = sizeof(usa[0].sin);
|
||||
bool success = false;
|
||||
|
||||
sp[0] = sp[1] = MG_INVALID_SOCKET;
|
||||
(void) memset(&usa[0], 0, sizeof(usa[0]));
|
||||
usa[0].sin.sin_family = AF_INET;
|
||||
*(uint32_t *) &usa->sin.sin_addr = mg_htonl(0x7f000001U); // 127.0.0.1
|
||||
usa[1] = usa[0];
|
||||
|
||||
if ((sp[0] = socket(AF_INET, SOCK_DGRAM, 0)) != MG_INVALID_SOCKET &&
|
||||
(sp[1] = socket(AF_INET, SOCK_DGRAM, 0)) != MG_INVALID_SOCKET &&
|
||||
bind(sp[0], &usa[0].sa, n) == 0 && //
|
||||
bind(sp[1], &usa[1].sa, n) == 0 && //
|
||||
getsockname(sp[0], &usa[0].sa, &n) == 0 && //
|
||||
getsockname(sp[1], &usa[1].sa, &n) == 0 && //
|
||||
connect(sp[0], &usa[1].sa, n) == 0 && //
|
||||
connect(sp[1], &usa[0].sa, n) == 0) { //
|
||||
success = true;
|
||||
}
|
||||
if (!success) {
|
||||
if (sp[0] != MG_INVALID_SOCKET) closesocket(sp[0]);
|
||||
if (sp[1] != MG_INVALID_SOCKET) closesocket(sp[1]);
|
||||
sp[0] = sp[1] = MG_INVALID_SOCKET;
|
||||
}
|
||||
return success;
|
||||
}
|
||||
|
||||
// mg_wakeup() event handler
|
||||
static void wufn(struct mg_connection *c, int ev, void *evd, void *fnd) {
|
||||
if (ev == MG_EV_READ) {
|
||||
unsigned long *id = (unsigned long *) c->recv.buf;
|
||||
// MG_INFO(("Got data"));
|
||||
// mg_hexdump(c->recv.buf, c->recv.len);
|
||||
if (c->recv.len >= sizeof(*id)) {
|
||||
struct mg_connection *t;
|
||||
for (t = c->mgr->conns; t != NULL; t = t->next) {
|
||||
if (t->id == *id) {
|
||||
struct mg_str data = mg_str_n((char *) c->recv.buf + sizeof(*id),
|
||||
c->recv.len - sizeof(*id));
|
||||
mg_call(t, MG_EV_WAKEUP, &data);
|
||||
}
|
||||
}
|
||||
}
|
||||
c->recv.len = 0; // Consume received data
|
||||
} else if (ev == MG_EV_CLOSE) {
|
||||
closesocket(c->mgr->pipe); // When we're closing, close the other
|
||||
c->mgr->pipe = MG_INVALID_SOCKET; // side of the socketpair, too
|
||||
}
|
||||
(void) evd, (void) fnd;
|
||||
}
|
||||
|
||||
bool mg_wakeup_init(struct mg_mgr *mgr) {
|
||||
bool ok = false;
|
||||
if (mgr->pipe == MG_INVALID_SOCKET) {
|
||||
union usa usa[2];
|
||||
MG_SOCKET_TYPE sp[2] = {MG_INVALID_SOCKET, MG_INVALID_SOCKET};
|
||||
struct mg_connection *c = NULL;
|
||||
if (!mg_socketpair(sp, usa)) {
|
||||
MG_ERROR(("Cannot create socket pair"));
|
||||
} else if ((c = mg_wrapfd(mgr, (int) sp[1], wufn, NULL)) == NULL) {
|
||||
closesocket(sp[0]);
|
||||
closesocket(sp[1]);
|
||||
sp[0] = sp[1] = MG_INVALID_SOCKET;
|
||||
} else {
|
||||
tomgaddr(&usa[0], &c->rem, false);
|
||||
MG_DEBUG(("%lu %p pipe %lu", c->id, c->fd, (unsigned long) sp[0]));
|
||||
mgr->pipe = sp[0];
|
||||
ok = true;
|
||||
}
|
||||
}
|
||||
return ok;
|
||||
}
|
||||
|
||||
bool mg_wakeup(struct mg_mgr *mgr, unsigned long conn_id, const void *buf,
|
||||
size_t len) {
|
||||
if (mgr->pipe != MG_INVALID_SOCKET && conn_id > 0) {
|
||||
char *extended_buf = (char *) alloca(len + sizeof(conn_id));
|
||||
memcpy(extended_buf, &conn_id, sizeof(conn_id));
|
||||
memcpy(extended_buf + sizeof(conn_id), buf, len);
|
||||
send(mgr->pipe, extended_buf, len + sizeof(conn_id), MSG_NONBLOCKING);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
void mg_mgr_poll(struct mg_mgr *mgr, int ms) {
|
||||
struct mg_connection *c, *tmp;
|
||||
uint64_t now;
|
||||
|
@ -2119,6 +2119,7 @@ enum {
|
||||
MG_EV_MQTT_MSG, // MQTT PUBLISH received struct mg_mqtt_message *
|
||||
MG_EV_MQTT_OPEN, // MQTT CONNACK received int *connack_status_code
|
||||
MG_EV_SNTP_TIME, // SNTP time received uint64_t *epoch_millis
|
||||
MG_EV_WAKEUP, // mg_wakeup() data received struct mg_str *data
|
||||
MG_EV_USER // Starting ID for user events
|
||||
};
|
||||
|
||||
@ -2158,6 +2159,7 @@ struct mg_mgr {
|
||||
int epoll_fd; // Used when MG_EPOLL_ENABLE=1
|
||||
void *priv; // Used by the MIP stack
|
||||
size_t extraconnsize; // Used by the MIP stack
|
||||
MG_SOCKET_TYPE pipe; // Socketpair end for mg_wakeup()
|
||||
#if MG_ENABLE_FREERTOS_TCP
|
||||
SocketSet_t ss; // NOTE(lsm): referenced from socket struct
|
||||
#endif
|
||||
@ -2222,6 +2224,8 @@ void mg_close_conn(struct mg_connection *c);
|
||||
bool mg_open_listener(struct mg_connection *c, const char *url);
|
||||
|
||||
// Utility functions
|
||||
bool mg_wakeup(struct mg_mgr *, unsigned long id, const void *buf, size_t len);
|
||||
bool mg_wakeup_init(struct mg_mgr *);
|
||||
struct mg_timer *mg_timer_add(struct mg_mgr *mgr, uint64_t milliseconds,
|
||||
unsigned flags, void (*fn)(void *), void *arg);
|
||||
|
||||
|
@ -25,5 +25,6 @@ enum {
|
||||
MG_EV_MQTT_MSG, // MQTT PUBLISH received struct mg_mqtt_message *
|
||||
MG_EV_MQTT_OPEN, // MQTT CONNACK received int *connack_status_code
|
||||
MG_EV_SNTP_TIME, // SNTP time received uint64_t *epoch_millis
|
||||
MG_EV_WAKEUP, // mg_wakeup() data received struct mg_str *data
|
||||
MG_EV_USER // Starting ID for user events
|
||||
};
|
||||
|
@ -271,6 +271,7 @@ void mg_mgr_init(struct mg_mgr *mgr) {
|
||||
// won't kill the whole process.
|
||||
signal(SIGPIPE, SIG_IGN);
|
||||
#endif
|
||||
mgr->pipe = MG_INVALID_SOCKET;
|
||||
mgr->dnstimeout = 3000;
|
||||
mgr->dns4.url = "udp://8.8.8.8:53";
|
||||
mgr->dns6.url = "udp://[2001:4860:4860::8888]:53";
|
||||
|
@ -35,6 +35,7 @@ struct mg_mgr {
|
||||
int epoll_fd; // Used when MG_EPOLL_ENABLE=1
|
||||
void *priv; // Used by the MIP stack
|
||||
size_t extraconnsize; // Used by the MIP stack
|
||||
MG_SOCKET_TYPE pipe; // Socketpair end for mg_wakeup()
|
||||
#if MG_ENABLE_FREERTOS_TCP
|
||||
SocketSet_t ss; // NOTE(lsm): referenced from socket struct
|
||||
#endif
|
||||
@ -99,5 +100,7 @@ void mg_close_conn(struct mg_connection *c);
|
||||
bool mg_open_listener(struct mg_connection *c, const char *url);
|
||||
|
||||
// Utility functions
|
||||
bool mg_wakeup(struct mg_mgr *, unsigned long id, const void *buf, size_t len);
|
||||
bool mg_wakeup_init(struct mg_mgr *);
|
||||
struct mg_timer *mg_timer_add(struct mg_mgr *mgr, uint64_t milliseconds,
|
||||
unsigned flags, void (*fn)(void *), void *arg);
|
||||
|
86
src/sock.c
86
src/sock.c
@ -589,6 +589,92 @@ static void mg_iotest(struct mg_mgr *mgr, int ms) {
|
||||
#endif
|
||||
}
|
||||
|
||||
static bool mg_socketpair(MG_SOCKET_TYPE sp[2], union usa usa[2]) {
|
||||
socklen_t n = sizeof(usa[0].sin);
|
||||
bool success = false;
|
||||
|
||||
sp[0] = sp[1] = MG_INVALID_SOCKET;
|
||||
(void) memset(&usa[0], 0, sizeof(usa[0]));
|
||||
usa[0].sin.sin_family = AF_INET;
|
||||
*(uint32_t *) &usa->sin.sin_addr = mg_htonl(0x7f000001U); // 127.0.0.1
|
||||
usa[1] = usa[0];
|
||||
|
||||
if ((sp[0] = socket(AF_INET, SOCK_DGRAM, 0)) != MG_INVALID_SOCKET &&
|
||||
(sp[1] = socket(AF_INET, SOCK_DGRAM, 0)) != MG_INVALID_SOCKET &&
|
||||
bind(sp[0], &usa[0].sa, n) == 0 && //
|
||||
bind(sp[1], &usa[1].sa, n) == 0 && //
|
||||
getsockname(sp[0], &usa[0].sa, &n) == 0 && //
|
||||
getsockname(sp[1], &usa[1].sa, &n) == 0 && //
|
||||
connect(sp[0], &usa[1].sa, n) == 0 && //
|
||||
connect(sp[1], &usa[0].sa, n) == 0) { //
|
||||
success = true;
|
||||
}
|
||||
if (!success) {
|
||||
if (sp[0] != MG_INVALID_SOCKET) closesocket(sp[0]);
|
||||
if (sp[1] != MG_INVALID_SOCKET) closesocket(sp[1]);
|
||||
sp[0] = sp[1] = MG_INVALID_SOCKET;
|
||||
}
|
||||
return success;
|
||||
}
|
||||
|
||||
// mg_wakeup() event handler
|
||||
static void wufn(struct mg_connection *c, int ev, void *evd, void *fnd) {
|
||||
if (ev == MG_EV_READ) {
|
||||
unsigned long *id = (unsigned long *) c->recv.buf;
|
||||
// MG_INFO(("Got data"));
|
||||
// mg_hexdump(c->recv.buf, c->recv.len);
|
||||
if (c->recv.len >= sizeof(*id)) {
|
||||
struct mg_connection *t;
|
||||
for (t = c->mgr->conns; t != NULL; t = t->next) {
|
||||
if (t->id == *id) {
|
||||
struct mg_str data = mg_str_n((char *) c->recv.buf + sizeof(*id),
|
||||
c->recv.len - sizeof(*id));
|
||||
mg_call(t, MG_EV_WAKEUP, &data);
|
||||
}
|
||||
}
|
||||
}
|
||||
c->recv.len = 0; // Consume received data
|
||||
} else if (ev == MG_EV_CLOSE) {
|
||||
closesocket(c->mgr->pipe); // When we're closing, close the other
|
||||
c->mgr->pipe = MG_INVALID_SOCKET; // side of the socketpair, too
|
||||
}
|
||||
(void) evd, (void) fnd;
|
||||
}
|
||||
|
||||
bool mg_wakeup_init(struct mg_mgr *mgr) {
|
||||
bool ok = false;
|
||||
if (mgr->pipe == MG_INVALID_SOCKET) {
|
||||
union usa usa[2];
|
||||
MG_SOCKET_TYPE sp[2] = {MG_INVALID_SOCKET, MG_INVALID_SOCKET};
|
||||
struct mg_connection *c = NULL;
|
||||
if (!mg_socketpair(sp, usa)) {
|
||||
MG_ERROR(("Cannot create socket pair"));
|
||||
} else if ((c = mg_wrapfd(mgr, (int) sp[1], wufn, NULL)) == NULL) {
|
||||
closesocket(sp[0]);
|
||||
closesocket(sp[1]);
|
||||
sp[0] = sp[1] = MG_INVALID_SOCKET;
|
||||
} else {
|
||||
tomgaddr(&usa[0], &c->rem, false);
|
||||
MG_DEBUG(("%lu %p pipe %lu", c->id, c->fd, (unsigned long) sp[0]));
|
||||
mgr->pipe = sp[0];
|
||||
ok = true;
|
||||
}
|
||||
}
|
||||
return ok;
|
||||
}
|
||||
|
||||
bool mg_wakeup(struct mg_mgr *mgr, unsigned long conn_id, const void *buf,
|
||||
size_t len) {
|
||||
if (mgr->pipe != MG_INVALID_SOCKET && conn_id > 0) {
|
||||
char *extended_buf = (char *) alloca(len + sizeof(conn_id));
|
||||
memcpy(extended_buf, &conn_id, sizeof(conn_id));
|
||||
memcpy(extended_buf + sizeof(conn_id), buf, len);
|
||||
send(mgr->pipe, extended_buf, len + sizeof(conn_id), MSG_NONBLOCKING);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
void mg_mgr_poll(struct mg_mgr *mgr, int ms) {
|
||||
struct mg_connection *c, *tmp;
|
||||
uint64_t now;
|
||||
|
Loading…
x
Reference in New Issue
Block a user