From 66c22456b97e1278ec0b24dcd1ff66e619d8fca2 Mon Sep 17 00:00:00 2001 From: Stefan Radomski Date: Tue, 4 Dec 2012 15:14:21 +0100 Subject: [PATCH] Close pipes for inproc sockets on zmq_disconnect - fixes LIBZMQ-476 and LIBZMQ-475 --- AUTHORS | 1 + doc/zmq_disconnect.txt | 2 + src/socket_base.cpp | 33 ++++++++- src/socket_base.hpp | 4 ++ tests/Makefile.am | 5 +- tests/test_disconnect_inproc.cpp | 119 +++++++++++++++++++++++++++++++ 6 files changed, 162 insertions(+), 2 deletions(-) create mode 100644 tests/test_disconnect_inproc.cpp diff --git a/AUTHORS b/AUTHORS index 3497eae8..c900ae6c 100644 --- a/AUTHORS +++ b/AUTHORS @@ -66,6 +66,7 @@ Pieter Hintjens Piotr Trojanek Robert G. Jakabosky Sebastian Otaegui +Stefan Radomski Steven McCoy Stuart Webster Tamara Kustarova diff --git a/doc/zmq_disconnect.txt b/doc/zmq_disconnect.txt index 7c8ca4d8..13a4c2b7 100644 --- a/doc/zmq_disconnect.txt +++ b/doc/zmq_disconnect.txt @@ -34,6 +34,8 @@ The endpoint supplied is invalid. The 0MQ 'context' associated with the specified 'socket' was terminated. *ENOTSOCK*:: The provided 'socket' was invalid. +*ENOENT*:: +The provided endpoint is not connected. EXAMPLE diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 2d853976..dc792f61 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -478,6 +478,10 @@ int zmq::socket_base_t::connect (const char *addr_) // Save last endpoint URI options.last_endpoint.assign (addr_); + // remember inproc connections for disconnect + inprocs.insert (inprocs_t::value_type (std::string (addr_), pipes[0])); + inprocs.insert (inprocs_t::value_type (std::string (addr_), pipes[1])); + return 0; } @@ -584,10 +588,37 @@ int zmq::socket_base_t::term_endpoint (const char *addr_) if (unlikely (rc != 0)) return -1; + // Parse addr_ string. + std::string protocol; + std::string address; + rc = parse_uri (addr_, protocol, address); + if (rc != 0) + return -1; + + rc = check_protocol (protocol); + if (rc != 0) + return -1; + + // Disconnect an inproc socket + if (protocol == "inproc") { + std::pair range = inprocs.equal_range (std::string (addr_)); + if (range.first == range.second) { + errno = ENOENT; + return -1; + } + + for (inprocs_t::iterator it = range.first; it != range.second; ++it) + it->second->terminate(true); + inprocs.erase (range.first, range.second); + return 0; + } + // Find the endpoints range (if any) corresponding to the addr_ string. std::pair range = endpoints.equal_range (std::string (addr_)); - if (range.first == range.second) + if (range.first == range.second) { + errno = ENOENT; return -1; + } for (endpoints_t::iterator it = range.first; it != range.second; ++it) term_child (it->second); diff --git a/src/socket_base.hpp b/src/socket_base.hpp index 2d5f259d..5192bd70 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -170,6 +170,10 @@ namespace zmq typedef std::multimap endpoints_t; endpoints_t endpoints; + // Map of open inproc endpoints. + typedef std::multimap inprocs_t; + inprocs_t inprocs; + // To be called after processing commands or invoking any command // handlers explicitly. If required, it will deallocate the socket. void check_destroy (); diff --git a/tests/Makefile.am b/tests/Makefile.am index e35ea470..72fe279e 100644 --- a/tests/Makefile.am +++ b/tests/Makefile.am @@ -18,7 +18,9 @@ noinst_PROGRAMS = test_pair_inproc \ test_term_endpoint \ test_monitor \ test_router_mandatory \ - test_raw_sock + test_raw_sock \ + test_disconnect_inproc + if !ON_MINGW noinst_PROGRAMS += test_shutdown_stress \ @@ -43,6 +45,7 @@ test_term_endpoint_SOURCES = test_term_endpoint.cpp test_monitor_SOURCES = test_monitor.cpp test_router_mandatory_SOURCES = test_router_mandatory.cpp test_raw_sock_SOURCES = test_raw_sock.cpp +test_disconnect_inproc_SOURCES = test_disconnect_inproc.cpp if !ON_MINGW test_shutdown_stress_SOURCES = test_shutdown_stress.cpp diff --git a/tests/test_disconnect_inproc.cpp b/tests/test_disconnect_inproc.cpp new file mode 100644 index 00000000..33b42160 --- /dev/null +++ b/tests/test_disconnect_inproc.cpp @@ -0,0 +1,119 @@ +#include +#include +#include +#include + +/// Initialize a zeromq message with a given null-terminated string +#define ZMQ_PREPARE_STRING(msg, data, size) \ +zmq_msg_init(&msg) && printf("zmq_msg_init: %s\n", zmq_strerror(errno)); \ +zmq_msg_init_size (&msg, size + 1) && printf("zmq_msg_init_size: %s\n",zmq_strerror(errno)); \ +memcpy(zmq_msg_data(&msg), data, size + 1); + +int publicationsReceived = 0; +bool isSubscribed = false; + +int main(int argc, char** argv) { + void* context = zmq_ctx_new(); + void* pubSocket; + void* subSocket; + + (pubSocket = zmq_socket(context, ZMQ_XPUB)) || printf("zmq_socket: %s\n", zmq_strerror(errno)); + (subSocket = zmq_socket(context, ZMQ_SUB)) || printf("zmq_socket: %s\n", zmq_strerror(errno)); + zmq_setsockopt(subSocket, ZMQ_SUBSCRIBE, "foo", 3) && printf("zmq_setsockopt: %s\n",zmq_strerror(errno)); + + zmq_bind(pubSocket, "inproc://someInProcDescriptor") && printf("zmq_bind: %s\n", zmq_strerror(errno)); + //zmq_bind(pubSocket, "tcp://*:30010") && printf("zmq_bind: %s\n", zmq_strerror(errno)); + + int32_t more; + size_t more_size = sizeof(more); + int iteration = 0; + + while(1) { + zmq_pollitem_t items [] = { + { subSocket, 0, ZMQ_POLLIN, 0 }, // read publications + { pubSocket, 0, ZMQ_POLLIN, 0 }, // read subscriptions + }; + zmq_poll(items, 2, 500); + + if (items[1].revents & ZMQ_POLLIN) { + while (1) { + zmq_msg_t msg; + zmq_msg_init (&msg); + zmq_msg_recv (&msg, pubSocket, 0); + int msgSize = zmq_msg_size(&msg); + char* buffer = (char*)zmq_msg_data(&msg); + + if (buffer[0] == 0) { + assert(isSubscribed); + printf("unsubscribing from '%s'\n", strndup(buffer + 1, msgSize - 1)); + isSubscribed = false; + } else { + assert(!isSubscribed); + printf("subscribing on '%s'\n", strndup(buffer + 1, msgSize - 1)); + isSubscribed = true; + } + + zmq_getsockopt (pubSocket, ZMQ_RCVMORE, &more, &more_size); + zmq_msg_close (&msg); + + if (!more) + break; // Last message part + } + } + + if (items[0].revents & ZMQ_POLLIN) { + while (1) { + zmq_msg_t msg; + zmq_msg_init (&msg); + zmq_msg_recv (&msg, subSocket, 0); + int msgSize = zmq_msg_size(&msg); + char* buffer = (char*)zmq_msg_data(&msg); + + printf("received on subscriber '%s'\n", strndup(buffer, msgSize)); + + zmq_getsockopt (subSocket, ZMQ_RCVMORE, &more, &more_size); + zmq_msg_close (&msg); + + if (!more) { + publicationsReceived++; + break; // Last message part + } + } + } + + if (iteration == 1) { + zmq_connect(subSocket, "inproc://someInProcDescriptor") && printf("zmq_connect: %s\n", zmq_strerror(errno)); + //zmq_connect(subSocket, "tcp://127.0.0.1:30010") && printf("zmq_connect: %s\n", zmq_strerror(errno)); + } + + if (iteration == 4) { + zmq_disconnect(subSocket, "inproc://someInProcDescriptor") && printf("zmq_disconnect(%d): %s\n", errno, zmq_strerror(errno)); + //zmq_disconnect(subSocket, "tcp://127.0.0.1:30010") && printf("zmq_disconnect: %s\n", zmq_strerror(errno)); + } + + if (iteration == 10) { + break; + } + + zmq_msg_t channelEnvlp; + ZMQ_PREPARE_STRING(channelEnvlp, "foo", 3); + zmq_sendmsg(pubSocket, &channelEnvlp, ZMQ_SNDMORE) >= 0 || printf("zmq_sendmsg: %s\n",zmq_strerror(errno)); + zmq_msg_close(&channelEnvlp) && printf("zmq_msg_close: %s\n",zmq_strerror(errno)); + + zmq_msg_t message; + ZMQ_PREPARE_STRING(message, "this is foo!", 12); + zmq_sendmsg(pubSocket, &message, 0) >= 0 || printf("zmq_sendmsg: %s\n",zmq_strerror(errno)); + zmq_msg_close(&message) && printf("zmq_msg_close: %s\n",zmq_strerror(errno)); + + iteration++; + } + + assert(publicationsReceived == 3); + assert(!isSubscribed); + + zmq_close(pubSocket) && printf("zmq_close: %s", zmq_strerror(errno)); + zmq_close(subSocket) && printf("zmq_close: %s", zmq_strerror(errno)); + + zmq_ctx_destroy(context); + return 0; +} \ No newline at end of file