0
0
mirror of https://github.com/zeromq/libzmq.git synced 2025-01-14 09:47:56 +08:00

First attempt at inproc connect before bind

This commit is contained in:
Richard Newton 2013-09-12 14:44:44 +01:00
parent 8e6b5ad17e
commit 5f20d63665
9 changed files with 270 additions and 40 deletions

View File

@ -609,7 +609,8 @@ set(tests
test_spec_router test_spec_router
test_sub_forward test_sub_forward
test_term_endpoint test_term_endpoint
test_timeo) test_timeo
test_inproc_connect_before_bind)
if(NOT WIN32) if(NOT WIN32)
list(APPEND tests list(APPEND tests
test_monitor test_monitor

View File

@ -392,6 +392,34 @@ zmq::endpoint_t zmq::ctx_t::find_endpoint (const char *addr_)
return endpoint; return endpoint;
} }
void zmq::ctx_t::pend_connection (const char *addr_, const pending_connection_t &pending_connection_)
{
endpoints_sync.lock ();
// Todo, use multimap to support multiple pending connections
pending_connections[addr_] = pending_connection_;
endpoints_sync.unlock ();
}
zmq::pending_connection_t zmq::ctx_t::next_pending_connection(const char *addr_)
{
endpoints_sync.lock ();
pending_connections_t::iterator it = pending_connections.find (addr_);
if (it == pending_connections.end ()) {
endpoints_sync.unlock ();
pending_connection_t empty = {NULL, NULL};
return empty;
}
pending_connection_t pending_connection = it->second;
pending_connections.erase(it);
endpoints_sync.unlock ();
return pending_connection;
}
// The last used socket ID, or 0 if no socket was used so far. Note that this // The last used socket ID, or 0 if no socket was used so far. Note that this
// is a global variable. Thus, even sockets created in different contexts have // is a global variable. Thus, even sockets created in different contexts have
// unique IDs. // unique IDs.

View File

