diff --git a/src/options.cpp b/src/options.cpp index f1767437..007fceb8 100644 --- a/src/options.cpp +++ b/src/options.cpp @@ -43,8 +43,6 @@ zmq::options_t::options_t () : sndtimeo (-1), ipv6 (0), immediate (0), - delay_on_close (true), - delay_on_disconnect (true), filter (false), recv_identity (false), raw_sock (false), diff --git a/src/options.hpp b/src/options.hpp index cf5fbbd7..f665b9fa 100644 --- a/src/options.hpp +++ b/src/options.hpp @@ -97,14 +97,6 @@ namespace zmq // on a socket with only connecting pipes would block int immediate; - // If true, session reads all the pending messages from the pipe and - // sends them to the network when socket is closed. - bool delay_on_close; - - // If true, socket reads all the messages from the pipe and delivers - // them to the user when the peer terminates. - bool delay_on_disconnect; - // If 1, (X)SUB socket should filter the messages. If 0, it should not. bool filter; diff --git a/src/pipe.cpp b/src/pipe.cpp index e4b841ad..6c3e0b9e 100644 --- a/src/pipe.cpp +++ b/src/pipe.cpp @@ -24,7 +24,7 @@ #include "err.hpp" int zmq::pipepair (class object_t *parents_ [2], class pipe_t* pipes_ [2], - int hwms_ [2], bool delays_ [2]) + int hwms_ [2]) { // Creates two pipe objects. These objects are connected by two ypipes, // each to pass messages in one direction. @@ -35,10 +35,10 @@ int zmq::pipepair (class object_t *parents_ [2], class pipe_t* pipes_ [2], alloc_assert (upipe2); pipes_ [0] = new (std::nothrow) pipe_t (parents_ [0], upipe1, upipe2, - hwms_ [1], hwms_ [0], delays_ [0]); + hwms_ [1], hwms_ [0]); alloc_assert (pipes_ [0]); pipes_ [1] = new (std::nothrow) pipe_t (parents_ [1], upipe2, upipe1, - hwms_ [0], hwms_ [1], delays_ [1]); + hwms_ [0], hwms_ [1]); alloc_assert (pipes_ [1]); pipes_ [0]->set_peer (pipes_ [1]); @@ -48,7 +48,7 @@ int zmq::pipepair (class object_t *parents_ [2], class pipe_t* pipes_ [2], } zmq::pipe_t::pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_, - int inhwm_, int outhwm_, bool delay_) : + int inhwm_, int outhwm_) : object_t (parent_), inpipe (inpipe_), outpipe (outpipe_), @@ -62,7 +62,7 @@ zmq::pipe_t::pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_, peer (NULL), sink (NULL), state (active), - delay (delay_) + delay (true) { } @@ -314,6 +314,11 @@ void zmq::pipe_t::process_pipe_term_ack () delete this; } +void zmq::pipe_t::set_nodelay () +{ + this->delay = false; +} + void zmq::pipe_t::terminate (bool delay_) { // Overload the value specified at pipe creation. diff --git a/src/pipe.hpp b/src/pipe.hpp index d329b582..867d1b97 100644 --- a/src/pipe.hpp +++ b/src/pipe.hpp @@ -41,7 +41,7 @@ namespace zmq // pipe receives all the pending messages before terminating, otherwise it // terminates straight away. int pipepair (zmq::object_t *parents_ [2], zmq::pipe_t* pipes_ [2], - int hwms_ [2], bool delays_ [2]); + int hwms_ [2]); struct i_pipe_events { @@ -65,7 +65,7 @@ namespace zmq { // This allows pipepair to create pipe objects. friend int pipepair (zmq::object_t *parents_ [2], - zmq::pipe_t* pipes_ [2], int hwms_ [2], bool delays_ [2]); + zmq::pipe_t* pipes_ [2], int hwms_ [2]); public: @@ -100,6 +100,9 @@ namespace zmq // all the messages on the fly. Causes 'hiccuped' event to be generated // in the peer. void hiccup (); + + // Ensure the pipe wont block on receiving pipe_term. + void set_nodelay (); // Ask pipe to terminate. The termination will happen asynchronously // and user will be notified about actual deallocation by 'terminated' @@ -125,7 +128,7 @@ namespace zmq // Constructor is private. Pipe can only be created using // pipepair function. pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_, - int inhwm_, int outhwm_, bool delay_); + int inhwm_, int outhwm_); // Pipepair uses this function to let us know about // the peer pipe object. diff --git a/src/session_base.cpp b/src/session_base.cpp index 302fec8a..af70b57d 100644 --- a/src/session_base.cpp +++ b/src/session_base.cpp @@ -299,14 +299,15 @@ int zmq::session_base_t::zap_connect () object_t *parents [2] = {this, peer.socket}; pipe_t *new_pipes [2] = {NULL, NULL}; int hwms [2] = {0, 0}; - bool delays [2] = {false, false}; - int rc = pipepair (parents, new_pipes, hwms, delays); + int rc = pipepair (parents, new_pipes, hwms); errno_assert (rc == 0); // Attach local end of the pipe to this socket object. zap_pipe = new_pipes [0]; + zap_pipe->set_nodelay (); zap_pipe->set_event_sink (this); + new_pipes [1]->set_nodelay (); send_bind (peer.socket, new_pipes [1], false); // Send empty identity if required by the peer. @@ -332,8 +333,7 @@ void zmq::session_base_t::process_attach (i_engine *engine_) object_t *parents [2] = {this, socket}; pipe_t *pipes [2] = {NULL, NULL}; int hwms [2] = {options.rcvhwm, options.sndhwm}; - bool delays [2] = {options.delay_on_close, options.delay_on_disconnect}; - int rc = pipepair (parents, pipes, hwms, delays); + int rc = pipepair (parents, pipes, hwms); errno_assert (rc == 0); // Plug the local end of the pipe. @@ -378,7 +378,7 @@ void zmq::session_base_t::process_term (int linger_) // If the termination of the pipe happens before the term command is // delivered there's nothing much to do. We can proceed with the - // stadard termination immediately. + // standard termination immediately. if (!pipe && !zap_pipe) { proceed_with_term (); return; diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 651651ff..b8c11e19 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -451,8 +451,7 @@ int zmq::socket_base_t::connect (const char *addr_) object_t *parents [2] = {this, peer.socket}; pipe_t *new_pipes [2] = {NULL, NULL}; int hwms [2] = {sndhwm, rcvhwm}; - bool delays [2] = {options.delay_on_disconnect, options.delay_on_close}; - int rc = pipepair (parents, new_pipes, hwms, delays); + int rc = pipepair (parents, new_pipes, hwms); errno_assert (rc == 0); // Attach local end of the pipe to this socket object. @@ -555,8 +554,7 @@ int zmq::socket_base_t::connect (const char *addr_) object_t *parents [2] = {this, session}; pipe_t *new_pipes [2] = {NULL, NULL}; int hwms [2] = {options.sndhwm, options.rcvhwm}; - bool delays [2] = {options.delay_on_disconnect, options.delay_on_close}; - rc = pipepair (parents, new_pipes, hwms, delays); + rc = pipepair (parents, new_pipes, hwms); errno_assert (rc == 0); // Attach local end of the pipe to the socket object.