diff --git a/src/Makefile.am b/src/Makefile.am
index 45e8ac0c..9c2fbf12 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -60,6 +60,7 @@ libzmq_la_SOURCES = \
select.hpp \
semaphore.hpp \
session.hpp \
+ signaler.hpp \
socket_base.hpp \
stdint.hpp \
sub.hpp \
@@ -123,6 +124,7 @@ libzmq_la_SOURCES = \
router.cpp \
select.cpp \
session.cpp \
+ signaler.cpp \
socket_base.cpp \
sub.cpp \
tcp_connecter.cpp \
diff --git a/src/config.hpp b/src/config.hpp
index dff3f87c..3984fcf4 100644
--- a/src/config.hpp
+++ b/src/config.hpp
@@ -36,6 +36,9 @@ namespace zmq
// memory allocation by approximately 99.6%
message_pipe_granularity = 256,
+ // Commands in pipe per allocation event.
+ command_pipe_granularity = 16,
+
// Size in bytes of the largest message that is still copied around
// rather than being reference-counted.
max_vsm_size = 29,
diff --git a/src/mailbox.cpp b/src/mailbox.cpp
index 7fdb93ea..9ef3e19c 100644
--- a/src/mailbox.cpp
+++ b/src/mailbox.cpp
@@ -18,439 +18,64 @@
along with this program. If not, see .
*/
-#include "platform.hpp"
-
-#if defined ZMQ_FORCE_SELECT
-#define ZMQ_RCVTIMEO_BASED_ON_SELECT
-#elif defined ZMQ_FORCE_POLL
-#define ZMQ_RCVTIMEO_BASED_ON_POLL
-#elif defined ZMQ_HAVE_LINUX || defined ZMQ_HAVE_FREEBSD ||\
- defined ZMQ_HAVE_OPENBSD || defined ZMQ_HAVE_SOLARIS ||\
- defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_QNXNTO ||\
- defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_AIX ||\
- defined ZMQ_HAVE_NETBSD
-#define ZMQ_RCVTIMEO_BASED_ON_POLL
-#elif defined ZMQ_HAVE_WINDOWS || defined ZMQ_HAVE_OPENVMS
-#define ZMQ_RCVTIMEO_BASED_ON_SELECT
-#endif
-
-// On AIX, poll.h has to be included before zmq.h to get consistent
-// definition of pollfd structure (AIX uses 'reqevents' and 'retnevents'
-// instead of 'events' and 'revents' and defines macros to map from POSIX-y
-// names to AIX-specific names).
-#if defined ZMQ_RCVTIMEO_BASED_ON_POLL
-#include
-#elif defined ZMQ_RCVTIMEO_BASED_ON_SELECT
-#if defined ZMQ_HAVE_WINDOWS
-#include "windows.hpp"
-#elif defined ZMQ_HAVE_HPUX
-#include
-#include
-#include
-#elif defined ZMQ_HAVE_OPENVMS
-#include
-#include
-#else
-#include
-#endif
-#endif
-
#include "mailbox.hpp"
#include "err.hpp"
-#include "fd.hpp"
-#include "ip.hpp"
-#if defined ZMQ_HAVE_WINDOWS
-#include "windows.hpp"
-#else
-#include
-#include
-#include
-#include
-#include
-#include
-#include
-#endif
+zmq::mailbox_t::mailbox_t ()
+{
+ // Get the pipe into passive state. That way, if the users starts by
+ // polling on the associated file descriptor it will get woken up when
+ // new command is posted.
+ bool ok = cpipe.read (NULL);
+ zmq_assert (!ok);
+ active = false;
+}
+
+zmq::mailbox_t::~mailbox_t ()
+{
+ // TODO: Retrieve and deallocate commands inside the cpipe.
+}
zmq::fd_t zmq::mailbox_t::get_fd ()
{
- return r;
-}
-
-#if defined ZMQ_HAVE_WINDOWS
-
-zmq::mailbox_t::mailbox_t () :
- blocking (true)
-{
- // Create the socketpair for signalling.
- int rc = make_socketpair (&r, &w);
- errno_assert (rc == 0);
-
- // Set the writer to non-blocking mode.
- unsigned long argp = 1;
- rc = ioctlsocket (w, FIONBIO, &argp);
- wsa_assert (rc != SOCKET_ERROR);
-}
-
-zmq::mailbox_t::~mailbox_t ()
-{
- int rc = closesocket (w);
- wsa_assert (rc != SOCKET_ERROR);
-
- rc = closesocket (r);
- wsa_assert (rc != SOCKET_ERROR);
+ return signaler.get_fd ();
}
void zmq::mailbox_t::send (const command_t &cmd_)
{
- // TODO: Implement SNDBUF auto-resizing as for POSIX platforms.
- // In the mean time, the following code with assert if the send()
- // call would block.
- int nbytes = ::send (w, (char *)&cmd_, sizeof (command_t), 0);
- wsa_assert (nbytes != SOCKET_ERROR);
- zmq_assert (nbytes == sizeof (command_t));
+ sync.lock ();
+ cpipe.write (cmd_, false);
+ bool ok = cpipe.flush ();
+ sync.unlock ();
+ if (!ok)
+ signaler.send ();
}
int zmq::mailbox_t::recv (command_t *cmd_, int timeout_)
{
- // If there's a finite timeout, poll on the fd.
- if (timeout_ > 0)
- return recv_timeout (cmd_, timeout_);
+ // Try to get the command straight away.
+ if (active) {
+ bool ok = cpipe.read (cmd_);
+ if (ok)
+ return 0;
- // If required, switch the reader to blocking or non-blocking mode.
- if ((timeout_ < 0 && !blocking) || (timeout_ == 0 && blocking)) {
- blocking = (timeout_ < 0);
- unsigned long argp = blocking ? 0 : 1;
- int rc = ioctlsocket (r, FIONBIO, &argp);
- wsa_assert (rc != SOCKET_ERROR);
+ // If there are no more commands available, switch into passive state.
+ active = false;
+ signaler.recv ();
}
- // Attempt to read an entire command.
- int nbytes = ::recv (r, (char*) cmd_, sizeof (command_t), 0);
- if (nbytes == -1 && WSAGetLastError () == WSAEWOULDBLOCK) {
- errno = EAGAIN;
+ // Wait for signal from the command sender.
+ int rc = signaler.wait (timeout_);
+ if (rc != 0 && errno == EAGAIN)
return -1;
- }
- // Sanity check for success.
- wsa_assert (nbytes != SOCKET_ERROR);
+ // We've got the signal. Now we can switch into active state.
+ active = true;
- // Check whether we haven't got half of command.
- zmq_assert (nbytes == sizeof (command_t));
-
- return 0;
-}
-
-#else
-
-zmq::mailbox_t::mailbox_t () :
- blocking (true)
-{
-#ifdef PIPE_BUF
- // Make sure that command can be written to the socket in atomic fashion.
- // If this wasn't guaranteed, commands from different threads would be
- // interleaved.
- zmq_assert (sizeof (command_t) <= PIPE_BUF);
-#endif
-
- // Create the socketpair for signaling.
- int rc = make_socketpair (&r, &w);
+ // Get a command.
errno_assert (rc == 0);
-
- // Set the writer to non-blocking mode.
- int flags = fcntl (w, F_GETFL, 0);
- errno_assert (flags >= 0);
- rc = fcntl (w, F_SETFL, flags | O_NONBLOCK);
- errno_assert (rc == 0);
-}
-
-zmq::mailbox_t::~mailbox_t ()
-{
- close (w);
- close (r);
-}
-
-void zmq::mailbox_t::send (const command_t &cmd_)
-{
- // Attempt to write an entire command without blocking.
- ssize_t nbytes;
- do {
- nbytes = ::send (w, &cmd_, sizeof (command_t), 0);
- } while (nbytes == -1 && errno == EINTR);
-
- // Attempt to increase mailbox SNDBUF if the send failed.
- if (nbytes == -1 && errno == EAGAIN) {
- int old_sndbuf, new_sndbuf;
- socklen_t sndbuf_size = sizeof old_sndbuf;
-
- // Retrieve current send buffer size.
- int rc = getsockopt (w, SOL_SOCKET, SO_SNDBUF, &old_sndbuf,
- &sndbuf_size);
- errno_assert (rc == 0);
- new_sndbuf = old_sndbuf * 2;
-
- // Double the new send buffer size.
- rc = setsockopt (w, SOL_SOCKET, SO_SNDBUF, &new_sndbuf, sndbuf_size);
- errno_assert (rc == 0);
-
- // Verify that the OS actually honored the request.
- rc = getsockopt (w, SOL_SOCKET, SO_SNDBUF, &new_sndbuf, &sndbuf_size);
- errno_assert (rc == 0);
- zmq_assert (new_sndbuf > old_sndbuf);
-
- // Retry the sending operation; at this point it must succeed.
- do {
- nbytes = ::send (w, &cmd_, sizeof (command_t), 0);
- } while (nbytes == -1 && errno == EINTR);
- }
- errno_assert (nbytes != -1);
-
- // This should never happen as we've already checked that command size is
- // less than PIPE_BUF.
- zmq_assert (nbytes == sizeof (command_t));
-}
-
-int zmq::mailbox_t::recv (command_t *cmd_, int timeout_)
-{
- // If there's a finite timeout, poll on the fd.
- if (timeout_ > 0)
- return recv_timeout (cmd_, timeout_);
-
-#ifdef MSG_DONTWAIT
-
- // Attempt to read an entire command. Returns EAGAIN if non-blocking
- // mode is requested and a command is not available.
- ssize_t nbytes = ::recv (r, cmd_, sizeof (command_t),
- timeout_ < 0 ? 0 : MSG_DONTWAIT);
- if (nbytes == -1 && (errno == EAGAIN || errno == EINTR))
- return -1;
-#else
-
- // If required, switch the reader to blocking or non-blocking mode.
- if ((timeout_ < 0 && !blocking) || (timeout_ == 0 && blocking)) {
- blocking = (timeout_ < 0);
- int flags = fcntl (r, F_GETFL, 0);
- errno_assert (flags >= 0);
- int rc = fcntl (r, F_SETFL,
- blocking ? flags | O_NONBLOCK : flags & ~O_NONBLOCK);
- errno_assert (rc == 0);
- }
-
- // Attempt to read an entire command.
- ssize_t nbytes = ::recv (r, cmd_, sizeof (command_t), 0);
- if (nbytes == -1 && (errno == EAGAIN || errno == EINTR))
- return -1;
-
-#endif
-
- // Sanity check for success.
- errno_assert (nbytes != -1);
-
- // Check whether we haven't got half of command.
- zmq_assert (nbytes == sizeof (command_t));
-
+ bool ok = cpipe.read (cmd_);
+ zmq_assert (ok);
return 0;
}
-#endif
-
-int zmq::mailbox_t::make_socketpair (fd_t *r_, fd_t *w_)
-{
-#if defined ZMQ_HAVE_WINDOWS
-
- // Windows has no 'socketpair' function. CreatePipe is no good as pipe
- // handles cannot be polled on. Here we create the socketpair by hand.
- *w_ = INVALID_SOCKET;
- *r_ = INVALID_SOCKET;
-
- // Create listening socket.
- SOCKET listener;
- listener = socket (AF_INET, SOCK_STREAM, 0);
- wsa_assert (listener != INVALID_SOCKET);
-
- // Set SO_REUSEADDR and TCP_NODELAY on listening socket.
- BOOL so_reuseaddr = 1;
- int rc = setsockopt (listener, SOL_SOCKET, SO_REUSEADDR,
- (char *)&so_reuseaddr, sizeof (so_reuseaddr));
- wsa_assert (rc != SOCKET_ERROR);
- BOOL tcp_nodelay = 1;
- rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELAY,
- (char *)&tcp_nodelay, sizeof (tcp_nodelay));
- wsa_assert (rc != SOCKET_ERROR);
-
- // Bind listening socket to any free local port.
- struct sockaddr_in addr;
- memset (&addr, 0, sizeof (addr));
- addr.sin_family = AF_INET;
- addr.sin_addr.s_addr = htonl (INADDR_LOOPBACK);
- addr.sin_port = 0;
- rc = bind (listener, (const struct sockaddr*) &addr, sizeof (addr));
- wsa_assert (rc != SOCKET_ERROR);
-
- // Retrieve local port listener is bound to (into addr).
- int addrlen = sizeof (addr);
- rc = getsockname (listener, (struct sockaddr*) &addr, &addrlen);
- wsa_assert (rc != SOCKET_ERROR);
-
- // Listen for incomming connections.
- rc = listen (listener, 1);
- wsa_assert (rc != SOCKET_ERROR);
-
- // Create the writer socket.
- *w_ = WSASocket (AF_INET, SOCK_STREAM, 0, NULL, 0, 0);
- wsa_assert (*w_ != INVALID_SOCKET);
-
- // Set TCP_NODELAY on writer socket.
- rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELAY,
- (char *)&tcp_nodelay, sizeof (tcp_nodelay));
- wsa_assert (rc != SOCKET_ERROR);
-
- // Connect writer to the listener.
- rc = connect (*w_, (sockaddr *) &addr, sizeof (addr));
- wsa_assert (rc != SOCKET_ERROR);
-
- // Accept connection from writer.
- *r_ = accept (listener, NULL, NULL);
- wsa_assert (*r_ != INVALID_SOCKET);
-
- // We don't need the listening socket anymore. Close it.
- rc = closesocket (listener);
- wsa_assert (rc != SOCKET_ERROR);
-
- return 0;
-
-#elif defined ZMQ_HAVE_OPENVMS
-
- // Whilst OpenVMS supports socketpair - it maps to AF_INET only. Further,
- // it does not set the socket options TCP_NODELAY and TCP_NODELACK which
- // can lead to performance problems.
- //
- // The bug will be fixed in V5.6 ECO4 and beyond. In the meantime, we'll
- // create the socket pair manually.
- sockaddr_in lcladdr;
- memset (&lcladdr, 0, sizeof (lcladdr));
- lcladdr.sin_family = AF_INET;
- lcladdr.sin_addr.s_addr = htonl (INADDR_LOOPBACK);
- lcladdr.sin_port = 0;
-
- int listener = socket (AF_INET, SOCK_STREAM, 0);
- errno_assert (listener != -1);
-
- int on = 1;
- int rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELAY, &on, sizeof (on));
- errno_assert (rc != -1);
-
- rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELACK, &on, sizeof (on));
- errno_assert (rc != -1);
-
- rc = bind(listener, (struct sockaddr*) &lcladdr, sizeof (lcladdr));
- errno_assert (rc != -1);
-
- socklen_t lcladdr_len = sizeof (lcladdr);
-
- rc = getsockname (listener, (struct sockaddr*) &lcladdr, &lcladdr_len);
- errno_assert (rc != -1);
-
- rc = listen (listener, 1);
- errno_assert (rc != -1);
-
- *w_ = socket (AF_INET, SOCK_STREAM, 0);
- errno_assert (*w_ != -1);
-
- rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELAY, &on, sizeof (on));
- errno_assert (rc != -1);
-
- rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELACK, &on, sizeof (on));
- errno_assert (rc != -1);
-
- rc = connect (*w_, (struct sockaddr*) &lcladdr, sizeof (lcladdr));
- errno_assert (rc != -1);
-
- *r_ = accept (listener, NULL, NULL);
- errno_assert (*r_ != -1);
-
- close (listener);
-
- return 0;
-
-#else // All other implementations support socketpair()
-
- int sv [2];
- int rc = socketpair (AF_UNIX, SOCK_STREAM, 0, sv);
- errno_assert (rc == 0);
- *w_ = sv [0];
- *r_ = sv [1];
- return 0;
-
-#endif
-}
-
-int zmq::mailbox_t::recv_timeout (command_t *cmd_, int timeout_)
-{
-#ifdef ZMQ_RCVTIMEO_BASED_ON_POLL
-
- struct pollfd pfd;
- pfd.fd = r;
- pfd.events = POLLIN;
- int rc = poll (&pfd, 1, timeout_);
- if (unlikely (rc < 0)) {
- zmq_assert (errno == EINTR);
- return -1;
- }
- else if (unlikely (rc == 0)) {
- errno = EAGAIN;
- return -1;
- }
- zmq_assert (rc == 1);
- zmq_assert (pfd.revents & POLLIN);
-
-#elif defined ZMQ_RCVTIMEO_BASED_ON_SELECT
-
- fd_set fds;
- FD_ZERO (&fds);
- FD_SET (r, &fds);
- struct timeval timeout;
- timeout.tv_sec = timeout_ / 1000;
- timeout.tv_usec = timeout_ % 1000 * 1000;
-#ifdef ZMQ_HAVE_WINDOWS
- int rc = select (0, &fds, NULL, NULL, &timeout);
- wsa_assert (rc != SOCKET_ERROR);
-#else
- int rc = select (r + 1, &fds, NULL, NULL, &timeout);
- if (unlikely (rc < 0)) {
- zmq_assert (errno == EINTR);
- return -1;
- }
-#endif
- if (unlikely (rc == 0)) {
- errno = EAGAIN;
- return -1;
- }
- zmq_assert (rc == 1);
-
-
-#else
-#error
-#endif
-
- // The file descriptor is ready for reading. Extract one command out of it.
-#ifdef ZMQ_HAVE_WINDOWS
- int nbytes = ::recv (r, (char*) cmd_, sizeof (command_t), 0);
- wsa_assert (nbytes != SOCKET_ERROR);
-#else
- ssize_t nbytes = ::recv (r, cmd_, sizeof (command_t), 0);
- if (unlikely (rc < 0 && errno == EINTR))
- return -1;
- errno_assert (nbytes > 0);
-#endif
- zmq_assert (nbytes == sizeof (command_t));
- return 0;
-}
-
-#if defined ZMQ_RCVTIMEO_BASED_ON_SELECT
-#undef ZMQ_RCVTIMEO_BASED_ON_SELECT
-#endif
-#if defined ZMQ_RCVTIMEO_BASED_ON_POLL
-#undef ZMQ_RCVTIMEO_BASED_ON_POLL
-#endif
-
diff --git a/src/mailbox.hpp b/src/mailbox.hpp
index eb02e397..0675b990 100644
--- a/src/mailbox.hpp
+++ b/src/mailbox.hpp
@@ -24,10 +24,12 @@
#include
#include "platform.hpp"
+#include "signaler.hpp"
#include "fd.hpp"
-#include "stdint.hpp"
#include "config.hpp"
#include "command.hpp"
+#include "ypipe.hpp"
+#include "mutex.hpp"
namespace zmq
{
@@ -45,21 +47,22 @@ namespace zmq
private:
- // Platform-dependent function to create a socketpair.
- static int make_socketpair (fd_t *r_, fd_t *w_);
+ // The pipe to store actual commands.
+ typedef ypipe_t cpipe_t;
+ cpipe_t cpipe;
- // Receives a command with the specific timeout.
- // This function is not to be used for non-blocking or inifinitely
- // blocking recvs.
- int recv_timeout (command_t *cmd_, int timeout_);
+ // Signaler to pass signals from writer thread to reader thread.
+ signaler_t signaler;
- // Write & read end of the socketpair.
- fd_t w;
- fd_t r;
+ // There's only one thread receiving from the mailbox, but there
+ // is arbitrary number of threads sending. Given that ypipe requires
+ // synchronised access on both of its endpoints, we have to synchronise
+ // the sending side.
+ mutex_t sync;
- // Used on platforms where there's no MSG_DONTWAIT functionality.
- // True if the read socket is set to the blocking state.
- bool blocking;
+ // True if the underlying pipe is active, ie. when we are allowed to
+ // read commands from it.
+ bool active;
// Disable copying of mailbox_t object.
mailbox_t (const mailbox_t&);
diff --git a/src/signaler.cpp b/src/signaler.cpp
new file mode 100644
index 00000000..2ecfb980
--- /dev/null
+++ b/src/signaler.cpp
@@ -0,0 +1,340 @@
+/*
+ Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the GNU Lesser General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with this program. If not, see .
+*/
+
+#include "platform.hpp"
+
+#if defined ZMQ_FORCE_SELECT
+#define ZMQ_SIGNALER_WAIT_BASED_ON_SELECT
+#elif defined ZMQ_FORCE_POLL
+#define ZMQ_SIGNALER_WAIT_BASED_ON_POLL
+#elif defined ZMQ_HAVE_LINUX || defined ZMQ_HAVE_FREEBSD ||\
+ defined ZMQ_HAVE_OPENBSD || defined ZMQ_HAVE_SOLARIS ||\
+ defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_QNXNTO ||\
+ defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_AIX ||\
+ defined ZMQ_HAVE_NETBSD
+#define ZMQ_SIGNALER_WAIT_BASED_ON_POLL
+#elif defined ZMQ_HAVE_WINDOWS || defined ZMQ_HAVE_OPENVMS
+#define ZMQ_SIGNALER_WAIT_BASED_ON_SELECT
+#endif
+
+// On AIX, poll.h has to be included before zmq.h to get consistent
+// definition of pollfd structure (AIX uses 'reqevents' and 'retnevents'
+// instead of 'events' and 'revents' and defines macros to map from POSIX-y
+// names to AIX-specific names).
+#if defined ZMQ_SIGNALER_WAIT_BASED_ON_POLL
+#include
+#elif defined ZMQ_SIGNALER_WAIT_BASED_ON_SELECT
+#if defined ZMQ_HAVE_WINDOWS
+#include "windows.hpp"
+#elif defined ZMQ_HAVE_HPUX
+#include
+#include
+#include
+#elif defined ZMQ_HAVE_OPENVMS
+#include
+#include
+#else
+#include
+#endif
+#endif
+
+#include "signaler.hpp"
+#include "likely.hpp"
+#include "err.hpp"
+#include "fd.hpp"
+#include "ip.hpp"
+
+#if defined ZMQ_HAVE_WINDOWS
+#include "windows.hpp"
+#else
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#endif
+
+zmq::signaler_t::signaler_t ()
+{
+ // Create the socketpair for signaling.
+ int rc = make_fdpair (&r, &w);
+ errno_assert (rc == 0);
+
+ // Set both fds to non-blocking mode.
+#if defined ZMQ_HAVE_WINDOWS
+ unsigned long argp = 1;
+ rc = ioctlsocket (w, FIONBIO, &argp);
+ wsa_assert (rc != SOCKET_ERROR);
+ rc = ioctlsocket (r, FIONBIO, &argp);
+ wsa_assert (rc != SOCKET_ERROR);
+#else
+ int flags = fcntl (w, F_GETFL, 0);
+ errno_assert (flags >= 0);
+ rc = fcntl (w, F_SETFL, flags | O_NONBLOCK);
+ errno_assert (rc == 0);
+ flags = fcntl (r, F_GETFL, 0);
+ errno_assert (flags >= 0);
+ rc = fcntl (r, F_SETFL, flags | O_NONBLOCK);
+ errno_assert (rc == 0);
+#endif
+}
+
+zmq::signaler_t::~signaler_t ()
+{
+#if defined ZMQ_HAVE_WINDOWS
+ int rc = closesocket (w);
+ wsa_assert (rc != SOCKET_ERROR);
+ rc = closesocket (r);
+ wsa_assert (rc != SOCKET_ERROR);
+#else
+ close (w);
+ close (r);
+#endif
+}
+
+zmq::fd_t zmq::signaler_t::get_fd ()
+{
+ return r;
+}
+
+void zmq::signaler_t::send ()
+{
+#if defined ZMQ_HAVE_WINDOWS
+ unsigned char dummy = 0;
+ int nbytes = ::send (w, &dummy, sizeof (dummy), 0);
+ wsa_assert (nbytes != SOCKET_ERROR);
+ zmq_assert (nbytes == sizeof (dummy));
+#else
+ unsigned char dummy = 0;
+ while (true) {
+ ssize_t nbytes = ::send (w, &dummy, sizeof (dummy), 0);
+ if (unlikely (nbytes == -1 && errno == EINTR))
+ continue;
+ zmq_assert (nbytes == sizeof (dummy));
+ break;
+ }
+#endif
+}
+
+int zmq::signaler_t::wait (int timeout_)
+{
+#ifdef ZMQ_SIGNALER_WAIT_BASED_ON_POLL
+
+ struct pollfd pfd;
+ pfd.fd = r;
+ pfd.events = POLLIN;
+ int rc = poll (&pfd, 1, timeout_);
+ if (unlikely (rc < 0)) {
+ zmq_assert (errno == EINTR);
+ return -1;
+ }
+ else if (unlikely (rc == 0)) {
+ errno = EAGAIN;
+ return -1;
+ }
+ zmq_assert (rc == 1);
+ zmq_assert (pfd.revents & POLLIN);
+ return 0;
+
+#elif defined ZMQ_SIGNALER_WAIT_BASED_ON_SELECT
+
+ fd_set fds;
+ FD_ZERO (&fds);
+ FD_SET (r, &fds);
+ struct timeval timeout;
+ timeout.tv_sec = timeout_ / 1000;
+ timeout.tv_usec = timeout_ % 1000 * 1000;
+#ifdef ZMQ_HAVE_WINDOWS
+ int rc = select (0, &fds, NULL, NULL, &timeout);
+ wsa_assert (rc != SOCKET_ERROR);
+#else
+ int rc = select (r + 1, &fds, NULL, NULL, &timeout);
+ if (unlikely (rc < 0)) {
+ zmq_assert (errno == EINTR);
+ return -1;
+ }
+#endif
+ if (unlikely (rc == 0)) {
+ errno = EAGAIN;
+ return -1;
+ }
+ zmq_assert (rc == 1);
+ return 0;
+
+#else
+#error
+#endif
+}
+
+void zmq::signaler_t::recv ()
+{
+ // Attempt to read a signal.
+ unsigned char dummy;
+#if ZMQ_HAVE_WINDOWS
+ int nbytes = ::recv (r, &dummy, sizeof (dummy), 0);
+ wsa_assert (nbytes != SOCKET_ERROR);
+#else
+ ssize_t nbytes = ::recv (r, &dummy, sizeof (dummy), 0);
+ errno_assert (nbytes >= 0);
+#endif
+ zmq_assert (nbytes == sizeof (dummy));
+ zmq_assert (dummy == 0);
+}
+
+int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_)
+{
+#if defined ZMQ_HAVE_WINDOWS
+
+ // Windows has no 'socketpair' function. CreatePipe is no good as pipe
+ // handles cannot be polled on. Here we create the socketpair by hand.
+ *w_ = INVALID_SOCKET;
+ *r_ = INVALID_SOCKET;
+
+ // Create listening socket.
+ SOCKET listener;
+ listener = socket (AF_INET, SOCK_STREAM, 0);
+ wsa_assert (listener != INVALID_SOCKET);
+
+ // Set SO_REUSEADDR and TCP_NODELAY on listening socket.
+ BOOL so_reuseaddr = 1;
+ int rc = setsockopt (listener, SOL_SOCKET, SO_REUSEADDR,
+ (char *)&so_reuseaddr, sizeof (so_reuseaddr));
+ wsa_assert (rc != SOCKET_ERROR);
+ BOOL tcp_nodelay = 1;
+ rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELAY,
+ (char *)&tcp_nodelay, sizeof (tcp_nodelay));
+ wsa_assert (rc != SOCKET_ERROR);
+
+ // Bind listening socket to any free local port.
+ struct sockaddr_in addr;
+ memset (&addr, 0, sizeof (addr));
+ addr.sin_family = AF_INET;
+ addr.sin_addr.s_addr = htonl (INADDR_LOOPBACK);
+ addr.sin_port = 0;
+ rc = bind (listener, (const struct sockaddr*) &addr, sizeof (addr));
+ wsa_assert (rc != SOCKET_ERROR);
+
+ // Retrieve local port listener is bound to (into addr).
+ int addrlen = sizeof (addr);
+ rc = getsockname (listener, (struct sockaddr*) &addr, &addrlen);
+ wsa_assert (rc != SOCKET_ERROR);
+
+ // Listen for incomming connections.
+ rc = listen (listener, 1);
+ wsa_assert (rc != SOCKET_ERROR);
+
+ // Create the writer socket.
+ *w_ = WSASocket (AF_INET, SOCK_STREAM, 0, NULL, 0, 0);
+ wsa_assert (*w_ != INVALID_SOCKET);
+
+ // Set TCP_NODELAY on writer socket.
+ rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELAY,
+ (char *)&tcp_nodelay, sizeof (tcp_nodelay));
+ wsa_assert (rc != SOCKET_ERROR);
+
+ // Connect writer to the listener.
+ rc = connect (*w_, (sockaddr *) &addr, sizeof (addr));
+ wsa_assert (rc != SOCKET_ERROR);
+
+ // Accept connection from writer.
+ *r_ = accept (listener, NULL, NULL);
+ wsa_assert (*r_ != INVALID_SOCKET);
+
+ // We don't need the listening socket anymore. Close it.
+ rc = closesocket (listener);
+ wsa_assert (rc != SOCKET_ERROR);
+
+ return 0;
+
+#elif defined ZMQ_HAVE_OPENVMS
+
+ // Whilst OpenVMS supports socketpair - it maps to AF_INET only. Further,
+ // it does not set the socket options TCP_NODELAY and TCP_NODELACK which
+ // can lead to performance problems.
+ //
+ // The bug will be fixed in V5.6 ECO4 and beyond. In the meantime, we'll
+ // create the socket pair manually.
+ sockaddr_in lcladdr;
+ memset (&lcladdr, 0, sizeof (lcladdr));
+ lcladdr.sin_family = AF_INET;
+ lcladdr.sin_addr.s_addr = htonl (INADDR_LOOPBACK);
+ lcladdr.sin_port = 0;
+
+ int listener = socket (AF_INET, SOCK_STREAM, 0);
+ errno_assert (listener != -1);
+
+ int on = 1;
+ int rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELAY, &on, sizeof (on));
+ errno_assert (rc != -1);
+
+ rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELACK, &on, sizeof (on));
+ errno_assert (rc != -1);
+
+ rc = bind(listener, (struct sockaddr*) &lcladdr, sizeof (lcladdr));
+ errno_assert (rc != -1);
+
+ socklen_t lcladdr_len = sizeof (lcladdr);
+
+ rc = getsockname (listener, (struct sockaddr*) &lcladdr, &lcladdr_len);
+ errno_assert (rc != -1);
+
+ rc = listen (listener, 1);
+ errno_assert (rc != -1);
+
+ *w_ = socket (AF_INET, SOCK_STREAM, 0);
+ errno_assert (*w_ != -1);
+
+ rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELAY, &on, sizeof (on));
+ errno_assert (rc != -1);
+
+ rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELACK, &on, sizeof (on));
+ errno_assert (rc != -1);
+
+ rc = connect (*w_, (struct sockaddr*) &lcladdr, sizeof (lcladdr));
+ errno_assert (rc != -1);
+
+ *r_ = accept (listener, NULL, NULL);
+ errno_assert (*r_ != -1);
+
+ close (listener);
+
+ return 0;
+
+#else // All other implementations support socketpair()
+
+ int sv [2];
+ int rc = socketpair (AF_UNIX, SOCK_STREAM, 0, sv);
+ errno_assert (rc == 0);
+ *w_ = sv [0];
+ *r_ = sv [1];
+ return 0;
+
+#endif
+}
+
+#if defined ZMQ_SIGNALER_WAIT_BASED_ON_SELECT
+#undef ZMQ_SIGNALER_WAIT_BASED_ON_SELECT
+#endif
+#if defined ZMQ_SIGNALER_WAIT_BASED_ON_POLL
+#undef ZMQ_SIGNALER_WAIT_BASED_ON_POLL
+#endif
+
diff --git a/src/signaler.hpp b/src/signaler.hpp
new file mode 100644
index 00000000..2ebff410
--- /dev/null
+++ b/src/signaler.hpp
@@ -0,0 +1,63 @@
+/*
+ Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the GNU Lesser General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with this program. If not, see .
+*/
+
+#ifndef __ZMQ_SIGNALER_HPP_INCLUDED__
+#define __ZMQ_SIGNALER_HPP_INCLUDED__
+
+#include "fd.hpp"
+
+namespace zmq
+{
+
+ // This is a cross-platform equivalent to signal_fd. However, as opposed
+ // to signal_fd there can be at most one signal in the signaler at any
+ // given moment. Attempt to send a signal before receiving the previous
+ // one will result in undefined behaviour.
+
+ class signaler_t
+ {
+ public:
+
+ signaler_t ();
+ ~signaler_t ();
+
+ fd_t get_fd ();
+ void send ();
+ int wait (int timeout_);
+ void recv ();
+
+ private:
+
+ // Creates a pair of filedescriptors that will be used
+ // to pass the signals.
+ static int make_fdpair (fd_t *r_, fd_t *w_);
+
+ // Write & read end of the socketpair.
+ fd_t w;
+ fd_t r;
+
+ // Disable copying of signaler_t object.
+ signaler_t (const signaler_t&);
+ const signaler_t &operator = (const signaler_t&);
+ };
+
+}
+
+#endif
diff --git a/tests/test_shutdown_stress.cpp b/tests/test_shutdown_stress.cpp
index ef81758d..b3ee90f8 100644
--- a/tests/test_shutdown_stress.cpp
+++ b/tests/test_shutdown_stress.cpp
@@ -23,7 +23,7 @@
#include
#include
-#define THREAD_COUNT 10
+#define THREAD_COUNT 100
extern "C"
{