From 0763146254c53a950faaefe4547635cf26aa1cf8 Mon Sep 17 00:00:00 2001 From: Sergey Lyubka Date: Fri, 3 Jun 2022 11:37:35 +0100 Subject: [PATCH] Fix #1476 - introduce c->is_full to cap the recv buf len for streaming connections --- docs/README.md | 2 ++ mongoose.c | 42 ++++++++++++++++++++++++++---------------- mongoose.h | 1 + src/net.h | 1 + src/sock.c | 42 ++++++++++++++++++++++++++---------------- 5 files changed, 56 insertions(+), 32 deletions(-) diff --git a/docs/README.md b/docs/README.md index 78818275..a1d5da96 100644 --- a/docs/README.md +++ b/docs/README.md @@ -183,6 +183,7 @@ struct mg_connection { unsigned is_hexdumping : 1; // Hexdump in/out traffic unsigned is_draining : 1; // Send remaining data, then close and free unsigned is_closing : 1; // Close and free the connection immediately + unsigned is_full : 1; // Stop reads, until cleared unsigned is_readable : 1; // Connection is ready to read unsigned is_writable : 1; // Connection is ready to write }; @@ -472,6 +473,7 @@ struct mg_connection { unsigned is_hexdumping : 1; // Hexdump in/out traffic unsigned is_draining : 1; // Send remaining data, then close and free unsigned is_closing : 1; // Close and free the connection immediately + unsigned is_full : 1; // Stop reads, until cleared unsigned is_readable : 1; // Connection is ready to read unsigned is_writable : 1; // Connection is ready to write }; diff --git a/mongoose.c b/mongoose.c index c10c9423..dfcac0e6 100644 --- a/mongoose.c +++ b/mongoose.c @@ -4222,7 +4222,7 @@ static void read_conn(struct mg_connection *c) { long n = -1; if (c->recv.len >= MG_MAX_RECV_SIZE) { mg_error(c, "max_recv_buf_size reached"); - } else if (c->recv.size - c->recv.len < MG_IO_SIZE && + } else if (c->recv.size <= c->recv.len && !mg_iobuf_resize(&c->recv, c->recv.size + MG_IO_SIZE)) { mg_error(c, "oom"); } else { @@ -4411,14 +4411,27 @@ int mg_mkpipe(struct mg_mgr *mgr, mg_event_handler_t fn, void *fn_data, return (int) sp[0]; } +static bool can_read(const struct mg_connection *c) { + return c->is_full == false; +} + +static bool can_write(const struct mg_connection *c) { + return c->is_connecting || (c->send.len > 0 && c->is_tls_hs == 0); +} + +static bool skip_iotest(const struct mg_connection *c) { + return (c->is_closing || c->is_resolving || FD(c) == INVALID_SOCKET) || + (can_read(c) == false && can_write(c) == false); +} + static void mg_iotest(struct mg_mgr *mgr, int ms) { #if MG_ARCH == MG_ARCH_FREERTOS_TCP struct mg_connection *c; for (c = mgr->conns; c != NULL; c = c->next) { - if (c->is_closing || c->is_resolving || FD(c) == INVALID_SOCKET) continue; - FreeRTOS_FD_SET(c->fd, mgr->ss, eSELECT_READ | eSELECT_EXCEPT); - if (c->is_connecting || (c->send.len > 0 && c->is_tls_hs == 0)) - FreeRTOS_FD_SET(c->fd, mgr->ss, eSELECT_WRITE); + if (skip_iotest(c)) continue; + if (can_read(c)) + FreeRTOS_FD_SET(c->fd, mgr->ss, eSELECT_READ | eSELECT_EXCEPT); + if (can_write(c)) FreeRTOS_FD_SET(c->fd, mgr->ss, eSELECT_WRITE); } FreeRTOS_select(mgr->ss, pdMS_TO_TICKS(ms)); for (c = mgr->conns; c != NULL; c = c->next) { @@ -4436,14 +4449,12 @@ static void mg_iotest(struct mg_mgr *mgr, int ms) { memset(fds, 0, sizeof(fds)); n = 0; for (struct mg_connection *c = mgr->conns; c != NULL; c = c->next) { - if (c->is_closing || c->is_resolving || FD(c) == INVALID_SOCKET) { + if (skip_iotest(c)) { // Socket not valid, ignore } else { fds[n].fd = FD(c); - fds[n].events = POLLIN; - if (c->is_connecting || (c->send.len > 0 && c->is_tls_hs == 0)) { - fds[n].events |= POLLOUT; - } + if (can_read(c)) fds[n].events = POLLIN; + if (can_write(c)) fds[n].events |= POLLOUT; n++; if (mg_tls_pending(c) > 0) ms = 0; // Don't wait if TLS is ready } @@ -4454,7 +4465,7 @@ static void mg_iotest(struct mg_mgr *mgr, int ms) { } else { n = 0; for (struct mg_connection *c = mgr->conns; c != NULL; c = c->next) { - if (c->is_closing || c->is_resolving || FD(c) == INVALID_SOCKET) { + if (skip_iotest(c)) { // Socket not valid, ignore } else { c->is_readable = (unsigned) (fds[n].revents & POLLIN ? 1 : 0); @@ -4476,12 +4487,11 @@ static void mg_iotest(struct mg_mgr *mgr, int ms) { FD_ZERO(&wset); for (c = mgr->conns; c != NULL; c = c->next) { - if (c->is_closing || c->is_resolving || FD(c) == INVALID_SOCKET) continue; - FD_SET(FD(c), &rset); - if (FD(c) > maxfd) maxfd = FD(c); - if (c->is_connecting || (c->send.len > 0 && c->is_tls_hs == 0)) - FD_SET(FD(c), &wset); + if (skip_iotest(c)) continue; + if (can_read(c)) FD_SET(FD(c), &rset); + if (can_write(c)) FD_SET(FD(c), &wset); if (mg_tls_pending(c) > 0) tv = tv_zero; + if (FD(c) > maxfd) maxfd = FD(c); } if ((rc = select((int) maxfd + 1, &rset, &wset, NULL, &tv)) < 0) { diff --git a/mongoose.h b/mongoose.h index 5418d2e8..02910e45 100644 --- a/mongoose.h +++ b/mongoose.h @@ -1000,6 +1000,7 @@ struct mg_connection { unsigned is_hexdumping : 1; // Hexdump in/out traffic unsigned is_draining : 1; // Send remaining data, then close and free unsigned is_closing : 1; // Close and free the connection immediately + unsigned is_full : 1; // Stop reads, until cleared unsigned is_readable : 1; // Connection is ready to read unsigned is_writable : 1; // Connection is ready to write }; diff --git a/src/net.h b/src/net.h index 044c7350..f6c17083 100644 --- a/src/net.h +++ b/src/net.h @@ -63,6 +63,7 @@ struct mg_connection { unsigned is_hexdumping : 1; // Hexdump in/out traffic unsigned is_draining : 1; // Send remaining data, then close and free unsigned is_closing : 1; // Close and free the connection immediately + unsigned is_full : 1; // Stop reads, until cleared unsigned is_readable : 1; // Connection is ready to read unsigned is_writable : 1; // Connection is ready to write }; diff --git a/src/sock.c b/src/sock.c index 26a37edb..a862d57b 100644 --- a/src/sock.c +++ b/src/sock.c @@ -280,7 +280,7 @@ static void read_conn(struct mg_connection *c) { long n = -1; if (c->recv.len >= MG_MAX_RECV_SIZE) { mg_error(c, "max_recv_buf_size reached"); - } else if (c->recv.size - c->recv.len < MG_IO_SIZE && + } else if (c->recv.size <= c->recv.len && !mg_iobuf_resize(&c->recv, c->recv.size + MG_IO_SIZE)) { mg_error(c, "oom"); } else { @@ -469,14 +469,27 @@ int mg_mkpipe(struct mg_mgr *mgr, mg_event_handler_t fn, void *fn_data, return (int) sp[0]; } +static bool can_read(const struct mg_connection *c) { + return c->is_full == false; +} + +static bool can_write(const struct mg_connection *c) { + return c->is_connecting || (c->send.len > 0 && c->is_tls_hs == 0); +} + +static bool skip_iotest(const struct mg_connection *c) { + return (c->is_closing || c->is_resolving || FD(c) == INVALID_SOCKET) || + (can_read(c) == false && can_write(c) == false); +} + static void mg_iotest(struct mg_mgr *mgr, int ms) { #if MG_ARCH == MG_ARCH_FREERTOS_TCP struct mg_connection *c; for (c = mgr->conns; c != NULL; c = c->next) { - if (c->is_closing || c->is_resolving || FD(c) == INVALID_SOCKET) continue; - FreeRTOS_FD_SET(c->fd, mgr->ss, eSELECT_READ | eSELECT_EXCEPT); - if (c->is_connecting || (c->send.len > 0 && c->is_tls_hs == 0)) - FreeRTOS_FD_SET(c->fd, mgr->ss, eSELECT_WRITE); + if (skip_iotest(c)) continue; + if (can_read(c)) + FreeRTOS_FD_SET(c->fd, mgr->ss, eSELECT_READ | eSELECT_EXCEPT); + if (can_write(c)) FreeRTOS_FD_SET(c->fd, mgr->ss, eSELECT_WRITE); } FreeRTOS_select(mgr->ss, pdMS_TO_TICKS(ms)); for (c = mgr->conns; c != NULL; c = c->next) { @@ -494,14 +507,12 @@ static void mg_iotest(struct mg_mgr *mgr, int ms) { memset(fds, 0, sizeof(fds)); n = 0; for (struct mg_connection *c = mgr->conns; c != NULL; c = c->next) { - if (c->is_closing || c->is_resolving || FD(c) == INVALID_SOCKET) { + if (skip_iotest(c)) { // Socket not valid, ignore } else { fds[n].fd = FD(c); - fds[n].events = POLLIN; - if (c->is_connecting || (c->send.len > 0 && c->is_tls_hs == 0)) { - fds[n].events |= POLLOUT; - } + if (can_read(c)) fds[n].events = POLLIN; + if (can_write(c)) fds[n].events |= POLLOUT; n++; if (mg_tls_pending(c) > 0) ms = 0; // Don't wait if TLS is ready } @@ -512,7 +523,7 @@ static void mg_iotest(struct mg_mgr *mgr, int ms) { } else { n = 0; for (struct mg_connection *c = mgr->conns; c != NULL; c = c->next) { - if (c->is_closing || c->is_resolving || FD(c) == INVALID_SOCKET) { + if (skip_iotest(c)) { // Socket not valid, ignore } else { c->is_readable = (unsigned) (fds[n].revents & POLLIN ? 1 : 0); @@ -534,12 +545,11 @@ static void mg_iotest(struct mg_mgr *mgr, int ms) { FD_ZERO(&wset); for (c = mgr->conns; c != NULL; c = c->next) { - if (c->is_closing || c->is_resolving || FD(c) == INVALID_SOCKET) continue; - FD_SET(FD(c), &rset); - if (FD(c) > maxfd) maxfd = FD(c); - if (c->is_connecting || (c->send.len > 0 && c->is_tls_hs == 0)) - FD_SET(FD(c), &wset); + if (skip_iotest(c)) continue; + if (can_read(c)) FD_SET(FD(c), &rset); + if (can_write(c)) FD_SET(FD(c), &wset); if (mg_tls_pending(c) > 0) tv = tv_zero; + if (FD(c) > maxfd) maxfd = FD(c); } if ((rc = select((int) maxfd + 1, &rset, &wset, NULL, &tv)) < 0) {