From aca0512173ba2194a987ce42cb1ab976a3f280ff Mon Sep 17 00:00:00 2001 From: Sergey Lyubka Date: Mon, 23 Aug 2010 17:42:25 +0200 Subject: [PATCH] optimized DEBUG_TRACE(). Fixed thread sync on exit. --- mongoose.c | 143 ++++++++++++++++++++++++----------------------------- 1 file changed, 65 insertions(+), 78 deletions(-) diff --git a/mongoose.c b/mongoose.c index 56309ffc..1b91122c 100644 --- a/mongoose.c +++ b/mongoose.c @@ -196,10 +196,15 @@ typedef int SOCKET; #define MAX_CGI_ENVIR_VARS 64 #define MAX_REQUEST_SIZE 8192 #define ARRAY_SIZE(array) (sizeof(array) / sizeof(array[0])) -#define DEBUG_MGS_PREFIX "*** Mongoose debug *** " #if defined(DEBUG) -#define DEBUG_TRACE(x) do {printf x; putchar('\n'); fflush(stdout);} while (0) +#define DEBUG_TRACE(x) do { \ + printf("*** [%lu] thread %p: %s: ", \ + (unsigned long) time(NULL), (void *) pthread_self(), __func__); \ + printf x; \ + putchar('\n'); \ + fflush(stdout); \ +} while (0) #else #define DEBUG_TRACE(x) #endif // DEBUG @@ -344,13 +349,13 @@ struct mg_context { int num_threads; // Number of threads pthread_mutex_t mutex; // Protects (max|num)_threads - pthread_cond_t thr_cond; // Condvar for thread sync + pthread_cond_t cond; // Condvar for tracking workers terminations struct socket queue[20]; // Accepted sockets int sq_head; // Head of the socket queue int sq_tail; // Tail of the socket queue - pthread_cond_t empty_cond; // Socket queue empty condvar - pthread_cond_t full_cond; // Socket queue full condvar + pthread_cond_t sq_full; // Singaled when socket is produced + pthread_cond_t sq_empty; // Signaled when socket is consumed }; struct mg_connection { @@ -622,7 +627,7 @@ static void send_http_error(struct mg_connection *conn, int status, mg_callback_t error_handler; bool_t handled; - DEBUG_TRACE((DEBUG_MGS_PREFIX "%s: %d %s", __func__, status, reason)); + DEBUG_TRACE(("%d %s", status, reason)); conn->request_info.status_code = status; error_handler = conn->ctx->config->http_error_handler; @@ -1040,7 +1045,7 @@ static pid_t spawn_process(struct mg_connection *conn, const char *prog, (void) mg_snprintf(conn, line, sizeof(line), "%s", dir); change_slashes_to_backslashes(line); - DEBUG_TRACE((DEBUG_MGS_PREFIX "%s: Running [%s]", __func__, cmdline)); + DEBUG_TRACE(("Running [%s]", cmdline)); if (CreateProcessA(NULL, cmdline, NULL, NULL, MG_TRUE, CREATE_NEW_PROCESS_GROUP, envblk, line, &si, &pi) == 0) { cry(conn, "%s: CreateProcess(%s): %d", @@ -1093,7 +1098,8 @@ static int start_thread(struct mg_context *ctx, mg_thread_func_t func, (void) pthread_attr_init(&attr); (void) pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); - (void) pthread_attr_setstacksize(&attr, sizeof(struct mg_connection) * 2); + // TODO(lsm): figure out why mongoose dies on Linux if next line is enabled + // (void) pthread_attr_setstacksize(&attr, sizeof(struct mg_connection) * 5); if ((retval = pthread_create(&thread_id, &attr, func, param)) != 0) { cry(fc(ctx), "%s: %s", __func__, strerror(retval)); @@ -1119,9 +1125,9 @@ static pid_t spawn_process(struct mg_connection *conn, const char *prog, if (chdir(dir) != 0) { cry(conn, "%s: chdir(%s): %s", __func__, dir, strerror(ERRNO)); } else if (dup2(fd_stdin, 0) == -1) { - cry(conn, "%s: dup2(stdin, %d): %s", __func__, fd_stdin, strerror(ERRNO)); + cry(conn, "%s: dup2(%d, 1): %s", __func__, fd_stdin, strerror(ERRNO)); } else if (dup2(fd_stdout, 1) == -1) { - cry(conn, "%s: dup2(stdout, %d): %s", __func__, fd_stdout, strerror(ERRNO)); + cry(conn, "%s: dup2(%d, 1): %s", __func__, fd_stdout, strerror(ERRNO)); } else { (void) dup2(fd_stdout, 2); (void) close(fd_stdin); @@ -1387,7 +1393,7 @@ static void convert_uri_to_file_name(struct mg_connection *conn, change_slashes_to_backslashes(buf); #endif /* _WIN32 */ - DEBUG_TRACE((DEBUG_MGS_PREFIX "%s: [%s] -> [%s]", __func__, uri, buf)); + DEBUG_TRACE(("[%s] -> [%s]", uri, buf)); } // Setup listening socket on given address, return socket. @@ -2521,6 +2527,7 @@ static bool_t handle_request_body(struct mg_connection *conn, FILE *fp) { if (conn->content_len <= (int64_t) data_len) { #if 0 + TODO(lsm): sort out embedded mode. ri->post_data_len = (int) content_len; #endif // If fp is NULL, this is embedded mode, and we do not @@ -3043,7 +3050,7 @@ static void handle_request(struct mg_connection *conn) { remove_double_dots_and_double_slashes(ri->uri); convert_uri_to_file_name(conn, ri->uri, path, sizeof(path)); - DEBUG_TRACE((DEBUG_MGS_PREFIX "%s: %s", __func__, ri->uri)); + DEBUG_TRACE(("%s", ri->uri)); if (new_request_callback && new_request_callback(conn, ri) == MG_TRUE) { // Do nothing, callback has served the request } else if (!check_authorization(conn, path)) { @@ -3107,7 +3114,6 @@ static void close_all_listening_sockets(struct mg_context *ctx) { (void) closesocket(sp->sock); free(sp); } - ctx->listening_sockets = NULL; } static enum mg_error_t set_ports_option(struct mg_context *ctx) { @@ -3442,8 +3448,6 @@ static void close_socket_gracefully(SOCKET sock) { } static void close_connection(struct mg_connection *conn) { - reset_per_request_attributes(conn); - if (conn->ssl) { SSL_free(conn->ssl); conn->ssl = NULL; @@ -3511,37 +3515,26 @@ static void process_new_connection(struct mg_connection *conn) { } // Worker threads take accepted socket from the queue -static bool_t get_socket(struct mg_context *ctx, struct socket *sp) { -#if 0 - struct timespec ts; -#endif - +static bool_t consume_socket(struct mg_context *ctx, struct socket *sp) { (void) pthread_mutex_lock(&ctx->mutex); - DEBUG_TRACE((DEBUG_MGS_PREFIX "%s: thread %p: going idle", - __func__, (void *) pthread_self())); + DEBUG_TRACE(("going idle")); // If the queue is empty, wait. We're idle at this point. - while (ctx->sq_head == ctx->sq_tail) { -#if 0 - ts.tv_nsec = 0; - ts.tv_sec = time(NULL) + 5; - if (pthread_cond_wait(&ctx->empty_cond, &ctx->mutex, &ts) != 0) { - // Timeout! release the mutex and return - (void) pthread_mutex_unlock(&ctx->mutex); - return MG_FALSE; - } -#else - (void) pthread_cond_wait(&ctx->empty_cond, &ctx->mutex); -#endif + while (ctx->sq_head == ctx->sq_tail && ctx->stop_flag == 0) { + pthread_cond_wait(&ctx->sq_full, &ctx->mutex); + } + // Master thread could wake us up without putting a socket. + // If this happens, it is time to exit. + if (ctx->stop_flag) { + (void) pthread_mutex_unlock(&ctx->mutex); + return MG_FALSE; } assert(ctx->sq_head > ctx->sq_tail); // Copy socket from the queue and increment tail *sp = ctx->queue[ctx->sq_tail % ARRAY_SIZE(ctx->queue)]; ctx->sq_tail++; - DEBUG_TRACE((DEBUG_MGS_PREFIX - "%s: thread %p grabbed socket %d, going busy", - __func__, (void *) pthread_self(), sp->sock)); + DEBUG_TRACE(("grabbed socket %d, going busy", sp->sock)); // Wrap pointers if needed while (ctx->sq_tail > (int) ARRAY_SIZE(ctx->queue)) { @@ -3549,7 +3542,7 @@ static bool_t get_socket(struct mg_context *ctx, struct socket *sp) { ctx->sq_head -= ARRAY_SIZE(ctx->queue); } - (void) pthread_cond_signal(&ctx->full_cond); + (void) pthread_cond_signal(&ctx->sq_empty); (void) pthread_mutex_unlock(&ctx->mutex); return MG_TRUE; @@ -3558,12 +3551,9 @@ static bool_t get_socket(struct mg_context *ctx, struct socket *sp) { static void worker_thread(struct mg_context *ctx) { struct mg_connection conn; - DEBUG_TRACE((DEBUG_MGS_PREFIX "%s: thread %p starting", - __func__, (void *) pthread_self())); - (void) memset(&conn, 0, sizeof(conn)); - while (ctx->stop_flag == 0 && get_socket(ctx, &conn.client) == MG_TRUE) { + while (ctx->stop_flag == 0 && consume_socket(ctx, &conn.client)) { conn.birth_time = time(NULL); conn.ctx = ctx; @@ -3593,30 +3583,29 @@ static void worker_thread(struct mg_context *ctx) { // Signal master that we're done with connection and exiting (void) pthread_mutex_lock(&ctx->mutex); ctx->num_threads--; - (void) pthread_cond_signal(&ctx->thr_cond); + (void) pthread_cond_signal(&ctx->cond); assert(ctx->num_threads >= 0); (void) pthread_mutex_unlock(&ctx->mutex); - DEBUG_TRACE((DEBUG_MGS_PREFIX "%s: thread %p exiting", - __func__, (void *) pthread_self())); + DEBUG_TRACE(("exiting")); } // Master thread adds accepted socket to a queue -static void put_socket(struct mg_context *ctx, const struct socket *sp) { +static void produce_socket(struct mg_context *ctx, const struct socket *sp) { (void) pthread_mutex_lock(&ctx->mutex); // If the queue is full, wait while (ctx->sq_head - ctx->sq_tail >= (int) ARRAY_SIZE(ctx->queue)) { - (void) pthread_cond_wait(&ctx->full_cond, &ctx->mutex); + (void) pthread_cond_wait(&ctx->sq_empty, &ctx->mutex); } assert(ctx->sq_head - ctx->sq_tail < (int) ARRAY_SIZE(ctx->queue)); // Copy socket to the queue and increment head ctx->queue[ctx->sq_head % ARRAY_SIZE(ctx->queue)] = *sp; ctx->sq_head++; - DEBUG_TRACE((DEBUG_MGS_PREFIX "%s: queued socket %d", __func__, sp->sock)); + DEBUG_TRACE(("queued socket %d", sp->sock)); - (void) pthread_cond_signal(&ctx->empty_cond); + (void) pthread_cond_signal(&ctx->sq_full); (void) pthread_mutex_unlock(&ctx->mutex); } @@ -3627,23 +3616,19 @@ static void accept_new_connection(const struct socket *listener, accepted.rsa.len = sizeof(accepted.rsa.u.sin); accepted.lsa = listener->lsa; - if ((accepted.sock = accept(listener->sock, &accepted.rsa.u.sa, - &accepted.rsa.len)) == INVALID_SOCKET) { - return; - } - - allowed = check_acl(ctx, &accepted.rsa) == MG_SUCCESS; - - if (allowed == MG_SUCCESS) { - // Put accepted socket structure into the queue - DEBUG_TRACE((DEBUG_MGS_PREFIX "%s: accepted socket %d", - __func__, accepted.sock)); - accepted.is_ssl = listener->is_ssl; - put_socket(ctx, &accepted); - } else { - cry(fc(ctx), "%s: %s is not allowed to connect", - __func__, inet_ntoa(accepted.rsa.u.sin.sin_addr)); - (void) closesocket(accepted.sock); + accepted.sock = accept(listener->sock, &accepted.rsa.u.sa, &accepted.rsa.len); + if (accepted.sock != INVALID_SOCKET) { + allowed = check_acl(ctx, &accepted.rsa) == MG_SUCCESS; + if (allowed == MG_SUCCESS) { + // Put accepted socket structure into the queue + DEBUG_TRACE(("accepted socket %d", accepted.sock)); + accepted.is_ssl = listener->is_ssl; + produce_socket(ctx, &accepted); + } else { + cry(fc(ctx), "%s: %s is not allowed to connect", + __func__, inet_ntoa(accepted.rsa.u.sin.sin_addr)); + (void) closesocket(accepted.sock); + } } } @@ -3680,15 +3665,18 @@ static void master_thread(struct mg_context *ctx) { } } } - DEBUG_TRACE((DEBUG_MGS_PREFIX "%s: master: stopping workers", __func__)); + DEBUG_TRACE(("stopping workers")); // Stop signal received: somebody called mg_stop. Quit. close_all_listening_sockets(ctx); + // Wakeup workers that are waiting for connections to handle. + pthread_cond_broadcast(&ctx->sq_full); + // Wait until all threads finish (void) pthread_mutex_lock(&ctx->mutex); while (ctx->num_threads > 0) { - (void) pthread_cond_wait(&ctx->thr_cond, &ctx->mutex); + (void) pthread_cond_wait(&ctx->cond, &ctx->mutex); } (void) pthread_mutex_unlock(&ctx->mutex); @@ -3697,15 +3685,16 @@ static void master_thread(struct mg_context *ctx) { SSL_CTX_free(ctx->ssl_ctx); } + // All threads exited, no sync is needed. Destroy mutex and condvars (void) pthread_mutex_destroy(&ctx->mutex); - (void) pthread_cond_destroy(&ctx->thr_cond); - (void) pthread_cond_destroy(&ctx->empty_cond); - (void) pthread_cond_destroy(&ctx->full_cond); + (void) pthread_cond_destroy(&ctx->cond); + (void) pthread_cond_destroy(&ctx->sq_empty); + (void) pthread_cond_destroy(&ctx->sq_full); // Signal mg_stop() that we're done ctx->stop_flag = 2; - DEBUG_TRACE((DEBUG_MGS_PREFIX "%s: master: exiting", __func__)); + DEBUG_TRACE(("exiting")); } void mg_stop(struct mg_context *ctx) { @@ -3713,10 +3702,8 @@ void mg_stop(struct mg_context *ctx) { // Wait until mg_fini() stops while (ctx->stop_flag != 2) { - (void) sleep(1); + (void) sleep(0); } - - assert(ctx->num_threads == 0); free(ctx); #if defined(_WIN32) @@ -3772,9 +3759,9 @@ struct mg_context * mg_start(const struct mg_config *config) { #endif // !_WIN32 (void) pthread_mutex_init(&ctx->mutex, NULL); - (void) pthread_cond_init(&ctx->thr_cond, NULL); - (void) pthread_cond_init(&ctx->empty_cond, NULL); - (void) pthread_cond_init(&ctx->full_cond, NULL); + (void) pthread_cond_init(&ctx->cond, NULL); + (void) pthread_cond_init(&ctx->sq_empty, NULL); + (void) pthread_cond_init(&ctx->sq_full, NULL); // Start master (listening) thread start_thread(ctx, (mg_thread_func_t) master_thread, ctx);