rpc builds...

git-svn-id: https://protobuf-c.googlecode.com/svn/trunk@123 00440858-1255-0410-a3e6-75ea37f81c3a
This commit is contained in:
lahiker42 2009-01-26 18:09:52 +00:00
parent 4349899e34
commit 70d37e825c
2 changed files with 48 additions and 9 deletions

View File

@ -1,3 +1,6 @@
/* KNOWN DEFECTS:
- server does not obey max_pending_requests_per_connection
*/
#include <string.h> #include <string.h>
#include <assert.h> #include <assert.h>
#include <sys/stat.h> #include <sys/stat.h>
@ -718,7 +721,8 @@ typedef struct _ServerRequest ServerRequest;
typedef struct _ServerConnection ServerConnection; typedef struct _ServerConnection ServerConnection;
struct _ServerRequest struct _ServerRequest
{ {
uint32_t request_id; uint32_t request_id; /* in little-endian */
uint32_t method_index; /* in native-endian */
ServerConnection *conn; ServerConnection *conn;
union { union {
/* if conn != NULL, then the request is alive: */ /* if conn != NULL, then the request is alive: */
@ -739,6 +743,7 @@ struct _ServerConnection
ProtobufC_RPC_Server *server; ProtobufC_RPC_Server *server;
ServerConnection *prev, *next; ServerConnection *prev, *next;
unsigned n_pending_requests;
ServerRequest *first_pending_request, *last_pending_request; ServerRequest *first_pending_request, *last_pending_request;
}; };
@ -875,7 +880,8 @@ server_connection_failed (ServerConnection *conn,
static ServerRequest * static ServerRequest *
create_server_request (ServerConnection *conn, create_server_request (ServerConnection *conn,
uint32_t request_id) uint32_t request_id,
uint32_t method_index)
{ {
ServerRequest *rv; ServerRequest *rv;
if (conn->server->recycled_requests != NULL) if (conn->server->recycled_requests != NULL)
@ -894,12 +900,17 @@ create_server_request (ServerConnection *conn,
return rv; return rv;
} }
static void handle_server_connection_events (int fd,
unsigned events,
void *data);
static void static void
server_connection_response_closure (const ProtobufCMessage *message, server_connection_response_closure (const ProtobufCMessage *message,
void *closure_data) void *closure_data)
{ {
ServerRequest *request = closure_data; ServerRequest *request = closure_data;
if (request->conn == NULL) ServerConnection *conn = request->conn;
protobuf_c_boolean must_set_output_watch = FALSE;
if (conn == NULL)
{ {
/* defunct request */ /* defunct request */
ProtobufCAllocator *allocator = request->info.defunct.allocator; ProtobufCAllocator *allocator = request->info.defunct.allocator;
@ -908,13 +919,37 @@ server_connection_response_closure (const ProtobufCMessage *message,
else if (message == NULL) else if (message == NULL)
{ {
/* send failed status */ /* send failed status */
... uint32_t header[4];
header[0] = uint32_to_le (PROTOBUF_C_STATUS_CODE_SERVICE_FAILED);
header[1] = uint32_to_le (request->method_index);
header[2] = 0; /* no message */
header[3] = request->request_id;
must_set_output_watch = (conn->outgoing.size == 0);
protobuf_c_data_buffer_append (&conn->outgoing, header, 16);
} }
else else
{ {
/* send success response */ /* send success response */
... uint32_t header[4];
uint8_t buffer_slab[512];
ProtobufCBufferSimple buffer_simple = PROTOBUF_C_BUFFER_SIMPLE_INIT (buffer_slab);
protobuf_c_message_pack_to_buffer (message, &buffer_simple.base);
header[0] = uint32_to_le (PROTOBUF_C_STATUS_CODE_SUCCESS);
header[1] = uint32_to_le (request->method_index);
header[2] = uint32_to_le (buffer_simple.len);
header[3] = request->request_id;
must_set_output_watch = (conn->outgoing.size == 0);
protobuf_c_data_buffer_append (&conn->outgoing, header, 16);
protobuf_c_data_buffer_append (&conn->outgoing, buffer_simple.data, buffer_simple.len);
PROTOBUF_C_BUFFER_SIMPLE_CLEAR (&buffer_simple);
} }
if (must_set_output_watch)
protobuf_c_dispatch_watch_fd (conn->server->dispatch,
conn->fd,
PROTOBUF_C_EVENT_READABLE|PROTOBUF_C_EVENT_WRITABLE,
handle_server_connection_events,
conn);
} }
static void static void
@ -991,7 +1026,7 @@ handle_server_connection_events (int fd,
} }
/* Invoke service (note that it may call back immediately) */ /* Invoke service (note that it may call back immediately) */
server_request = create_server_request (conn, request_id); server_request = create_server_request (conn, request_id, method_index);
service->invoke (service, method_index, message, service->invoke (service, method_index, message,
server_connection_response_closure, server_request); server_connection_response_closure, server_request);
protobuf_c_message_free_unpacked (message, allocator); protobuf_c_message_free_unpacked (message, allocator);
@ -1005,11 +1040,15 @@ handle_server_connection_events (int fd,
{ {
if (!errno_is_ignorable (errno)) if (!errno_is_ignorable (errno))
{ {
... server_connection_failed (conn,
PROTOBUF_C_ERROR_CODE_CLIENT_TERMINATED,
"writing to file-descriptor: %s",
strerror (errno));
return;
} }
} }
if (conn->outgoing.size == 0) if (conn->outgoing.size == 0)
protobuf_c_dispatch_watch_fd (client->dispatch, conn->fd, PROTOBUF_C_EVENT_READABLE, protobuf_c_dispatch_watch_fd (conn->server->dispatch, conn->fd, PROTOBUF_C_EVENT_READABLE,
handle_server_connection_events, conn); handle_server_connection_events, conn);
} }
} }
@ -1035,7 +1074,6 @@ handle_server_listener_readable (int fd,
} }
conn = allocator->alloc (allocator, sizeof (ServerConnection)); conn = allocator->alloc (allocator, sizeof (ServerConnection));
conn->fd = new_fd; conn->fd = new_fd;
conn->defunct = 0;
protobuf_c_data_buffer_init (&conn->incoming, server->allocator); protobuf_c_data_buffer_init (&conn->incoming, server->allocator);
protobuf_c_data_buffer_init (&conn->outgoing, server->allocator); protobuf_c_data_buffer_init (&conn->outgoing, server->allocator);
conn->n_pending_requests = 0; conn->n_pending_requests = 0;

View File

@ -31,6 +31,7 @@ typedef enum
typedef enum typedef enum
{ {
PROTOBUF_C_STATUS_CODE_SUCCESS, PROTOBUF_C_STATUS_CODE_SUCCESS,
PROTOBUF_C_STATUS_CODE_SERVICE_FAILED,
PROTOBUF_C_STATUS_CODE_TOO_MANY_PENDING PROTOBUF_C_STATUS_CODE_TOO_MANY_PENDING
} ProtobufC_RPC_Status_Code; } ProtobufC_RPC_Status_Code;