diff --git a/src/ctx.cpp b/src/ctx.cpp index e6271a86..74bbef05 100644 --- a/src/ctx.cpp +++ b/src/ctx.cpp @@ -392,7 +392,7 @@ zmq::endpoint_t zmq::ctx_t::find_endpoint (const char *addr_) return endpoint; } -void zmq::ctx_t::pend_connection (const char *addr_, const pending_connection_t &pending_connection_) +void zmq::ctx_t::pend_connection (const char *addr_, pending_connection_t &pending_connection_) { endpoints_sync.lock (); @@ -400,18 +400,17 @@ void zmq::ctx_t::pend_connection (const char *addr_, const pending_connection_t if (it == endpoints.end ()) { // Still no bind. - pending_connection_.socket->inc_seqnum (); + pending_connection_.endpoint.socket->inc_seqnum (); pending_connections.insert (pending_connections_t::value_type (std::string (addr_), pending_connection_)); } else { // Bind has happened in the mean time, connect directly - pending_connection_t copy = pending_connection_; it->second.socket->inc_seqnum(); - copy.pipe->set_tid(it->second.socket->get_tid()); + pending_connection_.bind_pipe->set_tid(it->second.socket->get_tid()); command_t cmd; cmd.type = command_t::bind; - cmd.args.bind.pipe = copy.pipe; + cmd.args.bind.pipe = pending_connection_.bind_pipe; it->second.socket->process_command(cmd); } @@ -427,13 +426,37 @@ void zmq::ctx_t::connect_pending (const char *addr_, zmq::socket_base_t *bind_so for (pending_connections_t::iterator p = pending.first; p != pending.second; ++p) { bind_socket_->inc_seqnum(); - p->second.pipe->set_tid(bind_socket_->get_tid()); + p->second.bind_pipe->set_tid(bind_socket_->get_tid()); command_t cmd; cmd.type = command_t::bind; - cmd.args.bind.pipe = p->second.pipe; + cmd.args.bind.pipe = p->second.bind_pipe; bind_socket_->process_command(cmd); - bind_socket_->send_inproc_connected(p->second.socket); + bind_socket_->send_inproc_connected(p->second.endpoint.socket); + + // Send identities + options_t& bind_options = endpoints[addr_].options; + if (bind_options.recv_identity) { + + msg_t id; + int rc = id.init_size (p->second.endpoint.options.identity_size); + errno_assert (rc == 0); + memcpy (id.data (), p->second.endpoint.options.identity, p->second.endpoint.options.identity_size); + id.set_flags (msg_t::identity); + bool written = p->second.connect_pipe->write (&id); + zmq_assert (written); + p->second.connect_pipe->flush (); + } + if (p->second.endpoint.options.recv_identity) { + msg_t id; + int rc = id.init_size (bind_options.identity_size); + errno_assert (rc == 0); + memcpy (id.data (), bind_options.identity, bind_options.identity_size); + id.set_flags (msg_t::identity); + bool written = p->second.bind_pipe->write (&id); + zmq_assert (written); + p->second.bind_pipe->flush (); + } } pending_connections.erase(pending.first, pending.second); diff --git a/src/ctx.hpp b/src/ctx.hpp index 1a5cc430..6fd9e369 100644 --- a/src/ctx.hpp +++ b/src/ctx.hpp @@ -53,8 +53,9 @@ namespace zmq struct pending_connection_t { - socket_base_t *socket; - pipe_t* pipe; + endpoint_t endpoint; + pipe_t* connect_pipe; + pipe_t* bind_pipe; }; // Context object encapsulates all the global state associated with @@ -108,7 +109,7 @@ namespace zmq int register_endpoint (const char *addr_, endpoint_t &endpoint_); void unregister_endpoints (zmq::socket_base_t *socket_); endpoint_t find_endpoint (const char *addr_); - void pend_connection (const char *addr_, const pending_connection_t &pending_connection_); + void pend_connection (const char *addr_, pending_connection_t &pending_connection_); void connect_pending (const char *addr_, zmq::socket_base_t *bind_socket_); enum { diff --git a/src/object.cpp b/src/object.cpp index ba20b8b0..e658b287 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -152,7 +152,7 @@ zmq::endpoint_t zmq::object_t::find_endpoint (const char *addr_) return ctx->find_endpoint (addr_); } -void zmq::object_t::pend_connection (const char *addr_, const pending_connection_t &pending_connection_) +void zmq::object_t::pend_connection (const char *addr_, pending_connection_t &pending_connection_) { ctx->pend_connection (addr_, pending_connection_); } diff --git a/src/object.hpp b/src/object.hpp index ab171e78..6bc52e4a 100644 --- a/src/object.hpp +++ b/src/object.hpp @@ -60,7 +60,7 @@ namespace zmq int register_endpoint (const char *addr_, zmq::endpoint_t &endpoint_); void unregister_endpoints (zmq::socket_base_t *socket_); zmq::endpoint_t find_endpoint (const char *addr_); - void pend_connection (const char *addr_, const pending_connection_t &pending_connection_); + void pend_connection (const char *addr_, pending_connection_t &pending_connection_); void connect_pending (const char *addr_, zmq::socket_base_t *bind_socket_); void destroy_socket (zmq::socket_base_t *socket_); diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 8709b3c4..b4b8df1c 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -466,7 +466,8 @@ int zmq::socket_base_t::connect (const char *addr_) if (!peer.socket) { - pending_connection_t pending_connection = {this, new_pipes [1]}; + endpoint_t endpoint = {this, options}; + pending_connection_t pending_connection = {endpoint, new_pipes [0], new_pipes [1]}; pend_connection (addr_, pending_connection); } else diff --git a/tests/test_inproc_connect_before_bind.cpp b/tests/test_inproc_connect_before_bind.cpp index 90004e80..71e228c7 100644 --- a/tests/test_inproc_connect_before_bind.cpp +++ b/tests/test_inproc_connect_before_bind.cpp @@ -226,7 +226,7 @@ void test_multiple_connects() void test_multiple_threads() { - const unsigned int no_of_threads = 10; + const unsigned int no_of_threads = 30; void *ctx = zmq_ctx_new (); assert (ctx); @@ -239,8 +239,6 @@ void test_multiple_threads() threads [i] = zmq_threadstart (&pusher, ctx); } - //zmq_sleep(1); - // Now bind void *bindSocket = zmq_socket (ctx, ZMQ_PULL); assert (bindSocket); @@ -272,15 +270,72 @@ void test_multiple_threads() assert (rc == 0); } +void test_identity() +{ + // Create the infrastructure + void *ctx = zmq_ctx_new (); + assert (ctx); + + void *sc = zmq_socket (ctx, ZMQ_DEALER); + assert (sc); + + int rc = zmq_connect (sc, "inproc://a"); + assert (rc == 0); + + void *sb = zmq_socket (ctx, ZMQ_ROUTER); + assert (sb); + + rc = zmq_bind (sb, "inproc://a"); + assert (rc == 0); + + // Send 2-part message. + rc = zmq_send (sc, "A", 1, ZMQ_SNDMORE); + assert (rc == 1); + rc = zmq_send (sc, "B", 1, 0); + assert (rc == 1); + + // Identity comes first. + zmq_msg_t msg; + rc = zmq_msg_init (&msg); + assert (rc == 0); + rc = zmq_msg_recv (&msg, sb, 0); + assert (rc >= 0); + int more = zmq_msg_more (&msg); + assert (more == 1); + + // Then the first part of the message body. + rc = zmq_msg_recv (&msg, sb, 0); + assert (rc == 1); + more = zmq_msg_more (&msg); + assert (more == 1); + + // And finally, the second part of the message body. + rc = zmq_msg_recv (&msg, sb, 0); + assert (rc == 1); + more = zmq_msg_more (&msg); + assert (more == 0); + + // Deallocate the infrastructure. + rc = zmq_close (sc); + assert (rc == 0); + + rc = zmq_close (sb); + assert (rc == 0); + + rc = zmq_ctx_term (ctx); + assert (rc == 0); +} + int main (void) { setup_test_environment(); - test_bind_before_connect(); - test_connect_before_bind(); - test_connect_before_bind_pub_sub(); - test_multiple_connects(); - test_multiple_threads(); + test_bind_before_connect (); + test_connect_before_bind (); + test_connect_before_bind_pub_sub (); + test_multiple_connects (); + test_multiple_threads (); + test_identity (); return 0; }