0
0
mirror of https://github.com/zeromq/libzmq.git synced 2025-01-14 17:58:01 +08:00

Send identities when connecting pending sockets.

This commit is contained in:
Richard Newton 2013-09-12 21:07:29 +01:00
parent 7c3496a76a
commit 379bcb0881
6 changed files with 102 additions and 22 deletions

View File

@ -392,7 +392,7 @@ zmq::endpoint_t zmq::ctx_t::find_endpoint (const char *addr_)
return endpoint; return endpoint;
} }
void zmq::ctx_t::pend_connection (const char *addr_, const pending_connection_t &pending_connection_) void zmq::ctx_t::pend_connection (const char *addr_, pending_connection_t &pending_connection_)
{ {
endpoints_sync.lock (); endpoints_sync.lock ();
@ -400,18 +400,17 @@ void zmq::ctx_t::pend_connection (const char *addr_, const pending_connection_t
if (it == endpoints.end ()) if (it == endpoints.end ())
{ {
// Still no bind. // Still no bind.
pending_connection_.socket->inc_seqnum (); pending_connection_.endpoint.socket->inc_seqnum ();
pending_connections.insert (pending_connections_t::value_type (std::string (addr_), pending_connection_)); pending_connections.insert (pending_connections_t::value_type (std::string (addr_), pending_connection_));
} }
else else
{ {
// Bind has happened in the mean time, connect directly // Bind has happened in the mean time, connect directly
pending_connection_t copy = pending_connection_;
it->second.socket->inc_seqnum(); it->second.socket->inc_seqnum();
copy.pipe->set_tid(it->second.socket->get_tid()); pending_connection_.bind_pipe->set_tid(it->second.socket->get_tid());
command_t cmd; command_t cmd;
cmd.type = command_t::bind; cmd.type = command_t::bind;
cmd.args.bind.pipe = copy.pipe; cmd.args.bind.pipe = pending_connection_.bind_pipe;
it->second.socket->process_command(cmd); it->second.socket->process_command(cmd);
} }
@ -427,13 +426,37 @@ void zmq::ctx_t::connect_pending (const char *addr_, zmq::socket_base_t *bind_so
for (pending_connections_t::iterator p = pending.first; p != pending.second; ++p) for (pending_connections_t::iterator p = pending.first; p != pending.second; ++p)
{ {
bind_socket_->inc_seqnum(); bind_socket_->inc_seqnum();
p->second.pipe->set_tid(bind_socket_->get_tid()); p->second.bind_pipe->set_tid(bind_socket_->get_tid());
command_t cmd; command_t cmd;
cmd.type = command_t::bind; cmd.type = command_t::bind;
cmd.args.bind.pipe = p->second.pipe; cmd.args.bind.pipe = p->second.bind_pipe;
bind_socket_->process_command(cmd); bind_socket_->process_command(cmd);
bind_socket_->send_inproc_connected(p->second.socket); bind_socket_->send_inproc_connected(p->second.endpoint.socket);
// Send identities
options_t& bind_options = endpoints[addr_].options;
if (bind_options.recv_identity) {
msg_t id;
int rc = id.init_size (p->second.endpoint.options.identity_size);
errno_assert (rc == 0);
memcpy (id.data (), p->second.endpoint.options.identity, p->second.endpoint.options.identity_size);
id.set_flags (msg_t::identity);
bool written = p->second.connect_pipe->write (&id);
zmq_assert (written);
p->second.connect_pipe->flush ();
}
if (p->second.endpoint.options.recv_identity) {
msg_t id;
int rc = id.init_size (bind_options.identity_size);
errno_assert (rc == 0);
memcpy (id.data (), bind_options.identity, bind_options.identity_size);
id.set_flags (msg_t::identity);
bool written = p->second.bind_pipe->write (&id);
zmq_assert (written);
p->second.bind_pipe->flush ();
}
} }
pending_connections.erase(pending.first, pending.second); pending_connections.erase(pending.first, pending.second);

View File

@ -53,8 +53,9 @@ namespace zmq
struct pending_connection_t struct pending_connection_t
{ {
socket_base_t *socket; endpoint_t endpoint;
pipe_t* pipe; pipe_t* connect_pipe;
pipe_t* bind_pipe;
}; };
// Context object encapsulates all the global state associated with // Context object encapsulates all the global state associated with
@ -108,7 +109,7 @@ namespace zmq
int register_endpoint (const char *addr_, endpoint_t &endpoint_); int register_endpoint (const char *addr_, endpoint_t &endpoint_);
void unregister_endpoints (zmq::socket_base_t *socket_); void unregister_endpoints (zmq::socket_base_t *socket_);
endpoint_t find_endpoint (const char *addr_); endpoint_t find_endpoint (const char *addr_);
void pend_connection (const char *addr_, const pending_connection_t &pending_connection_); void pend_connection (const char *addr_, pending_connection_t &pending_connection_);
void connect_pending (const char *addr_, zmq::socket_base_t *bind_socket_); void connect_pending (const char *addr_, zmq::socket_base_t *bind_socket_);
enum { enum {

View File

@ -152,7 +152,7 @@ zmq::endpoint_t zmq::object_t::find_endpoint (const char *addr_)
return ctx->find_endpoint (addr_); return ctx->find_endpoint (addr_);
} }
void zmq::object_t::pend_connection (const char *addr_, const pending_connection_t &pending_connection_) void zmq::object_t::pend_connection (const char *addr_, pending_connection_t &pending_connection_)
{ {
ctx->pend_connection (addr_, pending_connection_); ctx->pend_connection (addr_, pending_connection_);
} }

View File

@ -60,7 +60,7 @@ namespace zmq
int register_endpoint (const char *addr_, zmq::endpoint_t &endpoint_); int register_endpoint (const char *addr_, zmq::endpoint_t &endpoint_);
void unregister_endpoints (zmq::socket_base_t *socket_); void unregister_endpoints (zmq::socket_base_t *socket_);
zmq::endpoint_t find_endpoint (const char *addr_); zmq::endpoint_t find_endpoint (const char *addr_);
void pend_connection (const char *addr_, const pending_connection_t &pending_connection_); void pend_connection (const char *addr_, pending_connection_t &pending_connection_);
void connect_pending (const char *addr_, zmq::socket_base_t *bind_socket_); void connect_pending (const char *addr_, zmq::socket_base_t *bind_socket_);
void destroy_socket (zmq::socket_base_t *socket_); void destroy_socket (zmq::socket_base_t *socket_);

View File

@ -466,7 +466,8 @@ int zmq::socket_base_t::connect (const char *addr_)
if (!peer.socket) if (!peer.socket)
{ {
pending_connection_t pending_connection = {this, new_pipes [1]}; endpoint_t endpoint = {this, options};
pending_connection_t pending_connection = {endpoint, new_pipes [0], new_pipes [1]};
pend_connection (addr_, pending_connection); pend_connection (addr_, pending_connection);
} }
else else

View File

@ -226,7 +226,7 @@ void test_multiple_connects()
void test_multiple_threads() void test_multiple_threads()
{ {
const unsigned int no_of_threads = 10; const unsigned int no_of_threads = 30;
void *ctx = zmq_ctx_new (); void *ctx = zmq_ctx_new ();
assert (ctx); assert (ctx);
@ -239,8 +239,6 @@ void test_multiple_threads()
threads [i] = zmq_threadstart (&pusher, ctx); threads [i] = zmq_threadstart (&pusher, ctx);
} }
//zmq_sleep(1);
// Now bind // Now bind
void *bindSocket = zmq_socket (ctx, ZMQ_PULL); void *bindSocket = zmq_socket (ctx, ZMQ_PULL);
assert (bindSocket); assert (bindSocket);
@ -272,15 +270,72 @@ void test_multiple_threads()
assert (rc == 0); assert (rc == 0);
} }
void test_identity()
{
// Create the infrastructure
void *ctx = zmq_ctx_new ();
assert (ctx);
void *sc = zmq_socket (ctx, ZMQ_DEALER);
assert (sc);
int rc = zmq_connect (sc, "inproc://a");
assert (rc == 0);
void *sb = zmq_socket (ctx, ZMQ_ROUTER);
assert (sb);
rc = zmq_bind (sb, "inproc://a");
assert (rc == 0);
// Send 2-part message.
rc = zmq_send (sc, "A", 1, ZMQ_SNDMORE);
assert (rc == 1);
rc = zmq_send (sc, "B", 1, 0);
assert (rc == 1);
// Identity comes first.
zmq_msg_t msg;
rc = zmq_msg_init (&msg);
assert (rc == 0);
rc = zmq_msg_recv (&msg, sb, 0);
assert (rc >= 0);
int more = zmq_msg_more (&msg);
assert (more == 1);
// Then the first part of the message body.
rc = zmq_msg_recv (&msg, sb, 0);
assert (rc == 1);
more = zmq_msg_more (&msg);
assert (more == 1);
// And finally, the second part of the message body.
rc = zmq_msg_recv (&msg, sb, 0);
assert (rc == 1);
more = zmq_msg_more (&msg);
assert (more == 0);
// Deallocate the infrastructure.
rc = zmq_close (sc);
assert (rc == 0);
rc = zmq_close (sb);
assert (rc == 0);
rc = zmq_ctx_term (ctx);
assert (rc == 0);
}
int main (void) int main (void)
{ {
setup_test_environment(); setup_test_environment();
test_bind_before_connect(); test_bind_before_connect ();
test_connect_before_bind(); test_connect_before_bind ();
test_connect_before_bind_pub_sub(); test_connect_before_bind_pub_sub ();
test_multiple_connects(); test_multiple_connects ();
test_multiple_threads(); test_multiple_threads ();
test_identity ();
return 0; return 0;
} }