mirror of
https://github.com/zeromq/libzmq.git
synced 2025-01-15 18:38:00 +08:00
Merge pull request #81 from olafmandel/issue_1015
Backport of test and fix for zeromq/libzmq#1015
This commit is contained in:
commit
9766baa9d3
16
src/ctx.cpp
16
src/ctx.cpp
@ -445,6 +445,14 @@ void zmq::ctx_t::connect_inproc_sockets(zmq::socket_base_t *bind_socket_, option
|
||||
bind_socket_->inc_seqnum();
|
||||
pending_connection_.bind_pipe->set_tid(bind_socket_->get_tid());
|
||||
|
||||
if (!bind_options.recv_identity) {
|
||||
msg_t msg;
|
||||
const bool ok = pending_connection_.bind_pipe->read (&msg);
|
||||
zmq_assert (ok);
|
||||
const int rc = msg.close ();
|
||||
errno_assert (rc == 0);
|
||||
}
|
||||
|
||||
if (side_ == bind_side)
|
||||
{
|
||||
command_t cmd;
|
||||
@ -476,14 +484,6 @@ void zmq::ctx_t::connect_inproc_sockets(zmq::socket_base_t *bind_socket_, option
|
||||
pending_connection_.connect_pipe->set_hwms(hwms [1], hwms [0]);
|
||||
pending_connection_.bind_pipe->set_hwms(hwms [0], hwms [1]);
|
||||
|
||||
if (!bind_options.recv_identity) {
|
||||
msg_t msg;
|
||||
const bool ok = pending_connection_.bind_pipe->read (&msg);
|
||||
zmq_assert (ok);
|
||||
const int rc = msg.close ();
|
||||
errno_assert (rc == 0);
|
||||
}
|
||||
|
||||
if (pending_connection_.endpoint.options.recv_identity) {
|
||||
msg_t id;
|
||||
int rc = id.init_size (bind_options.identity_size);
|
||||
|
@ -36,6 +36,40 @@ static void pusher (void *ctx)
|
||||
assert (rc == 0);
|
||||
}
|
||||
|
||||
static void simult_conn (void *payload)
|
||||
{
|
||||
// Pull out arguments - context followed by endpoint string
|
||||
void* ctx = (void*)((void**)payload)[0];
|
||||
char* endpt = (char*)((void**)payload)[1];
|
||||
|
||||
// Connect
|
||||
void *connectSocket = zmq_socket (ctx, ZMQ_SUB);
|
||||
assert (connectSocket);
|
||||
int rc = zmq_connect (connectSocket, endpt);
|
||||
assert (rc == 0);
|
||||
|
||||
// Cleanup
|
||||
rc = zmq_close (connectSocket);
|
||||
assert (rc == 0);
|
||||
}
|
||||
|
||||
static void simult_bind (void *payload)
|
||||
{
|
||||
// Pull out arguments - context followed by endpoint string
|
||||
void* ctx = (void*)((void**)payload)[0];
|
||||
char* endpt = (char*)((void**)payload)[1];
|
||||
|
||||
// Bind
|
||||
void *bindSocket = zmq_socket (ctx, ZMQ_PUB);
|
||||
assert (bindSocket);
|
||||
int rc = zmq_bind (bindSocket, endpt);
|
||||
assert (rc == 0);
|
||||
|
||||
// Cleanup
|
||||
rc = zmq_close (bindSocket);
|
||||
assert (rc == 0);
|
||||
}
|
||||
|
||||
void test_bind_before_connect()
|
||||
{
|
||||
void *ctx = zmq_ctx_new ();
|
||||
@ -268,6 +302,42 @@ void test_multiple_threads()
|
||||
assert (rc == 0);
|
||||
}
|
||||
|
||||
void test_simultaneous_connect_bind_threads ()
|
||||
{
|
||||
const unsigned int no_of_times = 50;
|
||||
void *ctx = zmq_ctx_new ();
|
||||
assert (ctx);
|
||||
|
||||
void *threads[no_of_times*2];
|
||||
void *thr_args[no_of_times][2];
|
||||
char endpts[no_of_times][20];
|
||||
|
||||
// Set up thread arguments: context followed by endpoint string
|
||||
for (unsigned int i = 0; i < no_of_times; ++i)
|
||||
{
|
||||
thr_args[i][0] = (void*) ctx;
|
||||
thr_args[i][1] = (void*) endpts[i];
|
||||
sprintf (endpts[i], "inproc://foo_%d", i);
|
||||
}
|
||||
|
||||
// Spawn all threads as simultaneously as possible
|
||||
for (unsigned int i = 0; i < no_of_times; ++i)
|
||||
{
|
||||
threads[i*2+0] = zmq_threadstart (&simult_conn, (void*)thr_args[i]);
|
||||
threads[i*2+1] = zmq_threadstart (&simult_bind, (void*)thr_args[i]);
|
||||
}
|
||||
|
||||
// Close all threads
|
||||
for (unsigned int i = 0; i < no_of_times; ++i)
|
||||
{
|
||||
zmq_threadclose (threads[i*2+0]);
|
||||
zmq_threadclose (threads[i*2+1]);
|
||||
}
|
||||
|
||||
int rc = zmq_ctx_term (ctx);
|
||||
assert (rc == 0);
|
||||
}
|
||||
|
||||
void test_identity()
|
||||
{
|
||||
// Create the infrastructure
|
||||
@ -350,6 +420,7 @@ int main (void)
|
||||
test_connect_before_bind_pub_sub ();
|
||||
test_multiple_connects ();
|
||||
test_multiple_threads ();
|
||||
test_simultaneous_connect_bind_threads ();
|
||||
test_identity ();
|
||||
test_connect_only ();
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user