diff --git a/AUTHORS b/AUTHORS index 53982ad8..97da6a02 100644 --- a/AUTHORS +++ b/AUTHORS @@ -66,6 +66,7 @@ Pieter Hintjens Piotr Trojanek Robert G. Jakabosky Sebastian Otaegui +Stefan Radomski Steven McCoy Stuart Webster Tamara Kustarova diff --git a/doc/zmq_disconnect.txt b/doc/zmq_disconnect.txt index 7c8ca4d8..13a4c2b7 100644 --- a/doc/zmq_disconnect.txt +++ b/doc/zmq_disconnect.txt @@ -34,6 +34,8 @@ The endpoint supplied is invalid. The 0MQ 'context' associated with the specified 'socket' was terminated. *ENOTSOCK*:: The provided 'socket' was invalid. +*ENOENT*:: +The provided endpoint is not connected. EXAMPLE diff --git a/src/socket_base.cpp b/src/socket_base.cpp index ca681c0d..ba382c74 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -478,6 +478,10 @@ int zmq::socket_base_t::connect (const char *addr_) // Save last endpoint URI options.last_endpoint.assign (addr_); + // remember inproc connections for disconnect + inprocs.insert (inprocs_t::value_type (std::string (addr_), pipes[0])); + inprocs.insert (inprocs_t::value_type (std::string (addr_), pipes[1])); + return 0; } @@ -584,10 +588,38 @@ int zmq::socket_base_t::term_endpoint (const char *addr_) if (unlikely (rc != 0)) return -1; + // Parse addr_ string. + std::string protocol; + std::string address; + rc = parse_uri (addr_, protocol, address); + if (rc != 0) + return -1; + + rc = check_protocol (protocol); + if (rc != 0) + return -1; + + // Disconnect an inproc socket + if (protocol == "inproc") { + std::pair range = inprocs.equal_range (std::string (addr_)); + if (range.first == range.second) { + errno = ENOENT; + return -1; + } + + for (inprocs_t::iterator it = range.first; it != range.second; ++it) + it->second->terminate(true); + inprocs.erase (range.first, range.second); + return 0; + } + + // Find the endpoints range (if any) corresponding to the addr_ string. std::pair range = endpoints.equal_range (std::string (addr_)); - if (range.first == range.second) + if (range.first == range.second) { + errno = ENOENT; return -1; + } for (endpoints_t::iterator it = range.first; it != range.second; ++it) term_child (it->second); diff --git a/src/socket_base.hpp b/src/socket_base.hpp index 7b6b3e79..b32782c9 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -170,6 +170,10 @@ namespace zmq typedef std::multimap endpoints_t; endpoints_t endpoints; + // Map of open inproc endpoints. + typedef std::multimap inprocs_t; + inprocs_t inprocs; + // To be called after processing commands or invoking any command // handlers explicitly. If required, it will deallocate the socket. void check_destroy (); diff --git a/tests/Makefile.am b/tests/Makefile.am index cb96e66a..e4b8fe70 100644 --- a/tests/Makefile.am +++ b/tests/Makefile.am @@ -17,7 +17,9 @@ noinst_PROGRAMS = test_pair_inproc \ test_last_endpoint \ test_term_endpoint \ test_monitor \ - test_router_mandatory + test_router_mandatory \ + test_disconnect_inproc + if !ON_MINGW noinst_PROGRAMS += test_shutdown_stress \ @@ -40,6 +42,7 @@ test_connect_delay_SOURCES = test_connect_delay.cpp test_last_endpoint_SOURCES = test_last_endpoint.cpp test_term_endpoint_SOURCES = test_term_endpoint.cpp test_monitor_SOURCES = test_monitor.cpp +test_disconnect_inproc_SOURCES = test_disconnect_inproc.cpp test_router_mandatory_SOURCES = test_router_mandatory.cpp if !ON_MINGW diff --git a/tests/test_disconnect_inproc.cpp b/tests/test_disconnect_inproc.cpp new file mode 100644 index 00000000..33b42160 --- /dev/null +++ b/tests/test_disconnect_inproc.cpp @@ -0,0 +1,119 @@ +#include +#include +#include +#include + +/// Initialize a zeromq message with a given null-terminated string +#define ZMQ_PREPARE_STRING(msg, data, size) \ +zmq_msg_init(&msg) && printf("zmq_msg_init: %s\n", zmq_strerror(errno)); \ +zmq_msg_init_size (&msg, size + 1) && printf("zmq_msg_init_size: %s\n",zmq_strerror(errno)); \ +memcpy(zmq_msg_data(&msg), data, size + 1); + +int publicationsReceived = 0; +bool isSubscribed = false; + +int main(int argc, char** argv) { + void* context = zmq_ctx_new(); + void* pubSocket; + void* subSocket; + + (pubSocket = zmq_socket(context, ZMQ_XPUB)) || printf("zmq_socket: %s\n", zmq_strerror(errno)); + (subSocket = zmq_socket(context, ZMQ_SUB)) || printf("zmq_socket: %s\n", zmq_strerror(errno)); + zmq_setsockopt(subSocket, ZMQ_SUBSCRIBE, "foo", 3) && printf("zmq_setsockopt: %s\n",zmq_strerror(errno)); + + zmq_bind(pubSocket, "inproc://someInProcDescriptor") && printf("zmq_bind: %s\n", zmq_strerror(errno)); + //zmq_bind(pubSocket, "tcp://*:30010") && printf("zmq_bind: %s\n", zmq_strerror(errno)); + + int32_t more; + size_t more_size = sizeof(more); + int iteration = 0; + + while(1) { + zmq_pollitem_t items [] = { + { subSocket, 0, ZMQ_POLLIN, 0 }, // read publications + { pubSocket, 0, ZMQ_POLLIN, 0 }, // read subscriptions + }; + zmq_poll(items, 2, 500); + + if (items[1].revents & ZMQ_POLLIN) { + while (1) { + zmq_msg_t msg; + zmq_msg_init (&msg); + zmq_msg_recv (&msg, pubSocket, 0); + int msgSize = zmq_msg_size(&msg); + char* buffer = (char*)zmq_msg_data(&msg); + + if (buffer[0] == 0) { + assert(isSubscribed); + printf("unsubscribing from '%s'\n", strndup(buffer + 1, msgSize - 1)); + isSubscribed = false; + } else { + assert(!isSubscribed); + printf("subscribing on '%s'\n", strndup(buffer + 1, msgSize - 1)); + isSubscribed = true; + } + + zmq_getsockopt (pubSocket, ZMQ_RCVMORE, &more, &more_size); + zmq_msg_close (&msg); + + if (!more) + break; // Last message part + } + } + + if (items[0].revents & ZMQ_POLLIN) { + while (1) { + zmq_msg_t msg; + zmq_msg_init (&msg); + zmq_msg_recv (&msg, subSocket, 0); + int msgSize = zmq_msg_size(&msg); + char* buffer = (char*)zmq_msg_data(&msg); + + printf("received on subscriber '%s'\n", strndup(buffer, msgSize)); + + zmq_getsockopt (subSocket, ZMQ_RCVMORE, &more, &more_size); + zmq_msg_close (&msg); + + if (!more) { + publicationsReceived++; + break; // Last message part + } + } + } + + if (iteration == 1) { + zmq_connect(subSocket, "inproc://someInProcDescriptor") && printf("zmq_connect: %s\n", zmq_strerror(errno)); + //zmq_connect(subSocket, "tcp://127.0.0.1:30010") && printf("zmq_connect: %s\n", zmq_strerror(errno)); + } + + if (iteration == 4) { + zmq_disconnect(subSocket, "inproc://someInProcDescriptor") && printf("zmq_disconnect(%d): %s\n", errno, zmq_strerror(errno)); + //zmq_disconnect(subSocket, "tcp://127.0.0.1:30010") && printf("zmq_disconnect: %s\n", zmq_strerror(errno)); + } + + if (iteration == 10) { + break; + } + + zmq_msg_t channelEnvlp; + ZMQ_PREPARE_STRING(channelEnvlp, "foo", 3); + zmq_sendmsg(pubSocket, &channelEnvlp, ZMQ_SNDMORE) >= 0 || printf("zmq_sendmsg: %s\n",zmq_strerror(errno)); + zmq_msg_close(&channelEnvlp) && printf("zmq_msg_close: %s\n",zmq_strerror(errno)); + + zmq_msg_t message; + ZMQ_PREPARE_STRING(message, "this is foo!", 12); + zmq_sendmsg(pubSocket, &message, 0) >= 0 || printf("zmq_sendmsg: %s\n",zmq_strerror(errno)); + zmq_msg_close(&message) && printf("zmq_msg_close: %s\n",zmq_strerror(errno)); + + iteration++; + } + + assert(publicationsReceived == 3); + assert(!isSubscribed); + + zmq_close(pubSocket) && printf("zmq_close: %s", zmq_strerror(errno)); + zmq_close(subSocket) && printf("zmq_close: %s", zmq_strerror(errno)); + + zmq_ctx_destroy(context); + return 0; +} \ No newline at end of file