Enable epoll on linux

This commit is contained in:
Sergey Lyubka 2022-08-03 15:07:16 +01:00
parent 4702073458
commit 51cb40acd2
8 changed files with 148 additions and 6 deletions

View File

@ -3511,6 +3511,7 @@ struct mg_connection *mg_wrapfd(struct mg_mgr *mgr, int fd,
c->fd = (void *) (size_t) fd;
c->fn = fn;
c->fn_data = fn_data;
MG_EPOLL_ADD(c);
mg_call(c, MG_EV_OPEN, NULL);
LIST_ADD_HEAD(struct mg_connection, &mgr->conns, c);
}
@ -3536,10 +3537,18 @@ void mg_mgr_free(struct mg_mgr *mgr) {
FreeRTOS_DeleteSocketSet(mgr->ss);
#endif
MG_DEBUG(("All connections closed"));
#if MG_ENABLE_EPOLL
if (mgr->epoll_fd >= 0) close(mgr->epoll_fd), mgr->epoll_fd = -1;
#endif
}
void mg_mgr_init(struct mg_mgr *mgr) {
memset(mgr, 0, sizeof(*mgr));
#if MG_ENABLE_EPOLL
if ((mgr->epoll_fd = epoll_create1(0)) < 0) MG_ERROR(("epoll: %d", errno));
#else
mgr->epoll_fd = -1;
#endif
#if MG_ARCH == MG_ARCH_WIN32 && MG_ENABLE_WINSOCK
// clang-format off
{ WSADATA data; WSAStartup(MAKEWORD(2, 2), &data); }
@ -4091,6 +4100,9 @@ static void iolog(struct mg_connection *c, char *buf, long n, bool r) {
} else {
mg_iobuf_del(&c->send, 0, (size_t) n);
// if (c->send.len == 0) mg_iobuf_resize(&c->send, 0);
if (c->send.len == 0) {
MG_EPOLL_MOD(c, 0);
}
mg_call(c, MG_EV_WRITE, &n);
}
}
@ -4207,6 +4219,7 @@ bool mg_open_listener(struct mg_connection *c, const char *url) {
setlocaddr(fd, &c->loc);
mg_set_non_blocking_mode(fd);
c->fd = S2PTR(fd);
MG_EPOLL_ADD(c);
success = true;
}
}
@ -4259,6 +4272,9 @@ static void write_conn(struct mg_connection *c) {
static void close_conn(struct mg_connection *c) {
if (FD(c) != INVALID_SOCKET) {
#if MG_ENABLE_EPOLL
epoll_ctl(c->mgr->epoll_fd, EPOLL_CTL_DEL, FD(c), NULL);
#endif
closesocket(FD(c));
#if MG_ARCH == MG_ARCH_FREERTOS_TCP
FreeRTOS_FD_CLR(c->fd, c->mgr->ss, eSELECT_ALL);
@ -4275,6 +4291,7 @@ static void connect_conn(struct mg_connection *c) {
if (getpeername(FD(c), &usa.sa, &n) == 0) {
c->is_connecting = 0;
mg_call(c, MG_EV_CONNECT, NULL);
MG_EPOLL_MOD(c, 0);
if (c->is_tls_hs) mg_tls_handshake(c);
} else {
mg_error(c, "socket error");
@ -4308,6 +4325,7 @@ void mg_connect_resolved(struct mg_connection *c) {
if (FD(c) == INVALID_SOCKET) {
mg_error(c, "socket(): %d", MG_SOCK_ERRNO);
} else if (c->is_udp) {
MG_EPOLL_ADD(c);
mg_call(c, MG_EV_RESOLVE, NULL);
mg_call(c, MG_EV_CONNECT, NULL);
} else {
@ -4315,6 +4333,7 @@ void mg_connect_resolved(struct mg_connection *c) {
socklen_t slen = tousa(&c->rem, &usa);
mg_set_non_blocking_mode(FD(c));
setsockopts(c);
MG_EPOLL_ADD(c);
mg_call(c, MG_EV_RESOLVE, NULL);
if ((rc = connect(FD(c), &usa.sa, slen)) == 0) {
mg_call(c, MG_EV_CONNECT, NULL);
@ -4366,6 +4385,7 @@ static void accept_conn(struct mg_mgr *mgr, struct mg_connection *lsn) {
MG_DEBUG(("%lu accepted %s", c->id, buf));
LIST_ADD_HEAD(struct mg_connection, &mgr->conns, c);
c->fd = S2PTR(fd);
MG_EPOLL_ADD(c);
mg_set_non_blocking_mode(FD(c));
setsockopts(c);
c->is_accepted = 1;
@ -4469,6 +4489,28 @@ static void mg_iotest(struct mg_mgr *mgr, int ms) {
FreeRTOS_FD_CLR(c->fd, mgr->ss,
eSELECT_READ | eSELECT_EXCEPT | eSELECT_WRITE);
}
#elif MG_ENABLE_EPOLL
size_t max = 1;
for (struct mg_connection *c = mgr->conns; c != NULL; c = c->next) {
c->is_readable = c->is_writable = 0;
if (mg_tls_pending(c) > 0) ms = 1, c->is_readable = 1;
if (can_write(c)) MG_EPOLL_MOD(c, 1);
max++;
}
struct epoll_event *evs = (struct epoll_event *) alloca(max * sizeof(evs[0]));
int n = epoll_wait(mgr->epoll_fd, evs, (int) max, ms);
for (int i = 0; i < n; i++) {
struct mg_connection *c = (struct mg_connection *) evs[i].data.ptr;
if (evs[i].events & EPOLLERR) {
mg_error(c, "socket error");
} else if (c->is_readable == 0) {
bool rd = evs[i].events & (EPOLLIN | EPOLLHUP);
bool wr = evs[i].events & EPOLLOUT;
c->is_readable = can_read(c) && rd ? 1U : 0;
c->is_writable = can_write(c) && wr ? 1U : 0;
}
}
(void) skip_iotest;
#elif MG_ENABLE_POLL
nfds_t n = 0;
for (struct mg_connection *c = mgr->conns; c != NULL; c = c->next) n++;

View File

@ -435,7 +435,9 @@ extern int SockSet(SOCKET hSock, int Type, int Prop, void *pbuf, int size);
#include <mach/mach_time.h>
#endif
#if !defined(MG_ENABLE_POLL) && (defined(__linux__) || defined(__APPLE__))
#if !defined(MG_ENABLE_EPOLL) && defined(__linux__)
#define MG_ENABLE_EPOLL 1
#elif !defined(MG_ENABLE_POLL)
#define MG_ENABLE_POLL 1
#endif
@ -457,11 +459,15 @@ extern int SockSet(SOCKET hSock, int Type, int Prop, void *pbuf, int size);
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#if defined(MG_ENABLE_POLL) && MG_ENABLE_POLL
#if defined(MG_ENABLE_EPOLL) && MG_ENABLE_EPOLL
#include <sys/epoll.h>
#elif defined(MG_ENABLE_POLL) && MG_ENABLE_POLL
#include <poll.h>
#else
#include <sys/select.h>
#endif
#include <sys/socket.h>
#include <sys/stat.h>
#include <sys/time.h>
@ -611,6 +617,10 @@ int sscanf(const char *, const char *, ...);
#define MG_ENABLE_POLL 0
#endif
#ifndef MG_ENABLE_EPOLL
#define MG_ENABLE_EPOLL 0
#endif
#ifndef MG_ENABLE_FATFS
#define MG_ENABLE_FATFS 0
#endif
@ -706,6 +716,23 @@ int sscanf(const char *, const char *, ...);
#endif
#endif
#if MG_ENABLE_EPOLL
#define MG_EPOLL_ADD(c) \
do { \
struct epoll_event ev = {EPOLLIN | EPOLLERR | EPOLLHUP, {c}}; \
epoll_ctl(c->mgr->epoll_fd, EPOLL_CTL_ADD, (int) (size_t) c->fd, &ev); \
} while (0)
#define MG_EPOLL_MOD(c, wr) \
do { \
struct epoll_event ev = {EPOLLIN | EPOLLERR | EPOLLHUP, {c}}; \
if (wr) ev.events |= EPOLLOUT; \
epoll_ctl(c->mgr->epoll_fd, EPOLL_CTL_MOD, (int) (size_t) c->fd, &ev); \
} while (0)
#else
#define MG_EPOLL_ADD(c)
#define MG_EPOLL_MOD(c, wr)
#endif
@ -1013,6 +1040,7 @@ struct mg_mgr {
uint16_t mqtt_id; // MQTT IDs for pub/sub
void *active_dns_requests; // DNS requests in progress
struct mg_timer *timers; // Active timers
int epoll_fd; // Used when MG_EPOLL_ENABLE=1
void *priv; // Used by the MIP stack
size_t extraconnsize; // Used by the MIP stack
#if MG_ARCH == MG_ARCH_FREERTOS_TCP

View File

@ -8,7 +8,9 @@
#include <mach/mach_time.h>
#endif
#if !defined(MG_ENABLE_POLL) && (defined(__linux__) || defined(__APPLE__))
#if !defined(MG_ENABLE_EPOLL) && defined(__linux__)
#define MG_ENABLE_EPOLL 1
#elif !defined(MG_ENABLE_POLL)
#define MG_ENABLE_POLL 1
#endif
@ -30,11 +32,15 @@
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#if defined(MG_ENABLE_POLL) && MG_ENABLE_POLL
#if defined(MG_ENABLE_EPOLL) && MG_ENABLE_EPOLL
#include <sys/epoll.h>
#elif defined(MG_ENABLE_POLL) && MG_ENABLE_POLL
#include <poll.h>
#else
#include <sys/select.h>
#endif
#include <sys/socket.h>
#include <sys/stat.h>
#include <sys/time.h>

View File

@ -12,6 +12,10 @@
#define MG_ENABLE_POLL 0
#endif
#ifndef MG_ENABLE_EPOLL
#define MG_ENABLE_EPOLL 0
#endif
#ifndef MG_ENABLE_FATFS
#define MG_ENABLE_FATFS 0
#endif
@ -106,3 +110,20 @@
#define MG_ENABLE_FILE 0
#endif
#endif
#if MG_ENABLE_EPOLL
#define MG_EPOLL_ADD(c) \
do { \
struct epoll_event ev = {EPOLLIN | EPOLLERR | EPOLLHUP, {c}}; \
epoll_ctl(c->mgr->epoll_fd, EPOLL_CTL_ADD, (int) (size_t) c->fd, &ev); \
} while (0)
#define MG_EPOLL_MOD(c, wr) \
do { \
struct epoll_event ev = {EPOLLIN | EPOLLERR | EPOLLHUP, {c}}; \
if (wr) ev.events |= EPOLLOUT; \
epoll_ctl(c->mgr->epoll_fd, EPOLL_CTL_MOD, (int) (size_t) c->fd, &ev); \
} while (0)
#else
#define MG_EPOLL_ADD(c)
#define MG_EPOLL_MOD(c, wr)
#endif

View File

@ -215,6 +215,7 @@ struct mg_connection *mg_wrapfd(struct mg_mgr *mgr, int fd,
c->fd = (void *) (size_t) fd;
c->fn = fn;
c->fn_data = fn_data;
MG_EPOLL_ADD(c);
mg_call(c, MG_EV_OPEN, NULL);
LIST_ADD_HEAD(struct mg_connection, &mgr->conns, c);
}
@ -240,10 +241,18 @@ void mg_mgr_free(struct mg_mgr *mgr) {
FreeRTOS_DeleteSocketSet(mgr->ss);
#endif
MG_DEBUG(("All connections closed"));
#if MG_ENABLE_EPOLL
if (mgr->epoll_fd >= 0) close(mgr->epoll_fd), mgr->epoll_fd = -1;
#endif
}
void mg_mgr_init(struct mg_mgr *mgr) {
memset(mgr, 0, sizeof(*mgr));
#if MG_ENABLE_EPOLL
if ((mgr->epoll_fd = epoll_create1(0)) < 0) MG_ERROR(("epoll: %d", errno));
#else
mgr->epoll_fd = -1;
#endif
#if MG_ARCH == MG_ARCH_WIN32 && MG_ENABLE_WINSOCK
// clang-format off
{ WSADATA data; WSAStartup(MAKEWORD(2, 2), &data); }

View File

@ -30,6 +30,7 @@ struct mg_mgr {
uint16_t mqtt_id; // MQTT IDs for pub/sub
void *active_dns_requests; // DNS requests in progress
struct mg_timer *timers; // Active timers
int epoll_fd; // Used when MG_EPOLL_ENABLE=1
void *priv; // Used by the MIP stack
size_t extraconnsize; // Used by the MIP stack
#if MG_ARCH == MG_ARCH_FREERTOS_TCP

View File

@ -143,6 +143,9 @@ static void iolog(struct mg_connection *c, char *buf, long n, bool r) {
} else {
mg_iobuf_del(&c->send, 0, (size_t) n);
// if (c->send.len == 0) mg_iobuf_resize(&c->send, 0);
if (c->send.len == 0) {
MG_EPOLL_MOD(c, 0);
}
mg_call(c, MG_EV_WRITE, &n);
}
}
@ -259,6 +262,7 @@ bool mg_open_listener(struct mg_connection *c, const char *url) {
setlocaddr(fd, &c->loc);
mg_set_non_blocking_mode(fd);
c->fd = S2PTR(fd);
MG_EPOLL_ADD(c);
success = true;
}
}
@ -311,6 +315,9 @@ static void write_conn(struct mg_connection *c) {
static void close_conn(struct mg_connection *c) {
if (FD(c) != INVALID_SOCKET) {
#if MG_ENABLE_EPOLL
epoll_ctl(c->mgr->epoll_fd, EPOLL_CTL_DEL, FD(c), NULL);
#endif
closesocket(FD(c));
#if MG_ARCH == MG_ARCH_FREERTOS_TCP
FreeRTOS_FD_CLR(c->fd, c->mgr->ss, eSELECT_ALL);
@ -327,6 +334,7 @@ static void connect_conn(struct mg_connection *c) {
if (getpeername(FD(c), &usa.sa, &n) == 0) {
c->is_connecting = 0;
mg_call(c, MG_EV_CONNECT, NULL);
MG_EPOLL_MOD(c, 0);
if (c->is_tls_hs) mg_tls_handshake(c);
} else {
mg_error(c, "socket error");
@ -360,6 +368,7 @@ void mg_connect_resolved(struct mg_connection *c) {
if (FD(c) == INVALID_SOCKET) {
mg_error(c, "socket(): %d", MG_SOCK_ERRNO);
} else if (c->is_udp) {
MG_EPOLL_ADD(c);
mg_call(c, MG_EV_RESOLVE, NULL);
mg_call(c, MG_EV_CONNECT, NULL);
} else {
@ -367,6 +376,7 @@ void mg_connect_resolved(struct mg_connection *c) {
socklen_t slen = tousa(&c->rem, &usa);
mg_set_non_blocking_mode(FD(c));
setsockopts(c);
MG_EPOLL_ADD(c);
mg_call(c, MG_EV_RESOLVE, NULL);
if ((rc = connect(FD(c), &usa.sa, slen)) == 0) {
mg_call(c, MG_EV_CONNECT, NULL);
@ -418,6 +428,7 @@ static void accept_conn(struct mg_mgr *mgr, struct mg_connection *lsn) {
MG_DEBUG(("%lu accepted %s", c->id, buf));
LIST_ADD_HEAD(struct mg_connection, &mgr->conns, c);
c->fd = S2PTR(fd);
MG_EPOLL_ADD(c);
mg_set_non_blocking_mode(FD(c));
setsockopts(c);
c->is_accepted = 1;
@ -521,6 +532,28 @@ static void mg_iotest(struct mg_mgr *mgr, int ms) {
FreeRTOS_FD_CLR(c->fd, mgr->ss,
eSELECT_READ | eSELECT_EXCEPT | eSELECT_WRITE);
}
#elif MG_ENABLE_EPOLL
size_t max = 1;
for (struct mg_connection *c = mgr->conns; c != NULL; c = c->next) {
c->is_readable = c->is_writable = 0;
if (mg_tls_pending(c) > 0) ms = 1, c->is_readable = 1;
if (can_write(c)) MG_EPOLL_MOD(c, 1);
max++;
}
struct epoll_event *evs = (struct epoll_event *) alloca(max * sizeof(evs[0]));
int n = epoll_wait(mgr->epoll_fd, evs, (int) max, ms);
for (int i = 0; i < n; i++) {
struct mg_connection *c = (struct mg_connection *) evs[i].data.ptr;
if (evs[i].events & EPOLLERR) {
mg_error(c, "socket error");
} else if (c->is_readable == 0) {
bool rd = evs[i].events & (EPOLLIN | EPOLLHUP);
bool wr = evs[i].events & EPOLLOUT;
c->is_readable = can_read(c) && rd ? 1U : 0;
c->is_writable = can_write(c) && wr ? 1U : 0;
}
}
(void) skip_iotest;
#elif MG_ENABLE_POLL
nfds_t n = 0;
for (struct mg_connection *c = mgr->conns; c != NULL; c = c->next) n++;

View File

@ -521,6 +521,8 @@ static void fcb(struct mg_connection *c, int ev, void *ev_data, void *fn_data) {
fd->closed = 1;
c->is_closing = 1;
(void) c;
} else if (ev == MG_EV_CLOSE) {
fd->closed = 1;
}
}
@ -545,13 +547,13 @@ static int fetch(struct mg_mgr *mgr, char *buf, const char *url,
}
mg_tls_init(c, &opts);
if (c->tls == NULL) fd.closed = 1;
// c->is_hexdumping = 1;
}
// c->is_hexdumping = 1;
va_start(ap, fmt);
mg_vprintf(c, fmt, ap);
va_end(ap);
buf[0] = '\0';
for (i = 0; i < 250 && buf[0] == '\0'; i++) mg_mgr_poll(mgr, 1);
for (i = 0; i < 50 && buf[0] == '\0'; i++) mg_mgr_poll(mgr, 1);
if (!fd.closed) c->is_closing = 1;
mg_mgr_poll(mgr, 1);
return fd.code;