diff --git a/src/ctx.cpp b/src/ctx.cpp index d61cc2f3..58b95600 100644 --- a/src/ctx.cpp +++ b/src/ctx.cpp @@ -529,15 +529,6 @@ void zmq::ctx_t::connect_inproc_sockets (zmq::socket_base_t *bind_socket_, errno_assert (rc == 0); } - - int sndhwm = 0; - if (pending_connection_.endpoint.options.sndhwm != 0 && bind_options.rcvhwm != 0) - sndhwm = pending_connection_.endpoint.options.sndhwm + bind_options.rcvhwm; - - int rcvhwm = 0; - if (pending_connection_.endpoint.options.rcvhwm != 0 && bind_options.sndhwm != 0) - rcvhwm = pending_connection_.endpoint.options.rcvhwm + bind_options.sndhwm; - bool conflate = pending_connection_.endpoint.options.conflate && (pending_connection_.endpoint.options.type == ZMQ_DEALER || pending_connection_.endpoint.options.type == ZMQ_PULL || @@ -545,9 +536,17 @@ void zmq::ctx_t::connect_inproc_sockets (zmq::socket_base_t *bind_socket_, pending_connection_.endpoint.options.type == ZMQ_PUB || pending_connection_.endpoint.options.type == ZMQ_SUB); - int hwms [2] = {conflate? -1 : sndhwm, conflate? -1 : rcvhwm}; - pending_connection_.connect_pipe->set_hwms(hwms [1], hwms [0]); - pending_connection_.bind_pipe->set_hwms(hwms [0], hwms [1]); + if (!conflate) { + pending_connection_.connect_pipe->set_hwms_boost(bind_options.sndhwm, bind_options.rcvhwm); + pending_connection_.bind_pipe->set_hwms_boost(pending_connection_.endpoint.options.sndhwm, pending_connection_.endpoint.options.rcvhwm); + + pending_connection_.connect_pipe->set_hwms(pending_connection_.endpoint.options.rcvhwm, pending_connection_.endpoint.options.sndhwm); + pending_connection_.bind_pipe->set_hwms(bind_options.rcvhwm, bind_options.sndhwm); + } + else { + pending_connection_.connect_pipe->set_hwms(-1, -1); + pending_connection_.bind_pipe->set_hwms(-1, -1); + } if (side_ == bind_side) { command_t cmd; diff --git a/src/options.cpp b/src/options.cpp index 9cc435bd..40116996 100644 --- a/src/options.cpp +++ b/src/options.cpp @@ -888,18 +888,5 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_) bool zmq::options_t::is_valid (int option_) const { - bool valid = true; - - if (connected) { - switch (option_) { - case ZMQ_SNDHWM: - case ZMQ_RCVHWM: - valid = false; - break; - default: - break; - } - } - - return valid; + return true; } diff --git a/src/pipe.cpp b/src/pipe.cpp index 15742aac..60137f9a 100644 --- a/src/pipe.cpp +++ b/src/pipe.cpp @@ -81,6 +81,8 @@ zmq::pipe_t::pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_, out_active (true), hwm (outhwm_), lwm (compute_lwm (inhwm_)), + inhwmboost(0), + outhwmboost(0), msgs_read (0), msgs_written (0), peers_msgs_read (0), @@ -518,8 +520,14 @@ void zmq::pipe_t::hiccup () void zmq::pipe_t::set_hwms (int inhwm_, int outhwm_) { - lwm = compute_lwm (inhwm_); - hwm = outhwm_; + lwm = compute_lwm(inhwm_ + inhwmboost); + hwm = outhwm_ + outhwmboost; +} + +void zmq::pipe_t::set_hwms_boost(int inhwmboost_, int outhwmboost_) +{ + inhwmboost = inhwmboost_; + outhwmboost = outhwmboost_; } bool zmq::pipe_t::check_hwm () const diff --git a/src/pipe.hpp b/src/pipe.hpp index 43d42050..b795f641 100644 --- a/src/pipe.hpp +++ b/src/pipe.hpp @@ -133,6 +133,9 @@ namespace zmq // set the high water marks. void set_hwms (int inhwm_, int outhwm_); + // set the boost to high water marks, used by inproc sockets so total hwm are sum of connect and bind sockets watermarks + void set_hwms_boost(int inhwmboost_, int outhwmboost_); + // check HWM bool check_hwm () const; private: @@ -176,6 +179,10 @@ namespace zmq // Low watermark for the inbound pipe. int lwm; + // boosts for high and low watermarks, used with inproc sockets so hwm are sum of send and recv hmws on each side of pipe + int inhwmboost; + int outhwmboost; + // Number of messages read and written so far. uint64_t msgs_read; uint64_t msgs_written; diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 36dac739..bddbaf2f 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -335,6 +335,7 @@ int zmq::socket_base_t::setsockopt (int option_, const void *optval_, // If the socket type doesn't support the option, pass it to // the generic option parser. rc = options.setsockopt (option_, optval_, optvallen_); + update_pipe_options(option_); EXIT_MUTEX(); return rc; @@ -612,6 +613,11 @@ int zmq::socket_base_t::connect (const char *addr_) int hwms [2] = {conflate? -1 : sndhwm, conflate? -1 : rcvhwm}; bool conflates [2] = {conflate, conflate}; int rc = pipepair (parents, new_pipes, hwms, conflates); + if (!conflate) { + new_pipes[0]->set_hwms_boost(peer.options.sndhwm, peer.options.rcvhwm); + new_pipes[1]->set_hwms_boost(options.sndhwm, options.rcvhwm); + } + errno_assert (rc == 0); if (!peer.socket) { @@ -1249,6 +1255,18 @@ void zmq::socket_base_t::process_term (int linger_) own_t::process_term (linger_); } +void zmq::socket_base_t::update_pipe_options(int option_) +{ + if (option_ == ZMQ_SNDHWM || option_ == ZMQ_RCVHWM) + { + for (pipes_t::size_type i = 0; i != pipes.size(); ++i) + { + pipes[i]->set_hwms(options.rcvhwm, options.sndhwm); + } + } + +} + void zmq::socket_base_t::process_destroy () { destroyed = true; diff --git a/src/socket_base.hpp b/src/socket_base.hpp index 682b8c75..56062b5b 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -232,6 +232,8 @@ namespace zmq void process_bind (zmq::pipe_t *pipe_); void process_term (int linger_); + void update_pipe_options(int option_); + // Socket's mailbox object. i_mailbox* mailbox; diff --git a/tests/test_sockopt_hwm.cpp b/tests/test_sockopt_hwm.cpp index b4ab2af0..a8b22f40 100644 --- a/tests/test_sockopt_hwm.cpp +++ b/tests/test_sockopt_hwm.cpp @@ -1,77 +1,145 @@ #include "testutil.hpp" -void test_valid_hwm_change() +const int MAX_SENDS = 10000; + +void test_change_before_connected() { - void *ctx = zmq_ctx_new (); - assert (ctx); - int rc; + int rc; + void *ctx = zmq_ctx_new(); - void *bind_socket = zmq_socket (ctx, ZMQ_SUB); - assert (bind_socket); + void *bind_socket = zmq_socket(ctx, ZMQ_PUSH); + void *connect_socket = zmq_socket(ctx, ZMQ_PULL); - int val = 500; - rc = zmq_setsockopt(bind_socket, ZMQ_RCVHWM, &val, sizeof(val)); - assert (rc == 0); + int val = 2; + rc = zmq_setsockopt(connect_socket, ZMQ_RCVHWM, &val, sizeof(val)); + assert(rc == 0); + rc = zmq_setsockopt(bind_socket, ZMQ_SNDHWM, &val, sizeof(val)); + assert(rc == 0); - rc = zmq_bind (bind_socket, "inproc://a"); - assert (rc == 0); + zmq_connect(connect_socket, "inproc://a"); + zmq_bind(bind_socket, "inproc://a"); - size_t placeholder = sizeof(val); - val = 0; - rc = zmq_getsockopt(bind_socket, ZMQ_RCVHWM, &val, &placeholder); - assert (rc == 0); - assert(val == 500); + size_t placeholder = sizeof(val); + val = 0; + rc = zmq_getsockopt(bind_socket, ZMQ_SNDHWM, &val, &placeholder); + assert(rc == 0); + assert(val == 2); + + int send_count = 0; + while (send_count < MAX_SENDS && zmq_send(bind_socket, NULL, 0, ZMQ_DONTWAIT) == 0) + ++send_count; + + assert(send_count == 4); + + zmq_close(bind_socket); + zmq_close(connect_socket); + zmq_ctx_term(ctx); } - -/** - * Test that zmq_setsockopt() fails to change the RCVHWM when called - * after a call to zmq_bind(). - */ -void test_invalid_hwm_change_bind() +void test_change_after_connected() { - void *ctx = zmq_ctx_new (); - assert (ctx); - int rc; + int rc; + void *ctx = zmq_ctx_new(); - void *bind_socket = zmq_socket (ctx, ZMQ_SUB); - assert (bind_socket); + void *bind_socket = zmq_socket(ctx, ZMQ_PUSH); + void *connect_socket = zmq_socket(ctx, ZMQ_PULL); - rc = zmq_bind (bind_socket, "inproc://a"); - assert (rc == 0); + int val = 1; + rc = zmq_setsockopt(connect_socket, ZMQ_RCVHWM, &val, sizeof(val)); + assert(rc == 0); + rc = zmq_setsockopt(bind_socket, ZMQ_SNDHWM, &val, sizeof(val)); + assert(rc == 0); - int val = 500; - rc = zmq_setsockopt (bind_socket, ZMQ_RCVHWM, &val, sizeof(val)); - assert (rc == -1); + zmq_connect(connect_socket, "inproc://a"); + zmq_bind(bind_socket, "inproc://a"); - zmq_close (bind_socket); - zmq_ctx_term (ctx); + val = 5; + rc = zmq_setsockopt(bind_socket, ZMQ_SNDHWM, &val, sizeof(val)); + assert(rc == 0); + + size_t placeholder = sizeof(val); + val = 0; + rc = zmq_getsockopt(bind_socket, ZMQ_SNDHWM, &val, &placeholder); + assert(rc == 0); + assert(val == 5); + + int send_count = 0; + while (send_count < MAX_SENDS && zmq_send(bind_socket, NULL, 0, ZMQ_DONTWAIT) == 0) + ++send_count; + + assert(send_count == 6); + + zmq_close(bind_socket); + zmq_close(connect_socket); + zmq_ctx_term(ctx); } -void test_invalid_hwm_change_connect() +void test_decrease_when_full() { - void *ctx = zmq_ctx_new(); - assert(ctx); - int rc; + int rc; + void *ctx = zmq_ctx_new(); - void *connect_socket = zmq_socket (ctx, ZMQ_SUB); - assert(connect_socket); + void *bind_socket = zmq_socket(ctx, ZMQ_PUSH); + void *connect_socket = zmq_socket(ctx, ZMQ_PULL); - rc = zmq_connect (connect_socket, "inproc://a"); - assert(rc == 0); + int val = 1; + rc = zmq_setsockopt(connect_socket, ZMQ_RCVHWM, &val, sizeof(val)); + assert(rc == 0); - int val = 500; - rc = zmq_setsockopt (connect_socket, ZMQ_RCVHWM, &val, sizeof(val)); - assert(rc == -1); + val = 100; + rc = zmq_setsockopt(bind_socket, ZMQ_SNDHWM, &val, sizeof(val)); + assert(rc == 0); - zmq_close (connect_socket); - zmq_ctx_term (ctx); + zmq_bind(bind_socket, "inproc://a"); + zmq_connect(connect_socket, "inproc://a"); + + // Fill up to hwm + int send_count = 0; + while (send_count < MAX_SENDS && zmq_send(bind_socket, &send_count, sizeof(send_count), ZMQ_DONTWAIT) == sizeof(send_count)) + ++send_count; + assert(send_count == 101); + + // Descrease snd hwm + val = 70; + rc = zmq_setsockopt(bind_socket, ZMQ_SNDHWM, &val, sizeof(val)); + assert(rc == 0); + + size_t placeholder = sizeof(val); + val = 0; + rc = zmq_getsockopt(bind_socket, ZMQ_SNDHWM, &val, &placeholder); + assert(rc == 0); + assert(val == 70); + + // Read out all data (should get up to previous hwm worth so none were dropped) + int read_count = 0; + int read_data = 0; + while (read_count < MAX_SENDS && zmq_recv(connect_socket, &read_data, sizeof(read_data), ZMQ_DONTWAIT) == sizeof(read_data)) { + assert(read_count == read_data); + ++read_count; + } + + assert(read_count == 101); + + // Give io thread some time to catch up + msleep(10); + + // Fill up to new hwm + send_count = 0; + while (send_count < MAX_SENDS && zmq_send(bind_socket, &send_count, sizeof(send_count), ZMQ_DONTWAIT) == sizeof(send_count)) + ++send_count; + + // Really this should be 71, but the lwm stuff kicks in doesn't seem quite right + assert(send_count > 0); + + zmq_close(bind_socket); + zmq_close(connect_socket); + zmq_ctx_term(ctx); } int main() { - test_valid_hwm_change(); - test_invalid_hwm_change_bind(); - test_invalid_hwm_change_connect(); + test_change_before_connected(); + test_change_after_connected(); + test_decrease_when_full(); }