From 7841b0dd0f94e050bce753c394cd034ab3184ee9 Mon Sep 17 00:00:00 2001 From: Richard Newton Date: Sat, 14 Sep 2013 17:27:18 +0100 Subject: [PATCH] Support high water mark on inproc socket connect before bind. --- src/ctx.cpp | 99 ++++++++++++++++++++------------- src/ctx.hpp | 2 + src/object.hpp | 3 +- src/pipe.cpp | 5 ++ src/pipe.hpp | 3 + tests/test_hwm.cpp | 136 +++++++++++++++++++++++++++++++++------------ 6 files changed, 173 insertions(+), 75 deletions(-) diff --git a/src/ctx.cpp b/src/ctx.cpp index 74bbef05..a05b2b15 100644 --- a/src/ctx.cpp +++ b/src/ctx.cpp @@ -406,12 +406,7 @@ void zmq::ctx_t::pend_connection (const char *addr_, pending_connection_t &pendi else { // Bind has happened in the mean time, connect directly - it->second.socket->inc_seqnum(); - pending_connection_.bind_pipe->set_tid(it->second.socket->get_tid()); - command_t cmd; - cmd.type = command_t::bind; - cmd.args.bind.pipe = pending_connection_.bind_pipe; - it->second.socket->process_command(cmd); + connect_inproc_sockets(it->second.socket, it->second.options, pending_connection_, connect_side); } endpoints_sync.unlock (); @@ -425,38 +420,7 @@ void zmq::ctx_t::connect_pending (const char *addr_, zmq::socket_base_t *bind_so for (pending_connections_t::iterator p = pending.first; p != pending.second; ++p) { - bind_socket_->inc_seqnum(); - p->second.bind_pipe->set_tid(bind_socket_->get_tid()); - command_t cmd; - cmd.type = command_t::bind; - cmd.args.bind.pipe = p->second.bind_pipe; - bind_socket_->process_command(cmd); - - bind_socket_->send_inproc_connected(p->second.endpoint.socket); - - // Send identities - options_t& bind_options = endpoints[addr_].options; - if (bind_options.recv_identity) { - - msg_t id; - int rc = id.init_size (p->second.endpoint.options.identity_size); - errno_assert (rc == 0); - memcpy (id.data (), p->second.endpoint.options.identity, p->second.endpoint.options.identity_size); - id.set_flags (msg_t::identity); - bool written = p->second.connect_pipe->write (&id); - zmq_assert (written); - p->second.connect_pipe->flush (); - } - if (p->second.endpoint.options.recv_identity) { - msg_t id; - int rc = id.init_size (bind_options.identity_size); - errno_assert (rc == 0); - memcpy (id.data (), bind_options.identity, bind_options.identity_size); - id.set_flags (msg_t::identity); - bool written = p->second.bind_pipe->write (&id); - zmq_assert (written); - p->second.bind_pipe->flush (); - } + connect_inproc_sockets(bind_socket_, endpoints[addr_].options, p->second, bind_side); } pending_connections.erase(pending.first, pending.second); @@ -464,6 +428,65 @@ void zmq::ctx_t::connect_pending (const char *addr_, zmq::socket_base_t *bind_so endpoints_sync.unlock (); } +void zmq::ctx_t::connect_inproc_sockets(zmq::socket_base_t *bind_socket_, options_t& bind_options, pending_connection_t &pending_connection_, side side_) +{ + bind_socket_->inc_seqnum(); + pending_connection_.bind_pipe->set_tid(bind_socket_->get_tid()); + + if (side_ == bind_side) + { + command_t cmd; + cmd.type = command_t::bind; + cmd.args.bind.pipe = pending_connection_.bind_pipe; + bind_socket_->process_command(cmd); + bind_socket_->send_inproc_connected(pending_connection_.endpoint.socket); + } + else + { + pending_connection_.connect_pipe->send_bind(bind_socket_, pending_connection_.bind_pipe, false); + } + + int sndhwm = 0; + if (pending_connection_.endpoint.options.sndhwm != 0 && bind_options.rcvhwm != 0) + sndhwm = pending_connection_.endpoint.options.sndhwm + bind_options.rcvhwm; + int rcvhwm = 0; + if (pending_connection_.endpoint.options.rcvhwm != 0 && bind_options.sndhwm != 0) + rcvhwm = pending_connection_.endpoint.options.rcvhwm + bind_options.sndhwm; + + bool conflate = pending_connection_.endpoint.options.conflate && + (pending_connection_.endpoint.options.type == ZMQ_DEALER || + pending_connection_.endpoint.options.type == ZMQ_PULL || + pending_connection_.endpoint.options.type == ZMQ_PUSH || + pending_connection_.endpoint.options.type == ZMQ_PUB || + pending_connection_.endpoint.options.type == ZMQ_SUB); + + int hwms [2] = {conflate? -1 : sndhwm, conflate? -1 : rcvhwm}; + pending_connection_.connect_pipe->set_hwms(hwms [1], hwms [0]); + pending_connection_.bind_pipe->set_hwms(hwms [0], hwms [1]); + + if (bind_options.recv_identity) { + + msg_t id; + int rc = id.init_size (pending_connection_.endpoint.options.identity_size); + errno_assert (rc == 0); + memcpy (id.data (), pending_connection_.endpoint.options.identity, pending_connection_.endpoint.options.identity_size); + id.set_flags (msg_t::identity); + bool written = pending_connection_.connect_pipe->write (&id); + zmq_assert (written); + pending_connection_.connect_pipe->flush (); + } + if (pending_connection_.endpoint.options.recv_identity) { + msg_t id; + int rc = id.init_size (bind_options.identity_size); + errno_assert (rc == 0); + memcpy (id.data (), bind_options.identity, bind_options.identity_size); + id.set_flags (msg_t::identity); + bool written = pending_connection_.bind_pipe->write (&id); + zmq_assert (written); + pending_connection_.bind_pipe->flush (); + } +} + // The last used socket ID, or 0 if no socket was used so far. Note that this // is a global variable. Thus, even sockets created in different contexts have // unique IDs. diff --git a/src/ctx.hpp b/src/ctx.hpp index 6fd9e369..9b6d8631 100644 --- a/src/ctx.hpp +++ b/src/ctx.hpp @@ -195,6 +195,8 @@ namespace zmq // the process that created this context. Used to detect forking. pid_t pid; #endif + enum side { connect_side, bind_side }; + void connect_inproc_sockets(zmq::socket_base_t *bind_socket_, options_t& bind_options, pending_connection_t &pending_connection_, side side_); }; } diff --git a/src/object.hpp b/src/object.hpp index 6bc52e4a..7655c38f 100644 --- a/src/object.hpp +++ b/src/object.hpp @@ -52,6 +52,7 @@ namespace zmq ctx_t *get_ctx (); void process_command (zmq::command_t &cmd_); void send_inproc_connected (zmq::socket_base_t *socket_); + void send_bind (zmq::own_t *destination_, zmq::pipe_t *pipe_, bool inc_seqnum_ = true); protected: @@ -80,8 +81,6 @@ namespace zmq zmq::own_t *object_); void send_attach (zmq::session_base_t *destination_, zmq::i_engine *engine_, bool inc_seqnum_ = true); - void send_bind (zmq::own_t *destination_, zmq::pipe_t *pipe_, - bool inc_seqnum_ = true); void send_activate_read (zmq::pipe_t *destination_); void send_activate_write (zmq::pipe_t *destination_, uint64_t msgs_read_); diff --git a/src/pipe.cpp b/src/pipe.cpp index fc21b3b8..8db6d146 100644 --- a/src/pipe.cpp +++ b/src/pipe.cpp @@ -478,3 +478,8 @@ void zmq::pipe_t::hiccup () send_hiccup (peer, (void*) inpipe); } +void zmq::pipe_t::set_hwms (int inhwm_, int outhwm_) +{ + lwm = compute_lwm (inhwm_); + hwm = outhwm_; +} diff --git a/src/pipe.hpp b/src/pipe.hpp index 8ee4e59c..dfb35d33 100644 --- a/src/pipe.hpp +++ b/src/pipe.hpp @@ -112,6 +112,9 @@ namespace zmq // before actual shutdown. void terminate (bool delay_); + // set the high water marks. + void set_hwms (int inhwm_, int outhwm_); + private: // Type of the underlying lock-free pipe. diff --git a/tests/test_hwm.cpp b/tests/test_hwm.cpp index 7f26176a..c6709a56 100644 --- a/tests/test_hwm.cpp +++ b/tests/test_hwm.cpp @@ -22,61 +22,127 @@ #include #include "testutil.hpp" -int main (void) +const int MAX_SENDS = 10000; + +enum TestType { BIND_FIRST, CONNECT_FIRST }; + +int count_msg (int send_hwm, int recv_hwm, TestType testType) { - setup_test_environment(); void *ctx = zmq_ctx_new (); assert (ctx); + int rc; - // Create pair of socket, each with high watermark of 2. Thus the total - // buffer space should be 4 messages. - void *sb = zmq_socket (ctx, ZMQ_PULL); - assert (sb); - int hwm = 2; - int rc = zmq_setsockopt (sb, ZMQ_RCVHWM, &hwm, sizeof (hwm)); - assert (rc == 0); - rc = zmq_bind (sb, "inproc://a"); - assert (rc == 0); - - void *sc = zmq_socket (ctx, ZMQ_PUSH); - assert (sc); - rc = zmq_setsockopt (sc, ZMQ_SNDHWM, &hwm, sizeof (hwm)); - assert (rc == 0); - rc = zmq_connect (sc, "inproc://a"); - assert (rc == 0); - - // Try to send 10 messages. Only 4 should succeed. - for (int i = 0; i < 10; i++) + void *bind_socket; + void *connect_socket; + if (testType == BIND_FIRST) { - int rc = zmq_send (sc, NULL, 0, ZMQ_DONTWAIT); - if (i < 4) - assert (rc == 0); - else - assert (rc < 0 && errno == EAGAIN); - } + // Set up bind socket + bind_socket = zmq_socket (ctx, ZMQ_PULL); + assert (bind_socket); + rc = zmq_setsockopt (bind_socket, ZMQ_RCVHWM, &recv_hwm, sizeof (recv_hwm)); + assert (rc == 0); + rc = zmq_bind (bind_socket, "inproc://a"); + assert (rc == 0); - // There should be now 4 messages pending, consume them. - for (int i = 0; i != 4; i++) { - rc = zmq_recv (sb, NULL, 0, 0); + // Set up connect socket + connect_socket = zmq_socket (ctx, ZMQ_PUSH); + assert (connect_socket); + rc = zmq_setsockopt (connect_socket, ZMQ_SNDHWM, &send_hwm, sizeof (send_hwm)); + assert (rc == 0); + rc = zmq_connect (connect_socket, "inproc://a"); + assert (rc == 0); + } + else + { + // Set up connect socket + connect_socket = zmq_socket (ctx, ZMQ_PUSH); + assert (connect_socket); + rc = zmq_setsockopt (connect_socket, ZMQ_SNDHWM, &send_hwm, sizeof (send_hwm)); + assert (rc == 0); + rc = zmq_connect (connect_socket, "inproc://a"); + assert (rc == 0); + + // Set up bind socket + bind_socket = zmq_socket (ctx, ZMQ_PULL); + assert (bind_socket); + rc = zmq_setsockopt (bind_socket, ZMQ_RCVHWM, &recv_hwm, sizeof (recv_hwm)); + assert (rc == 0); + rc = zmq_bind (bind_socket, "inproc://a"); assert (rc == 0); } + // Send until we block + int send_count = 0; + while (send_count < MAX_SENDS && zmq_send (connect_socket, NULL, 0, ZMQ_DONTWAIT) == 0) + ++send_count; + + // Now receive all sent messages + int recv_count = 0; + while (zmq_recv (bind_socket, NULL, 0, ZMQ_DONTWAIT) == 0) + ++recv_count; + + assert (send_count == recv_count); + // Now it should be possible to send one more. - rc = zmq_send (sc, NULL, 0, 0); + rc = zmq_send (connect_socket, NULL, 0, 0); assert (rc == 0); // Consume the remaining message. - rc = zmq_recv (sb, NULL, 0, 0); + rc = zmq_recv (bind_socket, NULL, 0, 0); assert (rc == 0); - rc = zmq_close (sc); + // Clean up + rc = zmq_close (connect_socket); assert (rc == 0); - rc = zmq_close (sb); + rc = zmq_close (bind_socket); assert (rc == 0); rc = zmq_ctx_term (ctx); assert (rc == 0); - return 0; + return send_count; +} + +int test_inproc_bind_first (int send_hwm, int recv_hwm) +{ + return count_msg(send_hwm, recv_hwm, BIND_FIRST); +} + +int test_inproc_connect_first (int send_hwm, int recv_hwm) +{ + return count_msg(send_hwm, recv_hwm, CONNECT_FIRST); +} + +int main (void) +{ + setup_test_environment(); + + int count; + + // Infinite send and receive buffer + count = test_inproc_bind_first (0, 0); + assert (count == MAX_SENDS); + count = test_inproc_connect_first (0, 0); + assert (count == MAX_SENDS); + + // Infinite send buffer + count = test_inproc_bind_first (1, 0); + assert (count == MAX_SENDS); + count = test_inproc_connect_first (1, 0); + assert (count == MAX_SENDS); + + // Infinite receive buffer + count = test_inproc_bind_first (0, 1); + assert (count == MAX_SENDS); + count = test_inproc_connect_first (0, 1); + assert (count == MAX_SENDS); + + // Send and recv buffers 1, so total that can be queued is 2 + count = test_inproc_bind_first (1, 1); + assert (count == 2); + count = test_inproc_connect_first (1, 1); + assert (count == 2); + + return 0; }