From f19eec7fae0279b98e00f9e9b6843e136ae58a46 Mon Sep 17 00:00:00 2001 From: Sergey Lyubka Date: Fri, 22 Apr 2022 14:42:07 +0100 Subject: [PATCH] Refactor multithreading API --- docs/README.md | 51 +++++++++++--------- examples/multi-threaded/main.c | 54 +++++++++++++-------- mongoose.c | 86 +++++++++++++++++----------------- mongoose.h | 6 +-- src/net.c | 13 +++++ src/net.h | 6 +-- src/sock.c | 73 ++++++++++++----------------- test/mongoose_custom.c | 12 ++--- test/unit_test.c | 15 ++++-- 9 files changed, 167 insertions(+), 149 deletions(-) diff --git a/docs/README.md b/docs/README.md index 7ebd5984..5bbec5ff 100644 --- a/docs/README.md +++ b/docs/README.md @@ -677,38 +677,45 @@ char buf[100]; LOG(LL_INFO, ("%s", mg_straddr(&c->peer, buf, sizeof(buf)))); ``` -### mg\_mkpipe() +### mg\_wrapfd() ```c -struct mg_connection *mg_mkpipe(struct mg_mgr *, mg_event_handler_t, void *); +struct mg_connection *mg_wrapfd(struct mg_mgr *mgr, int fd, + mg_event_handler_t fn, void *fn_data); ``` -Create a "pipe" connection which is safe to pass to a different task/thread -and used to wake up event manager from a different task. These -functions are designed to implement multi-threaded support, to handle two -common use cases: - -- There are multiple consumer connections, e.g. connected websocket clients. - A server constantly pushes some data to all of them. In this case, a data - producer task should call `mg_mgr_wakeup()` as soon as more data is produced. - A pipe's event handler should push data to all client connection. - Use `c->label` to mark client connections. -- In order to serve a request, a long blocking operation should be performed. - In this case, request handler assigns some marker to `c->label` and then - spawns a handler task and gives a pipe to a - handler task. A handler does its job, and when data is ready, wakes up a - manager. A pipe's event handler pushes data to a marked connection. - -Another task can wake up a sleeping event manager (in `mg_mgr_poll()` call) -using `mg_mgr_wakeup()`. When an event manager is woken up, a pipe -connection event handler function receives `MG_EV_READ` event. +Wrap a given file descriptor `fd` into a connection, and add that connection +to the event manager. An `fd` descriptor must suport `send()`, `recv()`, +`select()` syscalls, and be non-blocking. Mongoose will treat it as a TCP +socket. The `c->rem` and `c->loc` addresses will be empty. Parameters: +- `fd` - A file descriptor to wrap - `mgr` - An event manager - `fn` - A pointer to event handler function - `ud` - A user data pointer. It will be passed to `fn` as `fn_data` parameter -Return value: Pointer to created connection or `NULL` in case of error +Return value: Pointer to the created connection or `NULL` in case of error + +### mg\_mkpipe() + +```c +int mg_mkpipe(struct mg_mgr *mgr, mg_event_handler_t fn, void *fn_data); +``` + +Create two interconnected sockets for inter-thread communication. One socket +is wrapped into a Mongoose connection and is added to the event manager. +Another socket is returned, and supposed to be passed to a worker thread. +When a worker thread `send()`s to socket any data, that wakes up `mgr` and +`fn` event handler reveives `MG_EV_READ` event. Also, `fn` can send any +data to a worker thread, which can be `recv()`ed by a worker thread. + +Parameters: +- `mgr` - An event manager +- `fn` - A pointer to event handler function +- `fn_data` - A user data pointer. It will be passed to `fn` as `fn_data` parameter + +Return value: created socket, or `-1` on error Usage example: see [examples/multi-threaded](https://github.com/cesanta/mongoose/tree/master/examples/multi-threaded). diff --git a/examples/multi-threaded/main.c b/examples/multi-threaded/main.c index d81faa9a..0daeb43e 100644 --- a/examples/multi-threaded/main.c +++ b/examples/multi-threaded/main.c @@ -35,9 +35,36 @@ static void start_thread(void (*f)(void *), void *p) { } static void thread_function(void *param) { - struct mg_connection *c = param; // Pipe connection + int sock = (int) (size_t) param; // Paired socket. We own it sleep(2); // Simulate long execution - mg_mgr_wakeup(c, "hi", 2); // Wakeup event manager + send(sock, "hi", 2, 2); // Wakeup event manager + close(sock); // Close the connection +} + +static void link_conns(struct mg_connection *c1, struct mg_connection *c2) { + c1->fn_data = c2; + c2->fn_data = c1; +} + +static void unlink_conns(struct mg_connection *c1, struct mg_connection *c2) { + c1->fn_data = c2->fn_data = NULL; +} + +// Pipe event handler +static void pcb(struct mg_connection *c, int ev, void *ev_data, void *fn_data) { + struct mg_connection *parent = (struct mg_connection *) fn_data; + MG_INFO(("%lu %p %d %p", c->id, c->fd, ev, parent)); + if (parent == NULL) { // If parent connection closed, close too + c->is_closing = 1; + } else if (ev == MG_EV_READ) { // Got data from the worker thread + mg_http_reply(parent, 200, "Host: foo.com\r\n", "%.*s\n", c->recv.len, + c->recv.buf); // Respond! + c->recv.len = 0; // Tell Mongoose we've consumed data + } else if (ev == MG_EV_OPEN) { + link_conns(c, parent); + } else if (ev == MG_EV_CLOSE) { + unlink_conns(c, parent); + } } // HTTP request callback @@ -50,32 +77,19 @@ static void fn(struct mg_connection *c, int ev, void *ev_data, void *fn_data) { mg_http_reply(c, 200, "Host: foo.com\r\n", "hi\n"); } else { // Multithreading code path - c->label[0] = 'W'; // Mark us as waiting for data - start_thread(thread_function, fn_data); // Start handling thread - } - } -} - -// Pipe event handler -static void pcb(struct mg_connection *c, int ev, void *ev_data, void *fn_data) { - if (ev == MG_EV_READ) { - struct mg_connection *t; - for (t = c->mgr->conns; t != NULL; t = t->next) { - if (t->label[0] != 'W') continue; // Ignore un-marked connections - mg_http_reply(t, 200, "Host: foo.com\r\n", "%.*s\n", c->recv.len, - c->recv.buf); // Respond! - t->label[0] = 0; // Clear mark + int sock = mg_mkpipe(c->mgr, pcb, c); // Create pipe + start_thread(thread_function, (void *) (size_t) sock); // Start thread } + } else if (ev == MG_EV_CLOSE) { + if (c->fn_data != NULL) unlink_conns(c, c->fn_data); } } int main(void) { struct mg_mgr mgr; - struct mg_connection *pipe; // Used to wake up event manager mg_mgr_init(&mgr); mg_log_set("3"); - pipe = mg_mkpipe(&mgr, pcb, NULL); // Create pipe - mg_http_listen(&mgr, "http://localhost:8000", fn, pipe); // Create listener + mg_http_listen(&mgr, "http://localhost:8000", fn, NULL); // Create listener for (;;) mg_mgr_poll(&mgr, 1000); // Event loop mg_mgr_free(&mgr); // Cleanup return 0; diff --git a/mongoose.c b/mongoose.c index 5c6ddef9..baefd399 100644 --- a/mongoose.c +++ b/mongoose.c @@ -2753,6 +2753,19 @@ struct mg_connection *mg_listen(struct mg_mgr *mgr, const char *url, return c; } +struct mg_connection *mg_wrapfd(struct mg_mgr *mgr, int fd, + mg_event_handler_t fn, void *fn_data) { + struct mg_connection *c = mg_alloc_conn(mgr); + if (c != NULL) { + c->fd = (void *) (size_t) fd; + c->fn = fn; + c->fn_data = fn_data; + mg_call(c, MG_EV_OPEN, NULL); + LIST_ADD_HEAD(struct mg_connection, &mgr->conns, c); + } + return c; +} + struct mg_timer *mg_timer_add(struct mg_mgr *mgr, uint64_t milliseconds, unsigned flags, void (*fn)(void *), void *arg) { struct mg_timer *t = (struct mg_timer *) calloc(1, sizeof(*t)); @@ -3446,11 +3459,19 @@ void mg_connect_resolved(struct mg_connection *c) { MG_DEBUG(("%lu %p", c->id, c->fd)); } +static SOCKET raccept(SOCKET sock, union usa *usa, socklen_t len) { + SOCKET s = INVALID_SOCKET; + do { + s = accept(sock, &usa->sa, &len); + } while (s == INVALID_SOCKET && errno == EINTR); + return s; +} + static void accept_conn(struct mg_mgr *mgr, struct mg_connection *lsn) { struct mg_connection *c = NULL; union usa usa; socklen_t sa_len = sizeof(usa); - SOCKET fd = accept(FD(lsn), &usa.sa, &sa_len); + SOCKET fd = raccept(FD(lsn), &usa, sa_len); if (fd == INVALID_SOCKET) { #if MG_ARCH == MG_ARCH_AZURERTOS // AzureRTOS, in non-block socket mode can mark listening socket readable @@ -3489,71 +3510,48 @@ static void accept_conn(struct mg_mgr *mgr, struct mg_connection *lsn) { } static bool mg_socketpair(SOCKET sp[2], union usa usa[2]) { - socklen_t n = sizeof(usa[0].sin); - bool result = false; + SOCKET sock; + socklen_t len = sizeof(usa[0].sin); + bool success = false; + sock = sp[0] = sp[1] = INVALID_SOCKET; (void) memset(&usa[0], 0, sizeof(usa[0])); usa[0].sin.sin_family = AF_INET; - *(uint32_t *) &usa->sin.sin_addr = mg_htonl(0x7f000001); // 127.0.0.1 + *(uint32_t *) &usa->sin.sin_addr = mg_htonl(0x7f000001U); // 127.0.0.1 usa[1] = usa[0]; - if ((sp[0] = socket(AF_INET, SOCK_DGRAM, 0)) != INVALID_SOCKET && - (sp[1] = socket(AF_INET, SOCK_DGRAM, 0)) != INVALID_SOCKET && - bind(sp[0], &usa[0].sa, n) == 0 && bind(sp[1], &usa[1].sa, n) == 0 && - getsockname(sp[0], &usa[0].sa, &n) == 0 && - getsockname(sp[1], &usa[1].sa, &n) == 0 && - connect(sp[0], &usa[1].sa, n) == 0 && - connect(sp[1], &usa[0].sa, n) == 0) { - mg_set_non_blocking_mode(sp[1]); // Set close-on-exec - result = true; + if ((sock = socket(AF_INET, SOCK_STREAM, 0)) != INVALID_SOCKET && + bind(sock, &usa[0].sa, len) == 0 && listen(sock, 3) == 0 && + getsockname(sock, &usa[0].sa, &len) == 0 && + (sp[0] = socket(AF_INET, SOCK_STREAM, 0)) != INVALID_SOCKET && + connect(sp[0], &usa[0].sa, len) == 0 && + (sp[1] = raccept(sock, &usa[1], len)) != INVALID_SOCKET) { + mg_set_non_blocking_mode(sp[1]); + success = true; } else { if (sp[0] != INVALID_SOCKET) closesocket(sp[0]); if (sp[1] != INVALID_SOCKET) closesocket(sp[1]); sp[0] = sp[1] = INVALID_SOCKET; } - - return result; + if (sock != INVALID_SOCKET) closesocket(sock); + return success; } -bool mg_mgr_wakeup(struct mg_connection *c, const void *buf, size_t len) { - if (buf == NULL || len == 0) buf = (void *) "", len = 1; - return (size_t) send((SOCKET) (size_t) c->pfn_data, (const char *) buf, len, - MSG_NONBLOCKING) == len; -} - -static void pf1(struct mg_connection *c, int ev, void *ev_data, void *fn_data) { - if (ev == MG_EV_READ) { - mg_iobuf_free(&c->recv); - } else if (ev == MG_EV_CLOSE) { - closesocket((SOCKET) (size_t) c->pfn_data); - } - (void) ev_data, (void) fn_data; -} - -struct mg_connection *mg_mkpipe(struct mg_mgr *mgr, mg_event_handler_t fn, - void *fn_data) { +int mg_mkpipe(struct mg_mgr *mgr, mg_event_handler_t fn, void *fn_data) { union usa usa[2]; SOCKET sp[2] = {INVALID_SOCKET, INVALID_SOCKET}; struct mg_connection *c = NULL; if (!mg_socketpair(sp, usa)) { MG_ERROR(("Cannot create socket pair")); - } else if ((c = mg_alloc_conn(mgr)) == NULL) { + } else if ((c = mg_wrapfd(mgr, (int) sp[1], fn, fn_data)) == NULL) { closesocket(sp[0]); closesocket(sp[1]); - MG_ERROR(("OOM")); + sp[0] = sp[1] = INVALID_SOCKET; } else { - MG_DEBUG(("pipe %lu", (unsigned long) sp[0])); tomgaddr(&usa[0], &c->rem, false); - c->fd = S2PTR(sp[1]); - c->is_udp = 1; - c->pfn = pf1; - c->pfn_data = (void *) (size_t) sp[0]; - c->fn = fn; - c->fn_data = fn_data; - mg_call(c, MG_EV_OPEN, NULL); - LIST_ADD_HEAD(struct mg_connection, &mgr->conns, c); + MG_DEBUG(("%lu %p pipe %lu", c->id, c->fd, (unsigned long) sp[0])); } - return c; + return (int) sp[0]; } static void mg_iotest(struct mg_mgr *mgr, int ms) { diff --git a/mongoose.h b/mongoose.h index fd45defb..47349aa7 100644 --- a/mongoose.h +++ b/mongoose.h @@ -948,6 +948,8 @@ struct mg_connection *mg_listen(struct mg_mgr *, const char *url, mg_event_handler_t fn, void *fn_data); struct mg_connection *mg_connect(struct mg_mgr *, const char *url, mg_event_handler_t fn, void *fn_data); +struct mg_connection *mg_wrapfd(struct mg_mgr *mgr, int fd, + mg_event_handler_t fn, void *fn_data); void mg_connect_resolved(struct mg_connection *); bool mg_send(struct mg_connection *, const void *, size_t); size_t mg_printf(struct mg_connection *, const char *fmt, ...); @@ -955,9 +957,7 @@ size_t mg_vprintf(struct mg_connection *, const char *fmt, va_list ap); char *mg_straddr(struct mg_addr *, char *, size_t); bool mg_aton(struct mg_str str, struct mg_addr *addr); char *mg_ntoa(const struct mg_addr *addr, char *buf, size_t len); - -struct mg_connection *mg_mkpipe(struct mg_mgr *, mg_event_handler_t, void *); -bool mg_mgr_wakeup(struct mg_connection *pipe, const void *buf, size_t len); +int mg_mkpipe(struct mg_mgr *, mg_event_handler_t, void *); // These functions are used to integrate with custom network stacks struct mg_connection *mg_alloc_conn(struct mg_mgr *); diff --git a/src/net.c b/src/net.c index 41679bb2..93d81087 100644 --- a/src/net.c +++ b/src/net.c @@ -205,6 +205,19 @@ struct mg_connection *mg_listen(struct mg_mgr *mgr, const char *url, return c; } +struct mg_connection *mg_wrapfd(struct mg_mgr *mgr, int fd, + mg_event_handler_t fn, void *fn_data) { + struct mg_connection *c = mg_alloc_conn(mgr); + if (c != NULL) { + c->fd = (void *) (size_t) fd; + c->fn = fn; + c->fn_data = fn_data; + mg_call(c, MG_EV_OPEN, NULL); + LIST_ADD_HEAD(struct mg_connection, &mgr->conns, c); + } + return c; +} + struct mg_timer *mg_timer_add(struct mg_mgr *mgr, uint64_t milliseconds, unsigned flags, void (*fn)(void *), void *arg) { struct mg_timer *t = (struct mg_timer *) calloc(1, sizeof(*t)); diff --git a/src/net.h b/src/net.h index bdb23535..e1b46964 100644 --- a/src/net.h +++ b/src/net.h @@ -73,6 +73,8 @@ struct mg_connection *mg_listen(struct mg_mgr *, const char *url, mg_event_handler_t fn, void *fn_data); struct mg_connection *mg_connect(struct mg_mgr *, const char *url, mg_event_handler_t fn, void *fn_data); +struct mg_connection *mg_wrapfd(struct mg_mgr *mgr, int fd, + mg_event_handler_t fn, void *fn_data); void mg_connect_resolved(struct mg_connection *); bool mg_send(struct mg_connection *, const void *, size_t); size_t mg_printf(struct mg_connection *, const char *fmt, ...); @@ -80,9 +82,7 @@ size_t mg_vprintf(struct mg_connection *, const char *fmt, va_list ap); char *mg_straddr(struct mg_addr *, char *, size_t); bool mg_aton(struct mg_str str, struct mg_addr *addr); char *mg_ntoa(const struct mg_addr *addr, char *buf, size_t len); - -struct mg_connection *mg_mkpipe(struct mg_mgr *, mg_event_handler_t, void *); -bool mg_mgr_wakeup(struct mg_connection *pipe, const void *buf, size_t len); +int mg_mkpipe(struct mg_mgr *, mg_event_handler_t, void *); // These functions are used to integrate with custom network stacks struct mg_connection *mg_alloc_conn(struct mg_mgr *); diff --git a/src/sock.c b/src/sock.c index 287555f2..a4bec3e2 100644 --- a/src/sock.c +++ b/src/sock.c @@ -355,11 +355,19 @@ void mg_connect_resolved(struct mg_connection *c) { MG_DEBUG(("%lu %p", c->id, c->fd)); } +static SOCKET raccept(SOCKET sock, union usa *usa, socklen_t len) { + SOCKET s = INVALID_SOCKET; + do { + s = accept(sock, &usa->sa, &len); + } while (s == INVALID_SOCKET && errno == EINTR); + return s; +} + static void accept_conn(struct mg_mgr *mgr, struct mg_connection *lsn) { struct mg_connection *c = NULL; union usa usa; socklen_t sa_len = sizeof(usa); - SOCKET fd = accept(FD(lsn), &usa.sa, &sa_len); + SOCKET fd = raccept(FD(lsn), &usa, sa_len); if (fd == INVALID_SOCKET) { #if MG_ARCH == MG_ARCH_AZURERTOS // AzureRTOS, in non-block socket mode can mark listening socket readable @@ -398,71 +406,48 @@ static void accept_conn(struct mg_mgr *mgr, struct mg_connection *lsn) { } static bool mg_socketpair(SOCKET sp[2], union usa usa[2]) { - socklen_t n = sizeof(usa[0].sin); - bool result = false; + SOCKET sock; + socklen_t len = sizeof(usa[0].sin); + bool success = false; + sock = sp[0] = sp[1] = INVALID_SOCKET; (void) memset(&usa[0], 0, sizeof(usa[0])); usa[0].sin.sin_family = AF_INET; - *(uint32_t *) &usa->sin.sin_addr = mg_htonl(0x7f000001); // 127.0.0.1 + *(uint32_t *) &usa->sin.sin_addr = mg_htonl(0x7f000001U); // 127.0.0.1 usa[1] = usa[0]; - if ((sp[0] = socket(AF_INET, SOCK_DGRAM, 0)) != INVALID_SOCKET && - (sp[1] = socket(AF_INET, SOCK_DGRAM, 0)) != INVALID_SOCKET && - bind(sp[0], &usa[0].sa, n) == 0 && bind(sp[1], &usa[1].sa, n) == 0 && - getsockname(sp[0], &usa[0].sa, &n) == 0 && - getsockname(sp[1], &usa[1].sa, &n) == 0 && - connect(sp[0], &usa[1].sa, n) == 0 && - connect(sp[1], &usa[0].sa, n) == 0) { - mg_set_non_blocking_mode(sp[1]); // Set close-on-exec - result = true; + if ((sock = socket(AF_INET, SOCK_STREAM, 0)) != INVALID_SOCKET && + bind(sock, &usa[0].sa, len) == 0 && listen(sock, 3) == 0 && + getsockname(sock, &usa[0].sa, &len) == 0 && + (sp[0] = socket(AF_INET, SOCK_STREAM, 0)) != INVALID_SOCKET && + connect(sp[0], &usa[0].sa, len) == 0 && + (sp[1] = raccept(sock, &usa[1], len)) != INVALID_SOCKET) { + mg_set_non_blocking_mode(sp[1]); + success = true; } else { if (sp[0] != INVALID_SOCKET) closesocket(sp[0]); if (sp[1] != INVALID_SOCKET) closesocket(sp[1]); sp[0] = sp[1] = INVALID_SOCKET; } - - return result; + if (sock != INVALID_SOCKET) closesocket(sock); + return success; } -bool mg_mgr_wakeup(struct mg_connection *c, const void *buf, size_t len) { - if (buf == NULL || len == 0) buf = (void *) "", len = 1; - return (size_t) send((SOCKET) (size_t) c->pfn_data, (const char *) buf, len, - MSG_NONBLOCKING) == len; -} - -static void pf1(struct mg_connection *c, int ev, void *ev_data, void *fn_data) { - if (ev == MG_EV_READ) { - mg_iobuf_free(&c->recv); - } else if (ev == MG_EV_CLOSE) { - closesocket((SOCKET) (size_t) c->pfn_data); - } - (void) ev_data, (void) fn_data; -} - -struct mg_connection *mg_mkpipe(struct mg_mgr *mgr, mg_event_handler_t fn, - void *fn_data) { +int mg_mkpipe(struct mg_mgr *mgr, mg_event_handler_t fn, void *fn_data) { union usa usa[2]; SOCKET sp[2] = {INVALID_SOCKET, INVALID_SOCKET}; struct mg_connection *c = NULL; if (!mg_socketpair(sp, usa)) { MG_ERROR(("Cannot create socket pair")); - } else if ((c = mg_alloc_conn(mgr)) == NULL) { + } else if ((c = mg_wrapfd(mgr, (int) sp[1], fn, fn_data)) == NULL) { closesocket(sp[0]); closesocket(sp[1]); - MG_ERROR(("OOM")); + sp[0] = sp[1] = INVALID_SOCKET; } else { - MG_DEBUG(("pipe %lu", (unsigned long) sp[0])); tomgaddr(&usa[0], &c->rem, false); - c->fd = S2PTR(sp[1]); - c->is_udp = 1; - c->pfn = pf1; - c->pfn_data = (void *) (size_t) sp[0]; - c->fn = fn; - c->fn_data = fn_data; - mg_call(c, MG_EV_OPEN, NULL); - LIST_ADD_HEAD(struct mg_connection, &mgr->conns, c); + MG_DEBUG(("%lu %p pipe %lu", c->id, c->fd, (unsigned long) sp[0])); } - return c; + return (int) sp[0]; } static void mg_iotest(struct mg_mgr *mgr, int ms) { diff --git a/test/mongoose_custom.c b/test/mongoose_custom.c index b72e9ae8..0fdecb3f 100644 --- a/test/mongoose_custom.c +++ b/test/mongoose_custom.c @@ -20,18 +20,12 @@ void mg_mgr_poll(struct mg_mgr *mgr, int ms) { bool mg_send(struct mg_connection *c, const void *buf, size_t len) { (void) c, (void) buf, (void) len; - return 0; + return false; } -bool mg_mgr_wakeup(struct mg_connection *c, const void *buf, size_t len) { - (void) c, (void) buf, (void) len; - return 0; -} - -struct mg_connection *mg_mkpipe(struct mg_mgr *mgr, mg_event_handler_t fn, - void *fn_data) { +int mg_mkpipe(struct mg_mgr *mgr, mg_event_handler_t fn, void *fn_data) { (void) mgr, (void) fn, (void) fn_data; - return NULL; + return -1; } void _fini(void); diff --git a/test/unit_test.c b/test/unit_test.c index 2c425555..9a7e0bdb 100644 --- a/test/unit_test.c +++ b/test/unit_test.c @@ -1720,13 +1720,20 @@ static void eh6(struct mg_connection *c, int ev, void *ev_data, void *fn_data) { (void) c, (void) ev_data; } +#ifdef __arm__ +int send(int sock, const void *buf, size_t len, int flags); +int send(int sock, const void *buf, size_t len, int flags) { + (void) sock, (void) buf, (void) len, (void) flags; + return -1; +} +#endif + static void test_pipe(void) { struct mg_mgr mgr; - struct mg_connection *c; - int i, done = 0; + int i, sock, done = 0; mg_mgr_init(&mgr); - ASSERT((c = mg_mkpipe(&mgr, eh6, (void *) &done)) != NULL); - mg_mgr_wakeup(c, "", 1); + ASSERT((sock = mg_mkpipe(&mgr, eh6, (void *) &done)) >= 0); + ASSERT(send(sock, "hi", 2, 0) == 2); for (i = 0; i < 10 && done == 0; i++) mg_mgr_poll(&mgr, 1); ASSERT(done == 1); mg_mgr_free(&mgr);