mirror of
https://github.com/cesanta/mongoose.git
synced 2025-01-14 01:38:01 +08:00
Refactor queue
This commit is contained in:
parent
8aa26fb2cd
commit
515e438d4f
2
.github/workflows/test.yml
vendored
2
.github/workflows/test.yml
vendored
@ -71,7 +71,7 @@ jobs:
|
||||
- if: ${{ env.GO == 1 }}
|
||||
run: make test upload-coverage TFLAGS=-DNO_SNTP_CHECK SSL=OPENSSL ASAN_OPTIONS= OPENSSL=`echo /usr/local/Cellar/openssl*/*`
|
||||
- if: ${{ env.GO == 1 }}
|
||||
run: make test SSL=MBEDTLS TFLAGS="-DNO_SNTP_CHECK -DMG_ENABLE_ATOMIC=1" ASAN_OPTIONS= MBEDTLS=`echo /usr/local/Cellar/mbedtls*/*`
|
||||
run: make test SSL=MBEDTLS TFLAGS=-DNO_SNTP_CHECK ASAN_OPTIONS= MBEDTLS=`echo /usr/local/Cellar/mbedtls*/*`
|
||||
- if: ${{ env.GO == 1 }}
|
||||
run: make mg_prefix
|
||||
windows:
|
||||
|
8
Makefile
8
Makefile
@ -1,6 +1,6 @@
|
||||
SRCS = mongoose.c test/unit_test.c test/packed_fs.c
|
||||
HDRS = $(wildcard src/*.h) $(wildcard src/tcpip/*.h)
|
||||
DEFS ?= -DMG_MAX_HTTP_HEADERS=7 -DMG_ENABLE_LINES -DMG_ENABLE_PACKED_FS=1 -DMG_ENABLE_SSI=1
|
||||
DEFS ?= -DMG_MAX_HTTP_HEADERS=7 -DMG_ENABLE_LINES -DMG_ENABLE_PACKED_FS=1 -DMG_ENABLE_SSI=1 -DMG_ENABLE_ASSERT=1
|
||||
WARN ?= -pedantic -W -Wall -Werror -Wshadow -Wdouble-promotion -fno-common -Wconversion -Wundef
|
||||
OPTS ?= -O3 -g3
|
||||
INCS ?= -Isrc -I.
|
||||
@ -15,10 +15,10 @@ ASAN_OPTIONS ?= detect_leaks=1
|
||||
EXAMPLES := $(dir $(wildcard examples/*/Makefile))
|
||||
PREFIX ?= /usr/local
|
||||
VERSION ?= $(shell cut -d'"' -f2 src/version.h)
|
||||
COMMON_CFLAGS ?= $(C_WARN) $(WARN) $(INCS) $(DEFS) -DMG_ENABLE_IPV6=$(IPV6) $(TFLAGS)
|
||||
CFLAGS ?= $(OPTS) $(ASAN) $(COMMON_CFLAGS) -pthread
|
||||
COMMON_CFLAGS ?= $(C_WARN) $(WARN) $(INCS) $(DEFS) -DMG_ENABLE_IPV6=$(IPV6) $(TFLAGS) -pthread
|
||||
CFLAGS ?= $(OPTS) $(ASAN) $(COMMON_CFLAGS)
|
||||
VALGRIND_CFLAGS ?= $(OPTS) $(COMMON_CFLAGS)
|
||||
VALGRIND_RUN ?= valgrind --tool=memcheck --gen-suppressions=all --leak-check=full --show-leak-kinds=all --leak-resolution=high --track-origins=yes --error-exitcode=1 --exit-on-first-error=yes
|
||||
VALGRIND_RUN ?= valgrind --tool=memcheck --gen-suppressions=all --leak-check=full --show-leak-kinds=all --leak-resolution=high --track-origins=yes --error-exitcode=1 --exit-on-first-error=yes --fair-sched=yes
|
||||
.PHONY: examples test valgrind mip_test
|
||||
|
||||
ifeq "$(findstring ++,$(CC))" ""
|
||||
|
@ -2,7 +2,7 @@ SOURCES = main.c mongoose.c # Source code files
|
||||
CFLAGS = -W -Wall -Wextra -g -I. # Build options
|
||||
|
||||
# Mongoose build options. See https://mongoose.ws/documentation/#build-options
|
||||
CFLAGS_MONGOOSE += -DMG_ENABLE_ATOMIC=1
|
||||
CFLAGS_MONGOOSE +=
|
||||
|
||||
ifeq ($(OS),Windows_NT)
|
||||
# Windows settings. Assume MinGW compiler. To use VC: make CC=cl CFLAGS=/MD
|
||||
|
@ -15,7 +15,7 @@ struct thread_data {
|
||||
struct mg_str body; // Copy of message body
|
||||
};
|
||||
|
||||
static void start_thread(void (*f)(void *), void *p) {
|
||||
static void start_thread(void *(*f)(void *), void *p) {
|
||||
#ifdef _WIN32
|
||||
#define usleep(x) Sleep((x) / 1000)
|
||||
_beginthread((void(__cdecl *)(void *)) f, 0, p);
|
||||
@ -25,18 +25,17 @@ static void start_thread(void (*f)(void *), void *p) {
|
||||
pthread_attr_t attr;
|
||||
(void) pthread_attr_init(&attr);
|
||||
(void) pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
|
||||
pthread_create(&thread_id, &attr, (void *(*) (void *) ) f, p);
|
||||
pthread_create(&thread_id, &attr, f, p);
|
||||
pthread_attr_destroy(&attr);
|
||||
#endif
|
||||
}
|
||||
|
||||
static void worker_thread(void *param) {
|
||||
static void *worker_thread(void *param) {
|
||||
struct thread_data *d = (struct thread_data *) param;
|
||||
char buf[100]; // On-stack buffer for the message queue
|
||||
|
||||
d->queue.buf = buf; // Caller passed us an empty queue with NULL
|
||||
d->queue.len = sizeof(buf); // buffer. Initialise it now
|
||||
usleep(1 * 1000 * 1000); // Simulate long execution time
|
||||
mg_queue_init(&d->queue, buf, sizeof(buf)); // Init queue
|
||||
usleep(1 * 1000 * 1000); // Simulate long execution time
|
||||
|
||||
// Send a response to the connection
|
||||
if (d->body.len == 0) {
|
||||
@ -48,9 +47,10 @@ static void worker_thread(void *param) {
|
||||
}
|
||||
|
||||
// Wait until connection reads our message, then it is safe to quit
|
||||
while (mg_queue_next(&d->queue, NULL) != MG_QUEUE_EMPTY) usleep(1000);
|
||||
while (d->queue.tail != d->queue.head) usleep(1000);
|
||||
MG_INFO(("done, cleaning up..."));
|
||||
free(d);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
// HTTP request callback
|
||||
@ -68,10 +68,11 @@ static void fn(struct mg_connection *c, int ev, void *ev_data, void *fn_data) {
|
||||
size_t len;
|
||||
char *buf;
|
||||
// Check if we have a message from the worker
|
||||
if (d != NULL && (len = mg_queue_next(&d->queue, &buf)) != MG_QUEUE_EMPTY) {
|
||||
if (d != NULL && (len = mg_queue_next(&d->queue, &buf)) > 0) {
|
||||
// Got message from worker. Send a response and cleanup
|
||||
mg_http_reply(c, 200, "", "%.*s\n", (int) len, buf);
|
||||
mg_queue_del(&d->queue); // Delete message: signal worker that we're done
|
||||
mg_queue_del(&d->queue, len); // Delete message
|
||||
*(void **) c->data = NULL; // Forget about thread data
|
||||
}
|
||||
}
|
||||
(void) fn_data;
|
||||
|
166
mongoose.c
166
mongoose.c
@ -3497,7 +3497,7 @@ void mg_mgr_init(struct mg_mgr *mgr) {
|
||||
size_t mg_queue_vprintf(struct mg_queue *q, const char *fmt, va_list *ap) {
|
||||
size_t len = mg_snprintf(NULL, 0, fmt, ap);
|
||||
char *buf;
|
||||
if (mg_queue_space(q, &buf) < len + 1) {
|
||||
if (len == 0 || mg_queue_book(q, &buf, len + 1) < len + 1) {
|
||||
len = 0; // Nah. Not enough space
|
||||
} else {
|
||||
len = mg_vsnprintf((char *)buf, len + 1, fmt, ap);
|
||||
@ -3613,43 +3613,85 @@ size_t mg_print_mac(void (*out)(char, void *), void *arg, va_list *ap) {
|
||||
#endif
|
||||
|
||||
|
||||
// Every message in the queue is prepended by the message length (ML)
|
||||
// ML is sizeof(size_t) in size
|
||||
// Tail points to the message data
|
||||
//
|
||||
// |------| ML | message1 | ML | message2 |--- free space ---|
|
||||
// ^ ^ ^ ^
|
||||
// buf tail head len
|
||||
|
||||
size_t mg_queue_space(struct mg_queue *q, char **buf) {
|
||||
size_t ofs;
|
||||
if (q->head > 0 && q->tail >= q->head) { // All messages read?
|
||||
q->head = 0; // Yes. Reset head first
|
||||
q->tail = 0; // Now reset the tail
|
||||
#if defined(__GNUC__) || defined(__clang__)
|
||||
#define MG_MEMORY_BARRIER() __sync_synchronize()
|
||||
#elif defined(_MSC_VER) && _MSC_VER >= 1700
|
||||
#define MG_MEMORY_BARRIER() MemoryBarrier()
|
||||
#elif !defined(MG_MEMORY_BARRIER)
|
||||
#define MG_MEMORY_BARRIER()
|
||||
#endif
|
||||
|
||||
// Every message in a queue is prepended by a 32-bit message length (ML).
|
||||
// If ML is 0, then it is the end, and reader must wrap to the beginning.
|
||||
//
|
||||
// Queue when q->tail <= q->head:
|
||||
// |----- free -----| ML | message1 | ML | message2 | ----- free ------|
|
||||
// ^ ^ ^ ^
|
||||
// buf tail head len
|
||||
//
|
||||
// Queue when q->tail > q->head:
|
||||
// | ML | message2 |----- free ------| ML | message1 | 0 |---- free ----|
|
||||
// ^ ^ ^ ^
|
||||
// buf head tail len
|
||||
|
||||
void mg_queue_init(struct mg_queue *q, char *buf, size_t size) {
|
||||
q->size = size;
|
||||
q->buf = buf;
|
||||
q->head = q->tail = 0;
|
||||
}
|
||||
|
||||
static size_t mg_queue_read_len(struct mg_queue *q) {
|
||||
uint32_t n = 0;
|
||||
MG_MEMORY_BARRIER();
|
||||
memcpy(&n, q->buf + q->tail, sizeof(n));
|
||||
assert(q->tail + n + sizeof(n) <= q->size);
|
||||
return n;
|
||||
}
|
||||
|
||||
static void mg_queue_write_len(struct mg_queue *q, size_t len) {
|
||||
uint32_t n = (uint32_t) len;
|
||||
memcpy(q->buf + q->head, &n, sizeof(n));
|
||||
MG_MEMORY_BARRIER();
|
||||
}
|
||||
|
||||
size_t mg_queue_book(struct mg_queue *q, char **buf, size_t len) {
|
||||
size_t space = 0, hs = sizeof(uint32_t) * 2; // *2 is for the 0 marker
|
||||
if (q->head >= q->tail && q->head + len + hs <= q->size) {
|
||||
space = q->size - q->head - hs; // There is enough space
|
||||
} else if (q->head >= q->tail && q->tail > hs) {
|
||||
mg_queue_write_len(q, 0); // Not enough space ahead
|
||||
q->head = 0; // Wrap head to the beginning
|
||||
}
|
||||
ofs = q->head + sizeof(size_t);
|
||||
if (buf != NULL) *buf = q->buf + ofs;
|
||||
return ofs > q->len ? 0 : q->len - ofs;
|
||||
if (q->head + hs + len < q->tail) space = q->tail - q->head - hs;
|
||||
if (buf != NULL) *buf = q->buf + q->head + sizeof(uint32_t);
|
||||
return space;
|
||||
}
|
||||
|
||||
size_t mg_queue_next(struct mg_queue *q, char **buf) {
|
||||
size_t len = MG_QUEUE_EMPTY;
|
||||
if (q->tail < q->head) memcpy(&len, &q->buf[q->tail], sizeof(len));
|
||||
if (buf != NULL) *buf = &q->buf[q->tail + sizeof(len)];
|
||||
size_t len = 0;
|
||||
if (q->tail != q->head) {
|
||||
len = mg_queue_read_len(q);
|
||||
if (len == 0) { // Zero (head wrapped) ?
|
||||
q->tail = 0; // Reset tail to the start
|
||||
if (q->head > q->tail) len = mg_queue_read_len(q); // Read again
|
||||
}
|
||||
}
|
||||
if (buf != NULL) *buf = q->buf + q->tail + sizeof(uint32_t);
|
||||
assert(q->tail + len <= q->size);
|
||||
return len;
|
||||
}
|
||||
|
||||
void mg_queue_add(struct mg_queue *q, size_t len) {
|
||||
size_t head = q->head + len + (size_t) sizeof(head); // New head
|
||||
if (head <= q->len) { // Have space ?
|
||||
memcpy(q->buf + q->head, &len, sizeof(len)); // Yes. Store ML
|
||||
q->head = head; // Advance head
|
||||
}
|
||||
assert(len > 0);
|
||||
mg_queue_write_len(q, len);
|
||||
assert(q->head + sizeof(uint32_t) * 2 + len <= q->size);
|
||||
q->head += len + sizeof(uint32_t);
|
||||
}
|
||||
|
||||
void mg_queue_del(struct mg_queue *q) {
|
||||
size_t len = mg_queue_next(q, NULL), tail = q->tail + len + sizeof(size_t);
|
||||
if (len != MG_QUEUE_EMPTY) q->tail = tail;
|
||||
void mg_queue_del(struct mg_queue *q, size_t len) {
|
||||
q->tail += len + sizeof(uint32_t);
|
||||
assert(q->tail + sizeof(uint32_t) <= q->size);
|
||||
}
|
||||
|
||||
#ifdef MG_ENABLE_LINES
|
||||
@ -3807,7 +3849,7 @@ static uint32_t blk0(union char64long16 *block, int i) {
|
||||
w = rol(w, 30);
|
||||
|
||||
static void mg_sha1_transform(uint32_t state[5],
|
||||
const unsigned char buffer[64]) {
|
||||
const unsigned char *buffer) {
|
||||
uint32_t a, b, c, d, e;
|
||||
union char64long16 block[1];
|
||||
|
||||
@ -6035,14 +6077,14 @@ volatile uint32_t RESERVED0, EIR, EIMR, RESERVED1, RDAR, TDAR, RESERVED2[3], ECR
|
||||
const uint32_t EIMR_RX_ERR = 0x2400000; // Intr mask RXF+EBERR
|
||||
|
||||
void ETH_IRQHandler(void);
|
||||
static bool mg_tcpip_driver_imxrt1020_init(struct mip_if *ifp);
|
||||
static bool mg_tcpip_driver_imxrt1020_init(struct mg_tcpip_if *ifp);
|
||||
static void wait_phy_complete(void);
|
||||
static struct mip_if *s_ifp; // MIP interface
|
||||
static struct mg_tcpip_if *s_ifp; // MIP interface
|
||||
|
||||
static size_t mg_tcpip_driver_imxrt1020_tx(const void *, size_t , struct mip_if *);
|
||||
static bool mg_tcpip_driver_imxrt1020_up(struct mip_if *ifp);
|
||||
static size_t mg_tcpip_driver_imxrt1020_tx(const void *, size_t , struct mg_tcpip_if *);
|
||||
static bool mg_tcpip_driver_imxrt1020_up(struct mg_tcpip_if *ifp);
|
||||
|
||||
enum { PHY_ADDR = 0x02, PHY_BCR = 0, PHY_BSR = 1 }; // PHY constants
|
||||
enum { IMXRT1020_PHY_ADDR = 0x02, IMXRT1020_PHY_BCR = 0, IMXRT1020_PHY_BSR = 1 }; // PHY constants
|
||||
|
||||
void delay(uint32_t);
|
||||
void delay (uint32_t di) {
|
||||
@ -6061,7 +6103,7 @@ static void wait_phy_complete(void) {
|
||||
ENET->EIR |= BIT(23); // MII interrupt clear
|
||||
}
|
||||
|
||||
static uint32_t eth_read_phy(uint8_t addr, uint8_t reg) {
|
||||
static uint32_t imxrt1020_eth_read_phy(uint8_t addr, uint8_t reg) {
|
||||
ENET->EIR |= BIT(23); // MII interrupt clear
|
||||
uint32_t mask_phy_adr_reg = 0x1f; // 0b00011111: Ensure we write 5 bits (Phy address & register)
|
||||
uint32_t phy_transaction = 0x00;
|
||||
@ -6077,7 +6119,7 @@ static uint32_t eth_read_phy(uint8_t addr, uint8_t reg) {
|
||||
return (ENET->MMFR & 0x0000ffff);
|
||||
}
|
||||
|
||||
static void eth_write_phy(uint8_t addr, uint8_t reg, uint32_t val) {
|
||||
static void imxrt1020_eth_write_phy(uint8_t addr, uint8_t reg, uint32_t val) {
|
||||
ENET->EIR |= BIT(23); // MII interrupt clear
|
||||
uint8_t mask_phy_adr_reg = 0x1f; // 0b00011111: Ensure we write 5 bits (Phy address & register)
|
||||
uint32_t mask_phy_data = 0x0000ffff; // Ensure we write 16 bits (data)
|
||||
@ -6116,7 +6158,7 @@ uint8_t tx_data_buffer[(ENET_TXBD_NUM)][((unsigned int)(((ENET_TXBUFF_SIZE)) + (
|
||||
// Initialise driver imx_rt1020
|
||||
|
||||
// static bool mg_tcpip_driver_imxrt1020_init(uint8_t *mac, void *data) { // VO
|
||||
static bool mg_tcpip_driver_imxrt1020_init(struct mip_if *ifp) {
|
||||
static bool mg_tcpip_driver_imxrt1020_init(struct mg_tcpip_if *ifp) {
|
||||
|
||||
struct mg_tcpip_driver_imxrt1020_data *d = (struct mg_tcpip_driver_imxrt1020_data *) ifp->driver_data;
|
||||
s_ifp = ifp;
|
||||
@ -6131,17 +6173,17 @@ static bool mg_tcpip_driver_imxrt1020_init(struct mip_if *ifp) {
|
||||
|
||||
// Setup MII/RMII MDC clock divider (<= 2.5MHz).
|
||||
ENET->MSCR = 0x130; // HOLDTIME 2 clk, Preamble enable, MDC MII_Speed Div 0x30
|
||||
eth_write_phy(PHY_ADDR, PHY_BCR, 0x8000); // PHY W @0x00 D=0x8000 Soft reset
|
||||
while (eth_read_phy(PHY_ADDR, PHY_BSR) & BIT(15)) {delay(0x5000);} // Wait finished poll 10ms
|
||||
imxrt1020_eth_write_phy(IMXRT1020_PHY_ADDR, IMXRT1020_PHY_BCR, 0x8000); // PHY W @0x00 D=0x8000 Soft reset
|
||||
while (imxrt1020_eth_read_phy(IMXRT1020_PHY_ADDR, IMXRT1020_PHY_BSR) & BIT(15)) {delay(0x5000);} // Wait finished poll 10ms
|
||||
|
||||
// PHY: Start Link
|
||||
{
|
||||
eth_write_phy(PHY_ADDR, PHY_BCR, 0x1200); // PHY W @0x00 D=0x1200 Autonego enable + start
|
||||
eth_write_phy(PHY_ADDR, 0x1f, 0x8180); // PHY W @0x1f D=0x8180 Ref clock 50 MHz at XI input
|
||||
imxrt1020_eth_write_phy(IMXRT1020_PHY_ADDR, IMXRT1020_PHY_BCR, 0x1200); // PHY W @0x00 D=0x1200 Autonego enable + start
|
||||
imxrt1020_eth_write_phy(IMXRT1020_PHY_ADDR, 0x1f, 0x8180); // PHY W @0x1f D=0x8180 Ref clock 50 MHz at XI input
|
||||
|
||||
uint32_t bcr = eth_read_phy(PHY_ADDR, PHY_BCR);
|
||||
uint32_t bcr = imxrt1020_eth_read_phy(IMXRT1020_PHY_ADDR, IMXRT1020_PHY_BCR);
|
||||
bcr &= ~BIT(10); // Isolation -> Normal
|
||||
eth_write_phy(PHY_ADDR, PHY_BCR, bcr);
|
||||
imxrt1020_eth_write_phy(IMXRT1020_PHY_ADDR, IMXRT1020_PHY_BCR, bcr);
|
||||
}
|
||||
|
||||
// Disable ENET
|
||||
@ -6199,23 +6241,23 @@ static bool mg_tcpip_driver_imxrt1020_init(struct mip_if *ifp) {
|
||||
}
|
||||
|
||||
// Transmit frame
|
||||
static uint32_t s_txno;
|
||||
static uint32_t s_rt1020_txno;
|
||||
|
||||
static size_t mg_tcpip_driver_imxrt1020_tx(const void *buf, size_t len, struct mip_if *ifp) {
|
||||
static size_t mg_tcpip_driver_imxrt1020_tx(const void *buf, size_t len, struct mg_tcpip_if *ifp) {
|
||||
|
||||
if (len > sizeof(tx_data_buffer[ENET_TXBD_NUM])) {
|
||||
// MG_ERROR(("Frame too big, %ld", (long) len));
|
||||
len = 0; // Frame is too big
|
||||
} else if ((tx_buffer_descriptor[s_txno].control & BIT(15))) {
|
||||
} else if ((tx_buffer_descriptor[s_rt1020_txno].control & BIT(15))) {
|
||||
MG_ERROR(("No free descriptors"));
|
||||
// printf("D0 %lx SR %lx\n", (long) s_txdesc[0][0], (long) ETH->DMASR);
|
||||
len = 0; // All descriptors are busy, fail
|
||||
} else {
|
||||
memcpy(tx_data_buffer[s_txno], buf, len); // Copy data
|
||||
tx_buffer_descriptor[s_txno].length = (uint16_t) len; // Set data len
|
||||
tx_buffer_descriptor[s_txno].control |= (uint16_t)(BIT(10)); // TC (transmit CRC)
|
||||
// tx_buffer_descriptor[s_txno].control &= (uint16_t)(BIT(14) | BIT(12)); // Own doesn't affect HW
|
||||
tx_buffer_descriptor[s_txno].control |= (uint16_t)(BIT(15) | BIT(11)); // R+L (ready+last)
|
||||
memcpy(tx_data_buffer[s_rt1020_txno], buf, len); // Copy data
|
||||
tx_buffer_descriptor[s_rt1020_txno].length = (uint16_t) len; // Set data len
|
||||
tx_buffer_descriptor[s_rt1020_txno].control |= (uint16_t)(BIT(10)); // TC (transmit CRC)
|
||||
// tx_buffer_descriptor[s_rt1020_txno].control &= (uint16_t)(BIT(14) | BIT(12)); // Own doesn't affect HW
|
||||
tx_buffer_descriptor[s_rt1020_txno].control |= (uint16_t)(BIT(15) | BIT(11)); // R+L (ready+last)
|
||||
ENET->TDAR = BIT(24); // Descriptor updated. Hand over to DMA.
|
||||
// INFO
|
||||
// Relevant Descriptor bits: 15(R) Ready
|
||||
@ -6223,36 +6265,34 @@ static size_t mg_tcpip_driver_imxrt1020_tx(const void *buf, size_t len, struct m
|
||||
// 10(TC) transmis CRC
|
||||
// __DSB(); // ARM errata 838869 Cortex-M4, M4F, M7, M7F: "store immediate overlapping
|
||||
// exception" return might vector to incorrect interrupt.
|
||||
if (++s_txno >= ENET_TXBD_NUM) s_txno = 0;
|
||||
if (++s_rt1020_txno >= ENET_TXBD_NUM) s_rt1020_txno = 0;
|
||||
}
|
||||
(void) ifp;
|
||||
return len;
|
||||
}
|
||||
|
||||
// IRQ (RX)
|
||||
static uint32_t s_rxno;
|
||||
static uint32_t s_rt1020_rxno;
|
||||
|
||||
void ENET_IRQHandler(void) {
|
||||
ENET->EIMR = 0; // Mask interrupts.
|
||||
uint32_t eir = ENET->EIR; // Read EIR
|
||||
ENET->EIR = 0xffffffff; // Clear interrupts
|
||||
|
||||
qp_mark(QP_IRQTRIGGERED, 0);
|
||||
|
||||
if (eir & EIMR_RX_ERR) // Global mask used
|
||||
{
|
||||
if (rx_buffer_descriptor[s_rxno].control & BIT(15)) {
|
||||
if (rx_buffer_descriptor[s_rt1020_rxno].control & BIT(15)) {
|
||||
ENET->EIMR = EIMR_RX_ERR; // Enable interrupts
|
||||
return; // Empty? -> exit.
|
||||
}
|
||||
// Read inframes
|
||||
else { // Frame received, loop
|
||||
for (uint32_t i = 0; i < 10; i++) { // read as they arrive but not forever
|
||||
if (rx_buffer_descriptor[s_rxno].control & BIT(15)) break; // exit when done
|
||||
uint32_t len = (rx_buffer_descriptor[s_rxno].length);
|
||||
mg_tcpip_qwrite(rx_buffer_descriptor[s_rxno].buffer, len > 4 ? len - 4 : len, s_ifp);
|
||||
rx_buffer_descriptor[s_rxno].control |= BIT(15); // Inform DMA RX is empty
|
||||
if (++s_rxno >= ENET_RXBD_NUM) s_rxno = 0;
|
||||
if (rx_buffer_descriptor[s_rt1020_rxno].control & BIT(15)) break; // exit when done
|
||||
uint32_t len = (rx_buffer_descriptor[s_rt1020_rxno].length);
|
||||
mg_tcpip_qwrite(rx_buffer_descriptor[s_rt1020_rxno].buffer, len > 4 ? len - 4 : len, s_ifp);
|
||||
rx_buffer_descriptor[s_rt1020_rxno].control |= BIT(15); // Inform DMA RX is empty
|
||||
if (++s_rt1020_rxno >= ENET_RXBD_NUM) s_rt1020_rxno = 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -6260,15 +6300,15 @@ void ENET_IRQHandler(void) {
|
||||
}
|
||||
|
||||
// Up/down status
|
||||
static bool mg_tcpip_driver_imxrt1020_up(struct mip_if *ifp) {
|
||||
uint32_t bsr = eth_read_phy(PHY_ADDR, PHY_BSR);
|
||||
static bool mg_tcpip_driver_imxrt1020_up(struct mg_tcpip_if *ifp) {
|
||||
uint32_t bsr = imxrt1020_eth_read_phy(IMXRT1020_PHY_ADDR, IMXRT1020_PHY_BSR);
|
||||
(void) ifp;
|
||||
return bsr & BIT(2) ? 1 : 0;
|
||||
}
|
||||
|
||||
// API
|
||||
struct mg_tcpip_driver mg_tcpip_driver_imxrt1020 = {
|
||||
mg_tcpip_driver_imxrt1020_init, mg_tcpip_driver_imxrt1020_tx, mip_driver_rx,
|
||||
mg_tcpip_driver_imxrt1020_init, mg_tcpip_driver_imxrt1020_tx, mg_tcpip_driver_rx,
|
||||
mg_tcpip_driver_imxrt1020_up};
|
||||
|
||||
#endif
|
||||
|
56
mongoose.h
56
mongoose.h
@ -223,7 +223,6 @@ static inline int mg_mkdir(const char *path, mode_t mode) {
|
||||
#define MG_PATH_MAX 100
|
||||
#define MG_ENABLE_SOCKET 0
|
||||
#define MG_ENABLE_DIRLIST 0
|
||||
#define MG_ENABLE_ATOMIC 1
|
||||
|
||||
#endif
|
||||
|
||||
@ -240,7 +239,6 @@ static inline int mg_mkdir(const char *path, mode_t mode) {
|
||||
|
||||
#include <pico/stdlib.h>
|
||||
int mkdir(const char *, mode_t);
|
||||
#define MG_ENABLE_ATOMIC 1
|
||||
#endif
|
||||
|
||||
|
||||
@ -599,10 +597,6 @@ struct timeval {
|
||||
#define MG_ENABLE_TCPIP 0 // Mongoose built-in network stack
|
||||
#endif
|
||||
|
||||
#ifndef MG_ENABLE_ATOMIC
|
||||
#define MG_ENABLE_ATOMIC 0 // Required by mg_queue
|
||||
#endif
|
||||
|
||||
#ifndef MG_ENABLE_LWIP
|
||||
#define MG_ENABLE_LWIP 0 // lWIP network stack
|
||||
#endif
|
||||
@ -676,6 +670,10 @@ struct timeval {
|
||||
#define MG_ENABLE_PACKED_FS 0
|
||||
#endif
|
||||
|
||||
#ifndef MG_ENABLE_ASSERT
|
||||
#define MG_ENABLE_ASSERT 0
|
||||
#endif
|
||||
|
||||
#ifndef MG_IO_SIZE
|
||||
#define MG_IO_SIZE 2048 // Granularity of the send/recv IO buffer growth
|
||||
#endif
|
||||
@ -792,40 +790,32 @@ char *mg_remove_double_dots(char *s);
|
||||
|
||||
|
||||
|
||||
|
||||
#if MG_ENABLE_ATOMIC
|
||||
#include <stdatomic.h>
|
||||
#elif !defined(_Atomic)
|
||||
#define _Atomic
|
||||
#endif
|
||||
|
||||
// Single producer, single consumer non-blocking queue
|
||||
//
|
||||
// Producer:
|
||||
// void *buf;
|
||||
// while (mg_queue_space(q, &buf) < len) WAIT(); // Wait for free space
|
||||
// memcpy(buf, data, len); // Copy data to the queue
|
||||
// mg_queue_add(q, len); // Advance q->head
|
||||
// char *buf;
|
||||
// while (mg_queue_book(q, &buf) < len) WAIT(); // Wait for space
|
||||
// memcpy(buf, my_data, len); // Copy data to the queue
|
||||
// mg_queue_add(q, len);
|
||||
//
|
||||
// Consumer:
|
||||
// void *buf;
|
||||
// while ((len = mg_queue_next(q, &buf)) == MG_QUEUE_EMPTY) WAIT();
|
||||
// mg_hexdump(buf, len); // Handle message
|
||||
// mg_queue_del(q); // Delete message
|
||||
// char *buf;
|
||||
// while ((len = mg_queue_get(q, &buf)) == 0) WAIT();
|
||||
// mg_hexdump(buf, len); // Handle message
|
||||
// mg_queue_del(q, len);
|
||||
//
|
||||
struct mg_queue {
|
||||
char *buf;
|
||||
size_t len;
|
||||
volatile _Atomic size_t tail;
|
||||
volatile _Atomic size_t head;
|
||||
size_t size;
|
||||
volatile size_t tail;
|
||||
volatile size_t head;
|
||||
};
|
||||
|
||||
#define MG_QUEUE_EMPTY ((size_t) ~0ul) // Next message size when queue is empty
|
||||
|
||||
void mg_queue_add(struct mg_queue *, size_t len); // Advance head
|
||||
void mg_queue_del(struct mg_queue *); // Advance tail
|
||||
size_t mg_queue_space(struct mg_queue *, char **); // Get free space
|
||||
size_t mg_queue_next(struct mg_queue *, char **); // Get next message size
|
||||
void mg_queue_init(struct mg_queue *, char *, size_t); // Init queue
|
||||
size_t mg_queue_book(struct mg_queue *, char **buf, size_t); // Reserve space
|
||||
void mg_queue_add(struct mg_queue *, size_t); // Add new message
|
||||
size_t mg_queue_next(struct mg_queue *, char **); // Get oldest message
|
||||
void mg_queue_del(struct mg_queue *, size_t); // Delete oldest message
|
||||
|
||||
|
||||
|
||||
@ -958,6 +948,12 @@ bool mg_file_printf(struct mg_fs *fs, const char *path, const char *fmt, ...);
|
||||
|
||||
|
||||
|
||||
#if MG_ENABLE_ASSERT
|
||||
#include <assert.h>
|
||||
#elif !defined(assert)
|
||||
#define assert(x)
|
||||
#endif
|
||||
|
||||
void mg_random(void *buf, size_t len);
|
||||
char *mg_random_str(char *buf, size_t len);
|
||||
uint16_t mg_ntohs(uint16_t net);
|
||||
|
@ -19,6 +19,5 @@
|
||||
#define MG_PATH_MAX 100
|
||||
#define MG_ENABLE_SOCKET 0
|
||||
#define MG_ENABLE_DIRLIST 0
|
||||
#define MG_ENABLE_ATOMIC 1
|
||||
|
||||
#endif
|
||||
|
@ -12,5 +12,4 @@
|
||||
|
||||
#include <pico/stdlib.h>
|
||||
int mkdir(const char *, mode_t);
|
||||
#define MG_ENABLE_ATOMIC 1
|
||||
#endif
|
||||
|
@ -8,10 +8,6 @@
|
||||
#define MG_ENABLE_TCPIP 0 // Mongoose built-in network stack
|
||||
#endif
|
||||
|
||||
#ifndef MG_ENABLE_ATOMIC
|
||||
#define MG_ENABLE_ATOMIC 0 // Required by mg_queue
|
||||
#endif
|
||||
|
||||
#ifndef MG_ENABLE_LWIP
|
||||
#define MG_ENABLE_LWIP 0 // lWIP network stack
|
||||
#endif
|
||||
@ -85,6 +81,10 @@
|
||||
#define MG_ENABLE_PACKED_FS 0
|
||||
#endif
|
||||
|
||||
#ifndef MG_ENABLE_ASSERT
|
||||
#define MG_ENABLE_ASSERT 0
|
||||
#endif
|
||||
|
||||
#ifndef MG_IO_SIZE
|
||||
#define MG_IO_SIZE 2048 // Granularity of the send/recv IO buffer growth
|
||||
#endif
|
||||
|
@ -6,7 +6,7 @@
|
||||
size_t mg_queue_vprintf(struct mg_queue *q, const char *fmt, va_list *ap) {
|
||||
size_t len = mg_snprintf(NULL, 0, fmt, ap);
|
||||
char *buf;
|
||||
if (mg_queue_space(q, &buf) < len + 1) {
|
||||
if (len == 0 || mg_queue_book(q, &buf, len + 1) < len + 1) {
|
||||
len = 0; // Nah. Not enough space
|
||||
} else {
|
||||
len = mg_vsnprintf((char *)buf, len + 1, fmt, ap);
|
||||
|
92
src/queue.c
92
src/queue.c
@ -1,40 +1,82 @@
|
||||
#include "queue.h"
|
||||
#include "util.h"
|
||||
|
||||
// Every message in the queue is prepended by the message length (ML)
|
||||
// ML is sizeof(size_t) in size
|
||||
// Tail points to the message data
|
||||
#if defined(__GNUC__) || defined(__clang__)
|
||||
#define MG_MEMORY_BARRIER() __sync_synchronize()
|
||||
#elif defined(_MSC_VER) && _MSC_VER >= 1700
|
||||
#define MG_MEMORY_BARRIER() MemoryBarrier()
|
||||
#elif !defined(MG_MEMORY_BARRIER)
|
||||
#define MG_MEMORY_BARRIER()
|
||||
#endif
|
||||
|
||||
// Every message in a queue is prepended by a 32-bit message length (ML).
|
||||
// If ML is 0, then it is the end, and reader must wrap to the beginning.
|
||||
//
|
||||
// |------| ML | message1 | ML | message2 |--- free space ---|
|
||||
// ^ ^ ^ ^
|
||||
// buf tail head len
|
||||
// Queue when q->tail <= q->head:
|
||||
// |----- free -----| ML | message1 | ML | message2 | ----- free ------|
|
||||
// ^ ^ ^ ^
|
||||
// buf tail head len
|
||||
//
|
||||
// Queue when q->tail > q->head:
|
||||
// | ML | message2 |----- free ------| ML | message1 | 0 |---- free ----|
|
||||
// ^ ^ ^ ^
|
||||
// buf head tail len
|
||||
|
||||
size_t mg_queue_space(struct mg_queue *q, char **buf) {
|
||||
size_t ofs;
|
||||
if (q->head > 0 && q->tail >= q->head) { // All messages read?
|
||||
q->head = 0; // Yes. Reset head first
|
||||
q->tail = 0; // Now reset the tail
|
||||
void mg_queue_init(struct mg_queue *q, char *buf, size_t size) {
|
||||
q->size = size;
|
||||
q->buf = buf;
|
||||
q->head = q->tail = 0;
|
||||
}
|
||||
|
||||
static size_t mg_queue_read_len(struct mg_queue *q) {
|
||||
uint32_t n = 0;
|
||||
MG_MEMORY_BARRIER();
|
||||
memcpy(&n, q->buf + q->tail, sizeof(n));
|
||||
assert(q->tail + n + sizeof(n) <= q->size);
|
||||
return n;
|
||||
}
|
||||
|
||||
static void mg_queue_write_len(struct mg_queue *q, size_t len) {
|
||||
uint32_t n = (uint32_t) len;
|
||||
memcpy(q->buf + q->head, &n, sizeof(n));
|
||||
MG_MEMORY_BARRIER();
|
||||
}
|
||||
|
||||
size_t mg_queue_book(struct mg_queue *q, char **buf, size_t len) {
|
||||
size_t space = 0, hs = sizeof(uint32_t) * 2; // *2 is for the 0 marker
|
||||
if (q->head >= q->tail && q->head + len + hs <= q->size) {
|
||||
space = q->size - q->head - hs; // There is enough space
|
||||
} else if (q->head >= q->tail && q->tail > hs) {
|
||||
mg_queue_write_len(q, 0); // Not enough space ahead
|
||||
q->head = 0; // Wrap head to the beginning
|
||||
}
|
||||
ofs = q->head + sizeof(size_t);
|
||||
if (buf != NULL) *buf = q->buf + ofs;
|
||||
return ofs > q->len ? 0 : q->len - ofs;
|
||||
if (q->head + hs + len < q->tail) space = q->tail - q->head - hs;
|
||||
if (buf != NULL) *buf = q->buf + q->head + sizeof(uint32_t);
|
||||
return space;
|
||||
}
|
||||
|
||||
size_t mg_queue_next(struct mg_queue *q, char **buf) {
|
||||
size_t len = MG_QUEUE_EMPTY;
|
||||
if (q->tail < q->head) memcpy(&len, &q->buf[q->tail], sizeof(len));
|
||||
if (buf != NULL) *buf = &q->buf[q->tail + sizeof(len)];
|
||||
size_t len = 0;
|
||||
if (q->tail != q->head) {
|
||||
len = mg_queue_read_len(q);
|
||||
if (len == 0) { // Zero (head wrapped) ?
|
||||
q->tail = 0; // Reset tail to the start
|
||||
if (q->head > q->tail) len = mg_queue_read_len(q); // Read again
|
||||
}
|
||||
}
|
||||
if (buf != NULL) *buf = q->buf + q->tail + sizeof(uint32_t);
|
||||
assert(q->tail + len <= q->size);
|
||||
return len;
|
||||
}
|
||||
|
||||
void mg_queue_add(struct mg_queue *q, size_t len) {
|
||||
size_t head = q->head + len + (size_t) sizeof(head); // New head
|
||||
if (head <= q->len) { // Have space ?
|
||||
memcpy(q->buf + q->head, &len, sizeof(len)); // Yes. Store ML
|
||||
q->head = head; // Advance head
|
||||
}
|
||||
assert(len > 0);
|
||||
mg_queue_write_len(q, len);
|
||||
assert(q->head + sizeof(uint32_t) * 2 + len <= q->size);
|
||||
q->head += len + sizeof(uint32_t);
|
||||
}
|
||||
|
||||
void mg_queue_del(struct mg_queue *q) {
|
||||
size_t len = mg_queue_next(q, NULL), tail = q->tail + len + sizeof(size_t);
|
||||
if (len != MG_QUEUE_EMPTY) q->tail = tail;
|
||||
void mg_queue_del(struct mg_queue *q, size_t len) {
|
||||
q->tail += len + sizeof(uint32_t);
|
||||
assert(q->tail + sizeof(uint32_t) <= q->size);
|
||||
}
|
||||
|
42
src/queue.h
42
src/queue.h
@ -1,38 +1,30 @@
|
||||
#pragma once
|
||||
|
||||
#include "arch.h" // For size_t
|
||||
#include "config.h" // For MG_ENABLE_ATOMIC
|
||||
|
||||
#if MG_ENABLE_ATOMIC
|
||||
#include <stdatomic.h>
|
||||
#elif !defined(_Atomic)
|
||||
#define _Atomic
|
||||
#endif
|
||||
#include "arch.h" // For size_t
|
||||
|
||||
// Single producer, single consumer non-blocking queue
|
||||
//
|
||||
// Producer:
|
||||
// void *buf;
|
||||
// while (mg_queue_space(q, &buf) < len) WAIT(); // Wait for free space
|
||||
// memcpy(buf, data, len); // Copy data to the queue
|
||||
// mg_queue_add(q, len); // Advance q->head
|
||||
// char *buf;
|
||||
// while (mg_queue_book(q, &buf) < len) WAIT(); // Wait for space
|
||||
// memcpy(buf, my_data, len); // Copy data to the queue
|
||||
// mg_queue_add(q, len);
|
||||
//
|
||||
// Consumer:
|
||||
// void *buf;
|
||||
// while ((len = mg_queue_next(q, &buf)) == MG_QUEUE_EMPTY) WAIT();
|
||||
// mg_hexdump(buf, len); // Handle message
|
||||
// mg_queue_del(q); // Delete message
|
||||
// char *buf;
|
||||
// while ((len = mg_queue_get(q, &buf)) == 0) WAIT();
|
||||
// mg_hexdump(buf, len); // Handle message
|
||||
// mg_queue_del(q, len);
|
||||
//
|
||||
struct mg_queue {
|
||||
char *buf;
|
||||
size_t len;
|
||||
volatile _Atomic size_t tail;
|
||||
volatile _Atomic size_t head;
|
||||
size_t size;
|
||||
volatile size_t tail;
|
||||
volatile size_t head;
|
||||
};
|
||||
|
||||
#define MG_QUEUE_EMPTY ((size_t) ~0ul) // Next message size when queue is empty
|
||||
|
||||
void mg_queue_add(struct mg_queue *, size_t len); // Advance head
|
||||
void mg_queue_del(struct mg_queue *); // Advance tail
|
||||
size_t mg_queue_space(struct mg_queue *, char **); // Get free space
|
||||
size_t mg_queue_next(struct mg_queue *, char **); // Get next message size
|
||||
void mg_queue_init(struct mg_queue *, char *, size_t); // Init queue
|
||||
size_t mg_queue_book(struct mg_queue *, char **buf, size_t); // Reserve space
|
||||
void mg_queue_add(struct mg_queue *, size_t); // Add new message
|
||||
size_t mg_queue_next(struct mg_queue *, char **); // Get oldest message
|
||||
void mg_queue_del(struct mg_queue *, size_t); // Delete oldest message
|
||||
|
@ -48,7 +48,7 @@ static uint32_t blk0(union char64long16 *block, int i) {
|
||||
w = rol(w, 30);
|
||||
|
||||
static void mg_sha1_transform(uint32_t state[5],
|
||||
const unsigned char buffer[64]) {
|
||||
const unsigned char *buffer) {
|
||||
uint32_t a, b, c, d, e;
|
||||
union char64long16 block[1];
|
||||
|
||||
|
@ -6,6 +6,12 @@
|
||||
#include "net.h"
|
||||
#include "str.h"
|
||||
|
||||
#if MG_ENABLE_ASSERT
|
||||
#include <assert.h>
|
||||
#elif !defined(assert)
|
||||
#define assert(x)
|
||||
#endif
|
||||
|
||||
void mg_random(void *buf, size_t len);
|
||||
char *mg_random_str(char *buf, size_t len);
|
||||
uint16_t mg_ntohs(uint16_t net);
|
||||
|
@ -4,7 +4,6 @@
|
||||
#define MG_ENABLE_PACKED_FS 0
|
||||
#define MG_ENABLE_LINES 1
|
||||
|
||||
#include <assert.h>
|
||||
#include <sys/socket.h>
|
||||
#ifndef __OpenBSD__
|
||||
#include <linux/if.h>
|
||||
|
@ -3,7 +3,6 @@
|
||||
#define MG_ENABLE_TCPIP 1
|
||||
#define MG_ENABLE_PACKED_FS 0
|
||||
|
||||
#include <assert.h>
|
||||
#include "mongoose.c"
|
||||
#include "driver_mock.c"
|
||||
|
||||
|
117
test/unit_test.c
117
test/unit_test.c
@ -2686,32 +2686,41 @@ static void test_poll(void) {
|
||||
mg_mgr_free(&mgr);
|
||||
}
|
||||
|
||||
#if MG_ENABLE_ATOMIC
|
||||
#define NMESSAGES 49999
|
||||
#define NMESSAGES 99999
|
||||
static uint32_t s_qcrc = 0;
|
||||
static int s_out, s_in;
|
||||
static void producer(void *param) {
|
||||
struct mg_queue *q = (struct mg_queue *) param;
|
||||
volatile size_t i, n, len;
|
||||
for (i = 0; i < NMESSAGES; i++) {
|
||||
char buf[100];
|
||||
char *p;
|
||||
mg_random(buf, sizeof(buf));
|
||||
n = ((unsigned char *) buf)[0] % sizeof(buf);
|
||||
while ((len = mg_queue_space(q, &p)) < n) (void) 0;
|
||||
memcpy(p, buf, n);
|
||||
mg_queue_add(q, n);
|
||||
s_qcrc = mg_crc32(s_qcrc, buf, n);
|
||||
char tmp[64 * 1024], *buf;
|
||||
size_t len, ofs = sizeof(tmp);
|
||||
for (s_out = 0; s_out < NMESSAGES; s_out++) {
|
||||
if (ofs >= sizeof(tmp)) mg_random(tmp, sizeof(tmp)), ofs = 0;
|
||||
len = ((uint8_t *) tmp)[ofs] % 55 + 1;
|
||||
if (ofs + len > sizeof(tmp)) len = sizeof(tmp) - ofs;
|
||||
while ((mg_queue_book(q, &buf, len)) < len) (void) 0;
|
||||
memcpy(buf, &tmp[ofs], len);
|
||||
s_qcrc = mg_crc32(s_qcrc, buf, len);
|
||||
ofs += len;
|
||||
#if 0
|
||||
fprintf(stderr, "-->prod %3d %8x %-3lu %zu/%zu/%lu\n", s_out, s_qcrc, len, q->tail,
|
||||
q->head, buf - q->buf);
|
||||
#endif
|
||||
mg_queue_add(q, len);
|
||||
}
|
||||
}
|
||||
|
||||
static uint32_t consumer(struct mg_queue *q) {
|
||||
uint32_t i, crc = 0;
|
||||
for (i = 0; i < NMESSAGES; i++) {
|
||||
char *p;
|
||||
volatile size_t len;
|
||||
while ((len = mg_queue_next(q, &p)) == MG_QUEUE_EMPTY) (void) 0;
|
||||
crc = mg_crc32(crc, (char *) p, len);
|
||||
mg_queue_del(q);
|
||||
uint32_t crc = 0;
|
||||
for (s_in = 0; s_in < NMESSAGES; s_in++) {
|
||||
char *buf;
|
||||
size_t len;
|
||||
while ((len = mg_queue_next(q, &buf)) == 0) (void) 0;
|
||||
crc = mg_crc32(crc, buf, len);
|
||||
#if 0
|
||||
fprintf(stderr, "-->cons %3u %8x %-3lu %zu/%zu/%lu\n", s_in, crc, len, q->tail,
|
||||
q->head, buf - q->buf);
|
||||
#endif
|
||||
mg_queue_del(q, len);
|
||||
}
|
||||
return crc;
|
||||
}
|
||||
@ -2739,68 +2748,18 @@ static void start_thread(void (*f)(void *), void *p) {
|
||||
(void) f, (void) p;
|
||||
}
|
||||
#endif
|
||||
#endif
|
||||
|
||||
static void test_queue(void) {
|
||||
char buf[512], *p;
|
||||
size_t size = sizeof(size);
|
||||
struct mg_queue queue, *q = &queue;
|
||||
|
||||
memset(q, 0, sizeof(*q));
|
||||
q->buf = buf;
|
||||
q->len = sizeof(buf);
|
||||
|
||||
ASSERT(mg_queue_next(q, &p) == MG_QUEUE_EMPTY);
|
||||
|
||||
// Write "hi"
|
||||
ASSERT(mg_queue_space(q, &p) == sizeof(buf) - size);
|
||||
ASSERT(mg_queue_printf(q, "hi") == 2);
|
||||
ASSERT(q->head == size + 2);
|
||||
ASSERT(mg_queue_next(q, &p) == 2);
|
||||
|
||||
// Write zero-length message
|
||||
ASSERT(mg_queue_space(q, &p) == sizeof(buf) - 2 - 2 * size);
|
||||
ASSERT(mg_queue_printf(q, "") == 0);
|
||||
ASSERT(q->head == size * 2 + 2);
|
||||
ASSERT(mg_queue_next(q, &p) == 2);
|
||||
|
||||
// Write "dude"
|
||||
ASSERT(mg_queue_space(q, &p) == sizeof(buf) - 2 - 3 * size);
|
||||
ASSERT(mg_queue_printf(q, "dude") == 4);
|
||||
ASSERT(q->head == size * 3 + 2 + 4);
|
||||
ASSERT(mg_queue_next(q, &p) == 2);
|
||||
|
||||
// Read "hi"
|
||||
ASSERT(mg_queue_next(q, &p) == 2);
|
||||
ASSERT(memcmp(p, "hi", 2) == 0);
|
||||
mg_queue_del(q);
|
||||
ASSERT(q->head == size * 3 + 2 + 4);
|
||||
ASSERT(q->tail == size + 2);
|
||||
|
||||
// Read empty message
|
||||
ASSERT(mg_queue_next(q, &p) == 0);
|
||||
mg_queue_del(q);
|
||||
ASSERT(q->tail == size * 2 + 2);
|
||||
|
||||
ASSERT(mg_queue_next(q, &p) == 4);
|
||||
ASSERT(memcmp(p, "dude", 4) == 0);
|
||||
mg_queue_del(q);
|
||||
ASSERT(q->tail == q->head);
|
||||
|
||||
ASSERT(mg_queue_space(q, &p) == sizeof(buf) - size);
|
||||
ASSERT(q->tail == 0 && q->head == 0);
|
||||
q->tail = q->head = 0;
|
||||
|
||||
#if MG_ENABLE_ATOMIC
|
||||
{
|
||||
// Test concurrent queue access
|
||||
uint32_t crc;
|
||||
start_thread(producer, q);
|
||||
crc = consumer(q);
|
||||
MG_INFO(("DONE. %x %x", s_qcrc, crc));
|
||||
ASSERT(s_qcrc == crc);
|
||||
}
|
||||
#endif
|
||||
char buf[512];
|
||||
struct mg_queue queue;
|
||||
uint32_t crc;
|
||||
memset(buf, 0x55, sizeof(buf));
|
||||
mg_queue_init(&queue, buf, sizeof(buf));
|
||||
start_thread(producer, &queue); // Start producer in a separate thread
|
||||
crc = consumer(&queue); // Consumer eats data in this thread
|
||||
MG_INFO(("CRC1 %8x", s_qcrc)); // Show CRCs
|
||||
MG_INFO(("CRC2 %8x", crc));
|
||||
ASSERT(s_qcrc == crc);
|
||||
}
|
||||
|
||||
int main(void) {
|
||||
|
Loading…
x
Reference in New Issue
Block a user