diff --git a/src/options.cpp b/src/options.cpp index 010222c8..17e04659 100644 --- a/src/options.cpp +++ b/src/options.cpp @@ -43,8 +43,6 @@ zmq::options_t::options_t () : sndtimeo (-1), ipv6 (0), immediate (0), - delay_on_close (true), - delay_on_disconnect (true), filter (false), recv_identity (false), raw_sock (false), diff --git a/src/options.hpp b/src/options.hpp index 358b2858..4eea0368 100644 --- a/src/options.hpp +++ b/src/options.hpp @@ -97,14 +97,6 @@ namespace zmq // on a socket with only connecting pipes would block int immediate; - // If true, session reads all the pending messages from the pipe and - // sends them to the network when socket is closed. - bool delay_on_close; - - // If true, socket reads all the messages from the pipe and delivers - // them to the user when the peer terminates. - bool delay_on_disconnect; - // If 1, (X)SUB socket should filter the messages. If 0, it should not. bool filter; diff --git a/src/pipe.cpp b/src/pipe.cpp index 5669005a..fc21b3b8 100644 --- a/src/pipe.cpp +++ b/src/pipe.cpp @@ -27,7 +27,7 @@ #include "ypipe_conflate.hpp" int zmq::pipepair (class object_t *parents_ [2], class pipe_t* pipes_ [2], - int hwms_ [2], bool delays_ [2], bool conflate_ [2]) + int hwms_ [2], bool conflate_ [2]) { // Creates two pipe objects. These objects are connected by two ypipes, // each to pass messages in one direction. @@ -50,10 +50,10 @@ int zmq::pipepair (class object_t *parents_ [2], class pipe_t* pipes_ [2], alloc_assert (upipe2); pipes_ [0] = new (std::nothrow) pipe_t (parents_ [0], upipe1, upipe2, - hwms_ [1], hwms_ [0], delays_ [0], conflate_ [0]); + hwms_ [1], hwms_ [0], conflate_ [0]); alloc_assert (pipes_ [0]); pipes_ [1] = new (std::nothrow) pipe_t (parents_ [1], upipe2, upipe1, - hwms_ [0], hwms_ [1], delays_ [1], conflate_ [1]); + hwms_ [0], hwms_ [1], conflate_ [1]); alloc_assert (pipes_ [1]); pipes_ [0]->set_peer (pipes_ [1]); @@ -63,7 +63,7 @@ int zmq::pipepair (class object_t *parents_ [2], class pipe_t* pipes_ [2], } zmq::pipe_t::pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_, - int inhwm_, int outhwm_, bool delay_, bool conflate_) : + int inhwm_, int outhwm_, bool conflate_) : object_t (parent_), inpipe (inpipe_), outpipe (outpipe_), @@ -77,7 +77,7 @@ zmq::pipe_t::pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_, peer (NULL), sink (NULL), state (active), - delay (delay_), + delay (true), conflate (conflate_) { } @@ -334,6 +334,11 @@ void zmq::pipe_t::process_pipe_term_ack () delete this; } +void zmq::pipe_t::set_nodelay () +{ + this->delay = false; +} + void zmq::pipe_t::terminate (bool delay_) { // Overload the value specified at pipe creation. diff --git a/src/pipe.hpp b/src/pipe.hpp index 5405110c..8ee4e59c 100644 --- a/src/pipe.hpp +++ b/src/pipe.hpp @@ -43,7 +43,7 @@ namespace zmq // If conflate is true, only the most recently arrived message could be // read (older messages are discarded) int pipepair (zmq::object_t *parents_ [2], zmq::pipe_t* pipes_ [2], - int hwms_ [2], bool delays_ [2], bool conflate_ [2]); + int hwms_ [2], bool conflate_ [2]); struct i_pipe_events { @@ -66,10 +66,9 @@ namespace zmq public array_item_t <3> { // This allows pipepair to create pipe objects. - friend int pipepair (zmq::object_t *parents_ [2], - zmq::pipe_t* pipes_ [2], int hwms_ [2], bool delays_ [2], - bool conflate_ [2]); - + friend int pipepair (zmq::object_t *parents_ [2], zmq::pipe_t* pipes_ [2], + int hwms_ [2], bool conflate_ [2]); + public: // Specifies the object to send events to. @@ -103,6 +102,9 @@ namespace zmq // all the messages on the fly. Causes 'hiccuped' event to be generated // in the peer. void hiccup (); + + // Ensure the pipe wont block on receiving pipe_term. + void set_nodelay (); // Ask pipe to terminate. The termination will happen asynchronously // and user will be notified about actual deallocation by 'terminated' @@ -128,7 +130,7 @@ namespace zmq // Constructor is private. Pipe can only be created using // pipepair function. pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_, - int inhwm_, int outhwm_, bool delay_, bool conflate_); + int inhwm_, int outhwm_, bool conflate_); // Pipepair uses this function to let us know about // the peer pipe object. diff --git a/src/session_base.cpp b/src/session_base.cpp index 025bafd1..efb04379 100644 --- a/src/session_base.cpp +++ b/src/session_base.cpp @@ -299,15 +299,16 @@ int zmq::session_base_t::zap_connect () object_t *parents [2] = {this, peer.socket}; pipe_t *new_pipes [2] = {NULL, NULL}; int hwms [2] = {0, 0}; - bool delays [2] = {false, false}; bool conflates [2] = {false, false}; - int rc = pipepair (parents, new_pipes, hwms, delays, conflates); + int rc = pipepair (parents, new_pipes, hwms, conflates); errno_assert (rc == 0); // Attach local end of the pipe to this socket object. zap_pipe = new_pipes [0]; + zap_pipe->set_nodelay (); zap_pipe->set_event_sink (this); + new_pipes [1]->set_nodelay (); send_bind (peer.socket, new_pipes [1], false); // Send empty identity if required by the peer. @@ -342,9 +343,8 @@ void zmq::session_base_t::process_attach (i_engine *engine_) int hwms [2] = {conflate? -1 : options.rcvhwm, conflate? -1 : options.sndhwm}; - bool delays [2] = {options.delay_on_close, options.delay_on_disconnect}; bool conflates [2] = {conflate, conflate}; - int rc = pipepair (parents, pipes, hwms, delays, conflates); + int rc = pipepair (parents, pipes, hwms, conflates); errno_assert (rc == 0); // Plug the local end of the pipe. @@ -389,7 +389,7 @@ void zmq::session_base_t::process_term (int linger_) // If the termination of the pipe happens before the term command is // delivered there's nothing much to do. We can proceed with the - // stadard termination immediately. + // standard termination immediately. if (!pipe && !zap_pipe) { proceed_with_term (); return; diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 64c1cbb0..84a49234 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -459,9 +459,8 @@ int zmq::socket_base_t::connect (const char *addr_) options.type == ZMQ_SUB); int hwms [2] = {conflate? -1 : sndhwm, conflate? -1 : rcvhwm}; - bool delays [2] = {options.delay_on_disconnect, options.delay_on_close}; bool conflates [2] = {conflate, conflate}; - int rc = pipepair (parents, new_pipes, hwms, delays, conflates); + int rc = pipepair (parents, new_pipes, hwms, conflates); errno_assert (rc == 0); // Attach local end of the pipe to this socket object. @@ -573,9 +572,8 @@ int zmq::socket_base_t::connect (const char *addr_) int hwms [2] = {conflate? -1 : options.sndhwm, conflate? -1 : options.rcvhwm}; - bool delays [2] = {options.delay_on_disconnect, options.delay_on_close}; bool conflates [2] = {conflate, conflate}; - rc = pipepair (parents, new_pipes, hwms, delays, conflates); + rc = pipepair (parents, new_pipes, hwms, conflates); errno_assert (rc == 0); // Attach local end of the pipe to the socket object.