mirror of
https://github.com/protobuf-c/protobuf-c.git
synced 2024-12-28 22:53:05 +08:00
1512 lines
50 KiB
C
1512 lines
50 KiB
C
/* KNOWN DEFECTS:
|
|
- server does not obey max_pending_requests_per_connection
|
|
- no ipv6 support
|
|
*/
|
|
#include <string.h>
|
|
#include <assert.h>
|
|
#include <sys/stat.h>
|
|
#include <sys/types.h>
|
|
#include <sys/socket.h>
|
|
#include <sys/un.h>
|
|
#include <netinet/in.h>
|
|
#include <netdb.h>
|
|
#include <stdarg.h>
|
|
#include <fcntl.h>
|
|
#include <errno.h>
|
|
#include <netinet/in.h>
|
|
#include <stdio.h>
|
|
#include <stdlib.h>
|
|
#include <unistd.h>
|
|
#include "protobuf-c-rpc.h"
|
|
#include "protobuf-c-data-buffer.h"
|
|
#include "gsklistmacros.h"
|
|
|
|
#define protobuf_c_assert(x) assert(x)
|
|
|
|
#undef TRUE
|
|
#define TRUE 1
|
|
#undef FALSE
|
|
#define FALSE 0
|
|
|
|
/* enabled for efficiency, can be useful to disable for debugging */
|
|
#define RECYCLE_REQUESTS 1
|
|
|
|
#define UINT_TO_POINTER(ui) ((void*)(uintptr_t)(ui))
|
|
#define POINTER_TO_UINT(ptr) ((unsigned)(uintptr_t)(ptr))
|
|
|
|
#define MAX_FAILED_MSG_LENGTH 512
|
|
|
|
typedef enum
|
|
{
|
|
PROTOBUF_C_CLIENT_STATE_INIT,
|
|
PROTOBUF_C_CLIENT_STATE_NAME_LOOKUP,
|
|
PROTOBUF_C_CLIENT_STATE_CONNECTING,
|
|
PROTOBUF_C_CLIENT_STATE_CONNECTED,
|
|
PROTOBUF_C_CLIENT_STATE_FAILED_WAITING,
|
|
PROTOBUF_C_CLIENT_STATE_FAILED, /* if no autoreconnect */
|
|
PROTOBUF_C_CLIENT_STATE_DESTROYED
|
|
} ProtobufC_RPC_ClientState;
|
|
|
|
typedef struct _Closure Closure;
|
|
struct _Closure
|
|
{
|
|
/* these will be NULL for unallocated request ids */
|
|
const ProtobufCMessageDescriptor *response_type;
|
|
ProtobufCClosure closure;
|
|
|
|
/* this is the next request id, or 0 for none */
|
|
void *closure_data;
|
|
};
|
|
|
|
static void
|
|
error_handler (ProtobufC_RPC_Error_Code code,
|
|
const char *message,
|
|
void *error_func_data)
|
|
{
|
|
fprintf (stderr, "*** error: %s: %s\n",
|
|
(char*) error_func_data, message);
|
|
}
|
|
|
|
struct _ProtobufC_RPC_Client
|
|
{
|
|
ProtobufCService base_service;
|
|
ProtobufCDataBuffer incoming;
|
|
ProtobufCDataBuffer outgoing;
|
|
ProtobufCAllocator *allocator;
|
|
ProtobufCDispatch *dispatch;
|
|
ProtobufC_RPC_AddressType address_type;
|
|
char *name;
|
|
ProtobufC_FD fd;
|
|
protobuf_c_boolean autoreconnect;
|
|
unsigned autoreconnect_millis;
|
|
ProtobufC_NameLookup_Func resolver;
|
|
ProtobufC_RPC_Error_Func error_handler;
|
|
void *error_handler_data;
|
|
ProtobufC_RPC_ClientState state;
|
|
union {
|
|
struct {
|
|
ProtobufCDispatchIdle *idle;
|
|
} init;
|
|
struct {
|
|
protobuf_c_boolean pending;
|
|
protobuf_c_boolean destroyed_while_pending;
|
|
uint16_t port;
|
|
} name_lookup;
|
|
struct {
|
|
unsigned closures_alloced;
|
|
unsigned first_free_request_id;
|
|
/* indexed by (request_id-1) */
|
|
Closure *closures;
|
|
} connected;
|
|
struct {
|
|
ProtobufCDispatchTimer *timer;
|
|
char *error_message;
|
|
} failed_waiting;
|
|
struct {
|
|
char *error_message;
|
|
} failed;
|
|
} info;
|
|
};
|
|
|
|
static void begin_name_lookup (ProtobufC_RPC_Client *client);
|
|
static void destroy_client_rpc (ProtobufCService *service);
|
|
|
|
|
|
static void
|
|
set_fd_nonblocking(int fd)
|
|
{
|
|
int flags = fcntl (fd, F_GETFL);
|
|
protobuf_c_assert (flags >= 0);
|
|
fcntl (fd, F_SETFL, flags | O_NONBLOCK);
|
|
}
|
|
|
|
static void
|
|
handle_autoreconnect_timeout (ProtobufCDispatch *dispatch,
|
|
void *func_data)
|
|
{
|
|
ProtobufC_RPC_Client *client = func_data;
|
|
protobuf_c_assert (client->state == PROTOBUF_C_CLIENT_STATE_FAILED_WAITING);
|
|
client->allocator->free (client->allocator,
|
|
client->info.failed_waiting.error_message);
|
|
begin_name_lookup (client);
|
|
}
|
|
|
|
static void
|
|
client_failed (ProtobufC_RPC_Client *client,
|
|
const char *format_str,
|
|
...)
|
|
{
|
|
va_list args;
|
|
char buf[MAX_FAILED_MSG_LENGTH];
|
|
size_t msg_len;
|
|
char *msg;
|
|
size_t n_closures = 0;
|
|
Closure *closures = NULL;
|
|
switch (client->state)
|
|
{
|
|
case PROTOBUF_C_CLIENT_STATE_NAME_LOOKUP:
|
|
protobuf_c_assert (!client->info.name_lookup.pending);
|
|
break;
|
|
case PROTOBUF_C_CLIENT_STATE_CONNECTING:
|
|
/* nothing to do */
|
|
break;
|
|
case PROTOBUF_C_CLIENT_STATE_CONNECTED:
|
|
n_closures = client->info.connected.closures_alloced;
|
|
closures = client->info.connected.closures;
|
|
break;
|
|
|
|
/* should not get here */
|
|
case PROTOBUF_C_CLIENT_STATE_INIT:
|
|
case PROTOBUF_C_CLIENT_STATE_FAILED_WAITING:
|
|
case PROTOBUF_C_CLIENT_STATE_FAILED:
|
|
case PROTOBUF_C_CLIENT_STATE_DESTROYED:
|
|
protobuf_c_assert (FALSE);
|
|
break;
|
|
}
|
|
if (client->fd >= 0)
|
|
{
|
|
protobuf_c_dispatch_close_fd (client->dispatch, client->fd);
|
|
client->fd = -1;
|
|
}
|
|
protobuf_c_data_buffer_reset (&client->incoming);
|
|
protobuf_c_data_buffer_reset (&client->outgoing);
|
|
|
|
/* Compute the message */
|
|
va_start (args, format_str);
|
|
vsnprintf (buf, sizeof (buf), format_str, args);
|
|
va_end (args);
|
|
buf[sizeof(buf)-1] = 0;
|
|
msg_len = strlen (buf);
|
|
msg = client->allocator->alloc (client->allocator, msg_len + 1);
|
|
strcpy (msg, buf);
|
|
|
|
/* go to one of the failed states */
|
|
if (client->autoreconnect)
|
|
{
|
|
client->state = PROTOBUF_C_CLIENT_STATE_FAILED_WAITING;
|
|
client->info.failed_waiting.timer
|
|
= protobuf_c_dispatch_add_timer_millis (client->dispatch,
|
|
client->autoreconnect_millis,
|
|
handle_autoreconnect_timeout,
|
|
client);
|
|
client->info.failed_waiting.error_message = msg;
|
|
}
|
|
else
|
|
{
|
|
client->state = PROTOBUF_C_CLIENT_STATE_FAILED;
|
|
client->info.failed.error_message = msg;
|
|
}
|
|
|
|
/* we defer calling the closures to avoid
|
|
any re-entrancy issues (e.g. people further RPC should
|
|
not see a socket in the "connected" state-- at least,
|
|
it shouldn't be accessing the array of closures that we are considering */
|
|
if (closures != NULL)
|
|
{
|
|
unsigned i;
|
|
|
|
for (i = 0; i < n_closures; i++)
|
|
if (closures[i].response_type != NULL)
|
|
closures[i].closure (NULL, closures[i].closure_data);
|
|
client->allocator->free (client->allocator, closures);
|
|
}
|
|
}
|
|
|
|
static inline protobuf_c_boolean
|
|
errno_is_ignorable (int e)
|
|
{
|
|
#ifdef EWOULDBLOCK /* for windows */
|
|
if (e == EWOULDBLOCK)
|
|
return 1;
|
|
#endif
|
|
return e == EINTR || e == EAGAIN;
|
|
}
|
|
|
|
static void
|
|
set_state_connected (ProtobufC_RPC_Client *client)
|
|
{
|
|
client->state = PROTOBUF_C_CLIENT_STATE_CONNECTED;
|
|
|
|
client->info.connected.closures_alloced = 1;
|
|
client->info.connected.first_free_request_id = 1;
|
|
client->info.connected.closures = client->allocator->alloc (client->allocator, sizeof (Closure));
|
|
client->info.connected.closures[0].closure = NULL;
|
|
client->info.connected.closures[0].response_type = NULL;
|
|
client->info.connected.closures[0].closure_data = UINT_TO_POINTER (0);
|
|
}
|
|
|
|
static void
|
|
handle_client_fd_connect_events (int fd,
|
|
unsigned events,
|
|
void *callback_data)
|
|
{
|
|
ProtobufC_RPC_Client *client = callback_data;
|
|
socklen_t size_int = sizeof (int);
|
|
int fd_errno = EINVAL;
|
|
if (getsockopt (fd, SOL_SOCKET, SO_ERROR, &fd_errno, &size_int) < 0)
|
|
{
|
|
/* Note: this behavior is vaguely hypothetically broken,
|
|
* in terms of ignoring getsockopt's error;
|
|
* however, this shouldn't happen, and EINVAL is ok if it does.
|
|
* Furthermore some broken OS's return an error code when
|
|
* fetching SO_ERROR!
|
|
*/
|
|
}
|
|
|
|
if (fd_errno == 0)
|
|
{
|
|
/* goto state CONNECTED */
|
|
protobuf_c_dispatch_watch_fd (client->dispatch,
|
|
client->fd,
|
|
0, NULL, NULL);
|
|
set_state_connected (client);
|
|
}
|
|
else if (errno_is_ignorable (fd_errno))
|
|
{
|
|
/* remain in CONNECTING state */
|
|
return;
|
|
}
|
|
else
|
|
{
|
|
/* Call error handler */
|
|
protobuf_c_dispatch_close_fd (client->dispatch, client->fd);
|
|
client_failed (client,
|
|
"failed connecting to server: %s",
|
|
strerror (fd_errno));
|
|
}
|
|
}
|
|
|
|
static void
|
|
begin_connecting (ProtobufC_RPC_Client *client,
|
|
struct sockaddr *address,
|
|
size_t addr_len)
|
|
{
|
|
protobuf_c_assert (client->state == PROTOBUF_C_CLIENT_STATE_NAME_LOOKUP);
|
|
|
|
client->state = PROTOBUF_C_CLIENT_STATE_CONNECTING;
|
|
client->fd = socket (address->sa_family, SOCK_STREAM, 0);
|
|
if (client->fd < 0)
|
|
{
|
|
client_failed (client, "error creating socket: %s", strerror (errno));
|
|
return;
|
|
}
|
|
set_fd_nonblocking (client->fd);
|
|
if (connect (client->fd, address, addr_len) < 0)
|
|
{
|
|
if (errno == EINPROGRESS)
|
|
{
|
|
/* register interest in fd */
|
|
protobuf_c_dispatch_watch_fd (client->dispatch,
|
|
client->fd,
|
|
PROTOBUF_C_EVENT_READABLE|PROTOBUF_C_EVENT_WRITABLE,
|
|
handle_client_fd_connect_events,
|
|
client);
|
|
return;
|
|
}
|
|
close (client->fd);
|
|
client->fd = -1;
|
|
client_failed (client, "error connecting to remote host: %s", strerror (errno));
|
|
return;
|
|
}
|
|
|
|
set_state_connected (client);
|
|
}
|
|
static void
|
|
handle_name_lookup_success (const uint8_t *address,
|
|
void *callback_data)
|
|
{
|
|
ProtobufC_RPC_Client *client = callback_data;
|
|
struct sockaddr_in addr;
|
|
protobuf_c_assert (client->state == PROTOBUF_C_CLIENT_STATE_NAME_LOOKUP);
|
|
protobuf_c_assert (client->info.name_lookup.pending);
|
|
client->info.name_lookup.pending = 0;
|
|
if (client->info.name_lookup.destroyed_while_pending)
|
|
{
|
|
destroy_client_rpc (&client->base_service);
|
|
return;
|
|
}
|
|
memset (&addr, 0, sizeof (addr));
|
|
addr.sin_family = AF_INET;
|
|
memcpy (&addr.sin_addr, address, 4);
|
|
addr.sin_port = htons (client->info.name_lookup.port);
|
|
begin_connecting (client, (struct sockaddr *) &addr, sizeof (addr));
|
|
}
|
|
|
|
static void
|
|
handle_name_lookup_failure (const char *error_message,
|
|
void *callback_data)
|
|
{
|
|
ProtobufC_RPC_Client *client = callback_data;
|
|
protobuf_c_assert (client->state == PROTOBUF_C_CLIENT_STATE_NAME_LOOKUP);
|
|
protobuf_c_assert (client->info.name_lookup.pending);
|
|
client->info.name_lookup.pending = 0;
|
|
if (client->info.name_lookup.destroyed_while_pending)
|
|
{
|
|
destroy_client_rpc (&client->base_service);
|
|
return;
|
|
}
|
|
client_failed (client, "name lookup failed (for name from %s): %s", client->name, error_message);
|
|
}
|
|
|
|
static void
|
|
begin_name_lookup (ProtobufC_RPC_Client *client)
|
|
{
|
|
protobuf_c_assert (client->state == PROTOBUF_C_CLIENT_STATE_INIT
|
|
|| client->state == PROTOBUF_C_CLIENT_STATE_FAILED_WAITING
|
|
|| client->state == PROTOBUF_C_CLIENT_STATE_FAILED);
|
|
client->state = PROTOBUF_C_CLIENT_STATE_NAME_LOOKUP;
|
|
client->info.name_lookup.pending = 0;
|
|
switch (client->address_type)
|
|
{
|
|
case PROTOBUF_C_RPC_ADDRESS_LOCAL:
|
|
{
|
|
struct sockaddr_un addr;
|
|
addr.sun_family = AF_UNIX;
|
|
strncpy (addr.sun_path, client->name, sizeof (addr.sun_path));
|
|
begin_connecting (client, (struct sockaddr *) &addr,
|
|
sizeof (addr));
|
|
return;
|
|
}
|
|
|
|
case PROTOBUF_C_RPC_ADDRESS_TCP:
|
|
{
|
|
/* parse hostname:port from client->name */
|
|
const char *colon = strchr (client->name, ':');
|
|
char *host;
|
|
unsigned port;
|
|
if (colon == NULL)
|
|
{
|
|
client_failed (client,
|
|
"name '%s' does not have a : in it (supposed to be HOST:PORT)",
|
|
client->name);
|
|
return;
|
|
}
|
|
host = client->allocator->alloc (client->allocator, colon + 1 - client->name);
|
|
memcpy (host, client->name, colon - client->name);
|
|
host[colon - client->name] = 0;
|
|
port = atoi (colon + 1);
|
|
|
|
client->info.name_lookup.pending = 1;
|
|
client->info.name_lookup.destroyed_while_pending = 0;
|
|
client->info.name_lookup.port = port;
|
|
client->resolver (client->dispatch,
|
|
host,
|
|
handle_name_lookup_success,
|
|
handle_name_lookup_failure,
|
|
client);
|
|
|
|
/* cleanup */
|
|
client->allocator->free (client->allocator, host);
|
|
return;
|
|
}
|
|
default:
|
|
assert (0);
|
|
}
|
|
}
|
|
|
|
static void
|
|
handle_init_idle (ProtobufCDispatch *dispatch,
|
|
void *data)
|
|
{
|
|
ProtobufC_RPC_Client *client = data;
|
|
protobuf_c_assert (client->state == PROTOBUF_C_CLIENT_STATE_INIT);
|
|
begin_name_lookup (client);
|
|
}
|
|
|
|
static void
|
|
grow_closure_array (ProtobufC_RPC_Client *client)
|
|
{
|
|
/* resize array */
|
|
unsigned old_size = client->info.connected.closures_alloced;
|
|
unsigned new_size = old_size * 2;
|
|
unsigned i;
|
|
Closure *new_closures = client->allocator->alloc (client->allocator, sizeof (Closure) * new_size);
|
|
memcpy (new_closures,
|
|
client->info.connected.closures,
|
|
sizeof (Closure) * old_size);
|
|
|
|
/* build new free list */
|
|
for (i = old_size; i < new_size - 1; i++)
|
|
{
|
|
new_closures[i].response_type = NULL;
|
|
new_closures[i].closure = NULL;
|
|
new_closures[i].closure_data = UINT_TO_POINTER (i+2);
|
|
}
|
|
new_closures[i].closure_data = UINT_TO_POINTER (client->info.connected.first_free_request_id);
|
|
new_closures[i].response_type = NULL;
|
|
new_closures[i].closure = NULL;
|
|
client->info.connected.first_free_request_id = old_size + 1;
|
|
|
|
client->allocator->free (client->allocator, client->info.connected.closures);
|
|
client->info.connected.closures = new_closures;
|
|
client->info.connected.closures_alloced = new_size;
|
|
}
|
|
static uint32_t
|
|
uint32_to_le (uint32_t le)
|
|
{
|
|
#if IS_LITTLE_ENDIAN
|
|
return le;
|
|
#else
|
|
return (le << 24) | (le >> 24)
|
|
| ((le >> 8) & 0xff00)
|
|
| ((le << 8) & 0xff0000);
|
|
#endif
|
|
}
|
|
#define uint32_from_le uint32_to_le /* make the code more readable, i guess */
|
|
|
|
static void
|
|
enqueue_request (ProtobufC_RPC_Client *client,
|
|
unsigned method_index,
|
|
const ProtobufCMessage *input,
|
|
ProtobufCClosure closure,
|
|
void *closure_data)
|
|
{
|
|
uint32_t request_id;
|
|
struct {
|
|
uint32_t method_index;
|
|
uint32_t packed_size;
|
|
uint32_t request_id;
|
|
} header;
|
|
size_t packed_size;
|
|
uint8_t *packed_data;
|
|
Closure *cl;
|
|
const ProtobufCServiceDescriptor *desc = client->base_service.descriptor;
|
|
const ProtobufCMethodDescriptor *method = desc->methods + method_index;
|
|
|
|
protobuf_c_assert (method_index < desc->n_methods);
|
|
|
|
/* Allocate request_id */
|
|
//protobuf_c_assert (client->state == PROTOBUF_C_CLIENT_STATE_CONNECTED);
|
|
if (client->info.connected.first_free_request_id == 0)
|
|
grow_closure_array (client);
|
|
request_id = client->info.connected.first_free_request_id;
|
|
cl = client->info.connected.closures + (request_id - 1);
|
|
client->info.connected.first_free_request_id = POINTER_TO_UINT (cl->closure_data);
|
|
|
|
/* Pack message */
|
|
packed_size = protobuf_c_message_get_packed_size (input);
|
|
if (packed_size < client->allocator->max_alloca)
|
|
packed_data = alloca (packed_size);
|
|
else
|
|
packed_data = client->allocator->alloc (client->allocator, packed_size);
|
|
protobuf_c_message_pack (input, packed_data);
|
|
|
|
/* Append to buffer */
|
|
protobuf_c_assert (sizeof (header) == 12);
|
|
header.method_index = uint32_to_le (method_index);
|
|
header.packed_size = uint32_to_le (packed_size);
|
|
header.request_id = request_id;
|
|
protobuf_c_data_buffer_append (&client->outgoing, &header, 12);
|
|
protobuf_c_data_buffer_append (&client->outgoing, packed_data, packed_size);
|
|
|
|
/* Clean up if not using alloca() */
|
|
if (packed_size >= client->allocator->max_alloca)
|
|
client->allocator->free (client->allocator, packed_data);
|
|
|
|
/* Add closure to request-tree */
|
|
cl->response_type = method->output;
|
|
cl->closure = closure;
|
|
cl->closure_data = closure_data;
|
|
}
|
|
|
|
static void
|
|
handle_client_fd_events (int fd,
|
|
unsigned events,
|
|
void *func_data)
|
|
{
|
|
ProtobufC_RPC_Client *client = func_data;
|
|
protobuf_c_assert (client->state == PROTOBUF_C_CLIENT_STATE_CONNECTED);
|
|
if (events & PROTOBUF_C_EVENT_WRITABLE)
|
|
{
|
|
int write_rv = protobuf_c_data_buffer_writev (&client->outgoing,
|
|
client->fd);
|
|
if (write_rv < 0 && !errno_is_ignorable (errno))
|
|
{
|
|
client_failed (client,
|
|
"writing to file-descriptor: %s",
|
|
strerror (errno));
|
|
return;
|
|
}
|
|
|
|
if (client->outgoing.size == 0)
|
|
protobuf_c_dispatch_watch_fd (client->dispatch, client->fd,
|
|
PROTOBUF_C_EVENT_READABLE,
|
|
handle_client_fd_events, client);
|
|
}
|
|
if (events & PROTOBUF_C_EVENT_READABLE)
|
|
{
|
|
/* do read */
|
|
int read_rv = protobuf_c_data_buffer_read_in_fd (&client->incoming,
|
|
client->fd);
|
|
if (read_rv < 0)
|
|
{
|
|
if (!errno_is_ignorable (errno))
|
|
{
|
|
client_failed (client,
|
|
"reading from file-descriptor: %s",
|
|
strerror (errno));
|
|
}
|
|
}
|
|
else if (read_rv == 0)
|
|
{
|
|
/* handle eof */
|
|
client_failed (client,
|
|
"got end-of-file from server [%u bytes incoming, %u bytes outgoing]",
|
|
client->incoming.size, client->outgoing.size);
|
|
}
|
|
else
|
|
{
|
|
/* try processing buffer */
|
|
while (client->incoming.size >= 16)
|
|
{
|
|
uint32_t header[4];
|
|
unsigned status_code, method_index, message_length, request_id;
|
|
Closure *closure;
|
|
uint8_t *packed_data;
|
|
ProtobufCMessage *msg;
|
|
protobuf_c_data_buffer_peek (&client->incoming, header, sizeof (header));
|
|
status_code = uint32_from_le (header[0]);
|
|
method_index = uint32_from_le (header[1]);
|
|
message_length = uint32_from_le (header[2]);
|
|
request_id = header[3]; /* already native-endian */
|
|
|
|
if (16 + message_length > client->incoming.size)
|
|
break;
|
|
|
|
/* lookup request by id */
|
|
if (request_id > client->info.connected.closures_alloced
|
|
|| request_id == 0
|
|
|| client->info.connected.closures[request_id-1].response_type == NULL)
|
|
{
|
|
client_failed (client, "bad request-id in response from server");
|
|
return;
|
|
}
|
|
closure = client->info.connected.closures + (request_id - 1);
|
|
|
|
/* read message and unpack */
|
|
protobuf_c_data_buffer_discard (&client->incoming, 16);
|
|
packed_data = client->allocator->alloc (client->allocator, message_length);
|
|
protobuf_c_data_buffer_read (&client->incoming, packed_data, message_length);
|
|
|
|
/* TODO: use fast temporary allocator */
|
|
msg = protobuf_c_message_unpack (closure->response_type,
|
|
client->allocator,
|
|
message_length,
|
|
packed_data);
|
|
if (msg == NULL)
|
|
{
|
|
fprintf(stderr, "unable to unpack msg of length %u", message_length);
|
|
client_failed (client, "failed to unpack message");
|
|
client->allocator->free (client->allocator, packed_data);
|
|
return;
|
|
}
|
|
|
|
/* invoke closure */
|
|
closure->closure (msg, closure->closure_data);
|
|
closure->response_type = NULL;
|
|
closure->closure = NULL;
|
|
closure->closure_data = UINT_TO_POINTER (client->info.connected.first_free_request_id);
|
|
client->info.connected.first_free_request_id = request_id;
|
|
|
|
/* clean up */
|
|
protobuf_c_message_free_unpacked (msg, client->allocator);
|
|
client->allocator->free (client->allocator, packed_data);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
static void
|
|
update_connected_client_watch (ProtobufC_RPC_Client *client)
|
|
{
|
|
unsigned events = PROTOBUF_C_EVENT_READABLE;
|
|
protobuf_c_assert (client->state == PROTOBUF_C_CLIENT_STATE_CONNECTED);
|
|
protobuf_c_assert (client->fd >= 0);
|
|
if (client->outgoing.size > 0)
|
|
events |= PROTOBUF_C_EVENT_WRITABLE;
|
|
protobuf_c_dispatch_watch_fd (client->dispatch,
|
|
client->fd,
|
|
events,
|
|
handle_client_fd_events, client);
|
|
}
|
|
|
|
static void
|
|
invoke_client_rpc (ProtobufCService *service,
|
|
unsigned method_index,
|
|
const ProtobufCMessage *input,
|
|
ProtobufCClosure closure,
|
|
void *closure_data)
|
|
{
|
|
ProtobufC_RPC_Client *client = (ProtobufC_RPC_Client *) service;
|
|
protobuf_c_assert (service->invoke == invoke_client_rpc);
|
|
switch (client->state)
|
|
{
|
|
case PROTOBUF_C_CLIENT_STATE_INIT:
|
|
case PROTOBUF_C_CLIENT_STATE_NAME_LOOKUP:
|
|
case PROTOBUF_C_CLIENT_STATE_CONNECTING:
|
|
enqueue_request (client, method_index, input, closure, closure_data);
|
|
break;
|
|
|
|
case PROTOBUF_C_CLIENT_STATE_CONNECTED:
|
|
{
|
|
int had_outgoing = (client->outgoing.size > 0);
|
|
enqueue_request (client, method_index, input, closure, closure_data);
|
|
if (!had_outgoing)
|
|
update_connected_client_watch (client);
|
|
}
|
|
break;
|
|
|
|
case PROTOBUF_C_CLIENT_STATE_FAILED_WAITING:
|
|
case PROTOBUF_C_CLIENT_STATE_FAILED:
|
|
case PROTOBUF_C_CLIENT_STATE_DESTROYED:
|
|
closure (NULL, closure_data);
|
|
break;
|
|
}
|
|
}
|
|
|
|
static void
|
|
destroy_client_rpc (ProtobufCService *service)
|
|
{
|
|
ProtobufC_RPC_Client *client = (ProtobufC_RPC_Client *) service;
|
|
ProtobufC_RPC_ClientState state = client->state;
|
|
unsigned i;
|
|
unsigned n_closures = 0;
|
|
Closure *closures = NULL;
|
|
switch (state)
|
|
{
|
|
case PROTOBUF_C_CLIENT_STATE_INIT:
|
|
protobuf_c_dispatch_remove_idle (client->info.init.idle);
|
|
break;
|
|
case PROTOBUF_C_CLIENT_STATE_NAME_LOOKUP:
|
|
if (client->info.name_lookup.pending)
|
|
{
|
|
client->info.name_lookup.destroyed_while_pending = 1;
|
|
return;
|
|
}
|
|
break;
|
|
case PROTOBUF_C_CLIENT_STATE_CONNECTING:
|
|
break;
|
|
case PROTOBUF_C_CLIENT_STATE_CONNECTED:
|
|
n_closures = client->info.connected.closures_alloced;
|
|
closures = client->info.connected.closures;
|
|
break;
|
|
case PROTOBUF_C_CLIENT_STATE_FAILED_WAITING:
|
|
protobuf_c_dispatch_remove_timer (client->info.failed_waiting.timer);
|
|
client->allocator->free (client->allocator, client->info.failed_waiting.error_message);
|
|
break;
|
|
case PROTOBUF_C_CLIENT_STATE_FAILED:
|
|
client->allocator->free (client->allocator, client->info.failed.error_message);
|
|
break;
|
|
case PROTOBUF_C_CLIENT_STATE_DESTROYED:
|
|
protobuf_c_assert (0);
|
|
break;
|
|
}
|
|
if (client->fd >= 0)
|
|
{
|
|
protobuf_c_dispatch_close_fd (client->dispatch, client->fd);
|
|
client->fd = -1;
|
|
}
|
|
protobuf_c_data_buffer_clear (&client->incoming);
|
|
protobuf_c_data_buffer_clear (&client->outgoing);
|
|
client->state = PROTOBUF_C_CLIENT_STATE_DESTROYED;
|
|
client->allocator->free (client->allocator, client->name);
|
|
|
|
/* free closures only once we are in the destroyed state */
|
|
for (i = 0; i < n_closures; i++)
|
|
if (closures[i].response_type != NULL)
|
|
closures[i].closure (NULL, closures[i].closure_data);
|
|
if (closures)
|
|
client->allocator->free (client->allocator, closures);
|
|
|
|
client->allocator->free (client->allocator, client);
|
|
}
|
|
|
|
static void
|
|
trivial_sync_libc_resolver (ProtobufCDispatch *dispatch,
|
|
const char *name,
|
|
ProtobufC_NameLookup_Found found_func,
|
|
ProtobufC_NameLookup_Failed failed_func,
|
|
void *callback_data)
|
|
{
|
|
struct hostent *ent;
|
|
ent = gethostbyname (name);
|
|
if (ent == NULL)
|
|
failed_func (hstrerror (h_errno), callback_data);
|
|
else
|
|
found_func ((const uint8_t *) ent->h_addr_list[0], callback_data);
|
|
}
|
|
|
|
ProtobufCService *protobuf_c_rpc_client_new (ProtobufC_RPC_AddressType type,
|
|
const char *name,
|
|
const ProtobufCServiceDescriptor *descriptor,
|
|
ProtobufCDispatch *orig_dispatch)
|
|
{
|
|
ProtobufCDispatch *dispatch = orig_dispatch ? orig_dispatch : protobuf_c_dispatch_default ();
|
|
ProtobufCAllocator *allocator = protobuf_c_dispatch_peek_allocator (dispatch);
|
|
ProtobufC_RPC_Client *rv = allocator->alloc (allocator, sizeof (ProtobufC_RPC_Client));
|
|
rv->base_service.descriptor = descriptor;
|
|
rv->base_service.invoke = invoke_client_rpc;
|
|
rv->base_service.destroy = destroy_client_rpc;
|
|
protobuf_c_data_buffer_init (&rv->incoming, allocator);
|
|
protobuf_c_data_buffer_init (&rv->outgoing, allocator);
|
|
rv->allocator = allocator;
|
|
rv->dispatch = dispatch;
|
|
rv->address_type = type;
|
|
rv->name = strcpy (allocator->alloc (allocator, strlen (name) + 1), name);
|
|
rv->state = PROTOBUF_C_CLIENT_STATE_INIT;
|
|
rv->fd = -1;
|
|
rv->autoreconnect = 1;
|
|
rv->autoreconnect_millis = 2*1000;
|
|
rv->resolver = trivial_sync_libc_resolver;
|
|
rv->error_handler = error_handler;
|
|
rv->error_handler_data = "protobuf-c rpc client";
|
|
rv->info.init.idle = protobuf_c_dispatch_add_idle (dispatch, handle_init_idle, rv);
|
|
return &rv->base_service;
|
|
}
|
|
|
|
protobuf_c_boolean
|
|
protobuf_c_rpc_client_is_connected (ProtobufC_RPC_Client *client)
|
|
{
|
|
return client->state == PROTOBUF_C_CLIENT_STATE_CONNECTED;
|
|
}
|
|
|
|
void
|
|
protobuf_c_rpc_client_set_autoreconnect_period (ProtobufC_RPC_Client *client,
|
|
unsigned millis)
|
|
{
|
|
client->autoreconnect = 1;
|
|
client->autoreconnect_millis = millis;
|
|
}
|
|
|
|
|
|
void
|
|
protobuf_c_rpc_client_set_error_handler (ProtobufC_RPC_Client *client,
|
|
ProtobufC_RPC_Error_Func func,
|
|
void *func_data)
|
|
{
|
|
client->error_handler = func;
|
|
client->error_handler_data = func_data;
|
|
}
|
|
|
|
void
|
|
protobuf_c_rpc_client_disable_autoreconnect (ProtobufC_RPC_Client *client)
|
|
{
|
|
client->autoreconnect = 0;
|
|
}
|
|
|
|
/* === Server === */
|
|
typedef struct _ServerRequest ServerRequest;
|
|
typedef struct _ServerConnection ServerConnection;
|
|
struct _ServerRequest
|
|
{
|
|
uint32_t request_id; /* in little-endian */
|
|
uint32_t method_index; /* in native-endian */
|
|
ServerConnection *conn;
|
|
ProtobufC_RPC_Server *server;
|
|
union {
|
|
/* if conn != NULL, then the request is alive: */
|
|
struct { ServerRequest *prev, *next; } alive;
|
|
|
|
/* if conn == NULL, then the request is defunct: */
|
|
struct { ProtobufCAllocator *allocator; } defunct;
|
|
|
|
/* well, if it is in the recycled list, then it's recycled :/ */
|
|
struct { ServerRequest *next; } recycled;
|
|
} info;
|
|
};
|
|
struct _ServerConnection
|
|
{
|
|
int fd;
|
|
ProtobufCDataBuffer incoming, outgoing;
|
|
|
|
ProtobufC_RPC_Server *server;
|
|
ServerConnection *prev, *next;
|
|
|
|
unsigned n_pending_requests;
|
|
ServerRequest *first_pending_request, *last_pending_request;
|
|
};
|
|
|
|
|
|
/* When we get a response in the wrong thread,
|
|
we proxy it over a system pipe. Actually, we allocate one
|
|
of these structures and pass the pointer over the pipe. */
|
|
typedef struct _ProxyResponse ProxyResponse;
|
|
struct _ProxyResponse
|
|
{
|
|
ServerRequest *request;
|
|
unsigned len;
|
|
/* data follows the structure */
|
|
};
|
|
|
|
struct _ProtobufC_RPC_Server
|
|
{
|
|
ProtobufCDispatch *dispatch;
|
|
ProtobufCAllocator *allocator;
|
|
ProtobufCService *underlying;
|
|
ProtobufC_RPC_AddressType address_type;
|
|
char *bind_name;
|
|
ServerConnection *first_connection, *last_connection;
|
|
ProtobufC_FD listening_fd;
|
|
|
|
ServerRequest *recycled_requests;
|
|
|
|
/* multithreading support */
|
|
ProtobufC_RPC_IsRpcThreadFunc is_rpc_thread_func;
|
|
void * is_rpc_thread_data;
|
|
int proxy_pipe[2];
|
|
unsigned proxy_extra_data_len;
|
|
uint8_t proxy_extra_data[sizeof (void*)];
|
|
|
|
ProtobufC_RPC_Error_Func error_handler;
|
|
void *error_handler_data;
|
|
|
|
/* configuration */
|
|
unsigned max_pending_requests_per_connection;
|
|
};
|
|
|
|
#define GET_PENDING_REQUEST_LIST(conn) \
|
|
ServerRequest *, conn->first_pending_request, conn->last_pending_request, info.alive.prev, info.alive.next
|
|
#define GET_CONNECTION_LIST(server) \
|
|
ServerConnection *, server->first_connection, server->last_connection, prev, next
|
|
|
|
static void
|
|
server_connection_close (ServerConnection *conn)
|
|
{
|
|
ProtobufCAllocator *allocator = conn->server->allocator;
|
|
|
|
/* general cleanup */
|
|
protobuf_c_dispatch_close_fd (conn->server->dispatch, conn->fd);
|
|
conn->fd = -1;
|
|
protobuf_c_data_buffer_clear (&conn->incoming);
|
|
protobuf_c_data_buffer_clear (&conn->outgoing);
|
|
|
|
/* remove this connection from the server's list */
|
|
GSK_LIST_REMOVE (GET_CONNECTION_LIST (conn->server), conn);
|
|
|
|
/* disassocate all the requests from the connection */
|
|
while (conn->first_pending_request != NULL)
|
|
{
|
|
ServerRequest *req = conn->first_pending_request;
|
|
conn->first_pending_request = req->info.alive.next;
|
|
req->conn = NULL;
|
|
req->info.defunct.allocator = allocator;
|
|
}
|
|
|
|
/* free the connection itself */
|
|
allocator->free (allocator, conn);
|
|
}
|
|
|
|
static void
|
|
server_failed_literal (ProtobufC_RPC_Server *server,
|
|
ProtobufC_RPC_Error_Code code,
|
|
const char *msg)
|
|
{
|
|
if (server->error_handler != NULL)
|
|
server->error_handler (code, msg, server->error_handler_data);
|
|
}
|
|
|
|
#if 0
|
|
static void
|
|
server_failed (ProtobufC_RPC_Server *server,
|
|
ProtobufC_RPC_Error_Code code,
|
|
const char *format,
|
|
...)
|
|
{
|
|
va_list args;
|
|
char buf[MAX_FAILED_MSG_LENGTH];
|
|
va_start (args, format);
|
|
vsnprintf (buf, sizeof (buf), format, args);
|
|
buf[sizeof(buf)-1] = 0;
|
|
va_end (args);
|
|
|
|
server_failed_literal (server, code, buf);
|
|
}
|
|
#endif
|
|
|
|
static protobuf_c_boolean
|
|
address_to_name (const struct sockaddr *addr,
|
|
unsigned addr_len,
|
|
char *name_out,
|
|
unsigned name_out_buf_length)
|
|
{
|
|
if (addr->sa_family == PF_INET)
|
|
{
|
|
/* convert to dotted address + port */
|
|
const struct sockaddr_in *addr_in = (const struct sockaddr_in *) addr;
|
|
const uint8_t *addr = (const uint8_t *) &(addr_in->sin_addr);
|
|
uint16_t port = htons (addr_in->sin_port);
|
|
snprintf (name_out, name_out_buf_length,
|
|
"%u.%u.%u.%u:%u",
|
|
addr[0], addr[1], addr[2], addr[3], port);
|
|
return TRUE;
|
|
}
|
|
return FALSE;
|
|
}
|
|
|
|
static void
|
|
server_connection_failed (ServerConnection *conn,
|
|
ProtobufC_RPC_Error_Code code,
|
|
const char *format,
|
|
...)
|
|
{
|
|
char remote_addr_name[64];
|
|
char msg[MAX_FAILED_MSG_LENGTH];
|
|
char *msg_end = msg + sizeof (msg);
|
|
char *msg_at;
|
|
struct sockaddr addr;
|
|
socklen_t addr_len = sizeof (addr);
|
|
va_list args;
|
|
|
|
/* if we can, find the remote name of this connection */
|
|
if (getpeername (conn->fd, &addr, &addr_len) == 0
|
|
&& address_to_name (&addr, addr_len, remote_addr_name, sizeof (remote_addr_name)))
|
|
snprintf (msg, sizeof (msg), "connection to %s from %s: ",
|
|
conn->server->bind_name, remote_addr_name);
|
|
else
|
|
snprintf (msg, sizeof (msg), "connection to %s: ",
|
|
conn->server->bind_name);
|
|
msg[sizeof(msg)-1] = 0;
|
|
msg_at = strchr (msg, 0);
|
|
|
|
/* do vsnprintf() */
|
|
va_start (args, format);
|
|
vsnprintf(msg_at, msg_end - msg_at, format, args);
|
|
va_end (args);
|
|
msg[sizeof(msg)-1] = 0;
|
|
|
|
/* invoke server error hook */
|
|
server_failed_literal (conn->server, code, msg);
|
|
|
|
server_connection_close (conn);
|
|
}
|
|
|
|
static ServerRequest *
|
|
create_server_request (ServerConnection *conn,
|
|
uint32_t request_id,
|
|
uint32_t method_index)
|
|
{
|
|
ServerRequest *rv;
|
|
if (conn->server->recycled_requests != NULL)
|
|
{
|
|
rv = conn->server->recycled_requests;
|
|
conn->server->recycled_requests = rv->info.recycled.next;
|
|
}
|
|
else
|
|
{
|
|
ProtobufCAllocator *allocator = conn->server->allocator;
|
|
rv = allocator->alloc (allocator, sizeof (ServerRequest));
|
|
}
|
|
rv->server = conn->server;
|
|
rv->conn = conn;
|
|
rv->request_id = request_id;
|
|
rv->method_index = method_index;
|
|
conn->n_pending_requests++;
|
|
GSK_LIST_APPEND (GET_PENDING_REQUEST_LIST (conn), rv);
|
|
return rv;
|
|
}
|
|
|
|
static void
|
|
free_server_request (ProtobufC_RPC_Server *server,
|
|
ServerRequest *request)
|
|
{
|
|
#if RECYCLE_REQUESTS
|
|
/* recycle request */
|
|
request->info.recycled.next = server->recycled_requests;
|
|
server->recycled_requests = request;
|
|
#else
|
|
/* free the request immediately */
|
|
server->allocator->free (server->allocator, request);
|
|
#endif
|
|
}
|
|
|
|
|
|
static void handle_server_connection_events (int fd,
|
|
unsigned events,
|
|
void *data);
|
|
static void
|
|
server_connection_response_closure (const ProtobufCMessage *message,
|
|
void *closure_data)
|
|
{
|
|
ServerRequest *request = closure_data;
|
|
ProtobufC_RPC_Server *server = request->server;
|
|
protobuf_c_boolean must_proxy = 0;
|
|
ProtobufCAllocator *allocator = server->allocator;
|
|
if (server->is_rpc_thread_func != NULL)
|
|
{
|
|
must_proxy = !server->is_rpc_thread_func (server,
|
|
server->dispatch,
|
|
server->is_rpc_thread_data);
|
|
}
|
|
|
|
|
|
uint8_t buffer_slab[512];
|
|
ProtobufCBufferSimple buffer_simple = PROTOBUF_C_BUFFER_SIMPLE_INIT (buffer_slab);
|
|
if (message == NULL)
|
|
{
|
|
/* send failed status */
|
|
uint32_t header[4];
|
|
header[0] = uint32_to_le (PROTOBUF_C_STATUS_CODE_SERVICE_FAILED);
|
|
header[1] = uint32_to_le (request->method_index);
|
|
header[2] = 0; /* no message */
|
|
header[3] = request->request_id;
|
|
protobuf_c_buffer_simple_append (&buffer_simple.base,
|
|
16, (uint8_t *) header);
|
|
}
|
|
else
|
|
{
|
|
/* send success response */
|
|
uint32_t header[4];
|
|
header[0] = uint32_to_le (PROTOBUF_C_STATUS_CODE_SUCCESS);
|
|
header[1] = uint32_to_le (request->method_index);
|
|
header[3] = request->request_id;
|
|
protobuf_c_buffer_simple_append (&buffer_simple.base,
|
|
16, (uint8_t *) header);
|
|
protobuf_c_message_pack_to_buffer (message, &buffer_simple.base);
|
|
((uint32_t *)buffer_simple.data)[2] = uint32_to_le (buffer_simple.len - 16);
|
|
}
|
|
|
|
if (must_proxy)
|
|
{
|
|
ProxyResponse *pr = allocator->alloc (allocator, sizeof (ProxyResponse) + buffer_simple.len);
|
|
int rv;
|
|
pr->request = request;
|
|
pr->len = buffer_simple.len;
|
|
memcpy (pr + 1, buffer_simple.data, buffer_simple.len);
|
|
|
|
/* write pointer to proxy pipe */
|
|
retry_write:
|
|
rv = write (server->proxy_pipe[1], &pr, sizeof (void*));
|
|
if (rv < 0)
|
|
{
|
|
if (errno == EINTR || errno == EAGAIN)
|
|
goto retry_write;
|
|
server_failed_literal (server, PROTOBUF_C_ERROR_CODE_PROXY_PROBLEM,
|
|
"error writing to proxy-pipe");
|
|
allocator->free (allocator, pr);
|
|
}
|
|
else if (rv < sizeof (void *))
|
|
{
|
|
server_failed_literal (server, PROTOBUF_C_ERROR_CODE_PROXY_PROBLEM,
|
|
"partial write to proxy-pipe");
|
|
allocator->free (allocator, pr);
|
|
}
|
|
}
|
|
else if (request->conn == NULL)
|
|
{
|
|
/* defunct request */
|
|
allocator->free (allocator, request);
|
|
free_server_request (server, request);
|
|
}
|
|
else
|
|
{
|
|
ServerConnection *conn = request->conn;
|
|
protobuf_c_boolean must_set_output_watch = (conn->outgoing.size == 0);
|
|
protobuf_c_data_buffer_append (&conn->outgoing, buffer_simple.data, buffer_simple.len);
|
|
if (must_set_output_watch)
|
|
protobuf_c_dispatch_watch_fd (conn->server->dispatch,
|
|
conn->fd,
|
|
PROTOBUF_C_EVENT_READABLE|PROTOBUF_C_EVENT_WRITABLE,
|
|
handle_server_connection_events,
|
|
conn);
|
|
GSK_LIST_REMOVE (GET_PENDING_REQUEST_LIST (conn), request);
|
|
conn->n_pending_requests--;
|
|
|
|
free_server_request (server, request);
|
|
}
|
|
PROTOBUF_C_BUFFER_SIMPLE_CLEAR (&buffer_simple);
|
|
}
|
|
|
|
static void
|
|
handle_server_connection_events (int fd,
|
|
unsigned events,
|
|
void *data)
|
|
{
|
|
ServerConnection *conn = data;
|
|
ProtobufCService *service = conn->server->underlying;
|
|
ProtobufCAllocator *allocator = conn->server->allocator;
|
|
if (events & PROTOBUF_C_EVENT_READABLE)
|
|
{
|
|
int read_rv = protobuf_c_data_buffer_read_in_fd (&conn->incoming, fd);
|
|
if (read_rv < 0)
|
|
{
|
|
if (!errno_is_ignorable (errno))
|
|
{
|
|
server_connection_failed (conn,
|
|
PROTOBUF_C_ERROR_CODE_CLIENT_TERMINATED,
|
|
"reading from file-descriptor: %s",
|
|
strerror (errno));
|
|
return;
|
|
}
|
|
}
|
|
else if (read_rv == 0)
|
|
{
|
|
if (conn->first_pending_request != NULL)
|
|
server_connection_failed (conn,
|
|
PROTOBUF_C_ERROR_CODE_CLIENT_TERMINATED,
|
|
"closed while calls pending");
|
|
else
|
|
server_connection_close (conn);
|
|
return;
|
|
}
|
|
else
|
|
while (conn->incoming.size >= 12)
|
|
{
|
|
uint32_t header[3];
|
|
uint32_t method_index, message_length, request_id;
|
|
uint8_t *packed_data;
|
|
ProtobufCMessage *message;
|
|
ServerRequest *server_request;
|
|
protobuf_c_data_buffer_peek (&conn->incoming, header, 12);
|
|
method_index = uint32_from_le (header[0]);
|
|
message_length = uint32_from_le (header[1]);
|
|
request_id = header[2]; /* store in whatever endianness it comes in */
|
|
|
|
if (conn->incoming.size < 12 + message_length)
|
|
break;
|
|
|
|
if (method_index >= conn->server->underlying->descriptor->n_methods)
|
|
{
|
|
server_connection_failed (conn,
|
|
PROTOBUF_C_ERROR_CODE_BAD_REQUEST,
|
|
"bad method_index %u", method_index);
|
|
return;
|
|
}
|
|
|
|
/* Read message */
|
|
protobuf_c_data_buffer_discard (&conn->incoming, 12);
|
|
packed_data = allocator->alloc (allocator, message_length);
|
|
protobuf_c_data_buffer_read (&conn->incoming, packed_data, message_length);
|
|
|
|
/* Unpack message */
|
|
message = protobuf_c_message_unpack (service->descriptor->methods[method_index].input,
|
|
allocator, message_length, packed_data);
|
|
allocator->free (allocator, packed_data);
|
|
if (message == NULL)
|
|
{
|
|
server_connection_failed (conn,
|
|
PROTOBUF_C_ERROR_CODE_BAD_REQUEST,
|
|
"error unpacking message");
|
|
return;
|
|
}
|
|
|
|
/* Invoke service (note that it may call back immediately) */
|
|
server_request = create_server_request (conn, request_id, method_index);
|
|
service->invoke (service, method_index, message,
|
|
server_connection_response_closure, server_request);
|
|
protobuf_c_message_free_unpacked (message, allocator);
|
|
}
|
|
}
|
|
if ((events & PROTOBUF_C_EVENT_WRITABLE) != 0
|
|
&& conn->outgoing.size > 0)
|
|
{
|
|
int write_rv = protobuf_c_data_buffer_writev (&conn->outgoing, fd);
|
|
if (write_rv < 0)
|
|
{
|
|
if (!errno_is_ignorable (errno))
|
|
{
|
|
server_connection_failed (conn,
|
|
PROTOBUF_C_ERROR_CODE_CLIENT_TERMINATED,
|
|
"writing to file-descriptor: %s",
|
|
strerror (errno));
|
|
return;
|
|
}
|
|
}
|
|
if (conn->outgoing.size == 0)
|
|
protobuf_c_dispatch_watch_fd (conn->server->dispatch, conn->fd, PROTOBUF_C_EVENT_READABLE,
|
|
handle_server_connection_events, conn);
|
|
}
|
|
}
|
|
|
|
static void
|
|
handle_server_listener_readable (int fd,
|
|
unsigned events,
|
|
void *data)
|
|
{
|
|
ProtobufC_RPC_Server *server = data;
|
|
struct sockaddr addr;
|
|
socklen_t addr_len = sizeof (addr);
|
|
int new_fd = accept (fd, &addr, &addr_len);
|
|
ServerConnection *conn;
|
|
ProtobufCAllocator *allocator = server->allocator;
|
|
if (new_fd < 0)
|
|
{
|
|
if (errno_is_ignorable (errno))
|
|
return;
|
|
fprintf (stderr, "error accept()ing file descriptor: %s\n",
|
|
strerror (errno));
|
|
return;
|
|
}
|
|
conn = allocator->alloc (allocator, sizeof (ServerConnection));
|
|
conn->fd = new_fd;
|
|
protobuf_c_data_buffer_init (&conn->incoming, server->allocator);
|
|
protobuf_c_data_buffer_init (&conn->outgoing, server->allocator);
|
|
conn->n_pending_requests = 0;
|
|
conn->first_pending_request = conn->last_pending_request = NULL;
|
|
conn->server = server;
|
|
GSK_LIST_APPEND (GET_CONNECTION_LIST (server), conn);
|
|
protobuf_c_dispatch_watch_fd (server->dispatch, conn->fd, PROTOBUF_C_EVENT_READABLE,
|
|
handle_server_connection_events, conn);
|
|
}
|
|
|
|
static ProtobufC_RPC_Server *
|
|
server_new_from_fd (ProtobufC_FD listening_fd,
|
|
ProtobufC_RPC_AddressType address_type,
|
|
const char *bind_name,
|
|
ProtobufCService *service,
|
|
ProtobufCDispatch *orig_dispatch)
|
|
{
|
|
ProtobufCDispatch *dispatch = orig_dispatch ? orig_dispatch : protobuf_c_dispatch_default ();
|
|
ProtobufCAllocator *allocator = protobuf_c_dispatch_peek_allocator (dispatch);
|
|
ProtobufC_RPC_Server *server = allocator->alloc (allocator, sizeof (ProtobufC_RPC_Server));
|
|
server->dispatch = dispatch;
|
|
server->allocator = allocator;
|
|
server->underlying = service;
|
|
server->first_connection = server->last_connection = NULL;
|
|
server->max_pending_requests_per_connection = 32;
|
|
server->address_type = address_type;
|
|
server->bind_name = allocator->alloc (allocator, strlen (bind_name) + 1);
|
|
server->error_handler = error_handler;
|
|
server->error_handler_data = "protobuf-c rpc server";
|
|
server->listening_fd = listening_fd;
|
|
server->recycled_requests = NULL;
|
|
server->is_rpc_thread_func = NULL;
|
|
server->is_rpc_thread_data = NULL;
|
|
server->proxy_pipe[0] = server->proxy_pipe[1] = -1;
|
|
server->proxy_extra_data_len = 0;
|
|
strcpy (server->bind_name, bind_name);
|
|
set_fd_nonblocking (listening_fd);
|
|
protobuf_c_dispatch_watch_fd (dispatch, listening_fd, PROTOBUF_C_EVENT_READABLE,
|
|
handle_server_listener_readable, server);
|
|
return server;
|
|
}
|
|
|
|
/* this function is for handling the common problem
|
|
that we bind over-and-over again to the same
|
|
unix path.
|
|
|
|
ideally, you'd think the OS's SO_REUSEADDR flag would
|
|
cause this to happen, but it doesn't,
|
|
at least on my linux 2.6 box.
|
|
|
|
in fact, we really need a way to test without
|
|
actually connecting to the remote server,
|
|
which might annoy it.
|
|
|
|
XXX: we should survey what others do here... like x-windows...
|
|
*/
|
|
/* NOTE: stolen from gsk, obviously */
|
|
static void
|
|
_gsk_socket_address_local_maybe_delete_stale_socket (const char *path,
|
|
struct sockaddr *addr,
|
|
unsigned addr_len)
|
|
{
|
|
int fd;
|
|
struct stat statbuf;
|
|
if (stat (path, &statbuf) < 0)
|
|
return;
|
|
if (!S_ISSOCK (statbuf.st_mode))
|
|
{
|
|
fprintf (stderr, "%s existed but was not a socket\n", path);
|
|
return;
|
|
}
|
|
|
|
fd = socket (PF_UNIX, SOCK_STREAM, 0);
|
|
if (fd < 0)
|
|
return;
|
|
set_fd_nonblocking (fd);
|
|
if (connect (fd, addr, addr_len) < 0)
|
|
{
|
|
if (errno == EINPROGRESS)
|
|
{
|
|
close (fd);
|
|
return;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
close (fd);
|
|
return;
|
|
}
|
|
|
|
/* ok, we should delete the stale socket */
|
|
close (fd);
|
|
if (unlink (path) < 0)
|
|
fprintf (stderr, "unable to delete %s: %s\n",
|
|
path, strerror(errno));
|
|
}
|
|
ProtobufC_RPC_Server *
|
|
protobuf_c_rpc_server_new (ProtobufC_RPC_AddressType type,
|
|
const char *name,
|
|
ProtobufCService *service,
|
|
ProtobufCDispatch *dispatch)
|
|
{
|
|
int fd = -1;
|
|
int protocol_family;
|
|
struct sockaddr *address;
|
|
socklen_t address_len;
|
|
struct sockaddr_un addr_un;
|
|
struct sockaddr_in addr_in;
|
|
switch (type)
|
|
{
|
|
case PROTOBUF_C_RPC_ADDRESS_LOCAL:
|
|
protocol_family = PF_UNIX;
|
|
memset (&addr_un, 0, sizeof (addr_un));
|
|
addr_un.sun_family = AF_LOCAL;
|
|
strncpy (addr_un.sun_path, name, sizeof (addr_un.sun_path));
|
|
address_len = sizeof (addr_un);
|
|
address = (struct sockaddr *) (&addr_un);
|
|
_gsk_socket_address_local_maybe_delete_stale_socket (name,
|
|
address,
|
|
address_len);
|
|
break;
|
|
case PROTOBUF_C_RPC_ADDRESS_TCP:
|
|
protocol_family = PF_INET;
|
|
memset (&addr_in, 0, sizeof (addr_in));
|
|
addr_in.sin_family = AF_INET;
|
|
{
|
|
unsigned port = atoi (name);
|
|
addr_in.sin_port = htons (port);
|
|
}
|
|
address_len = sizeof (addr_in);
|
|
address = (struct sockaddr *) (&addr_in);
|
|
break;
|
|
default:
|
|
protobuf_c_assert (0);
|
|
}
|
|
|
|
fd = socket (protocol_family, SOCK_STREAM, 0);
|
|
if (fd < 0)
|
|
{
|
|
fprintf (stderr, "protobuf_c_rpc_server_new: socket() failed: %s\n",
|
|
strerror (errno));
|
|
return NULL;
|
|
}
|
|
if (bind (fd, address, address_len) < 0)
|
|
{
|
|
fprintf (stderr, "protobuf_c_rpc_server_new: error binding to port: %s\n",
|
|
strerror (errno));
|
|
return NULL;
|
|
}
|
|
if (listen (fd, 255) < 0)
|
|
{
|
|
fprintf (stderr, "protobuf_c_rpc_server_new: listen() failed: %s\n",
|
|
strerror (errno));
|
|
return NULL;
|
|
}
|
|
return server_new_from_fd (fd, type, name, service, dispatch);
|
|
}
|
|
|
|
ProtobufCService *
|
|
protobuf_c_rpc_server_destroy (ProtobufC_RPC_Server *server,
|
|
protobuf_c_boolean destroy_underlying)
|
|
{
|
|
ProtobufCService *rv = destroy_underlying ? NULL : server->underlying;
|
|
while (server->first_connection != NULL)
|
|
server_connection_close (server->first_connection);
|
|
|
|
if (server->address_type == PROTOBUF_C_RPC_ADDRESS_LOCAL)
|
|
unlink (server->bind_name);
|
|
server->allocator->free (server->allocator, server->bind_name);
|
|
|
|
while (server->recycled_requests != NULL)
|
|
{
|
|
ServerRequest *req = server->recycled_requests;
|
|
server->recycled_requests = req->info.recycled.next;
|
|
server->allocator->free (server->allocator, req);
|
|
}
|
|
|
|
protobuf_c_dispatch_close_fd (server->dispatch, server->listening_fd);
|
|
|
|
if (destroy_underlying)
|
|
protobuf_c_service_destroy (server->underlying);
|
|
|
|
server->allocator->free (server->allocator, server);
|
|
|
|
return rv;
|
|
}
|
|
|
|
/* Number of proxied requests to try to grab in a single read */
|
|
#define PROXY_BUF_SIZE 256
|
|
static void
|
|
handle_proxy_pipe_readable (ProtobufC_FD fd,
|
|
unsigned events,
|
|
void *callback_data)
|
|
{
|
|
int nread;
|
|
ProtobufC_RPC_Server *server = callback_data;
|
|
ProtobufCAllocator *allocator = server->allocator;
|
|
union {
|
|
char buf[sizeof(void*) * PROXY_BUF_SIZE];
|
|
ProxyResponse *responses[PROXY_BUF_SIZE];
|
|
} u;
|
|
unsigned amt, i;
|
|
memcpy (u.buf, server->proxy_extra_data, server->proxy_extra_data_len);
|
|
nread = read (fd, u.buf + server->proxy_extra_data_len, sizeof (u.buf) - server->proxy_extra_data_len);
|
|
if (nread <= 0)
|
|
return; /* TODO: handle 0 and non-retryable errors separately */
|
|
amt = server->proxy_extra_data_len + nread;
|
|
for (i = 0; i < amt / sizeof(void*); i++)
|
|
{
|
|
ProxyResponse *pr = u.responses[i];
|
|
ServerRequest *request = pr->request;
|
|
if (request->conn == NULL)
|
|
{
|
|
/* defunct request */
|
|
allocator->free (allocator, request);
|
|
}
|
|
else
|
|
{
|
|
ServerConnection *conn = request->conn;
|
|
protobuf_c_boolean must_set_output_watch = (conn->outgoing.size == 0);
|
|
protobuf_c_data_buffer_append (&conn->outgoing, (uint8_t*)(pr+1), pr->len);
|
|
if (must_set_output_watch)
|
|
protobuf_c_dispatch_watch_fd (conn->server->dispatch,
|
|
conn->fd,
|
|
PROTOBUF_C_EVENT_READABLE|PROTOBUF_C_EVENT_WRITABLE,
|
|
handle_server_connection_events,
|
|
conn);
|
|
GSK_LIST_REMOVE (GET_PENDING_REQUEST_LIST (conn), request);
|
|
conn->n_pending_requests--;
|
|
|
|
free_server_request (conn->server, request);
|
|
}
|
|
allocator->free (allocator, pr);
|
|
}
|
|
memcpy (server->proxy_extra_data, u.buf + i * sizeof(void*), amt - i * sizeof(void*));
|
|
server->proxy_extra_data_len = amt - i * sizeof(void*);
|
|
}
|
|
|
|
void
|
|
protobuf_c_rpc_server_configure_threading (ProtobufC_RPC_Server *server,
|
|
ProtobufC_RPC_IsRpcThreadFunc func,
|
|
void *is_rpc_data)
|
|
{
|
|
server->is_rpc_thread_func = func;
|
|
server->is_rpc_thread_data = is_rpc_data;
|
|
retry_pipe:
|
|
if (pipe (server->proxy_pipe) < 0)
|
|
{
|
|
if (errno == EINTR || errno == EAGAIN)
|
|
goto retry_pipe;
|
|
server_failed_literal (server, PROTOBUF_C_ERROR_CODE_PROXY_PROBLEM,
|
|
"error creating pipe for thread-proxying");
|
|
return;
|
|
}
|
|
|
|
/* make the read side non-blocking, since we will use it from the main-loop;
|
|
leave the write side blocking, since it will be used from foreign threads */
|
|
set_fd_nonblocking (server->proxy_pipe[0]);
|
|
protobuf_c_dispatch_watch_fd (server->dispatch, server->proxy_pipe[0],
|
|
PROTOBUF_C_EVENT_READABLE,
|
|
handle_proxy_pipe_readable, server);
|
|
}
|
|
|
|
void
|
|
protobuf_c_rpc_server_set_error_handler (ProtobufC_RPC_Server *server,
|
|
ProtobufC_RPC_Error_Func func,
|
|
void *error_func_data)
|
|
{
|
|
server->error_handler = func;
|
|
server->error_handler_data = error_func_data;
|
|
}
|