diff --git a/src/google/protobuf-c/protobuf-c-rpc.c b/src/google/protobuf-c/protobuf-c-rpc.c index bb02d5b..78d9b47 100644 --- a/src/google/protobuf-c/protobuf-c-rpc.c +++ b/src/google/protobuf-c/protobuf-c-rpc.c @@ -1,3 +1,6 @@ +/* KNOWN DEFECTS: + - server does not obey max_pending_requests_per_connection + */ #include #include #include @@ -718,7 +721,8 @@ typedef struct _ServerRequest ServerRequest; typedef struct _ServerConnection ServerConnection; struct _ServerRequest { - uint32_t request_id; + uint32_t request_id; /* in little-endian */ + uint32_t method_index; /* in native-endian */ ServerConnection *conn; union { /* if conn != NULL, then the request is alive: */ @@ -739,6 +743,7 @@ struct _ServerConnection ProtobufC_RPC_Server *server; ServerConnection *prev, *next; + unsigned n_pending_requests; ServerRequest *first_pending_request, *last_pending_request; }; @@ -875,7 +880,8 @@ server_connection_failed (ServerConnection *conn, static ServerRequest * create_server_request (ServerConnection *conn, - uint32_t request_id) + uint32_t request_id, + uint32_t method_index) { ServerRequest *rv; if (conn->server->recycled_requests != NULL) @@ -894,12 +900,17 @@ create_server_request (ServerConnection *conn, return rv; } +static void handle_server_connection_events (int fd, + unsigned events, + void *data); static void server_connection_response_closure (const ProtobufCMessage *message, void *closure_data) { ServerRequest *request = closure_data; - if (request->conn == NULL) + ServerConnection *conn = request->conn; + protobuf_c_boolean must_set_output_watch = FALSE; + if (conn == NULL) { /* defunct request */ ProtobufCAllocator *allocator = request->info.defunct.allocator; @@ -908,13 +919,37 @@ server_connection_response_closure (const ProtobufCMessage *message, else if (message == NULL) { /* send failed status */ - ... + uint32_t header[4]; + header[0] = uint32_to_le (PROTOBUF_C_STATUS_CODE_SERVICE_FAILED); + header[1] = uint32_to_le (request->method_index); + header[2] = 0; /* no message */ + header[3] = request->request_id; + must_set_output_watch = (conn->outgoing.size == 0); + protobuf_c_data_buffer_append (&conn->outgoing, header, 16); } else { /* send success response */ - ... + uint32_t header[4]; + uint8_t buffer_slab[512]; + ProtobufCBufferSimple buffer_simple = PROTOBUF_C_BUFFER_SIMPLE_INIT (buffer_slab); + protobuf_c_message_pack_to_buffer (message, &buffer_simple.base); + header[0] = uint32_to_le (PROTOBUF_C_STATUS_CODE_SUCCESS); + header[1] = uint32_to_le (request->method_index); + header[2] = uint32_to_le (buffer_simple.len); + header[3] = request->request_id; + must_set_output_watch = (conn->outgoing.size == 0); + protobuf_c_data_buffer_append (&conn->outgoing, header, 16); + protobuf_c_data_buffer_append (&conn->outgoing, buffer_simple.data, buffer_simple.len); + PROTOBUF_C_BUFFER_SIMPLE_CLEAR (&buffer_simple); } + if (must_set_output_watch) + protobuf_c_dispatch_watch_fd (conn->server->dispatch, + conn->fd, + PROTOBUF_C_EVENT_READABLE|PROTOBUF_C_EVENT_WRITABLE, + handle_server_connection_events, + conn); + } static void @@ -991,7 +1026,7 @@ handle_server_connection_events (int fd, } /* Invoke service (note that it may call back immediately) */ - server_request = create_server_request (conn, request_id); + server_request = create_server_request (conn, request_id, method_index); service->invoke (service, method_index, message, server_connection_response_closure, server_request); protobuf_c_message_free_unpacked (message, allocator); @@ -1005,11 +1040,15 @@ handle_server_connection_events (int fd, { if (!errno_is_ignorable (errno)) { - ... + server_connection_failed (conn, + PROTOBUF_C_ERROR_CODE_CLIENT_TERMINATED, + "writing to file-descriptor: %s", + strerror (errno)); + return; } } if (conn->outgoing.size == 0) - protobuf_c_dispatch_watch_fd (client->dispatch, conn->fd, PROTOBUF_C_EVENT_READABLE, + protobuf_c_dispatch_watch_fd (conn->server->dispatch, conn->fd, PROTOBUF_C_EVENT_READABLE, handle_server_connection_events, conn); } } @@ -1035,7 +1074,6 @@ handle_server_listener_readable (int fd, } conn = allocator->alloc (allocator, sizeof (ServerConnection)); conn->fd = new_fd; - conn->defunct = 0; protobuf_c_data_buffer_init (&conn->incoming, server->allocator); protobuf_c_data_buffer_init (&conn->outgoing, server->allocator); conn->n_pending_requests = 0; diff --git a/src/google/protobuf-c/protobuf-c-rpc.h b/src/google/protobuf-c/protobuf-c-rpc.h index 5c81392..a3be368 100644 --- a/src/google/protobuf-c/protobuf-c-rpc.h +++ b/src/google/protobuf-c/protobuf-c-rpc.h @@ -31,6 +31,7 @@ typedef enum typedef enum { PROTOBUF_C_STATUS_CODE_SUCCESS, + PROTOBUF_C_STATUS_CODE_SERVICE_FAILED, PROTOBUF_C_STATUS_CODE_TOO_MANY_PENDING } ProtobufC_RPC_Status_Code;