From 64e1c181f71299614975b2809e112803719221b1 Mon Sep 17 00:00:00 2001 From: Richard Newton Date: Tue, 10 Sep 2013 13:30:00 +0100 Subject: [PATCH] Implement non-blocking shutdown command that unblocks other threads waiting on blocking operations. --- CMakeLists.txt | 1 + doc/zmq_ctx_shutdown.txt | 52 ++++++++++++++++++++++ include/zmq.h | 1 + src/ctx.cpp | 19 ++++++++ src/ctx.hpp | 9 ++++ src/zmq.cpp | 10 +++++ tests/Makefile.am | 1 + tests/test_ctx_destroy.cpp | 90 ++++++++++++++++++++++++++++++++++++++ 8 files changed, 183 insertions(+) create mode 100644 doc/zmq_ctx_shutdown.txt create mode 100644 tests/test_ctx_destroy.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index a258c084..cfceada2 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -581,6 +581,7 @@ set(tests test_system test_connect_delay test_connect_resolve + test_ctx_destroy test_ctx_options test_disconnect_inproc test_hwm diff --git a/doc/zmq_ctx_shutdown.txt b/doc/zmq_ctx_shutdown.txt new file mode 100644 index 00000000..0ddd760b --- /dev/null +++ b/doc/zmq_ctx_shutdown.txt @@ -0,0 +1,52 @@ +zmq_ctx_shutdown(3) +================== + + +NAME +---- +zmq_ctx_shutdown - shutdown a 0MQ context + + +SYNOPSIS +-------- +*int zmq_ctx_shutdown (void '*context');* + + +DESCRIPTION +----------- +The _zmq_ctx_shutdown()_ function shall shutdown the 0MQ context 'context'. + +Context shutdown will cause any blocking operations currently in progress on +sockets open within 'context' to return immediately with an error code of ETERM. +With the exception of _zmq_close()_, any further operations on sockets open within +'context' shall fail with an error code of ETERM. + +This function is optional, client code is still required to call the linkzmq:zmq_ctx_term[3] +function to free all resources allocated by zeromq. + + +RETURN VALUE +------------ +The _zmq_ctx_shutdown()_ function shall return zero if successful. Otherwise +it shall return `-1` and set 'errno' to one of the values defined below. + + +ERRORS +------ +*EFAULT*:: +The provided 'context' was invalid. + + +SEE ALSO +-------- +linkzmq:zmq[7] +linkzmq:zmq_init[3] +linkzmq:zmq_ctx_term[3] +linkzmq:zmq_close[3] +linkzmq:zmq_setsockopt[3] + + +AUTHORS +------- +This page was written by the 0MQ community. To make a change please +read the 0MQ Contribution Policy at . diff --git a/include/zmq.h b/include/zmq.h index d76f49ce..a1c6c6e0 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -178,6 +178,7 @@ ZMQ_EXPORT const char *zmq_strerror (int errnum); ZMQ_EXPORT void *zmq_ctx_new (void); ZMQ_EXPORT int zmq_ctx_term (void *context); +ZMQ_EXPORT int zmq_ctx_shutdown (void *ctx_); ZMQ_EXPORT int zmq_ctx_set (void *context, int option, int optval); ZMQ_EXPORT int zmq_ctx_get (void *context, int option); diff --git a/src/ctx.cpp b/src/ctx.cpp index 3723c28a..c6e8e57a 100644 --- a/src/ctx.cpp +++ b/src/ctx.cpp @@ -143,6 +143,25 @@ int zmq::ctx_t::terminate () return 0; } +int zmq::ctx_t::shutdown () +{ + slot_sync.lock (); + if (!starting && !terminating) { + terminating = true; + + // Send stop command to sockets so that any blocking calls + // can be interrupted. If there are no sockets we can ask reaper + // thread to stop. + for (sockets_t::size_type i = 0; i != sockets.size (); i++) + sockets [i]->stop (); + if (sockets.empty ()) + reaper->stop (); + } + slot_sync.unlock (); + + return 0; +} + int zmq::ctx_t::set (int option_, int optval_) { int rc = 0; diff --git a/src/ctx.hpp b/src/ctx.hpp index 05e0d297..bb498a65 100644 --- a/src/ctx.hpp +++ b/src/ctx.hpp @@ -69,6 +69,15 @@ namespace zmq // after the last one is closed. int terminate (); + // This function starts the terminate process by unblocking any blocking + // operations currently in progress and stopping any more socket activity + // (except zmq_close). + // This function is non-blocking. + // terminate must still be called afterwards. + // This function is optional, terminate will unblock any current + // operations as well. + int shutdown(); + // Set and get context properties. int set (int option_, int optval_); int get (int option_); diff --git a/src/zmq.cpp b/src/zmq.cpp index ff3e60fa..d9116285 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -190,6 +190,16 @@ int zmq_ctx_term (void *ctx_) return rc; } +int zmq_ctx_shutdown (void *ctx_) +{ + if (!ctx_ || !((zmq::ctx_t*) ctx_)->check_tag ()) { + errno = EFAULT; + return -1; + } + + return ((zmq::ctx_t*) ctx_)->shutdown (); +} + int zmq_ctx_set (void *ctx_, int option_, int optval_) { if (!ctx_ || !((zmq::ctx_t*) ctx_)->check_tag ()) { diff --git a/tests/Makefile.am b/tests/Makefile.am index bb7e7dc7..3efe2454 100644 --- a/tests/Makefile.am +++ b/tests/Makefile.am @@ -24,6 +24,7 @@ noinst_PROGRAMS = test_system \ test_stream \ test_disconnect_inproc \ test_ctx_options \ + test_ctx_destroy \ test_security_null \ test_security_plain \ test_security_curve \ diff --git a/tests/test_ctx_destroy.cpp b/tests/test_ctx_destroy.cpp new file mode 100644 index 00000000..ce31e42a --- /dev/null +++ b/tests/test_ctx_destroy.cpp @@ -0,0 +1,90 @@ +/* + Copyright (c) 2007-2013 Contributors as noted in the AUTHORS file + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include "../include/zmq.h" +#include "../include/zmq_utils.h" +#include +#include "testutil.hpp" + +static void receiver (void *socket) +{ + char buffer[16]; + int rc = zmq_recv (socket, &buffer, sizeof (buffer), 0); + assert(rc == -1); +} + +void test_ctx_destroy() +{ + int rc; + + // Set up our context and sockets + void *ctx = zmq_ctx_new (); + assert (ctx); + + void *socket = zmq_socket (ctx, ZMQ_PULL); + assert (socket); + + // Close the socket + rc = zmq_close (socket); + assert (rc == 0); + + // Destroy the context + rc = zmq_ctx_destroy (ctx); + assert (rc == 0); +} + +void test_ctx_shutdown() +{ + int rc; + + // Set up our context and sockets + void *ctx = zmq_ctx_new (); + assert (ctx); + + void *socket = zmq_socket (ctx, ZMQ_PULL); + assert (socket); + + // Spawn a thread to receive on socket + void *receiver_thread = zmq_threadstart (&receiver, socket); + + // Shutdown context, if we used destroy here we would deadlock. + rc = zmq_ctx_shutdown (ctx); + assert (rc == 0); + + // Wait for thread to finish + zmq_threadclose (receiver_thread); + + // Close the socket. + rc = zmq_close (socket); + assert (rc == 0); + + // Destory the context, will now not hang as we have closed the socket. + rc = zmq_ctx_destroy (ctx); + assert (rc == 0); +} + +int main (void) +{ + setup_test_environment(); + + test_ctx_destroy(); + test_ctx_shutdown(); + + return 0; +}