diff --git a/MAINTAINERS b/MAINTAINERS
index 9d65ad4b..8e572eb0 100644
--- a/MAINTAINERS
+++ b/MAINTAINERS
@@ -38,7 +38,7 @@ Component: Devices
Maintainer: Martin Sustrik
Contact: sustrik@250bpm.com
-Component: Generic Infrastructure (context, signaler, command, pipe)
+Component: Generic Infrastructure (context, mailbox, command, pipe)
Maintainer: Martin Sustrik
Contact: sustrik@250bpm.com
diff --git a/builds/msvc/libzmq/libzmq.vcproj b/builds/msvc/libzmq/libzmq.vcproj
index ffe8a17a..456b5ed0 100644
--- a/builds/msvc/libzmq/libzmq.vcproj
+++ b/builds/msvc/libzmq/libzmq.vcproj
@@ -310,7 +310,7 @@
>
get_signaler ();
+ slots [i] = io_thread->get_mailbox ();
io_thread->start ();
}
@@ -92,8 +92,8 @@ zmq::ctx_t::~ctx_t ()
for (io_threads_t::size_type i = 0; i != io_threads.size (); i++)
delete io_threads [i];
- // Deallocate the array of slots. No special work is
- // needed as signalers themselves were deallocated with their
+ // Deallocate the array of mailboxes. No special work is
+ // needed as mailboxes themselves were deallocated with their
// corresponding io_thread/socket objects.
free (slots);
@@ -178,7 +178,7 @@ zmq::socket_base_t *zmq::ctx_t::create_socket (int type_)
uint32_t slot = empty_slots.back ();
empty_slots.pop_back ();
- // Create the socket and register its signaler.
+ // Create the socket and register its mailbox.
socket_base_t *s = socket_base_t::create (type_, this, slot);
if (!s) {
empty_slots.push_back (slot);
@@ -186,7 +186,7 @@ zmq::socket_base_t *zmq::ctx_t::create_socket (int type_)
return NULL;
}
sockets.push_back (s);
- slots [slot] = s->get_signaler ();
+ slots [slot] = s->get_mailbox ();
slot_sync.unlock ();
diff --git a/src/ctx.hpp b/src/ctx.hpp
index 5a3a6aa4..0f2dd522 100644
--- a/src/ctx.hpp
+++ b/src/ctx.hpp
@@ -26,7 +26,7 @@
#include "../include/zmq.h"
-#include "signaler.hpp"
+#include "mailbox.hpp"
#include "semaphore.hpp"
#include "ypipe.hpp"
#include "array.hpp"
@@ -117,9 +117,9 @@ namespace zmq
typedef std::vector io_threads_t;
io_threads_t io_threads;
- // Array of pointers to signalers for both application and I/O threads.
+ // Array of pointers to mailboxes for both application and I/O threads.
uint32_t slot_count;
- signaler_t **slots;
+ mailbox_t **slots;
// List of inproc endpoints within this context.
typedef std::map endpoints_t;
diff --git a/src/io_thread.cpp b/src/io_thread.cpp
index aacf8430..7ba8905f 100644
--- a/src/io_thread.cpp
+++ b/src/io_thread.cpp
@@ -32,8 +32,8 @@ zmq::io_thread_t::io_thread_t (ctx_t *ctx_, uint32_t tid_) :
poller = new (std::nothrow) poller_t;
zmq_assert (poller);
- signaler_handle = poller->add_fd (signaler.get_fd (), this);
- poller->set_pollin (signaler_handle);
+ mailbox_handle = poller->add_fd (mailbox.get_fd (), this);
+ poller->set_pollin (mailbox_handle);
}
zmq::io_thread_t::~io_thread_t ()
@@ -52,9 +52,9 @@ void zmq::io_thread_t::stop ()
send_stop ();
}
-zmq::signaler_t *zmq::io_thread_t::get_signaler ()
+zmq::mailbox_t *zmq::io_thread_t::get_mailbox ()
{
- return &signaler;
+ return &mailbox;
}
int zmq::io_thread_t::get_load ()
@@ -71,7 +71,7 @@ void zmq::io_thread_t::in_event ()
// Get the next command. If there is none, exit.
command_t cmd;
- int rc = signaler.recv (&cmd, false);
+ int rc = mailbox.recv (&cmd, false);
if (rc != 0 && errno == EINTR)
continue;
if (rc != 0 && errno == EAGAIN)
@@ -103,6 +103,6 @@ zmq::poller_t *zmq::io_thread_t::get_poller ()
void zmq::io_thread_t::process_stop ()
{
- poller->rm_fd (signaler_handle);
+ poller->rm_fd (mailbox_handle);
poller->stop ();
}
diff --git a/src/io_thread.hpp b/src/io_thread.hpp
index a0704fc9..b01eecb3 100644
--- a/src/io_thread.hpp
+++ b/src/io_thread.hpp
@@ -26,7 +26,7 @@
#include "object.hpp"
#include "poller.hpp"
#include "i_poll_events.hpp"
-#include "signaler.hpp"
+#include "mailbox.hpp"
namespace zmq
{
@@ -50,8 +50,8 @@ namespace zmq
// Ask underlying thread to stop.
void stop ();
- // Returns signaler associated with this I/O thread.
- signaler_t *get_signaler ();
+ // Returns mailbox associated with this I/O thread.
+ mailbox_t *get_mailbox ();
// i_poll_events implementation.
void in_event ();
@@ -69,12 +69,11 @@ namespace zmq
private:
- // Poll thread gets notifications about incoming commands using
- // this signaler.
- signaler_t signaler;
+ // I/O thread accesses incoming commands via this mailbox.
+ mailbox_t mailbox;
- // Handle associated with signaler's file descriptor.
- poller_t::handle_t signaler_handle;
+ // Handle associated with mailbox' file descriptor.
+ poller_t::handle_t mailbox_handle;
// I/O multiplexing is performed using a poller object.
poller_t *poller;
diff --git a/src/signaler.cpp b/src/mailbox.cpp
similarity index 95%
rename from src/signaler.cpp
rename to src/mailbox.cpp
index a6920782..927c230f 100644
--- a/src/signaler.cpp
+++ b/src/mailbox.cpp
@@ -17,7 +17,7 @@
along with this program. If not, see .
*/
-#include "signaler.hpp"
+#include "mailbox.hpp"
#include "platform.hpp"
#include "err.hpp"
#include "fd.hpp"
@@ -35,14 +35,14 @@
#include
#endif
-zmq::fd_t zmq::signaler_t::get_fd ()
+zmq::fd_t zmq::mailbox_t::get_fd ()
{
return r;
}
#if defined ZMQ_HAVE_WINDOWS
-zmq::signaler_t::signaler_t ()
+zmq::mailbox_t::mailbox_t ()
{
// Create the socketpair for signalling.
int rc = make_socketpair (&r, &w);
@@ -59,7 +59,7 @@ zmq::signaler_t::signaler_t ()
wsa_assert (rc != SOCKET_ERROR);
}
-zmq::signaler_t::~signaler_t ()
+zmq::mailbox_t::~mailbox_t ()
{
int rc = closesocket (w);
wsa_assert (rc != SOCKET_ERROR);
@@ -68,7 +68,7 @@ zmq::signaler_t::~signaler_t ()
wsa_assert (rc != SOCKET_ERROR);
}
-void zmq::signaler_t::send (const command_t &cmd_)
+void zmq::mailbox_t::send (const command_t &cmd_)
{
// TODO: Implement SNDBUF auto-resizing as for POSIX platforms.
// In the mean time, the following code with assert if the send()
@@ -78,7 +78,7 @@ void zmq::signaler_t::send (const command_t &cmd_)
zmq_assert (nbytes == sizeof (command_t));
}
-int zmq::signaler_t::recv (command_t *cmd_, bool block_)
+int zmq::mailbox_t::recv (command_t *cmd_, bool block_)
{
if (block_) {
// Set the reader to blocking mode.
@@ -115,7 +115,7 @@ int zmq::signaler_t::recv (command_t *cmd_, bool block_)
#else // !ZMQ_HAVE_WINDOWS
-zmq::signaler_t::signaler_t ()
+zmq::mailbox_t::mailbox_t ()
{
#ifdef PIPE_BUF
// Make sure that command can be written to the socket in atomic fashion.
@@ -143,35 +143,40 @@ zmq::signaler_t::signaler_t ()
#endif
}
-zmq::signaler_t::~signaler_t ()
+zmq::mailbox_t::~mailbox_t ()
{
close (w);
close (r);
}
-void zmq::signaler_t::send (const command_t &cmd_)
+void zmq::mailbox_t::send (const command_t &cmd_)
{
// Attempt to write an entire command without blocking.
ssize_t nbytes;
do {
nbytes = ::send (w, &cmd_, sizeof (command_t), 0);
} while (nbytes == -1 && errno == EINTR);
- // Attempt to increase signaler SNDBUF if the send failed.
+
+ // Attempt to increase mailbox SNDBUF if the send failed.
if (nbytes == -1 && errno == EAGAIN) {
int old_sndbuf, new_sndbuf;
socklen_t sndbuf_size = sizeof old_sndbuf;
+
// Retrieve current send buffer size.
int rc = getsockopt (w, SOL_SOCKET, SO_SNDBUF, &old_sndbuf,
&sndbuf_size);
errno_assert (rc == 0);
new_sndbuf = old_sndbuf * 2;
+
// Double the new send buffer size.
rc = setsockopt (w, SOL_SOCKET, SO_SNDBUF, &new_sndbuf, sndbuf_size);
errno_assert (rc == 0);
+
// Verify that the OS actually honored the request.
rc = getsockopt (w, SOL_SOCKET, SO_SNDBUF, &new_sndbuf, &sndbuf_size);
errno_assert (rc == 0);
zmq_assert (new_sndbuf > old_sndbuf);
+
// Retry the sending operation; at this point it must succeed.
do {
nbytes = ::send (w, &cmd_, sizeof (command_t), 0);
@@ -184,7 +189,7 @@ void zmq::signaler_t::send (const command_t &cmd_)
zmq_assert (nbytes == sizeof (command_t));
}
-int zmq::signaler_t::recv (command_t *cmd_, bool block_)
+int zmq::mailbox_t::recv (command_t *cmd_, bool block_)
{
#ifdef MSG_DONTWAIT
// Attempt to read an entire command. Returns EAGAIN if non-blocking
@@ -195,33 +200,40 @@ int zmq::signaler_t::recv (command_t *cmd_, bool block_)
return -1;
#else
if (block_) {
+
// Set the reader to blocking mode.
int flags = fcntl (r, F_GETFL, 0);
errno_assert (flags >= 0);
int rc = fcntl (r, F_SETFL, flags & ~O_NONBLOCK);
errno_assert (rc == 0);
}
+
// Attempt to read an entire command. Returns EAGAIN if non-blocking
// and a command is not available.
int err = 0;
ssize_t nbytes = ::recv (r, cmd_, sizeof (command_t), 0);
if (nbytes == -1 && (errno == EAGAIN || errno == EINTR)) {
+
// Save value of errno if we wish to pass it to caller.
err = errno;
}
+
if (block_) {
+
// Re-set the reader to non-blocking mode.
int flags = fcntl (r, F_GETFL, 0);
errno_assert (flags >= 0);
int rc = fcntl (r, F_SETFL, flags | O_NONBLOCK);
errno_assert (rc == 0);
}
+
// If the recv failed, return with the saved errno if set.
if (err != 0) {
errno = err;
return -1;
}
#endif
+
// Sanity check for success.
errno_assert (nbytes != -1);
@@ -233,7 +245,7 @@ int zmq::signaler_t::recv (command_t *cmd_, bool block_)
#endif
-int zmq::signaler_t::make_socketpair (fd_t *r_, fd_t *w_)
+int zmq::mailbox_t::make_socketpair (fd_t *r_, fd_t *w_)
{
#if defined ZMQ_HAVE_WINDOWS
diff --git a/src/signaler.hpp b/src/mailbox.hpp
similarity index 81%
rename from src/signaler.hpp
rename to src/mailbox.hpp
index faf3f1f3..dc49aad9 100644
--- a/src/signaler.hpp
+++ b/src/mailbox.hpp
@@ -17,8 +17,8 @@
along with this program. If not, see .
*/
-#ifndef __ZMQ_SIGNALER_HPP_INCLUDED__
-#define __ZMQ_SIGNALER_HPP_INCLUDED__
+#ifndef __ZMQ_MAILBOX_HPP_INCLUDED__
+#define __ZMQ_MAILBOX_HPP_INCLUDED__
#include
@@ -31,12 +31,12 @@
namespace zmq
{
- class signaler_t
+ class mailbox_t
{
public:
- signaler_t ();
- ~signaler_t ();
+ mailbox_t ();
+ ~mailbox_t ();
fd_t get_fd ();
void send (const command_t &cmd_);
@@ -51,9 +51,9 @@ namespace zmq
// Platform-dependent function to create a socketpair.
static int make_socketpair (fd_t *r_, fd_t *w_);
- // Disable copying of signaler_t object.
- signaler_t (const signaler_t&);
- void operator = (const signaler_t&);
+ // Disable copying of mailbox_t object.
+ mailbox_t (const mailbox_t&);
+ void operator = (const mailbox_t&);
};
}
diff --git a/src/own.cpp b/src/own.cpp
index 955113ab..15d25675 100644
--- a/src/own.cpp
+++ b/src/own.cpp
@@ -83,7 +83,7 @@ void zmq::own_t::launch_sibling (own_t *object_)
{
// At this point it is important that object is plugged in before its
// owner has a chance to terminate it. Thus, 'plug' command is sent before
- // the 'own' command. Given that the signaler preserves ordering of
+ // the 'own' command. Given that the mailbox preserves ordering of
// commands, 'term' command from the owner cannot make it to the object
// before the already written 'plug' command.
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index c6728fe5..a10ed0e5 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -118,9 +118,9 @@ zmq::socket_base_t::~socket_base_t ()
sessions_sync.unlock ();
}
-zmq::signaler_t *zmq::socket_base_t::get_signaler ()
+zmq::mailbox_t *zmq::socket_base_t::get_mailbox ()
{
- return &signaler;
+ return &mailbox;
}
void zmq::socket_base_t::stop ()
@@ -227,7 +227,7 @@ int zmq::socket_base_t::getsockopt (int option_, void *optval_,
errno = EINVAL;
return -1;
}
- *((fd_t*) optval_) = signaler.get_fd ();
+ *((fd_t*) optval_) = mailbox.get_fd ();
*optvallen_ = sizeof (fd_t);
return 0;
}
@@ -613,7 +613,7 @@ int zmq::socket_base_t::process_commands (bool block_, bool throttle_)
int rc;
command_t cmd;
if (block_) {
- rc = signaler.recv (&cmd, true);
+ rc = mailbox.recv (&cmd, true);
if (rc == -1 && errno == EINTR)
return -1;
errno_assert (rc == 0);
@@ -640,7 +640,7 @@ int zmq::socket_base_t::process_commands (bool block_, bool throttle_)
}
// Check whether there are any commands pending for this thread.
- rc = signaler.recv (&cmd, false);
+ rc = mailbox.recv (&cmd, false);
}
// Process all the commands available at the moment.
@@ -651,7 +651,7 @@ int zmq::socket_base_t::process_commands (bool block_, bool throttle_)
return -1;
errno_assert (rc == 0);
cmd.destination->process_command (cmd);
- rc = signaler.recv (&cmd, false);
+ rc = mailbox.recv (&cmd, false);
}
if (ctx_terminated) {
diff --git a/src/socket_base.hpp b/src/socket_base.hpp
index 0fdfefd7..69de24dd 100644
--- a/src/socket_base.hpp
+++ b/src/socket_base.hpp
@@ -30,7 +30,7 @@
#include "mutex.hpp"
#include "stdint.hpp"
#include "atomic_counter.hpp"
-#include "signaler.hpp"
+#include "mailbox.hpp"
#include "stdint.hpp"
#include "blob.hpp"
#include "own.hpp"
@@ -48,8 +48,8 @@ namespace zmq
static socket_base_t *create (int type_, class ctx_t *parent_,
uint32_t tid_);
- // Returns the signaler associated with this socket.
- signaler_t *get_signaler ();
+ // Returns the mailbox associated with this socket.
+ mailbox_t *get_mailbox ();
// Interrupt blocking call if the socket is stuck in one.
// This function can be called from a different thread!
@@ -148,8 +148,8 @@ namespace zmq
const blob_t &peer_identity_);
void process_unplug ();
- // App thread's signaler object.
- signaler_t signaler;
+ // Socket's mailbox object.
+ mailbox_t mailbox;
// Timestamp of when commands were processed the last time.
uint64_t last_tsc;