diff --git a/perf/c/local_lat.c b/perf/c/local_lat.c index 81a2e0c8..92cfadf5 100644 --- a/perf/c/local_lat.c +++ b/perf/c/local_lat.c @@ -35,13 +35,13 @@ int main (int argc, char *argv []) struct zmq_msg_t msg; if (argc != 4) { - printf ("usage: local_lat " - "\n"); + printf ("usage: local_lat " + "\n"); return 1; } bind_to = argv [1]; - roundtrip_count = atoi (argv [2]); - message_size = atoi (argv [3]); + message_size = atoi (argv [2]); + roundtrip_count = atoi (argv [3]); ctx = zmq_init (1, 1); assert (ctx); @@ -68,6 +68,9 @@ int main (int argc, char *argv []) sleep (1); + rc = zmq_close (s); + assert (rc == 0); + rc = zmq_term (ctx); assert (rc == 0); diff --git a/perf/c/local_thr.c b/perf/c/local_thr.c index 64c492df..71ed21cf 100644 --- a/perf/c/local_thr.c +++ b/perf/c/local_thr.c @@ -41,13 +41,12 @@ int main (int argc, char *argv []) double megabits; if (argc != 4) { - printf ("usage: local_thr " - "\n"); + printf ("usage: local_thr \n"); return 1; } bind_to = argv [1]; - message_count = atoi (argv [2]); - message_size = atoi (argv [3]); + message_size = atoi (argv [2]); + message_count = atoi (argv [3]); ctx = zmq_init (1, 1); assert (ctx); @@ -92,6 +91,9 @@ int main (int argc, char *argv []) printf ("mean throughput: %d [msg/s]\n", (int) throughput); printf ("mean throughput: %.3f [Mb/s]\n", (double) megabits); + rc = zmq_close (s); + assert (rc == 0); + rc = zmq_term (ctx); assert (rc == 0); diff --git a/perf/c/remote_lat.c b/perf/c/remote_lat.c index 32329b88..6da1c420 100644 --- a/perf/c/remote_lat.c +++ b/perf/c/remote_lat.c @@ -39,13 +39,13 @@ int main (int argc, char *argv []) double latency; if (argc != 4) { - printf ("usage: remote_lat " - "\n"); + printf ("usage: remote_lat " + "\n"); return 1; } connect_to = argv [1]; - roundtrip_count = atoi (argv [2]); - message_size = atoi (argv [3]); + message_size = atoi (argv [2]); + roundtrip_count = atoi (argv [3]); ctx = zmq_init (1, 1); assert (ctx); @@ -87,6 +87,9 @@ int main (int argc, char *argv []) printf ("roundtrip count: %d\n", (int) roundtrip_count); printf ("average latency: %.3f [us]\n", (double) latency); + rc = zmq_close (s); + assert (rc == 0); + rc = zmq_term (ctx); assert (rc == 0); diff --git a/perf/c/remote_thr.c b/perf/c/remote_thr.c index 1010bc97..9606d00f 100644 --- a/perf/c/remote_thr.c +++ b/perf/c/remote_thr.c @@ -35,13 +35,13 @@ int main (int argc, char *argv []) struct zmq_msg_t msg; if (argc != 4) { - printf ("usage: remote_thr " - "\n"); + printf ("usage: remote_thr " + "\n"); return 1; } connect_to = argv [1]; - message_count = atoi (argv [2]); - message_size = atoi (argv [3]); + message_size = atoi (argv [2]); + message_count = atoi (argv [3]); ctx = zmq_init (1, 1); assert (ctx); @@ -63,6 +63,9 @@ int main (int argc, char *argv []) sleep (10); + rc = zmq_close (s); + assert (rc == 0); + rc = zmq_term (ctx); assert (rc == 0); diff --git a/perf/cpp/local_lat.cpp b/perf/cpp/local_lat.cpp index 9260f0a7..343ca74c 100644 --- a/perf/cpp/local_lat.cpp +++ b/perf/cpp/local_lat.cpp @@ -27,13 +27,13 @@ int main (int argc, char *argv []) { if (argc != 4) { - printf ("usage: local_lat " - "\n"); + printf ("usage: local_lat " + "\n"); return 1; } const char *bind_to = argv [1]; - int roundtrip_count = atoi (argv [2]); - size_t message_size = (size_t) atoi (argv [3]); + size_t message_size = (size_t) atoi (argv [2]); + int roundtrip_count = atoi (argv [3]); zmq::context_t ctx (1, 1); diff --git a/perf/cpp/local_thr.cpp b/perf/cpp/local_thr.cpp index 3e961dea..ca81ba90 100644 --- a/perf/cpp/local_thr.cpp +++ b/perf/cpp/local_thr.cpp @@ -28,13 +28,13 @@ int main (int argc, char *argv []) { if (argc != 4) { - printf ("usage: local_thr " - "\n"); + printf ("usage: local_thr " + "\n"); return 1; } const char *bind_to = argv [1]; - int message_count = atoi (argv [2]); - size_t message_size = (size_t) atoi (argv [3]); + size_t message_size = (size_t) atoi (argv [2]); + int message_count = atoi (argv [3]); zmq::context_t ctx (1, 1); diff --git a/perf/cpp/remote_lat.cpp b/perf/cpp/remote_lat.cpp index 169ed1e5..c3ded102 100644 --- a/perf/cpp/remote_lat.cpp +++ b/perf/cpp/remote_lat.cpp @@ -27,13 +27,13 @@ int main (int argc, char *argv []) { if (argc != 4) { - printf ("usage: remote_lat " - "\n"); + printf ("usage: remote_lat " + "\n"); return 1; } const char *connect_to = argv [1]; - int roundtrip_count = atoi (argv [2]); - size_t message_size = (size_t) atoi (argv [3]); + size_t message_size = (size_t) atoi (argv [2]); + int roundtrip_count = atoi (argv [3]); zmq::context_t ctx (1, 1); diff --git a/perf/cpp/remote_thr.cpp b/perf/cpp/remote_thr.cpp index 06946f57..5474c6ad 100644 --- a/perf/cpp/remote_thr.cpp +++ b/perf/cpp/remote_thr.cpp @@ -27,13 +27,13 @@ int main (int argc, char *argv []) { if (argc != 4) { - printf ("usage: remote_thr " - "\n"); + printf ("usage: remote_thr " + "\n"); return 1; } const char *connect_to = argv [1]; - int message_count = atoi (argv [2]); - size_t message_size = (size_t) atoi (argv [3]); + size_t message_size = (size_t) atoi (argv [2]); + int message_count = atoi (argv [3]); zmq::context_t ctx (1, 1); diff --git a/perf/python/local_lat.py b/perf/python/local_lat.py index 7f1503fb..e9d46e05 100644 --- a/perf/python/local_lat.py +++ b/perf/python/local_lat.py @@ -23,13 +23,13 @@ import libpyzmq def main (): if len (sys.argv) != 4: - print 'usage: local_lat ' + print 'usage: local_lat ' sys.exit (1) try: bind_to = sys.argv [1] - roundtrip_count = int (sys.argv [2]) - message_size = int (sys.argv [3]) + message_size = int (sys.argv [2]) + roundtrip_count = int (sys.argv [3]) except (ValueError, OverflowError), e: print 'message-size and roundtrip-count must be integers' sys.exit (1) diff --git a/perf/python/remote_lat.py b/perf/python/remote_lat.py index 372f5670..f2ee04a9 100644 --- a/perf/python/remote_lat.py +++ b/perf/python/remote_lat.py @@ -23,7 +23,7 @@ import libpyzmq def main (): if len(sys.argv) != 4: - print 'usage: remote_lat ' + print 'usage: remote_lat ' sys.exit (1) try: @@ -49,7 +49,7 @@ def main (): end = datetime.now () delta = (end - start).microseconds + 1000000 * (end - start).seconds - latency = delta / roundtrip_count / 2 + latency = float (delta) / roundtrip_count / 2 print "message size: %.0f [B]" % (message_size, ) print "roundtrip count: %.0f" % (roundtrip_count, ) diff --git a/perf/python/remote_thr.py b/perf/python/remote_thr.py index a80adfd5..bab001dd 100644 --- a/perf/python/remote_thr.py +++ b/perf/python/remote_thr.py @@ -27,7 +27,7 @@ def main (): sys.exit (1) try: - connect_to = argv [1] + connect_to = sys.argv [1] message_size = int (sys.argv [2]) message_count = int (sys.argv [3]) except (ValueError, OverflowError), e: diff --git a/src/app_thread.cpp b/src/app_thread.cpp index e1085948..58fe19d2 100644 --- a/src/app_thread.cpp +++ b/src/app_thread.cpp @@ -51,9 +51,7 @@ zmq::app_thread_t::app_thread_t (dispatcher_t *dispatcher_, int thread_slot_) : zmq::app_thread_t::~app_thread_t () { - // Destroy all the sockets owned by this application thread. - for (sockets_t::iterator it = sockets.begin (); it != sockets.end (); it ++) - delete *it; + zmq_assert (sockets.empty ()); } zmq::i_signaler *zmq::app_thread_t::get_signaler () diff --git a/src/dispatcher.cpp b/src/dispatcher.cpp index 71e20df3..49c21972 100644 --- a/src/dispatcher.cpp +++ b/src/dispatcher.cpp @@ -30,7 +30,9 @@ #include "windows.h" #endif -zmq::dispatcher_t::dispatcher_t (int app_threads_, int io_threads_) +zmq::dispatcher_t::dispatcher_t (int app_threads_, int io_threads_) : + sockets (0), + terminated (false) { #ifdef ZMQ_HAVE_WINDOWS // Intialise Windows sockets. Note that WSAStartup can be called multiple @@ -68,6 +70,20 @@ zmq::dispatcher_t::dispatcher_t (int app_threads_, int io_threads_) io_threads [i]->start (); } +int zmq::dispatcher_t::term () +{ + term_sync.lock (); + zmq_assert (!terminated); + terminated = true; + bool destroy = (sockets == 0); + term_sync.unlock (); + + if (destroy) + delete this; + + return 0; +} + zmq::dispatcher_t::~dispatcher_t () { // Close all application theads, sockets, io_objects etc. @@ -111,9 +127,27 @@ zmq::socket_base_t *zmq::dispatcher_t::create_socket (int type_) } threads_sync.unlock (); + term_sync.lock (); + sockets++; + term_sync.unlock (); + return thread->create_socket (type_); } +void zmq::dispatcher_t::destroy_socket () +{ + // If zmq_term was already called and there are no more sockets, + // terminate the whole 0MQ infrastructure. + term_sync.lock (); + zmq_assert (sockets > 0); + sockets--; + bool destroy = (sockets == 0 && terminated); + term_sync.unlock (); + + if (destroy) + delete this; +} + zmq::app_thread_t *zmq::dispatcher_t::choose_app_thread () { // Check whether thread ID is already assigned. If so, return it. diff --git a/src/dispatcher.hpp b/src/dispatcher.hpp index cb445efe..bd1f6557 100644 --- a/src/dispatcher.hpp +++ b/src/dispatcher.hpp @@ -52,12 +52,18 @@ namespace zmq // signalers. dispatcher_t (int app_threads_, int io_threads_); - // To be called to terminate the whole infrastructure (zmq_term). - ~dispatcher_t (); + // This function is called when user invokes zmq_term. If there are + // no more sockets open it'll cause all the infrastructure to be shut + // down. If there are open sockets still, the deallocation happens + // after the last one is closed. + int term (); // Create a socket. class socket_base_t *create_socket (int type_); + // Destroy a socket. + void destroy_socket (); + // Returns number of thread slots in the dispatcher. To be used by // individual threads to find out how many distinct signals can be // received. @@ -93,6 +99,8 @@ namespace zmq private: + ~dispatcher_t (); + // Returns the app thread associated with the current thread. // NULL if we are out of app thread slots. class app_thread_t *choose_app_thread (); @@ -127,9 +135,20 @@ namespace zmq typedef std::set pipes_t; pipes_t pipes; - // Synchronisation of access to the pipes repository. + // Synchronisation of access to the pipes repository. mutex_t pipes_sync; + // Number of sockets alive. + int sockets; + + // If true, zmq_term was already called. When last socket is closed + // the whole 0MQ infrastructure should be deallocated. + bool terminated; + + // Synchronisation of access to the termination data (socket count + // and 'terminated' flag). + mutex_t term_sync; + dispatcher_t (const dispatcher_t&); void operator = (const dispatcher_t&); }; diff --git a/src/object.cpp b/src/object.cpp index c0ef21cc..1433b7bd 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -53,6 +53,11 @@ int zmq::object_t::get_thread_slot () return thread_slot; } +zmq::dispatcher_t *zmq::object_t::get_dispatcher () +{ + return dispatcher; +} + void zmq::object_t::process_command (command_t &cmd_) { switch (cmd_.type) { diff --git a/src/object.hpp b/src/object.hpp index 250e8568..2e415074 100644 --- a/src/object.hpp +++ b/src/object.hpp @@ -40,6 +40,7 @@ namespace zmq ~object_t (); int get_thread_slot (); + dispatcher_t *get_dispatcher (); void process_command (struct command_t &cmd_); // Allow pipe to access corresponding dispatcher functions. diff --git a/src/session.cpp b/src/session.cpp index ac2dd127..bc334e00 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -54,7 +54,12 @@ bool zmq::session_t::read (::zmq_msg_t *msg_) bool zmq::session_t::write (::zmq_msg_t *msg_) { - return out_pipe->write (msg_); + if (out_pipe->write (msg_)) { + zmq_msg_init (msg_); + return true; + } + + return false; } void zmq::session_t::flush () diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 6ad1f551..93a0a4c3 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -24,7 +24,7 @@ #include "socket_base.hpp" #include "app_thread.hpp" -#include "err.hpp" +#include "dispatcher.hpp" #include "zmq_listener.hpp" #include "zmq_connecter.hpp" #include "msg_content.hpp" @@ -34,6 +34,7 @@ #include "owned.hpp" #include "uuid.hpp" #include "pipe.hpp" +#include "err.hpp" zmq::socket_base_t::socket_base_t (app_thread_t *parent_) : object_t (parent_), @@ -288,7 +289,16 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_) int zmq::socket_base_t::close () { app_thread->remove_socket (this); + + // Pointer to the dispatcher must be retrieved before the socket is + // deallocated. Afterwards it is not available. + dispatcher_t *dispatcher = get_dispatcher (); delete this; + + // This function must be called after the socket is completely deallocated + // as it may cause termination of the whole 0MQ infrastructure. + dispatcher->destroy_socket (); + return 0; } diff --git a/src/zmq.cpp b/src/zmq.cpp index 49096ad6..0ffd530d 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -183,8 +183,7 @@ void *zmq_init (int app_threads_, int io_threads_) int zmq_term (void *dispatcher_) { - delete (zmq::dispatcher_t*) dispatcher_; - return 0; + return ((zmq::dispatcher_t*) dispatcher_)->term (); } void *zmq_socket (void *dispatcher_, int type_)