diff --git a/src/mutex.hpp b/src/mutex.hpp index 82b23632..db5056e4 100644 --- a/src/mutex.hpp +++ b/src/mutex.hpp @@ -181,6 +181,33 @@ namespace zmq scoped_lock_t (const scoped_lock_t&); const scoped_lock_t &operator = (const scoped_lock_t&); }; + + + struct scoped_optional_lock_t + { + scoped_optional_lock_t (mutex_t* mutex_) + : mutex (mutex_) + { + if(mutex != 0) + mutex->lock (); + } + + ~scoped_optional_lock_t () + { + if(mutex != 0) + mutex->unlock (); + } + + private: + + mutex_t* mutex; + + // Disable copy construction and assignment. + scoped_optional_lock_t (const scoped_lock_t&); + const scoped_optional_lock_t &operator = (const scoped_lock_t&); + }; + + } #endif diff --git a/src/socket_base.cpp b/src/socket_base.cpp index a7d92c72..97b5ac70 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -97,13 +97,7 @@ #include "scatter.hpp" #include "dgram.hpp" -#define ENTER_MUTEX() \ - if (thread_safe) \ - sync.lock(); -#define EXIT_MUTEX(); \ - if (thread_safe) \ - sync.unlock(); bool zmq::socket_base_t::check_tag () { @@ -338,24 +332,21 @@ void zmq::socket_base_t::attach_pipe (pipe_t *pipe_, bool subscribe_to_all_) int zmq::socket_base_t::setsockopt (int option_, const void *optval_, size_t optvallen_) { - ENTER_MUTEX (); + scoped_optional_lock_t sync_lock(thread_safe ? &sync : 0); if (!options.is_valid(option_)) { errno = EINVAL; - EXIT_MUTEX (); return -1; } if (unlikely (ctx_terminated)) { errno = ETERM; - EXIT_MUTEX (); return -1; } // First, check whether specific socket type overloads the option. int rc = xsetsockopt (option_, optval_, optvallen_); if (rc == 0 || errno != EINVAL) { - EXIT_MUTEX (); return rc; } @@ -364,64 +355,55 @@ int zmq::socket_base_t::setsockopt (int option_, const void *optval_, rc = options.setsockopt (option_, optval_, optvallen_); update_pipe_options(option_); - EXIT_MUTEX (); return rc; } int zmq::socket_base_t::getsockopt (int option_, void *optval_, size_t *optvallen_) { - ENTER_MUTEX (); + scoped_optional_lock_t sync_lock(thread_safe ? &sync : 0); if (unlikely (ctx_terminated)) { errno = ETERM; - EXIT_MUTEX (); return -1; } if (option_ == ZMQ_RCVMORE) { if (*optvallen_ < sizeof (int)) { errno = EINVAL; - EXIT_MUTEX (); return -1; } memset(optval_, 0, *optvallen_); *((int*) optval_) = rcvmore ? 1 : 0; *optvallen_ = sizeof (int); - EXIT_MUTEX (); return 0; } if (option_ == ZMQ_FD) { if (*optvallen_ < sizeof (fd_t)) { errno = EINVAL; - EXIT_MUTEX (); return -1; } if (thread_safe) { // thread safe socket doesn't provide file descriptor errno = EINVAL; - EXIT_MUTEX (); return -1; } *((fd_t*)optval_) = ((mailbox_t*)mailbox)->get_fd(); *optvallen_ = sizeof(fd_t); - EXIT_MUTEX (); return 0; } if (option_ == ZMQ_EVENTS) { if (*optvallen_ < sizeof (int)) { errno = EINVAL; - EXIT_MUTEX (); return -1; } int rc = process_commands (0, false); if (rc != 0 && (errno == EINTR || errno == ETERM)) { - EXIT_MUTEX (); return -1; } errno_assert (rc == 0); @@ -431,108 +413,94 @@ int zmq::socket_base_t::getsockopt (int option_, void *optval_, if (has_in ()) *((int*) optval_) |= ZMQ_POLLIN; *optvallen_ = sizeof (int); - EXIT_MUTEX (); return 0; } if (option_ == ZMQ_LAST_ENDPOINT) { if (*optvallen_ < last_endpoint.size () + 1) { errno = EINVAL; - EXIT_MUTEX (); return -1; } strncpy(static_cast (optval_), last_endpoint.c_str(), last_endpoint.size() + 1); *optvallen_ = last_endpoint.size () + 1; - EXIT_MUTEX (); return 0; } if (option_ == ZMQ_THREAD_SAFE) { if (*optvallen_ < sizeof (int)) { errno = EINVAL; - EXIT_MUTEX (); return -1; } memset(optval_, 0, *optvallen_); *((int*) optval_) = thread_safe ? 1 : 0; *optvallen_ = sizeof (int); - EXIT_MUTEX (); return 0; } int rc = options.getsockopt (option_, optval_, optvallen_); - EXIT_MUTEX (); return rc; } int zmq::socket_base_t::join (const char* group_) { - ENTER_MUTEX (); + scoped_optional_lock_t sync_lock(thread_safe ? &sync : 0); int rc = xjoin (group_); - EXIT_MUTEX(); return rc; } int zmq::socket_base_t::leave (const char* group_) { - ENTER_MUTEX (); + scoped_optional_lock_t sync_lock(thread_safe ? &sync : 0); int rc = xleave (group_); - EXIT_MUTEX(); return rc; } int zmq::socket_base_t::add_signaler(signaler_t *s_) { - ENTER_MUTEX (); + scoped_optional_lock_t sync_lock(thread_safe ? &sync : 0); if (!thread_safe) { errno = EINVAL; - EXIT_MUTEX (); return -1; } ((mailbox_safe_t*)mailbox)->add_signaler(s_); - EXIT_MUTEX (); return 0; } int zmq::socket_base_t::remove_signaler(signaler_t *s_) { - ENTER_MUTEX (); + scoped_optional_lock_t sync_lock(thread_safe ? &sync : 0); if (!thread_safe) { errno = EINVAL; - EXIT_MUTEX (); return -1; } ((mailbox_safe_t*)mailbox)->remove_signaler(s_); - EXIT_MUTEX (); return 0; } int zmq::socket_base_t::bind (const char *addr_) { - ENTER_MUTEX (); + scoped_optional_lock_t sync_lock(thread_safe ? &sync : 0); if (unlikely (ctx_terminated)) { errno = ETERM; - EXIT_MUTEX (); return -1; } // Process pending commands, if any. int rc = process_commands (0, false); if (unlikely (rc != 0)) { - EXIT_MUTEX (); return -1; } @@ -540,7 +508,6 @@ int zmq::socket_base_t::bind (const char *addr_) std::string protocol; std::string address; if (parse_uri (addr_, protocol, address) || check_protocol (protocol)) { - EXIT_MUTEX (); return -1; } @@ -552,14 +519,12 @@ int zmq::socket_base_t::bind (const char *addr_) last_endpoint.assign (addr_); options.connected = true; } - EXIT_MUTEX (); return rc; } if (protocol == "pgm" || protocol == "epgm" || protocol == "norm") { // For convenience's sake, bind can be used interchangeable with // connect for PGM, EPGM, NORM transports. - EXIT_MUTEX (); rc = connect (addr_); if (rc != -1) options.connected = true; @@ -569,7 +534,6 @@ int zmq::socket_base_t::bind (const char *addr_) if (protocol == "udp") { if (!(options.type == ZMQ_DGRAM || options.type == ZMQ_DISH)) { errno = ENOCOMPATPROTO; - EXIT_MUTEX (); return -1; } @@ -577,7 +541,6 @@ int zmq::socket_base_t::bind (const char *addr_) io_thread_t *io_thread = choose_io_thread (options.affinity); if (!io_thread) { errno = EMTHREAD; - EXIT_MUTEX (); return -1; } @@ -589,7 +552,6 @@ int zmq::socket_base_t::bind (const char *addr_) rc = paddr->resolved.udp_addr->resolve (address.c_str(), true); if (rc != 0) { LIBZMQ_DELETE(paddr); - EXIT_MUTEX (); return -1; } @@ -620,7 +582,6 @@ int zmq::socket_base_t::bind (const char *addr_) add_endpoint (addr_, (own_t *) session, newpipe); - EXIT_MUTEX (); return 0; } @@ -629,7 +590,6 @@ int zmq::socket_base_t::bind (const char *addr_) io_thread_t *io_thread = choose_io_thread (options.affinity); if (!io_thread) { errno = EMTHREAD; - EXIT_MUTEX (); return -1; } @@ -641,7 +601,6 @@ int zmq::socket_base_t::bind (const char *addr_) if (rc != 0) { LIBZMQ_DELETE(listener); event_bind_failed (address, zmq_errno()); - EXIT_MUTEX (); return -1; } @@ -650,7 +609,6 @@ int zmq::socket_base_t::bind (const char *addr_) add_endpoint (last_endpoint.c_str (), (own_t *) listener, NULL); options.connected = true; - EXIT_MUTEX (); return 0; } @@ -663,7 +621,6 @@ int zmq::socket_base_t::bind (const char *addr_) if (rc != 0) { LIBZMQ_DELETE(listener); event_bind_failed (address, zmq_errno()); - EXIT_MUTEX (); return -1; } @@ -672,7 +629,6 @@ int zmq::socket_base_t::bind (const char *addr_) add_endpoint (last_endpoint.c_str (), (own_t *) listener, NULL); options.connected = true; - EXIT_MUTEX (); return 0; } #endif @@ -685,7 +641,6 @@ int zmq::socket_base_t::bind (const char *addr_) if (rc != 0) { LIBZMQ_DELETE(listener); event_bind_failed (address, zmq_errno()); - EXIT_MUTEX (); return -1; } @@ -694,7 +649,6 @@ int zmq::socket_base_t::bind (const char *addr_) add_endpoint (addr_, (own_t *) listener, NULL); options.connected = true; - EXIT_MUTEX (); return 0; } #endif @@ -707,7 +661,6 @@ int zmq::socket_base_t::bind (const char *addr_) if (rc != 0) { LIBZMQ_DELETE(listener); event_bind_failed (address, zmq_errno ()); - EXIT_MUTEX (); return -1; } @@ -715,30 +668,26 @@ int zmq::socket_base_t::bind (const char *addr_) add_endpoint (last_endpoint.c_str(), (own_t *) listener, NULL); options.connected = true; - EXIT_MUTEX (); return 0; } #endif - EXIT_MUTEX (); zmq_assert (false); return -1; } int zmq::socket_base_t::connect (const char *addr_) { - ENTER_MUTEX (); + scoped_optional_lock_t sync_lock(thread_safe ? &sync : 0); if (unlikely (ctx_terminated)) { errno = ETERM; - EXIT_MUTEX (); return -1; } // Process pending commands, if any. int rc = process_commands (0, false); if (unlikely (rc != 0)) { - EXIT_MUTEX (); return -1; } @@ -746,7 +695,6 @@ int zmq::socket_base_t::connect (const char *addr_) std::string protocol; std::string address; if (parse_uri (addr_, protocol, address) || check_protocol (protocol)) { - EXIT_MUTEX (); return -1; } @@ -852,7 +800,6 @@ int zmq::socket_base_t::connect (const char *addr_) inprocs.insert (inprocs_t::value_type (std::string (addr_), new_pipes [0])); options.connected = true; - EXIT_MUTEX (); return 0; } bool is_single_connect = (options.type == ZMQ_DEALER || @@ -864,7 +811,6 @@ int zmq::socket_base_t::connect (const char *addr_) // There is no valid use for multiple connects for SUB-PUB nor // DEALER-ROUTER nor REQ-REP. Multiple connects produces // nonsensical results. - EXIT_MUTEX (); return 0; } } @@ -873,7 +819,6 @@ int zmq::socket_base_t::connect (const char *addr_) io_thread_t *io_thread = choose_io_thread (options.affinity); if (!io_thread) { errno = EMTHREAD; - EXIT_MUTEX (); return -1; } @@ -918,7 +863,6 @@ int zmq::socket_base_t::connect (const char *addr_) if (rc == -1) { errno = EINVAL; LIBZMQ_DELETE(paddr); - EXIT_MUTEX (); return -1; } // Defer resolution until a socket is opened @@ -932,7 +876,6 @@ int zmq::socket_base_t::connect (const char *addr_) int rc = paddr->resolved.ipc_addr->resolve (address.c_str ()); if (rc != 0) { LIBZMQ_DELETE(paddr); - EXIT_MUTEX (); return -1; } } @@ -942,7 +885,6 @@ if (protocol == "udp") { if (options.type != ZMQ_RADIO) { errno = ENOCOMPATPROTO; LIBZMQ_DELETE(paddr); - EXIT_MUTEX (); return -1; } @@ -951,7 +893,6 @@ if (protocol == "udp") { rc = paddr->resolved.udp_addr->resolve (address.c_str(), false); if (rc != 0) { LIBZMQ_DELETE(paddr); - EXIT_MUTEX (); return -1; } } @@ -966,7 +907,6 @@ if (protocol == "udp") { if (res != NULL) pgm_freeaddrinfo (res); if (rc != 0 || port_number == 0) { - EXIT_MUTEX (); return -1; } } @@ -979,7 +919,6 @@ if (protocol == "udp") { int rc = paddr->resolved.tipc_addr->resolve (address.c_str()); if (rc != 0) { LIBZMQ_DELETE(paddr); - EXIT_MUTEX (); return -1; } } @@ -992,7 +931,6 @@ if (protocol == "udp") { int rc = paddr->resolved.vmci_addr->resolve (address.c_str ()); if (rc != 0) { LIBZMQ_DELETE(paddr); - EXIT_MUTEX (); return -1; } } @@ -1038,7 +976,6 @@ if (protocol == "udp") { paddr->to_string (last_endpoint); add_endpoint (addr_, (own_t *) session, newpipe); - EXIT_MUTEX (); return 0; } @@ -1051,19 +988,17 @@ void zmq::socket_base_t::add_endpoint (const char *addr_, own_t *endpoint_, pipe int zmq::socket_base_t::term_endpoint (const char *addr_) { - ENTER_MUTEX (); + scoped_optional_lock_t sync_lock(thread_safe ? &sync : 0); // Check whether the library haven't been shut down yet. if (unlikely (ctx_terminated)) { errno = ETERM; - EXIT_MUTEX (); return -1; } // Check whether endpoint address passed to the function is valid. if (unlikely (!addr_)) { errno = EINVAL; - EXIT_MUTEX (); return -1; } @@ -1071,7 +1006,6 @@ int zmq::socket_base_t::term_endpoint (const char *addr_) // (from launch_child() for example) we're asked to terminate now. int rc = process_commands (0, false); if (unlikely(rc != 0)) { - EXIT_MUTEX (); return -1; } @@ -1079,27 +1013,23 @@ int zmq::socket_base_t::term_endpoint (const char *addr_) std::string protocol; std::string address; if (parse_uri(addr_, protocol, address) || check_protocol(protocol)) { - EXIT_MUTEX (); return -1; } // Disconnect an inproc socket if (protocol == "inproc") { if (unregister_endpoint (std::string(addr_), this) == 0) { - EXIT_MUTEX (); return 0; } std::pair range = inprocs.equal_range (std::string (addr_)); if (range.first == range.second) { errno = ENOENT; - EXIT_MUTEX (); return -1; } for (inprocs_t::iterator it = range.first; it != range.second; ++it) it->second->terminate (true); inprocs.erase (range.first, range.second); - EXIT_MUTEX (); return 0; } @@ -1137,7 +1067,6 @@ int zmq::socket_base_t::term_endpoint (const char *addr_) range = endpoints.equal_range (resolved_addr); if (range.first == range.second) { errno = ENOENT; - EXIT_MUTEX (); return -1; } @@ -1148,32 +1077,28 @@ int zmq::socket_base_t::term_endpoint (const char *addr_) term_child (it->second.first); } endpoints.erase (range.first, range.second); - EXIT_MUTEX (); return 0; } int zmq::socket_base_t::send (msg_t *msg_, int flags_) { - ENTER_MUTEX (); + scoped_optional_lock_t sync_lock(thread_safe ? &sync : 0); // Check whether the library haven't been shut down yet. if (unlikely (ctx_terminated)) { errno = ETERM; - EXIT_MUTEX (); return -1; } // Check whether message passed to the function is valid. if (unlikely (!msg_ || !msg_->check ())) { errno = EFAULT; - EXIT_MUTEX (); return -1; } // Process pending commands, if any. int rc = process_commands (0, true); if (unlikely (rc != 0)) { - EXIT_MUTEX (); return -1; } @@ -1189,18 +1114,15 @@ int zmq::socket_base_t::send (msg_t *msg_, int flags_) // Try to send the message using method in each socket class rc = xsend (msg_); if (rc == 0) { - EXIT_MUTEX (); return 0; } if (unlikely (errno != EAGAIN)) { - EXIT_MUTEX (); return -1; } // In case of non-blocking send we'll simply propagate // the error - including EAGAIN - up the stack. if (flags_ & ZMQ_DONTWAIT || options.sndtimeo == 0) { - EXIT_MUTEX (); return -1; } @@ -1214,45 +1136,39 @@ int zmq::socket_base_t::send (msg_t *msg_, int flags_) // If timeout is reached in the meantime, return EAGAIN. while (true) { if (unlikely (process_commands (timeout, false) != 0)) { - EXIT_MUTEX (); return -1; } rc = xsend (msg_); if (rc == 0) break; if (unlikely (errno != EAGAIN)) { - EXIT_MUTEX (); return -1; } if (timeout > 0) { timeout = (int) (end - clock.now_ms ()); if (timeout <= 0) { errno = EAGAIN; - EXIT_MUTEX (); return -1; } } } - EXIT_MUTEX (); return 0; } int zmq::socket_base_t::recv (msg_t *msg_, int flags_) { - ENTER_MUTEX (); + scoped_optional_lock_t sync_lock(thread_safe ? &sync : 0); // Check whether the library haven't been shut down yet. if (unlikely (ctx_terminated)) { errno = ETERM; - EXIT_MUTEX (); return -1; } // Check whether message passed to the function is valid. if (unlikely (!msg_ || !msg_->check ())) { errno = EFAULT; - EXIT_MUTEX (); return -1; } @@ -1266,7 +1182,6 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_) // ticks is more efficient than doing RDTSC all the time. if (++ticks == inbound_poll_rate) { if (unlikely (process_commands (0, false) != 0)) { - EXIT_MUTEX (); return -1; } ticks = 0; @@ -1275,14 +1190,12 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_) // Get the message. int rc = xrecv (msg_); if (unlikely (rc != 0 && errno != EAGAIN)) { - EXIT_MUTEX (); return -1; } // If we have the message, return immediately. if (rc == 0) { extract_flags (msg_); - EXIT_MUTEX (); return 0; } @@ -1292,19 +1205,16 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_) // If it's not, return EAGAIN. if (flags_ & ZMQ_DONTWAIT || options.rcvtimeo == 0) { if (unlikely (process_commands (0, false) != 0)) { - EXIT_MUTEX (); return -1; } ticks = 0; rc = xrecv (msg_); if (rc < 0) { - EXIT_MUTEX (); return rc; } extract_flags (msg_); - EXIT_MUTEX (); return 0; } @@ -1318,7 +1228,6 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_) bool block = (ticks != 0); while (true) { if (unlikely (process_commands (block ? timeout : 0, false) != 0)) { - EXIT_MUTEX (); return -1; } rc = xrecv (msg_); @@ -1327,7 +1236,6 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_) break; } if (unlikely (errno != EAGAIN)) { - EXIT_MUTEX (); return -1; } block = true; @@ -1335,20 +1243,18 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_) timeout = (int) (end - clock.now_ms ()); if (timeout <= 0) { errno = EAGAIN; - EXIT_MUTEX (); return -1; } } } extract_flags (msg_); - EXIT_MUTEX (); return 0; } int zmq::socket_base_t::close () { - ENTER_MUTEX (); + scoped_optional_lock_t sync_lock(thread_safe ? &sync : 0); // Remove all existing signalers for thread safe sockets if (thread_safe) @@ -1357,7 +1263,6 @@ int zmq::socket_base_t::close () // Mark the socket as dead tag = 0xdeadbeef; - EXIT_MUTEX (); // Transfer the ownership of the socket from this application thread // to the reaper thread which will take care of the rest of shutdown @@ -1387,7 +1292,7 @@ void zmq::socket_base_t::start_reaping (poller_t *poller_) if (!thread_safe) fd = ((mailbox_t*)mailbox)->get_fd(); else { - ENTER_MUTEX (); + scoped_optional_lock_t sync_lock(thread_safe ? &sync : 0); reaper_signaler = new signaler_t(); @@ -1398,7 +1303,6 @@ void zmq::socket_base_t::start_reaping (poller_t *poller_) // Send a signal to make sure reaper handle existing commands reaper_signaler->send(); - EXIT_MUTEX (); } handle = poller->add_fd (fd, this); @@ -1581,14 +1485,13 @@ void zmq::socket_base_t::in_event () // of the reaper thread. Process any commands from other threads/sockets // that may be available at the moment. Ultimately, the socket will // be destroyed. - ENTER_MUTEX (); + scoped_optional_lock_t sync_lock(thread_safe ? &sync : 0); // If the socket is thread safe we need to unsignal the reaper signaler if (thread_safe) reaper_signaler->recv(); process_commands (0, false); - EXIT_MUTEX(); check_destroy(); }