diff --git a/src/google/protobuf-c/protobuf-c-rpc.c b/src/google/protobuf-c/protobuf-c-rpc.c index a13abd9..bb02d5b 100644 --- a/src/google/protobuf-c/protobuf-c-rpc.c +++ b/src/google/protobuf-c/protobuf-c-rpc.c @@ -533,19 +533,20 @@ handle_client_fd_events (int fd, else { /* try processing buffer */ - while (client->incoming.size >= 12) + while (client->incoming.size >= 16) { - uint32_t header[3]; - unsigned service_index, message_length, request_id; + uint32_t header[4]; + unsigned status_code, method_index, message_length, request_id; Closure *closure; uint8_t *packed_data; ProtobufCMessage *msg; protobuf_c_data_buffer_peek (&client->incoming, header, sizeof (header)); - service_index = uint32_from_le (header[0]); - message_length = uint32_from_le (header[1]); - request_id = header[2]; /* already native-endian */ + status_code = uint32_from_le (header[0]); + method_index = uint32_from_le (header[1]); + message_length = uint32_from_le (header[2]); + request_id = header[3]; /* already native-endian */ - if (12 + message_length > client->incoming.size) + if (16 + message_length > client->incoming.size) break; /* lookup request by id */ @@ -559,7 +560,7 @@ handle_client_fd_events (int fd, closure = client->info.connected.closures + (request_id - 1); /* read message and unpack */ - protobuf_c_data_buffer_discard (&client->incoming, 12); + protobuf_c_data_buffer_discard (&client->incoming, 16); packed_data = client->allocator->alloc (client->allocator, message_length); protobuf_c_data_buffer_read (&client->incoming, packed_data, message_length); @@ -719,7 +720,16 @@ struct _ServerRequest { uint32_t request_id; ServerConnection *conn; - ServerRequest *prev, *next; + union { + /* if conn != NULL, then the request is alive: */ + struct { ServerRequest *prev, *next; } alive; + + /* if conn == NULL, then the request is defunct: */ + struct { ProtobufCAllocator *allocator; } defunct; + + /* well, if it is in the recycled list, then it's recycled :/ */ + struct { ServerRequest *next; } recycled; + } info; }; struct _ServerConnection { @@ -740,12 +750,17 @@ struct _ProtobufC_RPC_Server char *bind_name; ServerConnection *first_connection, *last_connection; + ServerRequest *recycled_requests; + + ProtobufC_RPC_Error_Func error_handler; + void *error_handler_data; + /* configuration */ unsigned max_pending_requests_per_connection; }; #define GET_PENDING_REQUEST_LIST(conn) \ - ServerRequest *, conn->first_pending_request, server->last_pending_request, prev, next + ServerRequest *, conn->first_pending_request, conn->last_pending_request, info.alive.prev, info.alive.next #define GET_CONNECTION_LIST(server) \ ServerConnection *, server->first_connection, server->last_connection, prev, next @@ -753,6 +768,7 @@ static void server_connection_close (ServerConnection *conn) { ServerRequest *req; + ProtobufCAllocator *allocator = conn->server->allocator; /* general cleanup */ protobuf_c_dispatch_close_fd (conn->server->dispatch, conn->fd); @@ -764,51 +780,151 @@ server_connection_close (ServerConnection *conn) GSK_LIST_REMOVE (GET_CONNECTION_LIST (conn->server), conn); /* disassocate all the requests from the connection */ - for (req = conn->first_pending_request; req; req = req->next) - req->conn = NULL; + while (conn->first_pending_request != NULL) + { + conn->first_pending_request = req->info.alive.next; + req->conn = NULL; + req->info.defunct.allocator = allocator; + } /* free the connection itself */ - conn->server->allocator->free (conn->server->allocator, conn); + allocator->free (allocator, conn); +} + +static void +server_failed_literal (ProtobufC_RPC_Server *server, + ProtobufC_RPC_Error_Code code, + const char *msg) +{ + if (server->error_handler != NULL) + server->error_handler (code, msg, server->error_handler_data); } static void server_failed (ProtobufC_RPC_Server *server, + ProtobufC_RPC_Error_Code code, const char *format, ...) { - ... + va_list args; + char buf[MAX_FAILED_MSG_LENGTH]; + va_start (args, format); + vsnprintf (buf, sizeof (buf), format, args); + buf[sizeof(buf)-1] = 0; + va_end (args); + + server_failed_literal (server, code, buf); +} + +static protobuf_c_boolean +address_to_name (const struct sockaddr *addr, + unsigned addr_len, + char *name_out, + unsigned name_out_buf_length) +{ + if (addr->sa_family == PF_INET) + { + /* convert to dotted address + port */ + const struct sockaddr_in *addr_in = (const struct sockaddr_in *) addr; + const uint8_t *addr = (const uint8_t *) &(addr_in->sin_addr); + uint16_t port = htons (addr_in->sin_port); + snprintf (name_out, name_out_buf_length, + "%u.%u.%u.%u:%u", + addr[0], addr[1], addr[2], addr[3], port); + return TRUE; + } + return FALSE; } static void server_connection_failed (ServerConnection *conn, + ProtobufC_RPC_Error_Code code, const char *format, ...) { - /* do vsnprintf() */ - ... + char remote_addr_name[64]; + char msg[MAX_FAILED_MSG_LENGTH]; + char *msg_end = msg + sizeof (msg); + char *msg_at; + struct sockaddr addr; + socklen_t addr_len = sizeof (addr); + va_list args; /* if we can, find the remote name of this connection */ - ... + if (getpeername (conn->fd, &addr, &addr_len) == 0 + && address_to_name (&addr, addr_len, remote_addr_name, sizeof (remote_addr_name))) + snprintf (msg, sizeof (msg), "connection to %s from %s: ", + conn->server->bind_name, remote_addr_name); + else + snprintf (msg, sizeof (msg), "connection to %s: ", + conn->server->bind_name); + msg[sizeof(msg)-1] = 0; + msg_at = strchr (msg, 0); + + /* do vsnprintf() */ + va_start (args, format); + vsnprintf(msg_at, msg_end - msg_at, format, args); + va_end (args); + msg[sizeof(msg)-1] = 0; /* invoke server error hook */ - if (remote_addr_name == NULL) - server_failed (conn->server, - "connection to %s: %s", - conn->server->bind_name, err_msg); - else - server_failed (conn->server, - "connection to %s from %s: %s", - conn->server->bind_name, remote_addr_name, err_msg); + server_failed_literal (conn->server, code, msg); server_connection_close (conn); } +static ServerRequest * +create_server_request (ServerConnection *conn, + uint32_t request_id) +{ + ServerRequest *rv; + if (conn->server->recycled_requests != NULL) + { + rv = conn->server->recycled_requests; + conn->server->recycled_requests = rv->info.recycled.next; + } + else + { + ProtobufCAllocator *allocator = conn->server->allocator; + rv = allocator->alloc (allocator, sizeof (ServerRequest)); + } + rv->conn = conn; + rv->request_id = request_id; + GSK_LIST_APPEND (GET_PENDING_REQUEST_LIST (conn), rv); + return rv; +} + +static void +server_connection_response_closure (const ProtobufCMessage *message, + void *closure_data) +{ + ServerRequest *request = closure_data; + if (request->conn == NULL) + { + /* defunct request */ + ProtobufCAllocator *allocator = request->info.defunct.allocator; + allocator->free (allocator, request); + } + else if (message == NULL) + { + /* send failed status */ + ... + } + else + { + /* send success response */ + ... + } +} + static void handle_server_connection_events (int fd, unsigned events, void *data) { ServerConnection *conn = data; + ProtobufCService *service = conn->server->underlying; + ProtobufCAllocator *allocator = conn->server->allocator; if (events & PROTOBUF_C_EVENT_READABLE) { int read_rv = protobuf_c_data_buffer_read_in_fd (&conn->incoming, fd); @@ -816,7 +932,9 @@ handle_server_connection_events (int fd, { if (!errno_is_ignorable (errno)) { - server_connection_failed (conn, "reading from file-descriptor: %s", + server_connection_failed (conn, + PROTOBUF_C_ERROR_CODE_CLIENT_TERMINATED, + "reading from file-descriptor: %s", strerror (errno)); return; } @@ -824,7 +942,9 @@ handle_server_connection_events (int fd, else if (read_rv == 0) { if (conn->first_pending_request != NULL) - server_connection_failed (conn, "closed while calls pending"); + server_connection_failed (conn, + PROTOBUF_C_ERROR_CODE_CLIENT_TERMINATED, + "closed while calls pending"); else server_connection_close (conn); return; @@ -833,31 +953,48 @@ handle_server_connection_events (int fd, while (conn->incoming.size >= 12) { uint32_t header[3]; - uint32_t service_index, message_length, request_id; + uint32_t method_index, message_length, request_id; + uint8_t *packed_data; + ProtobufCMessage *message; + ServerRequest *server_request; protobuf_c_data_buffer_peek (&conn->incoming, header, 12); - service_index = uint32_from_le (header[0]); + method_index = uint32_from_le (header[0]); message_length = uint32_from_le (header[1]); request_id = header[2]; /* store in whatever endianness it comes in */ if (conn->incoming.size < 12 + message_length) break; - if (service_index >= conn->server->service->descriptor->n_methods) + if (method_index >= conn->server->underlying->descriptor->n_methods) { - server_connection_failed (conn, "bad service_index %u", service_index); + server_connection_failed (conn, + PROTOBUF_C_ERROR_CODE_BAD_REQUEST, + "bad method_index %u", method_index); return; } /* Read message */ protobuf_c_data_buffer_discard (&conn->incoming, 12); - ... + packed_data = allocator->alloc (allocator, message_length); + protobuf_c_data_buffer_read (&conn->incoming, packed_data, message_length); /* Unpack message */ - ... + message = protobuf_c_message_unpack (service->descriptor->methods[method_index].input, + allocator, message_length, packed_data); + allocator->free (allocator, packed_data); + if (message == NULL) + { + server_connection_failed (conn, + PROTOBUF_C_ERROR_CODE_BAD_REQUEST, + "error unpacking message"); + return; + } /* Invoke service (note that it may call back immediately) */ - server_request = ...; - ... + server_request = create_server_request (conn, request_id); + service->invoke (service, method_index, message, + server_connection_response_closure, server_request); + protobuf_c_message_free_unpacked (message, allocator); } } if ((events & PROTOBUF_C_EVENT_WRITABLE) != 0 diff --git a/src/google/protobuf-c/protobuf-c-rpc.h b/src/google/protobuf-c/protobuf-c-rpc.h index 2c1d5e2..5c81392 100644 --- a/src/google/protobuf-c/protobuf-c-rpc.h +++ b/src/google/protobuf-c/protobuf-c-rpc.h @@ -3,11 +3,12 @@ /* Protocol is: * client issues request with header: - * service_index 32-bit little-endian + * method_index 32-bit little-endian * message_length 32-bit little-endian * request_id 32-bit any-endian * server responds with header: - * service_index 32-bit little-endian + * status_code 32-bit little-endian + * method_index 32-bit little-endian * message_length 32-bit little-endian * request_id 32-bit any-endian */ @@ -22,9 +23,17 @@ typedef enum typedef enum { PROTOBUF_C_ERROR_CODE_HOST_NOT_FOUND, - PROTOBUF_C_ERROR_CODE_CONNECTION_REFUSED + PROTOBUF_C_ERROR_CODE_CONNECTION_REFUSED, + PROTOBUF_C_ERROR_CODE_CLIENT_TERMINATED, + PROTOBUF_C_ERROR_CODE_BAD_REQUEST, } ProtobufC_RPC_Error_Code; +typedef enum +{ + PROTOBUF_C_STATUS_CODE_SUCCESS, + PROTOBUF_C_STATUS_CODE_TOO_MANY_PENDING +} ProtobufC_RPC_Status_Code; + typedef void (*ProtobufC_RPC_Error_Func) (ProtobufC_RPC_Error_Code code, const char *message, void *error_func_data);