@ -40,6 +40,7 @@ namespace zmq
class io_thread_t; class io_thread_t;
class socket_base_t; class socket_base_t;
class reaper_t; class reaper_t;
class pipe_t;
// Information associated with inproc endpoint. Note that endpoint options // Information associated with inproc endpoint. Note that endpoint options
// are registered as well so that the peer can access them without a need // are registered as well so that the peer can access them without a need
@ -50,6 +51,12 @@ namespace zmq
options_t options; options_t options;
}; };
struct pending_connection_t
{
socket_base_t *socket;
pipe_t* pipe;
};
// Context object encapsulates all the global state associated with // Context object encapsulates all the global state associated with
// the library. // the library.
@ -101,6 +108,8 @@ namespace zmq
int register_endpoint (const char *addr_, endpoint_t &endpoint_); int register_endpoint (const char *addr_, endpoint_t &endpoint_);
void unregister_endpoints (zmq::socket_base_t *socket_); void unregister_endpoints (zmq::socket_base_t *socket_);
endpoint_t find_endpoint (const char *addr_); endpoint_t find_endpoint (const char *addr_);
void pend_connection (const char *addr_, const pending_connection_t &pending_connection_);
pending_connection_t next_pending_connection (const char *addr_);
enum { enum {
term_tid = 0, term_tid = 0,
@ -156,6 +165,10 @@ namespace zmq
typedef std::map <std::string, endpoint_t> endpoints_t; typedef std::map <std::string, endpoint_t> endpoints_t;
endpoints_t endpoints; endpoints_t endpoints;
// List of inproc connection endpoints pending a bind
typedef std::map <std::string, pending_connection_t> pending_connections_t;
pending_connections_t pending_connections;
// Synchronisation of access to the list of inproc endpoints. // Synchronisation of access to the list of inproc endpoints.
mutex_t endpoints_sync; mutex_t endpoints_sync;

View File

@ -49,6 +49,11 @@ uint32_t zmq::object_t::get_tid ()
return tid; return tid;
} }
void zmq::object_t::set_tid(uint32_t id)
{
tid = id;
}
zmq::ctx_t *zmq::object_t::get_ctx () zmq::ctx_t *zmq::object_t::get_ctx ()
{ {
return ctx; return ctx;
@ -143,6 +148,16 @@ zmq::endpoint_t zmq::object_t::find_endpoint (const char *addr_)
return ctx->find_endpoint (addr_); return ctx->find_endpoint (addr_);
} }
void zmq::object_t::pend_connection (const char *addr_, const pending_connection_t &pending_connection_)
{
ctx->pend_connection (addr_, pending_connection_);
}
zmq::pending_connection_t zmq::object_t::next_pending_connection (const char *addr_)
{
return ctx->next_pending_connection(addr_);
}
void zmq::object_t::destroy_socket (socket_base_t *socket_) void zmq::object_t::destroy_socket (socket_base_t *socket_)
{ {
ctx->destroy_socket (socket_); ctx->destroy_socket (socket_);

View File

@ -27,6 +27,7 @@ namespace zmq
struct i_engine; struct i_engine;
struct endpoint_t; struct endpoint_t;
struct pending_connection_t;
struct command_t; struct command_t;
class ctx_t; class ctx_t;
class pipe_t; class pipe_t;
@ -47,6 +48,7 @@ namespace zmq
virtual ~object_t (); virtual ~object_t ();
uint32_t get_tid (); uint32_t get_tid ();
void set_tid(uint32_t id);
ctx_t *get_ctx (); ctx_t *get_ctx ();
void process_command (zmq::command_t &cmd_); void process_command (zmq::command_t &cmd_);
@ -57,6 +59,9 @@ namespace zmq
int register_endpoint (const char *addr_, zmq::endpoint_t &endpoint_); int register_endpoint (const char *addr_, zmq::endpoint_t &endpoint_);
void unregister_endpoints (zmq::socket_base_t *socket_); void unregister_endpoints (zmq::socket_base_t *socket_);
zmq::endpoint_t find_endpoint (const char *addr_); zmq::endpoint_t find_endpoint (const char *addr_);
void pend_connection (const char *addr_, const pending_connection_t &pending_connection_);
zmq::pending_connection_t next_pending_connection (const char *addr_);
void destroy_socket (zmq::socket_base_t *socket_); void destroy_socket (zmq::socket_base_t *socket_);
// Logs an message. // Logs an message.

View File

@ -344,6 +344,50 @@ int zmq::socket_base_t::bind (const char *addr_)
if (rc == 0) { if (rc == 0) {
// Save last endpoint URI // Save last endpoint URI
last_endpoint.assign (addr_); last_endpoint.assign (addr_);
pending_connection_t pending_connection = next_pending_connection(addr_);
while (pending_connection.pipe != NULL)
{
inc_seqnum();
//// If required, send the identity of the local socket to the peer.
//if (peer.options.recv_identity) {
// msg_t id;
// rc = id.init_size (options.identity_size);
// errno_assert (rc == 0);
// memcpy (id.data (), options.identity, options.identity_size);
// id.set_flags (msg_t::identity);
// bool written = new_pipes [0]->write (&id);
// zmq_assert (written);
// new_pipes [0]->flush ();
//}
//// If required, send the identity of the peer to the local socket.
//if (options.recv_identity) {
// msg_t id;
// rc = id.init_size (peer.options.identity_size);
// errno_assert (rc == 0);
// memcpy (id.data (), peer.options.identity, peer.options.identity_size);
// id.set_flags (msg_t::identity);
// bool written = new_pipes [1]->write (&id);
// zmq_assert (written);
// new_pipes [1]->flush ();
//}
//// Attach remote end of the pipe to the peer socket. Note that peer's
//// seqnum was incremented in find_endpoint function. We don't need it
//// increased here.
//send_bind (peer.socket, new_pipes [1], false);
pending_connection.pipe->set_tid(get_tid());
command_t cmd;
cmd.type = command_t::bind;
cmd.args.bind.pipe = pending_connection.pipe;
process_command(cmd);
pending_connection = next_pending_connection(addr_);
}
} }
return rc; return rc;
} }
@ -435,8 +479,6 @@ int zmq::socket_base_t::connect (const char *addr_)
// Find the peer endpoint. // Find the peer endpoint.
endpoint_t peer = find_endpoint (addr_); endpoint_t peer = find_endpoint (addr_);
if (!peer.socket)
return -1;
// The total HWM for an inproc connection should be the sum of // The total HWM for an inproc connection should be the sum of
// the binder's HWM and the connector's HWM. // the binder's HWM and the connector's HWM.
@ -448,7 +490,7 @@ int zmq::socket_base_t::connect (const char *addr_)
rcvhwm = options.rcvhwm + peer.options.sndhwm; rcvhwm = options.rcvhwm + peer.options.sndhwm;
// Create a bi-directional pipe to connect the peers. // Create a bi-directional pipe to connect the peers.
object_t *parents [2] = {this, peer.socket}; object_t *parents [2] = {this, peer.socket == NULL ? this : peer.socket};
pipe_t *new_pipes [2] = {NULL, NULL}; pipe_t *new_pipes [2] = {NULL, NULL};
bool conflate = options.conflate && bool conflate = options.conflate &&
@ -466,35 +508,44 @@ int zmq::socket_base_t::connect (const char *addr_)
// Attach local end of the pipe to this socket object. // Attach local end of the pipe to this socket object.
attach_pipe (new_pipes [0]); attach_pipe (new_pipes [0]);
// If required, send the identity of the local socket to the peer. if (!peer.socket)
if (peer.options.recv_identity) { {
msg_t id; pending_connection_t pending_connection = {this, new_pipes [1]};
rc = id.init_size (options.identity_size); pend_connection (addr_, pending_connection);
errno_assert (rc == 0);
memcpy (id.data (), options.identity, options.identity_size);
id.set_flags (msg_t::identity);
bool written = new_pipes [0]->write (&id);
zmq_assert (written);
new_pipes [0]->flush ();
} }
else
{
// If required, send the identity of the local socket to the peer.
if (peer.options.recv_identity) {
msg_t id;
rc = id.init_size (options.identity_size);
errno_assert (rc == 0);
memcpy (id.data (), options.identity, options.identity_size);
id.set_flags (msg_t::identity);
bool written = new_pipes [0]->write (&id);
zmq_assert (written);
new_pipes [0]->flush ();
}
// If required, send the identity of the peer to the local socket. // If required, send the identity of the peer to the local socket.
if (options.recv_identity) { if (options.recv_identity) {
msg_t id; msg_t id;
rc = id.init_size (peer.options.identity_size); rc = id.init_size (peer.options.identity_size);
errno_assert (rc == 0); errno_assert (rc == 0);
memcpy (id.data (), peer.options.identity, peer.options.identity_size); memcpy (id.data (), peer.options.identity, peer.options.identity_size);
id.set_flags (msg_t::identity); id.set_flags (msg_t::identity);
bool written = new_pipes [1]->write (&id); bool written = new_pipes [1]->write (&id);
zmq_assert (written); zmq_assert (written);
new_pipes [1]->flush (); new_pipes [1]->flush ();
}
// Attach remote end of the pipe to the peer socket. Note that peer's
// seqnum was incremented in find_endpoint function. We don't need it
// increased here.
send_bind (peer.socket, new_pipes [1], false);
} }
// Attach remote end of the pipe to the peer socket. Note that peer's
// seqnum was incremented in find_endpoint function. We don't need it
// increased here.
send_bind (peer.socket, new_pipes [1], false);
// Save last endpoint URI // Save last endpoint URI
last_endpoint.assign (addr_); last_endpoint.assign (addr_);
@ -636,7 +687,7 @@ int zmq::socket_base_t::term_endpoint (const char *addr_)
errno = ENOENT; errno = ENOENT;
return -1; return -1;
} }
for (inprocs_t::iterator it = range.first; it != range.second; ++it) for (inprocs_t::iterator it = range.first; it != range.second; ++it)
it->second->terminate(true); it->second->terminate(true);
inprocs.erase (range.first, range.second); inprocs.erase (range.first, range.second);
@ -655,7 +706,7 @@ int zmq::socket_base_t::term_endpoint (const char *addr_)
if (it->second.second != NULL) if (it->second.second != NULL)
it->second.second->terminate(false); it->second.second->terminate(false);
term_child (it->second.first); term_child (it->second.first);
} }
endpoints.erase (range.first, range.second); endpoints.erase (range.first, range.second);
return 0; return 0;
} }
@ -1223,20 +1274,20 @@ void zmq::socket_base_t::event_disconnected (std::string &addr_, int fd_)
void zmq::socket_base_t::monitor_event (zmq_event_t event_, const std::string& addr_) void zmq::socket_base_t::monitor_event (zmq_event_t event_, const std::string& addr_)
{ {
if (monitor_socket) { if (monitor_socket) {
const uint16_t eid = (uint16_t)event_.event ; const uint16_t eid = (uint16_t)event_.event ;
const uint32_t value = (uint32_t)event_.value ; const uint32_t value = (uint32_t)event_.value ;
// prepare and send first message frame // prepare and send first message frame
// containing event id and value // containing event id and value
zmq_msg_t msg; zmq_msg_t msg;
zmq_msg_init_size (&msg, sizeof(eid) + sizeof(value)); zmq_msg_init_size (&msg, sizeof(eid) + sizeof(value));
char* data1 = (char*)zmq_msg_data(&msg); char* data1 = (char*)zmq_msg_data(&msg);
memcpy (data1, &eid, sizeof(eid)); memcpy (data1, &eid, sizeof(eid));
memcpy (data1+sizeof(eid), &value, sizeof(value)); memcpy (data1+sizeof(eid), &value, sizeof(value));
zmq_sendmsg (monitor_socket, &msg, ZMQ_SNDMORE); zmq_sendmsg (monitor_socket, &msg, ZMQ_SNDMORE);
// prepare and send second message frame // prepare and send second message frame
// containing the address (endpoint) // containing the address (endpoint)
zmq_msg_init_size (&msg, addr_.size()); zmq_msg_init_size (&msg, addr_.size());
memcpy(zmq_msg_data(&msg), addr_.c_str(), addr_.size()); memcpy(zmq_msg_data(&msg), addr_.c_str(), addr_.size());
zmq_sendmsg (monitor_socket, &msg, 0); zmq_sendmsg (monitor_socket, &msg, 0);
} }
} }

