mirror of
https://github.com/zeromq/libzmq.git
synced 2025-03-09 15:26:04 +00:00
Merge pull request #656 from ricnewton/inproc_connect_before_bind
Allow inproc sockets to connect before bind
This commit is contained in:
commit
3298e5a206
1
.gitignore
vendored
1
.gitignore
vendored
@ -64,6 +64,7 @@ tests/test_req_request_ids
|
||||
tests/test_req_strict
|
||||
tests/test_fork
|
||||
tests/test_conflate
|
||||
tests/test_inproc_connect_before_bind
|
||||
tests/test_linger
|
||||
tests/test_security_null
|
||||
tests/test_security_null.opp
|
||||
|
@ -609,7 +609,8 @@ set(tests
|
||||
test_spec_router
|
||||
test_sub_forward
|
||||
test_term_endpoint
|
||||
test_timeo)
|
||||
test_timeo
|
||||
test_inproc_connect_before_bind)
|
||||
if(NOT WIN32)
|
||||
list(APPEND tests
|
||||
test_monitor
|
||||
|
@ -55,6 +55,7 @@ namespace zmq
|
||||
term_ack,
|
||||
reap,
|
||||
reaped,
|
||||
inproc_connected,
|
||||
done
|
||||
} type;
|
||||
|
||||
|
72
src/ctx.cpp
72
src/ctx.cpp
@ -392,6 +392,78 @@ zmq::endpoint_t zmq::ctx_t::find_endpoint (const char *addr_)
|
||||
return endpoint;
|
||||
}
|
||||
|
||||
void zmq::ctx_t::pend_connection (const char *addr_, pending_connection_t &pending_connection_)
|
||||
{
|
||||
endpoints_sync.lock ();
|
||||
|
||||
endpoints_t::iterator it = endpoints.find (addr_);
|
||||
if (it == endpoints.end ())
|
||||
{
|
||||
// Still no bind.
|
||||
pending_connection_.endpoint.socket->inc_seqnum ();
|
||||
pending_connections.insert (pending_connections_t::value_type (std::string (addr_), pending_connection_));
|
||||
}
|
||||
else
|
||||
{
|
||||
// Bind has happened in the mean time, connect directly
|
||||
it->second.socket->inc_seqnum();
|
||||
pending_connection_.bind_pipe->set_tid(it->second.socket->get_tid());
|
||||
command_t cmd;
|
||||
cmd.type = command_t::bind;
|
||||
cmd.args.bind.pipe = pending_connection_.bind_pipe;
|
||||
it->second.socket->process_command(cmd);
|
||||
}
|
||||
|
||||
endpoints_sync.unlock ();
|
||||
}
|
||||
|
||||
void zmq::ctx_t::connect_pending (const char *addr_, zmq::socket_base_t *bind_socket_)
|
||||
{
|
||||
endpoints_sync.lock ();
|
||||
|
||||
std::pair<pending_connections_t::iterator, pending_connections_t::iterator> pending = pending_connections.equal_range(addr_);
|
||||
|
||||
for (pending_connections_t::iterator p = pending.first; p != pending.second; ++p)
|
||||
{
|
||||
bind_socket_->inc_seqnum();
|
||||
p->second.bind_pipe->set_tid(bind_socket_->get_tid());
|
||||
command_t cmd;
|
||||
cmd.type = command_t::bind;
|
||||
cmd.args.bind.pipe = p->second.bind_pipe;
|
||||
bind_socket_->process_command(cmd);
|
||||
|
||||
bind_socket_->send_inproc_connected(p->second.endpoint.socket);
|
||||
|
||||
// Send identities
|
||||
options_t& bind_options = endpoints[addr_].options;
|
||||
if (bind_options.recv_identity) {
|
||||
|
||||
msg_t id;
|
||||
int rc = id.init_size (p->second.endpoint.options.identity_size);
|
||||
errno_assert (rc == 0);
|
||||
memcpy (id.data (), p->second.endpoint.options.identity, p->second.endpoint.options.identity_size);
|
||||
id.set_flags (msg_t::identity);
|
||||
bool written = p->second.connect_pipe->write (&id);
|
||||
zmq_assert (written);
|
||||
p->second.connect_pipe->flush ();
|
||||
}
|
||||
if (p->second.endpoint.options.recv_identity) {
|
||||
msg_t id;
|
||||
int rc = id.init_size (bind_options.identity_size);
|
||||
errno_assert (rc == 0);
|
||||
memcpy (id.data (), bind_options.identity, bind_options.identity_size);
|
||||
id.set_flags (msg_t::identity);
|
||||
bool written = p->second.bind_pipe->write (&id);
|
||||
zmq_assert (written);
|
||||
p->second.bind_pipe->flush ();
|
||||
}
|
||||
}
|
||||
|
||||
pending_connections.erase(pending.first, pending.second);
|
||||
|
||||
endpoints_sync.unlock ();
|
||||
}
|
||||
|
||||
// The last used socket ID, or 0 if no socket was used so far. Note that this
|
||||
// is a global variable. Thus, even sockets created in different contexts have
|
||||
// unique IDs.
|
||||
|
14
src/ctx.hpp
14
src/ctx.hpp
@ -40,6 +40,7 @@ namespace zmq
|
||||
class io_thread_t;
|
||||
class socket_base_t;
|
||||
class reaper_t;
|
||||
class pipe_t;
|
||||
|
||||
// Information associated with inproc endpoint. Note that endpoint options
|
||||
// are registered as well so that the peer can access them without a need
|
||||
@ -50,6 +51,13 @@ namespace zmq
|
||||
options_t options;
|
||||
};
|
||||
|
||||
struct pending_connection_t
|
||||
{
|
||||
endpoint_t endpoint;
|
||||
pipe_t* connect_pipe;
|
||||
pipe_t* bind_pipe;
|
||||
};
|
||||
|
||||
// Context object encapsulates all the global state associated with
|
||||
// the library.
|
||||
|
||||
@ -101,6 +109,8 @@ namespace zmq
|
||||
int register_endpoint (const char *addr_, endpoint_t &endpoint_);
|
||||
void unregister_endpoints (zmq::socket_base_t *socket_);
|
||||
endpoint_t find_endpoint (const char *addr_);
|
||||
void pend_connection (const char *addr_, pending_connection_t &pending_connection_);
|
||||
void connect_pending (const char *addr_, zmq::socket_base_t *bind_socket_);
|
||||
|
||||
enum {
|
||||
term_tid = 0,
|
||||
@ -156,6 +166,10 @@ namespace zmq
|
||||
typedef std::map <std::string, endpoint_t> endpoints_t;
|
||||
endpoints_t endpoints;
|
||||
|
||||
// List of inproc connection endpoints pending a bind
|
||||
typedef std::multimap <std::string, pending_connection_t> pending_connections_t;
|
||||
pending_connections_t pending_connections;
|
||||
|
||||
// Synchronisation of access to the list of inproc endpoints.
|
||||
mutex_t endpoints_sync;
|
||||
|
||||
|
@ -49,6 +49,11 @@ uint32_t zmq::object_t::get_tid ()
|
||||
return tid;
|
||||
}
|
||||
|
||||
void zmq::object_t::set_tid(uint32_t id)
|
||||
{
|
||||
tid = id;
|
||||
}
|
||||
|
||||
zmq::ctx_t *zmq::object_t::get_ctx ()
|
||||
{
|
||||
return ctx;
|
||||
@ -122,6 +127,10 @@ void zmq::object_t::process_command (command_t &cmd_)
|
||||
process_reaped ();
|
||||
break;
|
||||
|
||||
case command_t::inproc_connected:
|
||||
process_seqnum ();
|
||||
break;
|
||||
|
||||
case command_t::done:
|
||||
default:
|
||||
zmq_assert (false);
|
||||
@ -143,6 +152,16 @@ zmq::endpoint_t zmq::object_t::find_endpoint (const char *addr_)
|
||||
return ctx->find_endpoint (addr_);
|
||||
}
|
||||
|
||||
void zmq::object_t::pend_connection (const char *addr_, pending_connection_t &pending_connection_)
|
||||
{
|
||||
ctx->pend_connection (addr_, pending_connection_);
|
||||
}
|
||||
|
||||
void zmq::object_t::connect_pending (const char *addr_, zmq::socket_base_t *bind_socket_)
|
||||
{
|
||||
return ctx->connect_pending(addr_, bind_socket_);
|
||||
}
|
||||
|
||||
void zmq::object_t::destroy_socket (socket_base_t *socket_)
|
||||
{
|
||||
ctx->destroy_socket (socket_);
|
||||
@ -297,6 +316,14 @@ void zmq::object_t::send_reaped ()
|
||||
send_command (cmd);
|
||||
}
|
||||
|
||||
void zmq::object_t::send_inproc_connected (zmq::socket_base_t *socket_)
|
||||
{
|
||||
command_t cmd;
|
||||
cmd.destination = socket_;
|
||||
cmd.type = command_t::inproc_connected;
|
||||
send_command (cmd);
|
||||
}
|
||||
|
||||
void zmq::object_t::send_done ()
|
||||
{
|
||||
command_t cmd;
|
||||
|
@ -27,6 +27,7 @@ namespace zmq
|
||||
|
||||
struct i_engine;
|
||||
struct endpoint_t;
|
||||
struct pending_connection_t;
|
||||
struct command_t;
|
||||
class ctx_t;
|
||||
class pipe_t;
|
||||
@ -47,8 +48,10 @@ namespace zmq
|
||||
virtual ~object_t ();
|
||||
|
||||
uint32_t get_tid ();
|
||||
void set_tid(uint32_t id);
|
||||
ctx_t *get_ctx ();
|
||||
void process_command (zmq::command_t &cmd_);
|
||||
void send_inproc_connected (zmq::socket_base_t *socket_);
|
||||
|
||||
protected:
|
||||
|
||||
@ -57,6 +60,9 @@ namespace zmq
|
||||
int register_endpoint (const char *addr_, zmq::endpoint_t &endpoint_);
|
||||
void unregister_endpoints (zmq::socket_base_t *socket_);
|
||||
zmq::endpoint_t find_endpoint (const char *addr_);
|
||||
void pend_connection (const char *addr_, pending_connection_t &pending_connection_);
|
||||
void connect_pending (const char *addr_, zmq::socket_base_t *bind_socket_);
|
||||
|
||||
void destroy_socket (zmq::socket_base_t *socket_);
|
||||
|
||||
// Logs an message.
|
||||
|
@ -342,7 +342,7 @@ int zmq::socket_base_t::bind (const char *addr_)
|
||||
endpoint_t endpoint = {this, options};
|
||||
int rc = register_endpoint (addr_, endpoint);
|
||||
if (rc == 0) {
|
||||
// Save last endpoint URI
|
||||
connect_pending(addr_, this);
|
||||
last_endpoint.assign (addr_);
|
||||
}
|
||||
return rc;
|
||||
@ -435,8 +435,6 @@ int zmq::socket_base_t::connect (const char *addr_)
|
||||
|
||||
// Find the peer endpoint.
|
||||
endpoint_t peer = find_endpoint (addr_);
|
||||
if (!peer.socket)
|
||||
return -1;
|
||||
|
||||
// The total HWM for an inproc connection should be the sum of
|
||||
// the binder's HWM and the connector's HWM.
|
||||
@ -448,7 +446,7 @@ int zmq::socket_base_t::connect (const char *addr_)
|
||||
rcvhwm = options.rcvhwm + peer.options.sndhwm;
|
||||
|
||||
// Create a bi-directional pipe to connect the peers.
|
||||
object_t *parents [2] = {this, peer.socket};
|
||||
object_t *parents [2] = {this, peer.socket == NULL ? this : peer.socket};
|
||||
pipe_t *new_pipes [2] = {NULL, NULL};
|
||||
|
||||
bool conflate = options.conflate &&
|
||||
@ -466,35 +464,45 @@ int zmq::socket_base_t::connect (const char *addr_)
|
||||
// Attach local end of the pipe to this socket object.
|
||||
attach_pipe (new_pipes [0]);
|
||||
|
||||
// If required, send the identity of the local socket to the peer.
|
||||
if (peer.options.recv_identity) {
|
||||
msg_t id;
|
||||
rc = id.init_size (options.identity_size);
|
||||
errno_assert (rc == 0);
|
||||
memcpy (id.data (), options.identity, options.identity_size);
|
||||
id.set_flags (msg_t::identity);
|
||||
bool written = new_pipes [0]->write (&id);
|
||||
zmq_assert (written);
|
||||
new_pipes [0]->flush ();
|
||||
if (!peer.socket)
|
||||
{
|
||||
endpoint_t endpoint = {this, options};
|
||||
pending_connection_t pending_connection = {endpoint, new_pipes [0], new_pipes [1]};
|
||||
pend_connection (addr_, pending_connection);
|
||||
}
|
||||
else
|
||||
{
|
||||
// If required, send the identity of the local socket to the peer.
|
||||
if (peer.options.recv_identity) {
|
||||
|
||||
msg_t id;
|
||||
rc = id.init_size (options.identity_size);
|
||||
errno_assert (rc == 0);
|
||||
memcpy (id.data (), options.identity, options.identity_size);
|
||||
id.set_flags (msg_t::identity);
|
||||
bool written = new_pipes [0]->write (&id);
|
||||
zmq_assert (written);
|
||||
new_pipes [0]->flush ();
|
||||
}
|
||||
|
||||
// If required, send the identity of the peer to the local socket.
|
||||
if (options.recv_identity) {
|
||||
msg_t id;
|
||||
rc = id.init_size (peer.options.identity_size);
|
||||
errno_assert (rc == 0);
|
||||
memcpy (id.data (), peer.options.identity, peer.options.identity_size);
|
||||
id.set_flags (msg_t::identity);
|
||||
bool written = new_pipes [1]->write (&id);
|
||||
zmq_assert (written);
|
||||
new_pipes [1]->flush ();
|
||||
// If required, send the identity of the peer to the local socket.
|
||||
if (options.recv_identity) {
|
||||
msg_t id;
|
||||
rc = id.init_size (peer.options.identity_size);
|
||||
errno_assert (rc == 0);
|
||||
memcpy (id.data (), peer.options.identity, peer.options.identity_size);
|
||||
id.set_flags (msg_t::identity);
|
||||
bool written = new_pipes [1]->write (&id);
|
||||
zmq_assert (written);
|
||||
new_pipes [1]->flush ();
|
||||
}
|
||||
|
||||
// Attach remote end of the pipe to the peer socket. Note that peer's
|
||||
// seqnum was incremented in find_endpoint function. We don't need it
|
||||
// increased here.
|
||||
send_bind (peer.socket, new_pipes [1], false);
|
||||
}
|
||||
|
||||
// Attach remote end of the pipe to the peer socket. Note that peer's
|
||||
// seqnum was incremented in find_endpoint function. We don't need it
|
||||
// increased here.
|
||||
send_bind (peer.socket, new_pipes [1], false);
|
||||
|
||||
// Save last endpoint URI
|
||||
last_endpoint.assign (addr_);
|
||||
|
||||
@ -636,7 +644,7 @@ int zmq::socket_base_t::term_endpoint (const char *addr_)
|
||||
errno = ENOENT;
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
||||
for (inprocs_t::iterator it = range.first; it != range.second; ++it)
|
||||
it->second->terminate(true);
|
||||
inprocs.erase (range.first, range.second);
|
||||
@ -655,7 +663,7 @@ int zmq::socket_base_t::term_endpoint (const char *addr_)
|
||||
if (it->second.second != NULL)
|
||||
it->second.second->terminate(false);
|
||||
term_child (it->second.first);
|
||||
}
|
||||
}
|
||||
endpoints.erase (range.first, range.second);
|
||||
return 0;
|
||||
}
|
||||
@ -1223,20 +1231,20 @@ void zmq::socket_base_t::event_disconnected (std::string &addr_, int fd_)
|
||||
void zmq::socket_base_t::monitor_event (zmq_event_t event_, const std::string& addr_)
|
||||
{
|
||||
if (monitor_socket) {
|
||||
const uint16_t eid = (uint16_t)event_.event ;
|
||||
const uint32_t value = (uint32_t)event_.value ;
|
||||
// prepare and send first message frame
|
||||
// containing event id and value
|
||||
const uint16_t eid = (uint16_t)event_.event;
|
||||
const uint32_t value = (uint32_t)event_.value;
|
||||
// prepare and send first message frame
|
||||
// containing event id and value
|
||||
zmq_msg_t msg;
|
||||
zmq_msg_init_size (&msg, sizeof(eid) + sizeof(value));
|
||||
char* data1 = (char*)zmq_msg_data(&msg);
|
||||
char* data1 = (char*)zmq_msg_data(&msg);
|
||||
memcpy (data1, &eid, sizeof(eid));
|
||||
memcpy (data1+sizeof(eid), &value, sizeof(value));
|
||||
zmq_sendmsg (monitor_socket, &msg, ZMQ_SNDMORE);
|
||||
// prepare and send second message frame
|
||||
// containing the address (endpoint)
|
||||
// prepare and send second message frame
|
||||
// containing the address (endpoint)
|
||||
zmq_msg_init_size (&msg, addr_.size());
|
||||
memcpy(zmq_msg_data(&msg), addr_.c_str(), addr_.size());
|
||||
memcpy(zmq_msg_data(&msg), addr_.c_str(), addr_.size());
|
||||
zmq_sendmsg (monitor_socket, &msg, 0);
|
||||
}
|
||||
}
|
||||
|
@ -36,7 +36,8 @@ noinst_PROGRAMS = test_system \
|
||||
test_spec_pushpull \
|
||||
test_req_request_ids \
|
||||
test_req_strict \
|
||||
test_conflate
|
||||
test_conflate \
|
||||
test_inproc_connect_before_bind
|
||||
|
||||
if !ON_MINGW
|
||||
noinst_PROGRAMS += test_shutdown_stress \
|
||||
@ -80,6 +81,7 @@ test_spec_pushpull_SOURCES = test_spec_pushpull.cpp
|
||||
test_req_request_ids_SOURCES = test_req_request_ids.cpp
|
||||
test_req_strict_SOURCES = test_req_strict.cpp
|
||||
test_conflate_SOURCES = test_conflate.cpp
|
||||
test_inproc_connect_before_bind_SOURCES = test_inproc_connect_before_bind.cpp
|
||||
if !ON_MINGW
|
||||
test_shutdown_stress_SOURCES = test_shutdown_stress.cpp
|
||||
test_pair_ipc_SOURCES = test_pair_ipc.cpp testutil.hpp
|
||||
|
@ -63,6 +63,9 @@ void test_ctx_shutdown()
|
||||
// Spawn a thread to receive on socket
|
||||
void *receiver_thread = zmq_threadstart (&receiver, socket);
|
||||
|
||||
// Wait for thread to start up and block
|
||||
zmq_sleep (1);
|
||||
|
||||
// Shutdown context, if we used destroy here we would deadlock.
|
||||
rc = zmq_ctx_shutdown (ctx);
|
||||
assert (rc == 0);
|
||||
|
341
tests/test_inproc_connect_before_bind.cpp
Normal file
341
tests/test_inproc_connect_before_bind.cpp
Normal file
@ -0,0 +1,341 @@
|
||||
/*
|
||||
Copyright (c) 2007-2013 Contributors as noted in the AUTHORS file
|
||||
|
||||
This file is part of 0MQ.
|
||||
|
||||
0MQ is free software; you can redistribute it and/or modify it under
|
||||
the terms of the GNU Lesser General Public License as published by
|
||||
the Free Software Foundation; either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
0MQ is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU Lesser General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU Lesser General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "../include/zmq_utils.h"
|
||||
#include <stdio.h>
|
||||
#include "testutil.hpp"
|
||||
|
||||
static void pusher (void *ctx)
|
||||
{
|
||||
// Connect first
|
||||
void *connectSocket = zmq_socket (ctx, ZMQ_PAIR);
|
||||
assert (connectSocket);
|
||||
int rc = zmq_connect (connectSocket, "inproc://a");
|
||||
assert (rc == 0);
|
||||
|
||||
// Queue up some data
|
||||
rc = zmq_send_const (connectSocket, "foobar", 6, 0);
|
||||
assert (rc == 6);
|
||||
|
||||
// Cleanup
|
||||
rc = zmq_close (connectSocket);
|
||||
assert (rc == 0);
|
||||
}
|
||||
|
||||
void test_bind_before_connect()
|
||||
{
|
||||
void *ctx = zmq_ctx_new ();
|
||||
assert (ctx);
|
||||
|
||||
// Bind first
|
||||
void *bindSocket = zmq_socket (ctx, ZMQ_PAIR);
|
||||
assert (bindSocket);
|
||||
int rc = zmq_bind (bindSocket, "inproc://a");
|
||||
assert (rc == 0);
|
||||
|
||||
// Now connect
|
||||
void *connectSocket = zmq_socket (ctx, ZMQ_PAIR);
|
||||
assert (connectSocket);
|
||||
rc = zmq_connect (connectSocket, "inproc://a");
|
||||
assert (rc == 0);
|
||||
|
||||
// Queue up some data
|
||||
rc = zmq_send_const (connectSocket, "foobar", 6, 0);
|
||||
assert (rc == 6);
|
||||
|
||||
// Read pending message
|
||||
zmq_msg_t msg;
|
||||
rc = zmq_msg_init (&msg);
|
||||
assert (rc == 0);
|
||||
rc = zmq_msg_recv (&msg, bindSocket, 0);
|
||||
assert (rc == 6);
|
||||
void *data = zmq_msg_data (&msg);
|
||||
assert (memcmp ("foobar", data, 6) == 0);
|
||||
|
||||
// Cleanup
|
||||
rc = zmq_close (connectSocket);
|
||||
assert (rc == 0);
|
||||
|
||||
rc = zmq_close (bindSocket);
|
||||
assert (rc == 0);
|
||||
|
||||
rc = zmq_ctx_term (ctx);
|
||||
assert (rc == 0);
|
||||
}
|
||||
|
||||
void test_connect_before_bind()
|
||||
{
|
||||
void *ctx = zmq_ctx_new ();
|
||||
assert (ctx);
|
||||
|
||||
// Connect first
|
||||
void *connectSocket = zmq_socket (ctx, ZMQ_PAIR);
|
||||
assert (connectSocket);
|
||||
int rc = zmq_connect (connectSocket, "inproc://a");
|
||||
assert (rc == 0);
|
||||
|
||||
// Queue up some data
|
||||
rc = zmq_send_const (connectSocket, "foobar", 6, 0);
|
||||
assert (rc == 6);
|
||||
|
||||
// Now bind
|
||||
void *bindSocket = zmq_socket (ctx, ZMQ_PAIR);
|
||||
assert (bindSocket);
|
||||
rc = zmq_bind (bindSocket, "inproc://a");
|
||||
assert (rc == 0);
|
||||
|
||||
// Read pending message
|
||||
zmq_msg_t msg;
|
||||
rc = zmq_msg_init (&msg);
|
||||
assert (rc == 0);
|
||||
rc = zmq_msg_recv (&msg, bindSocket, 0);
|
||||
assert (rc == 6);
|
||||
void *data = zmq_msg_data (&msg);
|
||||
assert (memcmp ("foobar", data, 6) == 0);
|
||||
|
||||
// Cleanup
|
||||
rc = zmq_close (connectSocket);
|
||||
assert (rc == 0);
|
||||
|
||||
rc = zmq_close (bindSocket);
|
||||
assert (rc == 0);
|
||||
|
||||
rc = zmq_ctx_term (ctx);
|
||||
assert (rc == 0);
|
||||
}
|
||||
|
||||
void test_connect_before_bind_pub_sub()
|
||||
{
|
||||
void *ctx = zmq_ctx_new ();
|
||||
assert (ctx);
|
||||
|
||||
// Connect first
|
||||
void *connectSocket = zmq_socket (ctx, ZMQ_PUB);
|
||||
assert (connectSocket);
|
||||
int rc = zmq_connect (connectSocket, "inproc://a");
|
||||
assert (rc == 0);
|
||||
|
||||
// Queue up some data, this will be dropped
|
||||
rc = zmq_send_const (connectSocket, "before", 6, 0);
|
||||
assert (rc == 6);
|
||||
|
||||
// Now bind
|
||||
void *bindSocket = zmq_socket (ctx, ZMQ_SUB);
|
||||
assert (bindSocket);
|
||||
rc = zmq_setsockopt (bindSocket, ZMQ_SUBSCRIBE, "", 0);
|
||||
assert (rc == 0);
|
||||
rc = zmq_bind (bindSocket, "inproc://a");
|
||||
assert (rc == 0);
|
||||
|
||||
// Wait for pub-sub connection to happen
|
||||
zmq_sleep (1);
|
||||
|
||||
// Queue up some data, this not will be dropped
|
||||
rc = zmq_send_const (connectSocket, "after", 6, 0);
|
||||
assert (rc == 6);
|
||||
|
||||
// Read pending message
|
||||
zmq_msg_t msg;
|
||||
rc = zmq_msg_init (&msg);
|
||||
assert (rc == 0);
|
||||
rc = zmq_msg_recv (&msg, bindSocket, 0);
|
||||
assert (rc == 6);
|
||||
void *data = zmq_msg_data (&msg);
|
||||
assert (memcmp ("after", data, 5) == 0);
|
||||
|
||||
// Cleanup
|
||||
rc = zmq_close (connectSocket);
|
||||
assert (rc == 0);
|
||||
|
||||
rc = zmq_close (bindSocket);
|
||||
assert (rc == 0);
|
||||
|
||||
rc = zmq_ctx_term (ctx);
|
||||
assert (rc == 0);
|
||||
}
|
||||
|
||||
void test_multiple_connects()
|
||||
{
|
||||
const unsigned int no_of_connects = 10;
|
||||
void *ctx = zmq_ctx_new ();
|
||||
assert (ctx);
|
||||
|
||||
int rc;
|
||||
void *connectSocket[no_of_connects];
|
||||
|
||||
// Connect first
|
||||
for (unsigned int i = 0; i < no_of_connects; ++i)
|
||||
{
|
||||
connectSocket [i] = zmq_socket (ctx, ZMQ_PUSH);
|
||||
assert (connectSocket [i]);
|
||||
rc = zmq_connect (connectSocket [i], "inproc://a");
|
||||
assert (rc == 0);
|
||||
|
||||
// Queue up some data
|
||||
rc = zmq_send_const (connectSocket [i], "foobar", 6, 0);
|
||||
assert (rc == 6);
|
||||
}
|
||||
|
||||
// Now bind
|
||||
void *bindSocket = zmq_socket (ctx, ZMQ_PULL);
|
||||
assert (bindSocket);
|
||||
rc = zmq_bind (bindSocket, "inproc://a");
|
||||
assert (rc == 0);
|
||||
|
||||
for (unsigned int i = 0; i < no_of_connects; ++i)
|
||||
{
|
||||
// Read pending message
|
||||
zmq_msg_t msg;
|
||||
rc = zmq_msg_init (&msg);
|
||||
assert (rc == 0);
|
||||
rc = zmq_msg_recv (&msg, bindSocket, 0);
|
||||
assert (rc == 6);
|
||||
void *data = zmq_msg_data (&msg);
|
||||
assert (memcmp ("foobar", data, 6) == 0);
|
||||
}
|
||||
|
||||
// Cleanup
|
||||
for (unsigned int i = 0; i < no_of_connects; ++i)
|
||||
{
|
||||
rc = zmq_close (connectSocket [i]);
|
||||
assert (rc == 0);
|
||||
}
|
||||
|
||||
rc = zmq_close (bindSocket);
|
||||
assert (rc == 0);
|
||||
|
||||
rc = zmq_ctx_term (ctx);
|
||||
assert (rc == 0);
|
||||
}
|
||||
|
||||
void test_multiple_threads()
|
||||
{
|
||||
const unsigned int no_of_threads = 30;
|
||||
void *ctx = zmq_ctx_new ();
|
||||
assert (ctx);
|
||||
|
||||
int rc;
|
||||
void *threads [no_of_threads];
|
||||
|
||||
// Connect first
|
||||
for (unsigned int i = 0; i < no_of_threads; ++i)
|
||||
{
|
||||
threads [i] = zmq_threadstart (&pusher, ctx);
|
||||
}
|
||||
|
||||
// Now bind
|
||||
void *bindSocket = zmq_socket (ctx, ZMQ_PULL);
|
||||
assert (bindSocket);
|
||||
rc = zmq_bind (bindSocket, "inproc://a");
|
||||
assert (rc == 0);
|
||||
|
||||
for (unsigned int i = 0; i < no_of_threads; ++i)
|
||||
{
|
||||
// Read pending message
|
||||
zmq_msg_t msg;
|
||||
rc = zmq_msg_init (&msg);
|
||||
assert (rc == 0);
|
||||
rc = zmq_msg_recv (&msg, bindSocket, 0);
|
||||
assert (rc == 6);
|
||||
void *data = zmq_msg_data (&msg);
|
||||
assert (memcmp ("foobar", data, 6) == 0);
|
||||
}
|
||||
|
||||
// Cleanup
|
||||
for (unsigned int i = 0; i < no_of_threads; ++i)
|
||||
{
|
||||
zmq_threadclose (threads [i]);
|
||||
}
|
||||
|
||||
rc = zmq_close (bindSocket);
|
||||
assert (rc == 0);
|
||||
|
||||
rc = zmq_ctx_term (ctx);
|
||||
assert (rc == 0);
|
||||
}
|
||||
|
||||
void test_identity()
|
||||
{
|
||||
// Create the infrastructure
|
||||
void *ctx = zmq_ctx_new ();
|
||||
assert (ctx);
|
||||
|
||||
void *sc = zmq_socket (ctx, ZMQ_DEALER);
|
||||
assert (sc);
|
||||
|
||||
int rc = zmq_connect (sc, "inproc://a");
|
||||
assert (rc == 0);
|
||||
|
||||
void *sb = zmq_socket (ctx, ZMQ_ROUTER);
|
||||
assert (sb);
|
||||
|
||||
rc = zmq_bind (sb, "inproc://a");
|
||||
assert (rc == 0);
|
||||
|
||||
// Send 2-part message.
|
||||
rc = zmq_send (sc, "A", 1, ZMQ_SNDMORE);
|
||||
assert (rc == 1);
|
||||
rc = zmq_send (sc, "B", 1, 0);
|
||||
assert (rc == 1);
|
||||
|
||||
// Identity comes first.
|
||||
zmq_msg_t msg;
|
||||
rc = zmq_msg_init (&msg);
|
||||
assert (rc == 0);
|
||||
rc = zmq_msg_recv (&msg, sb, 0);
|
||||
assert (rc >= 0);
|
||||
int more = zmq_msg_more (&msg);
|
||||
assert (more == 1);
|
||||
|
||||
// Then the first part of the message body.
|
||||
rc = zmq_msg_recv (&msg, sb, 0);
|
||||
assert (rc == 1);
|
||||
more = zmq_msg_more (&msg);
|
||||
assert (more == 1);
|
||||
|
||||
// And finally, the second part of the message body.
|
||||
rc = zmq_msg_recv (&msg, sb, 0);
|
||||
assert (rc == 1);
|
||||
more = zmq_msg_more (&msg);
|
||||
assert (more == 0);
|
||||
|
||||
// Deallocate the infrastructure.
|
||||
rc = zmq_close (sc);
|
||||
assert (rc == 0);
|
||||
|
||||
rc = zmq_close (sb);
|
||||
assert (rc == 0);
|
||||
|
||||
rc = zmq_ctx_term (ctx);
|
||||
assert (rc == 0);
|
||||
}
|
||||
|
||||
int main (void)
|
||||
{
|
||||
setup_test_environment();
|
||||
|
||||
test_bind_before_connect ();
|
||||
test_connect_before_bind ();
|
||||
test_connect_before_bind_pub_sub ();
|
||||
test_multiple_connects ();
|
||||
test_multiple_threads ();
|
||||
test_identity ();
|
||||
|
||||
return 0;
|
||||
}
|
@ -55,7 +55,7 @@ int main (void)
|
||||
rc = zmq_msg_recv (&msg, sc, 0);
|
||||
assert (rc == 6);
|
||||
data = zmq_msg_data (&msg);
|
||||
assert (memcmp ("foobar", data, 3) == 0);
|
||||
assert (memcmp ("foobar", data, 6) == 0);
|
||||
|
||||
// Cleanup
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user