Fix #1476 - introduce c->is_full to cap the recv buf len for streaming connections

This commit is contained in:
Sergey Lyubka 2022-06-03 11:37:35 +01:00
parent 7da5d42009
commit 0763146254
5 changed files with 56 additions and 32 deletions

View File

@ -183,6 +183,7 @@ struct mg_connection {
unsigned is_hexdumping : 1; // Hexdump in/out traffic
unsigned is_draining : 1; // Send remaining data, then close and free
unsigned is_closing : 1; // Close and free the connection immediately
unsigned is_full : 1; // Stop reads, until cleared
unsigned is_readable : 1; // Connection is ready to read
unsigned is_writable : 1; // Connection is ready to write
};
@ -472,6 +473,7 @@ struct mg_connection {
unsigned is_hexdumping : 1; // Hexdump in/out traffic
unsigned is_draining : 1; // Send remaining data, then close and free
unsigned is_closing : 1; // Close and free the connection immediately
unsigned is_full : 1; // Stop reads, until cleared
unsigned is_readable : 1; // Connection is ready to read
unsigned is_writable : 1; // Connection is ready to write
};

View File

@ -4222,7 +4222,7 @@ static void read_conn(struct mg_connection *c) {
long n = -1;
if (c->recv.len >= MG_MAX_RECV_SIZE) {
mg_error(c, "max_recv_buf_size reached");
} else if (c->recv.size - c->recv.len < MG_IO_SIZE &&
} else if (c->recv.size <= c->recv.len &&
!mg_iobuf_resize(&c->recv, c->recv.size + MG_IO_SIZE)) {
mg_error(c, "oom");
} else {
@ -4411,14 +4411,27 @@ int mg_mkpipe(struct mg_mgr *mgr, mg_event_handler_t fn, void *fn_data,
return (int) sp[0];
}
static bool can_read(const struct mg_connection *c) {
return c->is_full == false;
}
static bool can_write(const struct mg_connection *c) {
return c->is_connecting || (c->send.len > 0 && c->is_tls_hs == 0);
}
static bool skip_iotest(const struct mg_connection *c) {
return (c->is_closing || c->is_resolving || FD(c) == INVALID_SOCKET) ||
(can_read(c) == false && can_write(c) == false);
}
static void mg_iotest(struct mg_mgr *mgr, int ms) {
#if MG_ARCH == MG_ARCH_FREERTOS_TCP
struct mg_connection *c;
for (c = mgr->conns; c != NULL; c = c->next) {
if (c->is_closing || c->is_resolving || FD(c) == INVALID_SOCKET) continue;
FreeRTOS_FD_SET(c->fd, mgr->ss, eSELECT_READ | eSELECT_EXCEPT);
if (c->is_connecting || (c->send.len > 0 && c->is_tls_hs == 0))
FreeRTOS_FD_SET(c->fd, mgr->ss, eSELECT_WRITE);
if (skip_iotest(c)) continue;
if (can_read(c))
FreeRTOS_FD_SET(c->fd, mgr->ss, eSELECT_READ | eSELECT_EXCEPT);
if (can_write(c)) FreeRTOS_FD_SET(c->fd, mgr->ss, eSELECT_WRITE);
}
FreeRTOS_select(mgr->ss, pdMS_TO_TICKS(ms));
for (c = mgr->conns; c != NULL; c = c->next) {
@ -4436,14 +4449,12 @@ static void mg_iotest(struct mg_mgr *mgr, int ms) {
memset(fds, 0, sizeof(fds));
n = 0;
for (struct mg_connection *c = mgr->conns; c != NULL; c = c->next) {
if (c->is_closing || c->is_resolving || FD(c) == INVALID_SOCKET) {
if (skip_iotest(c)) {
// Socket not valid, ignore
} else {
fds[n].fd = FD(c);
fds[n].events = POLLIN;
if (c->is_connecting || (c->send.len > 0 && c->is_tls_hs == 0)) {
fds[n].events |= POLLOUT;
}
if (can_read(c)) fds[n].events = POLLIN;
if (can_write(c)) fds[n].events |= POLLOUT;
n++;
if (mg_tls_pending(c) > 0) ms = 0; // Don't wait if TLS is ready
}
@ -4454,7 +4465,7 @@ static void mg_iotest(struct mg_mgr *mgr, int ms) {
} else {
n = 0;
for (struct mg_connection *c = mgr->conns; c != NULL; c = c->next) {
if (c->is_closing || c->is_resolving || FD(c) == INVALID_SOCKET) {
if (skip_iotest(c)) {
// Socket not valid, ignore
} else {
c->is_readable = (unsigned) (fds[n].revents & POLLIN ? 1 : 0);
@ -4476,12 +4487,11 @@ static void mg_iotest(struct mg_mgr *mgr, int ms) {
FD_ZERO(&wset);
for (c = mgr->conns; c != NULL; c = c->next) {
if (c->is_closing || c->is_resolving || FD(c) == INVALID_SOCKET) continue;
FD_SET(FD(c), &rset);
if (FD(c) > maxfd) maxfd = FD(c);
if (c->is_connecting || (c->send.len > 0 && c->is_tls_hs == 0))
FD_SET(FD(c), &wset);
if (skip_iotest(c)) continue;
if (can_read(c)) FD_SET(FD(c), &rset);
if (can_write(c)) FD_SET(FD(c), &wset);
if (mg_tls_pending(c) > 0) tv = tv_zero;
if (FD(c) > maxfd) maxfd = FD(c);
}
if ((rc = select((int) maxfd + 1, &rset, &wset, NULL, &tv)) < 0) {

View File

@ -1000,6 +1000,7 @@ struct mg_connection {
unsigned is_hexdumping : 1; // Hexdump in/out traffic
unsigned is_draining : 1; // Send remaining data, then close and free
unsigned is_closing : 1; // Close and free the connection immediately
unsigned is_full : 1; // Stop reads, until cleared
unsigned is_readable : 1; // Connection is ready to read
unsigned is_writable : 1; // Connection is ready to write
};

View File

@ -63,6 +63,7 @@ struct mg_connection {
unsigned is_hexdumping : 1; // Hexdump in/out traffic
unsigned is_draining : 1; // Send remaining data, then close and free
unsigned is_closing : 1; // Close and free the connection immediately
unsigned is_full : 1; // Stop reads, until cleared
unsigned is_readable : 1; // Connection is ready to read
unsigned is_writable : 1; // Connection is ready to write
};

View File

@ -280,7 +280,7 @@ static void read_conn(struct mg_connection *c) {
long n = -1;
if (c->recv.len >= MG_MAX_RECV_SIZE) {
mg_error(c, "max_recv_buf_size reached");
} else if (c->recv.size - c->recv.len < MG_IO_SIZE &&
} else if (c->recv.size <= c->recv.len &&
!mg_iobuf_resize(&c->recv, c->recv.size + MG_IO_SIZE)) {
mg_error(c, "oom");
} else {
@ -469,14 +469,27 @@ int mg_mkpipe(struct mg_mgr *mgr, mg_event_handler_t fn, void *fn_data,
return (int) sp[0];
}
static bool can_read(const struct mg_connection *c) {
return c->is_full == false;
}
static bool can_write(const struct mg_connection *c) {
return c->is_connecting || (c->send.len > 0 && c->is_tls_hs == 0);
}
static bool skip_iotest(const struct mg_connection *c) {
return (c->is_closing || c->is_resolving || FD(c) == INVALID_SOCKET) ||
(can_read(c) == false && can_write(c) == false);
}
static void mg_iotest(struct mg_mgr *mgr, int ms) {
#if MG_ARCH == MG_ARCH_FREERTOS_TCP
struct mg_connection *c;
for (c = mgr->conns; c != NULL; c = c->next) {
if (c->is_closing || c->is_resolving || FD(c) == INVALID_SOCKET) continue;
FreeRTOS_FD_SET(c->fd, mgr->ss, eSELECT_READ | eSELECT_EXCEPT);
if (c->is_connecting || (c->send.len > 0 && c->is_tls_hs == 0))
FreeRTOS_FD_SET(c->fd, mgr->ss, eSELECT_WRITE);
if (skip_iotest(c)) continue;
if (can_read(c))
FreeRTOS_FD_SET(c->fd, mgr->ss, eSELECT_READ | eSELECT_EXCEPT);
if (can_write(c)) FreeRTOS_FD_SET(c->fd, mgr->ss, eSELECT_WRITE);
}
FreeRTOS_select(mgr->ss, pdMS_TO_TICKS(ms));
for (c = mgr->conns; c != NULL; c = c->next) {
@ -494,14 +507,12 @@ static void mg_iotest(struct mg_mgr *mgr, int ms) {
memset(fds, 0, sizeof(fds));
n = 0;
for (struct mg_connection *c = mgr->conns; c != NULL; c = c->next) {
if (c->is_closing || c->is_resolving || FD(c) == INVALID_SOCKET) {
if (skip_iotest(c)) {
// Socket not valid, ignore
} else {
fds[n].fd = FD(c);
fds[n].events = POLLIN;
if (c->is_connecting || (c->send.len > 0 && c->is_tls_hs == 0)) {
fds[n].events |= POLLOUT;
}
if (can_read(c)) fds[n].events = POLLIN;
if (can_write(c)) fds[n].events |= POLLOUT;
n++;
if (mg_tls_pending(c) > 0) ms = 0; // Don't wait if TLS is ready
}
@ -512,7 +523,7 @@ static void mg_iotest(struct mg_mgr *mgr, int ms) {
} else {
n = 0;
for (struct mg_connection *c = mgr->conns; c != NULL; c = c->next) {
if (c->is_closing || c->is_resolving || FD(c) == INVALID_SOCKET) {
if (skip_iotest(c)) {
// Socket not valid, ignore
} else {
c->is_readable = (unsigned) (fds[n].revents & POLLIN ? 1 : 0);
@ -534,12 +545,11 @@ static void mg_iotest(struct mg_mgr *mgr, int ms) {
FD_ZERO(&wset);
for (c = mgr->conns; c != NULL; c = c->next) {
if (c->is_closing || c->is_resolving || FD(c) == INVALID_SOCKET) continue;
FD_SET(FD(c), &rset);
if (FD(c) > maxfd) maxfd = FD(c);
if (c->is_connecting || (c->send.len > 0 && c->is_tls_hs == 0))
FD_SET(FD(c), &wset);
if (skip_iotest(c)) continue;
if (can_read(c)) FD_SET(FD(c), &rset);
if (can_write(c)) FD_SET(FD(c), &wset);
if (mg_tls_pending(c) > 0) tv = tv_zero;
if (FD(c) > maxfd) maxfd = FD(c);
}
if ((rc = select((int) maxfd + 1, &rset, &wset, NULL, &tv)) < 0) {