View File

@ -63,6 +63,9 @@ void test_ctx_shutdown()
// Spawn a thread to receive on socket // Spawn a thread to receive on socket
void *receiver_thread = zmq_threadstart (&receiver, socket); void *receiver_thread = zmq_threadstart (&receiver, socket);
// Wait for thread to start up and block
zmq_sleep (1);
// Shutdown context, if we used destroy here we would deadlock. // Shutdown context, if we used destroy here we would deadlock.
rc = zmq_ctx_shutdown (ctx); rc = zmq_ctx_shutdown (ctx);
assert (rc == 0); assert (rc == 0);

View File

@ -0,0 +1,114 @@
/*
Copyright (c) 2007-2013 Contributors as noted in the AUTHORS file
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <stdio.h>
#include "testutil.hpp"
void test_bind_before_connect()
{
void *ctx = zmq_ctx_new ();
assert (ctx);
// Bind first
void *bindSocket = zmq_socket (ctx, ZMQ_PAIR);
assert (bindSocket);
int rc = zmq_bind (bindSocket, "inproc://a");
assert (rc == 0);
// Now connect
void *connectSocket = zmq_socket (ctx, ZMQ_PAIR);
assert (connectSocket);
rc = zmq_connect (connectSocket, "inproc://a");
assert (rc == 0);
// Queue up some data
rc = zmq_send_const (connectSocket, "foobar", 6, 0);
assert (rc == 6);
// Read pending message
zmq_msg_t msg;
rc = zmq_msg_init (&msg);
assert (rc == 0);
rc = zmq_msg_recv (&msg, bindSocket, ZMQ_NOBLOCK);
assert (rc == 6);
void *data = zmq_msg_data (&msg);
assert (memcmp ("foobar", data, 6) == 0);
// Cleanup
rc = zmq_close (connectSocket);
assert (rc == 0);
rc = zmq_close (bindSocket);
assert (rc == 0);
rc = zmq_ctx_term (ctx);
assert (rc == 0);
}
void test_connect_before_bind()
{
void *ctx = zmq_ctx_new ();
assert (ctx);
// Connect first
void *connectSocket = zmq_socket (ctx, ZMQ_PAIR);
assert (connectSocket);
int rc = zmq_connect (connectSocket, "inproc://a");
assert (rc == 0);
// Queue up some data
rc = zmq_send_const (connectSocket, "foobar", 6, 0);
assert (rc == 6);
// Now bind
void *bindSocket = zmq_socket (ctx, ZMQ_PAIR);
assert (bindSocket);
rc = zmq_bind (bindSocket, "inproc://a");
assert (rc == 0);
// Read pending message
zmq_msg_t msg;
rc = zmq_msg_init (&msg);
assert (rc == 0);
rc = zmq_msg_recv (&msg, bindSocket, ZMQ_NOBLOCK);
assert (rc == 6);
void *data = zmq_msg_data (&msg);
assert (memcmp ("foobar", data, 6) == 0);
// Cleanup
rc = zmq_close (connectSocket);
assert (rc == 0);
rc = zmq_close (bindSocket);
assert (rc == 0);
rc = zmq_ctx_term (ctx);
assert (rc == 0);
}
int main (void)
{
setup_test_environment();
test_bind_before_connect();
test_connect_before_bind();
return 0 ;
}

View File

@ -55,7 +55,7 @@ int main (void)
rc = zmq_msg_recv (&msg, sc, 0); rc = zmq_msg_recv (&msg, sc, 0);
assert (rc == 6); assert (rc == 6);
data = zmq_msg_data (&msg); data = zmq_msg_data (&msg);
assert (memcmp ("foobar", data, 3) == 0); assert (memcmp ("foobar", data, 6) == 0);
// Cleanup // Cleanup