From 2388f27bfe334d9a98f417bb0c87cfea5727fce6 Mon Sep 17 00:00:00 2001 From: Stefan Radomski Date: Tue, 4 Dec 2012 13:14:56 +0100 Subject: [PATCH 1/3] Close inproc socket pairs on zmq_disconnect This patch fixes LIBZMQ-476 and LIBZMQ-475 --- AUTHORS | 1 + src/socket_base.cpp | 28 ++++++++ src/socket_base.hpp | 4 ++ tests/Makefile.am | 5 +- tests/test_disconnect_inproc.cpp | 119 +++++++++++++++++++++++++++++++ 5 files changed, 156 insertions(+), 1 deletion(-) create mode 100644 tests/test_disconnect_inproc.cpp diff --git a/AUTHORS b/AUTHORS index 53982ad8..97da6a02 100644 --- a/AUTHORS +++ b/AUTHORS @@ -66,6 +66,7 @@ Pieter Hintjens Piotr Trojanek Robert G. Jakabosky Sebastian Otaegui +Stefan Radomski Steven McCoy Stuart Webster Tamara Kustarova diff --git a/src/socket_base.cpp b/src/socket_base.cpp index ca681c0d..12c9170f 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -478,6 +478,10 @@ int zmq::socket_base_t::connect (const char *addr_) // Save last endpoint URI options.last_endpoint.assign (addr_); + // remember inproc connections for disconnect + inprocs.insert (inprocs_t::value_type (std::string (addr_), pipes[0])); + inprocs.insert (inprocs_t::value_type (std::string (addr_), pipes[1])); + return 0; } @@ -584,6 +588,30 @@ int zmq::socket_base_t::term_endpoint (const char *addr_) if (unlikely (rc != 0)) return -1; + // Parse addr_ string. + std::string protocol; + std::string address; + rc = parse_uri (addr_, protocol, address); + if (rc != 0) + return -1; + + rc = check_protocol (protocol); + if (rc != 0) + return -1; + + // Disconnect an inproc socket + if (protocol == "inproc") { + std::pair range = inprocs.equal_range (std::string (addr_)); + if (range.first == range.second) + return -1; + + for (inprocs_t::iterator it = range.first; it != range.second; ++it) + it->second->terminate(true); + inprocs.erase (range.first, range.second); + return 0; + } + + // Find the endpoints range (if any) corresponding to the addr_ string. std::pair range = endpoints.equal_range (std::string (addr_)); if (range.first == range.second) diff --git a/src/socket_base.hpp b/src/socket_base.hpp index 7b6b3e79..b32782c9 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -170,6 +170,10 @@ namespace zmq typedef std::multimap endpoints_t; endpoints_t endpoints; + // Map of open inproc endpoints. + typedef std::multimap inprocs_t; + inprocs_t inprocs; + // To be called after processing commands or invoking any command // handlers explicitly. If required, it will deallocate the socket. void check_destroy (); diff --git a/tests/Makefile.am b/tests/Makefile.am index cb96e66a..e4b8fe70 100644 --- a/tests/Makefile.am +++ b/tests/Makefile.am @@ -17,7 +17,9 @@ noinst_PROGRAMS = test_pair_inproc \ test_last_endpoint \ test_term_endpoint \ test_monitor \ - test_router_mandatory + test_router_mandatory \ + test_disconnect_inproc + if !ON_MINGW noinst_PROGRAMS += test_shutdown_stress \ @@ -40,6 +42,7 @@ test_connect_delay_SOURCES = test_connect_delay.cpp test_last_endpoint_SOURCES = test_last_endpoint.cpp test_term_endpoint_SOURCES = test_term_endpoint.cpp test_monitor_SOURCES = test_monitor.cpp +test_disconnect_inproc_SOURCES = test_disconnect_inproc.cpp test_router_mandatory_SOURCES = test_router_mandatory.cpp if !ON_MINGW diff --git a/tests/test_disconnect_inproc.cpp b/tests/test_disconnect_inproc.cpp new file mode 100644 index 00000000..33b42160 --- /dev/null +++ b/tests/test_disconnect_inproc.cpp @@ -0,0 +1,119 @@ +#include +#include +#include +#include + +/// Initialize a zeromq message with a given null-terminated string +#define ZMQ_PREPARE_STRING(msg, data, size) \ +zmq_msg_init(&msg) && printf("zmq_msg_init: %s\n", zmq_strerror(errno)); \ +zmq_msg_init_size (&msg, size + 1) && printf("zmq_msg_init_size: %s\n",zmq_strerror(errno)); \ +memcpy(zmq_msg_data(&msg), data, size + 1); + +int publicationsReceived = 0; +bool isSubscribed = false; + +int main(int argc, char** argv) { + void* context = zmq_ctx_new(); + void* pubSocket; + void* subSocket; + + (pubSocket = zmq_socket(context, ZMQ_XPUB)) || printf("zmq_socket: %s\n", zmq_strerror(errno)); + (subSocket = zmq_socket(context, ZMQ_SUB)) || printf("zmq_socket: %s\n", zmq_strerror(errno)); + zmq_setsockopt(subSocket, ZMQ_SUBSCRIBE, "foo", 3) && printf("zmq_setsockopt: %s\n",zmq_strerror(errno)); + + zmq_bind(pubSocket, "inproc://someInProcDescriptor") && printf("zmq_bind: %s\n", zmq_strerror(errno)); + //zmq_bind(pubSocket, "tcp://*:30010") && printf("zmq_bind: %s\n", zmq_strerror(errno)); + + int32_t more; + size_t more_size = sizeof(more); + int iteration = 0; + + while(1) { + zmq_pollitem_t items [] = { + { subSocket, 0, ZMQ_POLLIN, 0 }, // read publications + { pubSocket, 0, ZMQ_POLLIN, 0 }, // read subscriptions + }; + zmq_poll(items, 2, 500); + + if (items[1].revents & ZMQ_POLLIN) { + while (1) { + zmq_msg_t msg; + zmq_msg_init (&msg); + zmq_msg_recv (&msg, pubSocket, 0); + int msgSize = zmq_msg_size(&msg); + char* buffer = (char*)zmq_msg_data(&msg); + + if (buffer[0] == 0) { + assert(isSubscribed); + printf("unsubscribing from '%s'\n", strndup(buffer + 1, msgSize - 1)); + isSubscribed = false; + } else { + assert(!isSubscribed); + printf("subscribing on '%s'\n", strndup(buffer + 1, msgSize - 1)); + isSubscribed = true; + } + + zmq_getsockopt (pubSocket, ZMQ_RCVMORE, &more, &more_size); + zmq_msg_close (&msg); + + if (!more) + break; // Last message part + } + } + + if (items[0].revents & ZMQ_POLLIN) { + while (1) { + zmq_msg_t msg; + zmq_msg_init (&msg); + zmq_msg_recv (&msg, subSocket, 0); + int msgSize = zmq_msg_size(&msg); + char* buffer = (char*)zmq_msg_data(&msg); + + printf("received on subscriber '%s'\n", strndup(buffer, msgSize)); + + zmq_getsockopt (subSocket, ZMQ_RCVMORE, &more, &more_size); + zmq_msg_close (&msg); + + if (!more) { + publicationsReceived++; + break; // Last message part + } + } + } + + if (iteration == 1) { + zmq_connect(subSocket, "inproc://someInProcDescriptor") && printf("zmq_connect: %s\n", zmq_strerror(errno)); + //zmq_connect(subSocket, "tcp://127.0.0.1:30010") && printf("zmq_connect: %s\n", zmq_strerror(errno)); + } + + if (iteration == 4) { + zmq_disconnect(subSocket, "inproc://someInProcDescriptor") && printf("zmq_disconnect(%d): %s\n", errno, zmq_strerror(errno)); + //zmq_disconnect(subSocket, "tcp://127.0.0.1:30010") && printf("zmq_disconnect: %s\n", zmq_strerror(errno)); + } + + if (iteration == 10) { + break; + } + + zmq_msg_t channelEnvlp; + ZMQ_PREPARE_STRING(channelEnvlp, "foo", 3); + zmq_sendmsg(pubSocket, &channelEnvlp, ZMQ_SNDMORE) >= 0 || printf("zmq_sendmsg: %s\n",zmq_strerror(errno)); + zmq_msg_close(&channelEnvlp) && printf("zmq_msg_close: %s\n",zmq_strerror(errno)); + + zmq_msg_t message; + ZMQ_PREPARE_STRING(message, "this is foo!", 12); + zmq_sendmsg(pubSocket, &message, 0) >= 0 || printf("zmq_sendmsg: %s\n",zmq_strerror(errno)); + zmq_msg_close(&message) && printf("zmq_msg_close: %s\n",zmq_strerror(errno)); + + iteration++; + } + + assert(publicationsReceived == 3); + assert(!isSubscribed); + + zmq_close(pubSocket) && printf("zmq_close: %s", zmq_strerror(errno)); + zmq_close(subSocket) && printf("zmq_close: %s", zmq_strerror(errno)); + + zmq_ctx_destroy(context); + return 0; +} \ No newline at end of file From b0563c21039cfaec805cf3ae31bca1ab111bc7d8 Mon Sep 17 00:00:00 2001 From: Stefan Radomski Date: Tue, 4 Dec 2012 13:52:23 +0100 Subject: [PATCH 2/3] Set errno and update documentation on zmq_disconnect --- doc/zmq_disconnect.txt | 2 ++ src/socket_base.cpp | 8 ++++++-- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/doc/zmq_disconnect.txt b/doc/zmq_disconnect.txt index 7c8ca4d8..a63fbefb 100644 --- a/doc/zmq_disconnect.txt +++ b/doc/zmq_disconnect.txt @@ -34,6 +34,8 @@ The endpoint supplied is invalid. The 0MQ 'context' associated with the specified 'socket' was terminated. *ENOTSOCK*:: The provided 'socket' was invalid. +*ENOTCONN*:: +The provided endpoint is not connected. EXAMPLE diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 12c9170f..df413789 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -602,8 +602,10 @@ int zmq::socket_base_t::term_endpoint (const char *addr_) // Disconnect an inproc socket if (protocol == "inproc") { std::pair range = inprocs.equal_range (std::string (addr_)); - if (range.first == range.second) + if (range.first == range.second) { + errno = ENOTCONN; return -1; + } for (inprocs_t::iterator it = range.first; it != range.second; ++it) it->second->terminate(true); @@ -614,8 +616,10 @@ int zmq::socket_base_t::term_endpoint (const char *addr_) // Find the endpoints range (if any) corresponding to the addr_ string. std::pair range = endpoints.equal_range (std::string (addr_)); - if (range.first == range.second) + if (range.first == range.second) { + errno = ENOTCONN; return -1; + } for (endpoints_t::iterator it = range.first; it != range.second; ++it) term_child (it->second); From 8e6fdc56e1f1ca7692ee3694a732f002fcb81d1b Mon Sep 17 00:00:00 2001 From: Stefan Radomski Date: Tue, 4 Dec 2012 14:14:46 +0100 Subject: [PATCH 3/3] Changed errno to ENOENT for disconnecting unconnected endpoints --- doc/zmq_disconnect.txt | 2 +- src/socket_base.cpp | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/doc/zmq_disconnect.txt b/doc/zmq_disconnect.txt index a63fbefb..13a4c2b7 100644 --- a/doc/zmq_disconnect.txt +++ b/doc/zmq_disconnect.txt @@ -34,7 +34,7 @@ The endpoint supplied is invalid. The 0MQ 'context' associated with the specified 'socket' was terminated. *ENOTSOCK*:: The provided 'socket' was invalid. -*ENOTCONN*:: +*ENOENT*:: The provided endpoint is not connected. diff --git a/src/socket_base.cpp b/src/socket_base.cpp index df413789..ba382c74 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -603,7 +603,7 @@ int zmq::socket_base_t::term_endpoint (const char *addr_) if (protocol == "inproc") { std::pair range = inprocs.equal_range (std::string (addr_)); if (range.first == range.second) { - errno = ENOTCONN; + errno = ENOENT; return -1; } @@ -617,7 +617,7 @@ int zmq::socket_base_t::term_endpoint (const char *addr_) // Find the endpoints range (if any) corresponding to the addr_ string. std::pair range = endpoints.equal_range (std::string (addr_)); if (range.first == range.second) { - errno = ENOTCONN; + errno = ENOENT; return -1; }