diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 8034d143..651651ff 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -376,7 +376,7 @@ int zmq::socket_base_t::bind (const char *addr_) // Save last endpoint URI listener->get_address (last_endpoint); - add_endpoint (addr_, (own_t *) listener); + add_endpoint (addr_, (own_t *) listener, NULL); return 0; } @@ -395,7 +395,7 @@ int zmq::socket_base_t::bind (const char *addr_) // Save last endpoint URI listener->get_address (last_endpoint); - add_endpoint (addr_, (own_t *) listener); + add_endpoint (addr_, (own_t *) listener, NULL); return 0; } #endif @@ -548,6 +548,7 @@ int zmq::socket_base_t::connect (const char *addr_) // PGM does not support subscription forwarding; ask for all data to be // sent to this pipe. bool icanhasall = protocol == "pgm" || protocol == "epgm"; + pipe_t *newpipe = NULL; if (options.immediate != 1 || icanhasall) { // Create a bi-directional pipe. @@ -560,6 +561,7 @@ int zmq::socket_base_t::connect (const char *addr_) // Attach local end of the pipe to the socket object. attach_pipe (new_pipes [0], icanhasall); + newpipe = new_pipes [0]; // Attach remote end of the pipe to the session object later on. session->attach_pipe (new_pipes [1]); @@ -568,15 +570,15 @@ int zmq::socket_base_t::connect (const char *addr_) // Save last endpoint URI paddr->to_string (last_endpoint); - add_endpoint (addr_, (own_t *) session); + add_endpoint (addr_, (own_t *) session, newpipe); return 0; } -void zmq::socket_base_t::add_endpoint (const char *addr_, own_t *endpoint_) +void zmq::socket_base_t::add_endpoint (const char *addr_, own_t *endpoint_, pipe_t *pipe) { // Activate the session. Make it a child of this socket. launch_child (endpoint_); - endpoints.insert (endpoints_t::value_type (std::string (addr_), endpoint_)); + endpoints.insert (endpoints_t::value_type (std::string (addr_), endpoint_pipe_t(endpoint_, pipe))); } int zmq::socket_base_t::term_endpoint (const char *addr_) @@ -631,8 +633,12 @@ int zmq::socket_base_t::term_endpoint (const char *addr_) return -1; } - for (endpoints_t::iterator it = range.first; it != range.second; ++it) - term_child (it->second); + for (endpoints_t::iterator it = range.first; it != range.second; ++it) { + // If we have an associated pipe, terminate it. + if (it->second.second != NULL) + it->second.second->terminate(false); + term_child (it->second.first); + } endpoints.erase (range.first, range.second); return 0; } diff --git a/src/socket_base.hpp b/src/socket_base.hpp index 61bfbc15..7b1f707a 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -158,10 +158,11 @@ namespace zmq private: // Creates new endpoint ID and adds the endpoint to the map. - void add_endpoint (const char *addr_, own_t *endpoint_); + void add_endpoint (const char *addr_, own_t *endpoint_, pipe_t *pipe); // Map of open endpoints. - typedef std::multimap endpoints_t; + typedef std::pair endpoint_pipe_t; + typedef std::multimap endpoints_t; endpoints_t endpoints; // Map of open inproc endpoints.