git-svn-id: https://protobuf-c.googlecode.com/svn/trunk@113 00440858-1255-0410-a3e6-75ea37f81c3a

This commit is contained in:
lahiker42 2009-01-21 17:16:45 +00:00
parent 73cb6e3023
commit db15bf48eb
2 changed files with 196 additions and 2 deletions

View File

@ -1,3 +1,4 @@
#include "protobuf-c-rpc.h"
#include "protobuf-c-data-buffer.h" #include "protobuf-c-data-buffer.h"
typedef struct _ProtobufC_RPC_Client ProtobufC_RPC_Client; typedef struct _ProtobufC_RPC_Client ProtobufC_RPC_Client;
@ -25,18 +26,211 @@ struct _ProtobufC_RPC_Client
ProtobufC_FD fd; ProtobufC_FD fd;
protobuf_c_boolean autoretry; protobuf_c_boolean autoretry;
unsigned autoretry_millis; unsigned autoretry_millis;
ProtobufC_NameLookup_Func resolver;
union { union {
struct { struct {
ProtobufCDispatch_Idle *idle; ProtobufCDispatch_Idle *idle;
} init; } init;
struct { struct {
ProtobufCDispatch_Timer *timer; ProtobufCDispatch_Timer *timer;
char *error_message;
} failed_waiting; } failed_waiting;
struct {
char *error_message;
} failed;
}; };
}; };
static void
set_fd_nonblocking(int fd)
{
int flags = fcntl (fd, F_GETFL);
protobuf_c_assert (flags >= 0);
fcntl (fd, F_SETFL, flags | O_NONBLOCK);
}
static void
client_failed (ProtobufC_RPC_Client *client,
const char *format_str,
...)
{
...
}
static void
begin_connecting (ProtobufC_RPC_Client *client,
struct sockaddr_t *address,
size_t addr_len)
{
protobuf_c_assert (client->state == PROTOBUF_C_CLIENT_STATE_NAME_LOOKUP);
client->state = PROTOBUF_C_CLIENT_STATE_CONNECTING;
client->fd = socket (PF_UNIX, SOCK_STREAM, 0);
if (client->fd < 0)
{
client_failed (client, "error creating socket: %s", strerror (errno));
return;
}
set_fd_nonblocking (client->fd);
if (connect (client->fd, address, addr_len) < 0)
{
if (errno == EINPROGRESS)
{
/* register interest in fd */
protobuf_c_dispatch_watch_fd (client->dispatch,
client->fd,
PROTOBUF_C_EVENT_READABLE|PROTOBUF_C_EVENT_WRITABLE,
handle_client_fd_connect_events,
client);
return;
}
close (client->fd);
client->fd = -1;
client_failed (client, "error connecting to remote host: %s", strerror (errno));
return;
}
client->state = PROTOBUF_C_CLIENT_STATE_CONNECTED;
if (client->first_outgoing_request != NULL)
{
/* register interest in writing to fd (there can be no pending requests, of course,
since we just connected) */
protobuf_c_dispatch_watch_fd (client->dispatch,
client->fd,
PROTOBUF_C_EVENT_WRITABLE,
handle_connected_client_fd_events,
client);
}
}
static void
handle_name_lookup_success (const uint8_t *address,
void *callback_data)
{
ProtobufC_RPC_Client *client = callback_data;
struct sockaddr_in address;
protobuf_c_assert (client->state == PROTOBUF_C_CLIENT_STATE_NAME_LOOKUP);
protobuf_c_assert (client->info.name_lookup.pending);
client->info.name_lookup.pending = 0;
address.sin_family = PF_INET;
memcpy (address.sin_addr, address, 4);
address.sin_port = htons (client->info.name_lookup.port);
begin_connecting (client, (struct sockaddr *) &address, sizeof (address));
}
static void
handle_name_lookup_failure (const char *error_message,
void *callback_data)
{
ProtobufC_RPC_Client *client = callback_data;
protobuf_c_assert (client->state == PROTOBUF_C_CLIENT_STATE_NAME_LOOKUP);
protobuf_c_assert (client->info.name_lookup.pending);
client->info.name_lookup.pending = 0;
client_failed ("name lookup failed (for name from %s): %s", client->name, error_message);
}
static void
begin_name_lookup (ProtobufC_RPC_Client *client)
{
protobuf_c_assert (client->state == PROTOBUF_C_CLIENT_STATE_INIT
|| client->state == PROTOBUF_C_CLIENT_STATE_FAILED_WAITING
|| client->state == PROTOBUF_C_CLIENT_STATE_FAILED);
client->state = PROTOBUF_C_CLIENT_STATE_NAME_LOOKUP;
client->info.name_lookup.pending = 0;
switch (client->address_type)
{
case PROTOBUF_C_RPC_ADDRESS_LOCAL:
{
struct sockaddr_un addr;
addr.sun_family = AF_UNIX;
strncpy (addr.sun_path, client->name, sizeof (addr.sun_path));
begin_connecting (client, (struct sockaddr *) &addr);
return;
}
case PROTOBUF_C_RPC_ADDRESS_TCP:
{
/* parse hostname:port from client->name */
const char *colon = strchr (client->name, ':');
char *host;
unsigned port;
if (colon == NULL)
{
client_failed (client,
"name '%s' does not have a : in it (supposed to be HOST:PORT)",
client->name);
return;
}
host = client->allocator->alloc (client->allocator, colon + 1 - client->name);
memcpy (host, client->name, colon - client->name);
host[colon - client->name] = 0;
port = atoi (colon + 1);
client->info.name_lookup.pending = 1;
client->info.name_lookup.port = port;
client->resolver (client->dispatch,
hostname,
handle_name_lookup_success,
handle_name_lookup_failure,
client);
/* cleanup */
client->allocator->free (client->allocator, host);
return;
}
default:
assert (0);
}
}
void void
handle_init_idle (ProtobufCDispatch * handle_init_idle (ProtobufCDispatch *dispatch,
void *data)
{
ProtobufC_RPC_Client *client = data;
}
static void
enqueue_request (ProtobufC_RPC_Client *client,
unsigned method_index,
const ProtobufCMessage *input,
ProtobufCClosure closure,
void *closure_data)
{
...
}
static void
invoke_client_rpc (ProtobufCService *service,
unsigned method_index,
const ProtobufCMessage *input,
ProtobufCClosure closure,
void *closure_data)
{
ProtobufC_RPC_Client *client = (ProtobufC_RPC_Client *) service;
protobuf_c_assert (service->invoke == invoke_client_rpc);
switch (client->state)
{
case PROTOBUF_C_CLIENT_STATE_INIT:
case PROTOBUF_C_CLIENT_STATE_NAME_LOOKUP:
case PROTOBUF_C_CLIENT_STATE_CONNECTING:
enqueue_request (client, method_index, input, closure, closure_data);
break;
case PROTOBUF_C_CLIENT_STATE_CONNECTED:
{
int had_outgoing = (client->first_outgoing_request != NULL);
enqueue_request (client, method_index, input, closure, closure_data);
if (!had_outgoing)
update_connected_client_watch (client);
}
break;
case PROTOBUF_C_CLIENT_STATE_FAILED_WAITING:
case PROTOBUF_C_CLIENT_STATE_FAILED: /* if no autoretry */
closure (NULL, closure_data);
break;
}
}
ProtobufCService *protobuf_c_rpc_client_new (ProtobufC_RPC_AddressType type, ProtobufCService *protobuf_c_rpc_client_new (ProtobufC_RPC_AddressType type,
const char *name, const char *name,
@ -60,4 +254,3 @@ ProtobufCService *protobuf_c_rpc_client_new (ProtobufC_RPC_AddressType type,
rv->info.init = protobuf_c_dispatch_add_idle (dispatch, handle_init_idle, rv); rv->info.init = protobuf_c_dispatch_add_idle (dispatch, handle_init_idle, rv);
return &rv->base; return &rv->base;
} }
-

View File

@ -42,6 +42,7 @@ typedef void (*ProtobufC_NameLookup_Found) (const uint8_t *address,
typedef void (*ProtobufC_NameLookup_Failed)(const char *error_message, typedef void (*ProtobufC_NameLookup_Failed)(const char *error_message,
void *callback_data); void *callback_data);
typedef void (*ProtobufC_NameLookup_Func) (ProtobufCDispatch *dispatch, typedef void (*ProtobufC_NameLookup_Func) (ProtobufCDispatch *dispatch,
const char *name,
ProtobufC_NameLookup_Found found_func, ProtobufC_NameLookup_Found found_func,
ProtobufC_NameLookup_Failed failed_func, ProtobufC_NameLookup_Failed failed_func,
gpointer callback_data); gpointer callback_data);