Add a way to support multiple threads.

git-svn-id: https://protobuf-c.googlecode.com/svn/trunk@289 00440858-1255-0410-a3e6-75ea37f81c3a
This commit is contained in:
lahiker42 2011-03-21 22:25:28 +00:00
parent 75fa6665de
commit 5787693b48
6 changed files with 212 additions and 33 deletions

View File

@ -3,6 +3,7 @@
as packed-repeated whenever it makes sense (for all types
other than messages, strings, and bytes).
- switch to New BSD license.
- add protobuf_c_rpc_server_configure_threading()
0.15:
- make protobuf_c_message_init() into a function (Issue #49, daveb)

View File

@ -802,6 +802,7 @@ struct _ServerRequest
uint32_t request_id; /* in little-endian */
uint32_t method_index; /* in native-endian */
ServerConnection *conn;
ProtobufC_RPC_Server *server;
union {
/* if conn != NULL, then the request is alive: */
struct { ServerRequest *prev, *next; } alive;
@ -825,6 +826,18 @@ struct _ServerConnection
ServerRequest *first_pending_request, *last_pending_request;
};
/* When we get a response in the wrong thread,
we proxy it over a system pipe. Actually, we allocate one
of these structures and pass the pointer over the pipe. */
typedef struct _ProxyResponse ProxyResponse;
struct _ProxyResponse
{
ServerRequest *request;
unsigned len;
/* data follows the structure */
};
struct _ProtobufC_RPC_Server
{
ProtobufCDispatch *dispatch;
@ -837,6 +850,13 @@ struct _ProtobufC_RPC_Server
ServerRequest *recycled_requests;
/* multithreading support */
ProtobufC_RPC_IsRpcThreadFunc is_rpc_thread_func;
void * is_rpc_thread_data;
int proxy_pipe[2];
unsigned proxy_extra_data_len;
uint8_t proxy_extra_data[sizeof (void*)];
ProtobufC_RPC_Error_Func error_handler;
void *error_handler_data;
@ -976,6 +996,7 @@ create_server_request (ServerConnection *conn,
ProtobufCAllocator *allocator = conn->server->allocator;
rv = allocator->alloc (allocator, sizeof (ServerRequest));
}
rv->server = conn->server;
rv->conn = conn;
rv->request_id = request_id;
rv->method_index = method_index;
@ -984,6 +1005,21 @@ create_server_request (ServerConnection *conn,
return rv;
}
static void
free_server_request (ProtobufC_RPC_Server *server,
ServerRequest *request)
{
#if RECYCLE_REQUESTS
/* recycle request */
request->info.recycled.next = server->recycled_requests;
server->recycled_requests = request;
#else
/* free the request immediately */
server->allocator->free (server->allocator, request);
#endif
}
static void handle_server_connection_events (int fd,
unsigned events,
void *data);
@ -992,16 +1028,19 @@ server_connection_response_closure (const ProtobufCMessage *message,
void *closure_data)
{
ServerRequest *request = closure_data;
ServerConnection *conn = request->conn;
protobuf_c_boolean must_set_output_watch = FALSE;
if (conn == NULL)
ProtobufC_RPC_Server *server = request->server;
protobuf_c_boolean must_proxy = 0;
ProtobufCAllocator *allocator = server->allocator;
if (server->is_rpc_thread_func != NULL)
{
/* defunct request */
ProtobufCAllocator *allocator = request->info.defunct.allocator;
allocator->free (allocator, request);
return;
must_proxy = !server->is_rpc_thread_func (server,
server->dispatch,
server->is_rpc_thread_data);
}
uint8_t buffer_slab[512];
ProtobufCBufferSimple buffer_simple = PROTOBUF_C_BUFFER_SIMPLE_INIT (buffer_slab);
if (message == NULL)
{
/* send failed status */
@ -1010,44 +1049,71 @@ server_connection_response_closure (const ProtobufCMessage *message,
header[1] = uint32_to_le (request->method_index);
header[2] = 0; /* no message */
header[3] = request->request_id;
must_set_output_watch = (conn->outgoing.size == 0);
protobuf_c_data_buffer_append (&conn->outgoing, header, 16);
protobuf_c_buffer_simple_append (&buffer_simple.base,
16, (uint8_t *) header);
}
else
{
/* send success response */
uint32_t header[4];
uint8_t buffer_slab[512];
ProtobufCBufferSimple buffer_simple = PROTOBUF_C_BUFFER_SIMPLE_INIT (buffer_slab);
protobuf_c_message_pack_to_buffer (message, &buffer_simple.base);
header[0] = uint32_to_le (PROTOBUF_C_STATUS_CODE_SUCCESS);
header[1] = uint32_to_le (request->method_index);
header[2] = uint32_to_le (buffer_simple.len);
header[3] = request->request_id;
must_set_output_watch = (conn->outgoing.size == 0);
protobuf_c_data_buffer_append (&conn->outgoing, header, 16);
protobuf_c_data_buffer_append (&conn->outgoing, buffer_simple.data, buffer_simple.len);
PROTOBUF_C_BUFFER_SIMPLE_CLEAR (&buffer_simple);
protobuf_c_buffer_simple_append (&buffer_simple.base,
16, (uint8_t *) header);
protobuf_c_message_pack_to_buffer (message, &buffer_simple.base);
((uint32_t *)buffer_simple.data)[2] = uint32_to_le (buffer_simple.len - 16);
}
if (must_set_output_watch)
protobuf_c_dispatch_watch_fd (conn->server->dispatch,
conn->fd,
PROTOBUF_C_EVENT_READABLE|PROTOBUF_C_EVENT_WRITABLE,
handle_server_connection_events,
conn);
GSK_LIST_REMOVE (GET_PENDING_REQUEST_LIST (conn), request);
conn->n_pending_requests--;
if (must_proxy)
{
ProxyResponse *pr = allocator->alloc (allocator, sizeof (ProxyResponse) + buffer_simple.len);
int rv;
pr->request = request;
pr->len = buffer_simple.len;
memcpy (pr + 1, buffer_simple.data, buffer_simple.len);
#if RECYCLE_REQUESTS
/* recycle request */
request->info.recycled.next = conn->server->recycled_requests;
conn->server->recycled_requests = request;
#else
/* free the request immediately */
conn->server->allocator->free (conn->server->allocator, request);
#endif
/* write pointer to proxy pipe */
retry_write:
rv = write (server->proxy_pipe[1], &pr, sizeof (void*));
if (rv < 0)
{
if (errno == EINTR || errno == EAGAIN)
goto retry_write;
server_failed_literal (server, PROTOBUF_C_ERROR_CODE_PROXY_PROBLEM,
"error writing to proxy-pipe");
allocator->free (allocator, pr);
}
else if (rv < sizeof (void *))
{
server_failed_literal (server, PROTOBUF_C_ERROR_CODE_PROXY_PROBLEM,
"partial write to proxy-pipe");
allocator->free (allocator, pr);
}
}
else if (request->conn == NULL)
{
/* defunct request */
allocator->free (allocator, request);
free_server_request (server, request);
}
else
{
ServerConnection *conn = request->conn;
protobuf_c_boolean must_set_output_watch = (conn->outgoing.size == 0);
protobuf_c_data_buffer_append (&conn->outgoing, buffer_simple.data, buffer_simple.len);
if (must_set_output_watch)
protobuf_c_dispatch_watch_fd (conn->server->dispatch,
conn->fd,
PROTOBUF_C_EVENT_READABLE|PROTOBUF_C_EVENT_WRITABLE,
handle_server_connection_events,
conn);
GSK_LIST_REMOVE (GET_PENDING_REQUEST_LIST (conn), request);
conn->n_pending_requests--;
free_server_request (server, request);
}
PROTOBUF_C_BUFFER_SIMPLE_CLEAR (&buffer_simple);
}
static void
@ -1203,6 +1269,10 @@ server_new_from_fd (ProtobufC_FD listening_fd,
server->error_handler_data = "protobuf-c rpc server";
server->listening_fd = listening_fd;
server->recycled_requests = NULL;
server->is_rpc_thread_func = NULL;
server->is_rpc_thread_data = NULL;
server->proxy_pipe[0] = server->proxy_pipe[1] = -1;
server->proxy_extra_data_len = 0;
strcpy (server->bind_name, bind_name);
set_fd_nonblocking (listening_fd);
protobuf_c_dispatch_watch_fd (dispatch, listening_fd, PROTOBUF_C_EVENT_READABLE,
@ -1355,6 +1425,82 @@ protobuf_c_rpc_server_destroy (ProtobufC_RPC_Server *server,
return rv;
}
/* Number of proxied requests to try to grab in a single read */
#define PROXY_BUF_SIZE 256
static void
handle_proxy_pipe_readable (ProtobufC_FD fd,
unsigned events,
void *callback_data)
{
int nread;
ProtobufC_RPC_Server *server = callback_data;
ProtobufCAllocator *allocator = server->allocator;
union {
char buf[sizeof(void*) * PROXY_BUF_SIZE];
ProxyResponse *responses[PROXY_BUF_SIZE];
} u;
unsigned amt, i;
memcpy (u.buf, server->proxy_extra_data, server->proxy_extra_data_len);
nread = read (fd, u.buf + server->proxy_extra_data_len, sizeof (u.buf) - server->proxy_extra_data_len);
if (nread <= 0)
return; /* TODO: handle 0 and non-retryable errors separately */
amt = server->proxy_extra_data_len + nread;
for (i = 0; i < amt / sizeof(void*); i++)
{
ProxyResponse *pr = u.responses[i];
ServerRequest *request = pr->request;
if (request->conn == NULL)
{
/* defunct request */
allocator->free (allocator, request);
}
else
{
ServerConnection *conn = request->conn;
protobuf_c_boolean must_set_output_watch = (conn->outgoing.size == 0);
protobuf_c_data_buffer_append (&conn->outgoing, (uint8_t*)(pr+1), pr->len);
if (must_set_output_watch)
protobuf_c_dispatch_watch_fd (conn->server->dispatch,
conn->fd,
PROTOBUF_C_EVENT_READABLE|PROTOBUF_C_EVENT_WRITABLE,
handle_server_connection_events,
conn);
GSK_LIST_REMOVE (GET_PENDING_REQUEST_LIST (conn), request);
conn->n_pending_requests--;
free_server_request (conn->server, request);
}
allocator->free (allocator, pr);
}
memcpy (server->proxy_extra_data, u.buf + i * sizeof(void*), amt - i * sizeof(void*));
server->proxy_extra_data_len = amt - i * sizeof(void*);
}
void
protobuf_c_rpc_server_configure_threading (ProtobufC_RPC_Server *server,
ProtobufC_RPC_IsRpcThreadFunc func,
void *is_rpc_data)
{
server->is_rpc_thread_func = func;
server->is_rpc_thread_data = is_rpc_data;
retry_pipe:
if (pipe (server->proxy_pipe) < 0)
{
if (errno == EINTR || errno == EAGAIN)
goto retry_pipe;
server_failed_literal (server, PROTOBUF_C_ERROR_CODE_PROXY_PROBLEM,
"error creating pipe for thread-proxying");
return;
}
/* make the read side non-blocking, since we will use it from the main-loop;
leave the write side blocking, since it will be used from foreign threads */
set_fd_nonblocking (server->proxy_pipe[0]);
protobuf_c_dispatch_watch_fd (server->dispatch, server->proxy_pipe[0],
PROTOBUF_C_EVENT_READABLE,
handle_proxy_pipe_readable, server);
}
void
protobuf_c_rpc_server_set_error_handler (ProtobufC_RPC_Server *server,
ProtobufC_RPC_Error_Func func,

View File

@ -26,6 +26,7 @@ typedef enum
PROTOBUF_C_ERROR_CODE_CONNECTION_REFUSED,
PROTOBUF_C_ERROR_CODE_CLIENT_TERMINATED,
PROTOBUF_C_ERROR_CODE_BAD_REQUEST,
PROTOBUF_C_ERROR_CODE_PROXY_PROBLEM
} ProtobufC_RPC_Error_Code;
typedef enum
@ -104,6 +105,7 @@ ProtobufC_RPC_Server *
ProtobufCService *service,
ProtobufCDispatch *dispatch /* or NULL */
);
ProtobufCService *
protobuf_c_rpc_server_destroy (ProtobufC_RPC_Server *server,
protobuf_c_boolean free_underlying_service);
@ -118,6 +120,15 @@ void protobuf_c_rpc_server_disable_autotimeout(ProtobufC_RPC_Server *server);
void protobuf_c_rpc_server_set_autotimeout (ProtobufC_RPC_Server *server,
unsigned timeout_millis);
typedef protobuf_c_boolean
(*ProtobufC_RPC_IsRpcThreadFunc) (ProtobufC_RPC_Server *server,
ProtobufCDispatch *dispatch,
void *is_rpc_data);
void protobuf_c_rpc_server_configure_threading (ProtobufC_RPC_Server *server,
ProtobufC_RPC_IsRpcThreadFunc func,
void *is_rpc_data);
/* Error handling */
void protobuf_c_rpc_server_set_error_handler (ProtobufC_RPC_Server *server,
ProtobufC_RPC_Error_Func func,

View File

@ -4,6 +4,7 @@
#include <string.h>
#include <errno.h>
#include <stdlib.h>
#include <signal.h>
#include "generated-code/test.pb-c.h"
#include <google/protobuf-c/protobuf-c-rpc.h>
@ -144,6 +145,8 @@ int main(int argc, char**argv)
if (name == NULL)
die ("missing --tcp=HOST:PORT or --unix=PATH");
signal (SIGPIPE, SIG_IGN);
service = protobuf_c_rpc_client_new (address_type, name, &foo__dir_lookup__descriptor, NULL);
if (service == NULL)

View File

@ -3,6 +3,7 @@
#include <stdio.h>
#include <string.h>
#include <errno.h>
#include <signal.h>
#include <stdlib.h>
#include "generated-code/test.pb-c.h"
#include <google/protobuf-c/protobuf-c-rpc.h>
@ -254,6 +255,8 @@ int main(int argc, char**argv)
if (name == NULL)
die ("missing --port=NUM or --unix=PATH");
signal (SIGPIPE, SIG_IGN);
server = protobuf_c_rpc_server_new (address_type, name, (ProtobufCService *) &the_dir_lookup_service, NULL);
for (;;)

View File

@ -2,6 +2,7 @@
#include <string.h>
#include <stdarg.h>
#include <stdio.h>
#include <signal.h>
#include "generated-code/test.pb-c.h"
#include <google/protobuf-c/protobuf-c-rpc.h>
@ -163,6 +164,14 @@ set_boolean_true (ProtobufCDispatch *dispatch,
* (protobuf_c_boolean *) func_data = 1;
}
static protobuf_c_boolean
pretend_we_are_in_another_thread (ProtobufC_RPC_Server *server,
ProtobufCDispatch *dispatch,
void *data)
{
return 0; /* indicate we are NOT in RPC thread */
}
int main()
{
protobuf_c_boolean is_done;
@ -171,6 +180,8 @@ int main()
ProtobufC_RPC_Client *client;
ProtobufC_RPC_Server *server;
signal (SIGPIPE, SIG_IGN);
message ("testing local service");
test_service (local_service);
@ -250,6 +261,10 @@ int main()
message ("testing client again");
test_service (remote_service);
message ("testing client again (simulating threaded environment)");
protobuf_c_rpc_server_configure_threading (server, pretend_we_are_in_another_thread, NULL);
test_service (remote_service);
/* Destroy the client */
message ("destroying client");
protobuf_c_service_destroy (remote_service);