signaler renamed to mailbox

For historical reasons queue to transfer commands between
threads was called 'signaler'. Given that it was used to
pass commands rather than signals it was renamed to 'mailbox',
see Erlang mailboxes.

Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
This commit is contained in:
Martin Sustrik 2010-11-05 17:39:51 +01:00
parent 9cfdb441f4
commit 9da84a5239
12 changed files with 72 additions and 61 deletions

View File

@ -38,7 +38,7 @@ Component: Devices
Maintainer: Martin Sustrik Maintainer: Martin Sustrik
Contact: sustrik@250bpm.com Contact: sustrik@250bpm.com
Component: Generic Infrastructure (context, signaler, command, pipe) Component: Generic Infrastructure (context, mailbox, command, pipe)
Maintainer: Martin Sustrik Maintainer: Martin Sustrik
Contact: sustrik@250bpm.com Contact: sustrik@250bpm.com

View File

@ -310,7 +310,7 @@
> >
</File> </File>
<File <File
RelativePath="..\..\..\src\signaler.cpp" RelativePath="..\..\..\src\mailbox.cpp"
> >
</File> </File>
<File <File
@ -592,7 +592,7 @@
> >
</File> </File>
<File <File
RelativePath="..\..\..\src\signaler.hpp" RelativePath="..\..\..\src\mailbox.hpp"
> >
</File> </File>
<File <File

View File

@ -87,6 +87,7 @@ libzmq_la_SOURCES = \
kqueue.hpp \ kqueue.hpp \
lb.hpp \ lb.hpp \
likely.hpp \ likely.hpp \
mailbox.hpp \
msg_content.hpp \ msg_content.hpp \
mutex.hpp \ mutex.hpp \
named_session.hpp \ named_session.hpp \
@ -111,7 +112,6 @@ libzmq_la_SOURCES = \
select.hpp \ select.hpp \
semaphore.hpp \ semaphore.hpp \
session.hpp \ session.hpp \
signaler.hpp \
socket_base.hpp \ socket_base.hpp \
stdint.hpp \ stdint.hpp \
streamer.hpp \ streamer.hpp \
@ -150,6 +150,7 @@ libzmq_la_SOURCES = \
ip.cpp \ ip.cpp \
kqueue.cpp \ kqueue.cpp \
lb.cpp \ lb.cpp \
mailbox.cpp \
named_session.cpp \ named_session.cpp \
object.cpp \ object.cpp \
options.cpp \ options.cpp \
@ -169,7 +170,6 @@ libzmq_la_SOURCES = \
req.cpp \ req.cpp \
select.cpp \ select.cpp \
session.cpp \ session.cpp \
signaler.cpp \
socket_base.cpp \ socket_base.cpp \
streamer.cpp \ streamer.cpp \
sub.cpp \ sub.cpp \

View File

@ -49,9 +49,9 @@ zmq::ctx_t::ctx_t (uint32_t io_threads_) :
HIBYTE (wsa_data.wVersion) == 2); HIBYTE (wsa_data.wVersion) == 2);
#endif #endif
// Initialise the array of signalers. // Initialise the array of mailboxes.
slot_count = max_sockets + io_threads_; slot_count = max_sockets + io_threads_;
slots = (signaler_t**) malloc (sizeof (signaler_t*) * slot_count); slots = (mailbox_t**) malloc (sizeof (mailbox_t*) * slot_count);
zmq_assert (slots); zmq_assert (slots);
// Create I/O thread objects and launch them. // Create I/O thread objects and launch them.
@ -59,7 +59,7 @@ zmq::ctx_t::ctx_t (uint32_t io_threads_) :
io_thread_t *io_thread = new (std::nothrow) io_thread_t (this, i); io_thread_t *io_thread = new (std::nothrow) io_thread_t (this, i);
zmq_assert (io_thread); zmq_assert (io_thread);
io_threads.push_back (io_thread); io_threads.push_back (io_thread);
slots [i] = io_thread->get_signaler (); slots [i] = io_thread->get_mailbox ();
io_thread->start (); io_thread->start ();
} }
@ -92,8 +92,8 @@ zmq::ctx_t::~ctx_t ()
for (io_threads_t::size_type i = 0; i != io_threads.size (); i++) for (io_threads_t::size_type i = 0; i != io_threads.size (); i++)
delete io_threads [i]; delete io_threads [i];
// Deallocate the array of slots. No special work is // Deallocate the array of mailboxes. No special work is
// needed as signalers themselves were deallocated with their // needed as mailboxes themselves were deallocated with their
// corresponding io_thread/socket objects. // corresponding io_thread/socket objects.
free (slots); free (slots);
@ -178,7 +178,7 @@ zmq::socket_base_t *zmq::ctx_t::create_socket (int type_)
uint32_t slot = empty_slots.back (); uint32_t slot = empty_slots.back ();
empty_slots.pop_back (); empty_slots.pop_back ();
// Create the socket and register its signaler. // Create the socket and register its mailbox.
socket_base_t *s = socket_base_t::create (type_, this, slot); socket_base_t *s = socket_base_t::create (type_, this, slot);
if (!s) { if (!s) {
empty_slots.push_back (slot); empty_slots.push_back (slot);
@ -186,7 +186,7 @@ zmq::socket_base_t *zmq::ctx_t::create_socket (int type_)
return NULL; return NULL;
} }
sockets.push_back (s); sockets.push_back (s);
slots [slot] = s->get_signaler (); slots [slot] = s->get_mailbox ();
slot_sync.unlock (); slot_sync.unlock ();

View File

@ -26,7 +26,7 @@
#include "../include/zmq.h" #include "../include/zmq.h"
#include "signaler.hpp" #include "mailbox.hpp"
#include "semaphore.hpp" #include "semaphore.hpp"
#include "ypipe.hpp" #include "ypipe.hpp"
#include "array.hpp" #include "array.hpp"
@ -117,9 +117,9 @@ namespace zmq
typedef std::vector <class io_thread_t*> io_threads_t; typedef std::vector <class io_thread_t*> io_threads_t;
io_threads_t io_threads; io_threads_t io_threads;
// Array of pointers to signalers for both application and I/O threads. // Array of pointers to mailboxes for both application and I/O threads.
uint32_t slot_count; uint32_t slot_count;
signaler_t **slots; mailbox_t **slots;
// List of inproc endpoints within this context. // List of inproc endpoints within this context.
typedef std::map <std::string, class socket_base_t*> endpoints_t; typedef std::map <std::string, class socket_base_t*> endpoints_t;

View File

@ -32,8 +32,8 @@ zmq::io_thread_t::io_thread_t (ctx_t *ctx_, uint32_t tid_) :
poller = new (std::nothrow) poller_t; poller = new (std::nothrow) poller_t;
zmq_assert (poller); zmq_assert (poller);
signaler_handle = poller->add_fd (signaler.get_fd (), this); mailbox_handle = poller->add_fd (mailbox.get_fd (), this);
poller->set_pollin (signaler_handle); poller->set_pollin (mailbox_handle);
} }
zmq::io_thread_t::~io_thread_t () zmq::io_thread_t::~io_thread_t ()
@ -52,9 +52,9 @@ void zmq::io_thread_t::stop ()
send_stop (); send_stop ();
} }
zmq::signaler_t *zmq::io_thread_t::get_signaler () zmq::mailbox_t *zmq::io_thread_t::get_mailbox ()
{ {
return &signaler; return &mailbox;
} }
int zmq::io_thread_t::get_load () int zmq::io_thread_t::get_load ()
@ -71,7 +71,7 @@ void zmq::io_thread_t::in_event ()
// Get the next command. If there is none, exit. // Get the next command. If there is none, exit.
command_t cmd; command_t cmd;
int rc = signaler.recv (&cmd, false); int rc = mailbox.recv (&cmd, false);
if (rc != 0 && errno == EINTR) if (rc != 0 && errno == EINTR)
continue; continue;
if (rc != 0 && errno == EAGAIN) if (rc != 0 && errno == EAGAIN)
@ -103,6 +103,6 @@ zmq::poller_t *zmq::io_thread_t::get_poller ()
void zmq::io_thread_t::process_stop () void zmq::io_thread_t::process_stop ()
{ {
poller->rm_fd (signaler_handle); poller->rm_fd (mailbox_handle);
poller->stop (); poller->stop ();
} }

View File

@ -26,7 +26,7 @@
#include "object.hpp" #include "object.hpp"
#include "poller.hpp" #include "poller.hpp"
#include "i_poll_events.hpp" #include "i_poll_events.hpp"
#include "signaler.hpp" #include "mailbox.hpp"
namespace zmq namespace zmq
{ {
@ -50,8 +50,8 @@ namespace zmq
// Ask underlying thread to stop. // Ask underlying thread to stop.
void stop (); void stop ();
// Returns signaler associated with this I/O thread. // Returns mailbox associated with this I/O thread.
signaler_t *get_signaler (); mailbox_t *get_mailbox ();
// i_poll_events implementation. // i_poll_events implementation.
void in_event (); void in_event ();
@ -69,12 +69,11 @@ namespace zmq
private: private:
// Poll thread gets notifications about incoming commands using // I/O thread accesses incoming commands via this mailbox.
// this signaler. mailbox_t mailbox;
signaler_t signaler;
// Handle associated with signaler's file descriptor. // Handle associated with mailbox' file descriptor.
poller_t::handle_t signaler_handle; poller_t::handle_t mailbox_handle;
// I/O multiplexing is performed using a poller object. // I/O multiplexing is performed using a poller object.
poller_t *poller; poller_t *poller;

View File

@ -17,7 +17,7 @@
along with this program. If not, see <http://www.gnu.org/licenses/>. along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "signaler.hpp" #include "mailbox.hpp"
#include "platform.hpp" #include "platform.hpp"
#include "err.hpp" #include "err.hpp"
#include "fd.hpp" #include "fd.hpp"
@ -35,14 +35,14 @@
#include <sys/socket.h> #include <sys/socket.h>
#endif #endif
zmq::fd_t zmq::signaler_t::get_fd () zmq::fd_t zmq::mailbox_t::get_fd ()
{ {
return r; return r;
} }
#if defined ZMQ_HAVE_WINDOWS #if defined ZMQ_HAVE_WINDOWS
zmq::signaler_t::signaler_t () zmq::mailbox_t::mailbox_t ()
{ {
// Create the socketpair for signalling. // Create the socketpair for signalling.
int rc = make_socketpair (&r, &w); int rc = make_socketpair (&r, &w);
@ -59,7 +59,7 @@ zmq::signaler_t::signaler_t ()
wsa_assert (rc != SOCKET_ERROR); wsa_assert (rc != SOCKET_ERROR);
} }
zmq::signaler_t::~signaler_t () zmq::mailbox_t::~mailbox_t ()
{ {
int rc = closesocket (w); int rc = closesocket (w);
wsa_assert (rc != SOCKET_ERROR); wsa_assert (rc != SOCKET_ERROR);
@ -68,7 +68,7 @@ zmq::signaler_t::~signaler_t ()
wsa_assert (rc != SOCKET_ERROR); wsa_assert (rc != SOCKET_ERROR);
} }
void zmq::signaler_t::send (const command_t &cmd_) void zmq::mailbox_t::send (const command_t &cmd_)
{ {
// TODO: Implement SNDBUF auto-resizing as for POSIX platforms. // TODO: Implement SNDBUF auto-resizing as for POSIX platforms.
// In the mean time, the following code with assert if the send() // In the mean time, the following code with assert if the send()
@ -78,7 +78,7 @@ void zmq::signaler_t::send (const command_t &cmd_)
zmq_assert (nbytes == sizeof (command_t)); zmq_assert (nbytes == sizeof (command_t));
} }
int zmq::signaler_t::recv (command_t *cmd_, bool block_) int zmq::mailbox_t::recv (command_t *cmd_, bool block_)
{ {
if (block_) { if (block_) {
// Set the reader to blocking mode. // Set the reader to blocking mode.
@ -115,7 +115,7 @@ int zmq::signaler_t::recv (command_t *cmd_, bool block_)
#else // !ZMQ_HAVE_WINDOWS #else // !ZMQ_HAVE_WINDOWS
zmq::signaler_t::signaler_t () zmq::mailbox_t::mailbox_t ()
{ {
#ifdef PIPE_BUF #ifdef PIPE_BUF
// Make sure that command can be written to the socket in atomic fashion. // Make sure that command can be written to the socket in atomic fashion.
@ -143,35 +143,40 @@ zmq::signaler_t::signaler_t ()
#endif #endif
} }
zmq::signaler_t::~signaler_t () zmq::mailbox_t::~mailbox_t ()
{ {
close (w); close (w);
close (r); close (r);
} }
void zmq::signaler_t::send (const command_t &cmd_) void zmq::mailbox_t::send (const command_t &cmd_)
{ {
// Attempt to write an entire command without blocking. // Attempt to write an entire command without blocking.
ssize_t nbytes; ssize_t nbytes;
do { do {
nbytes = ::send (w, &cmd_, sizeof (command_t), 0); nbytes = ::send (w, &cmd_, sizeof (command_t), 0);
} while (nbytes == -1 && errno == EINTR); } while (nbytes == -1 && errno == EINTR);
// Attempt to increase signaler SNDBUF if the send failed.
// Attempt to increase mailbox SNDBUF if the send failed.
if (nbytes == -1 && errno == EAGAIN) { if (nbytes == -1 && errno == EAGAIN) {
int old_sndbuf, new_sndbuf; int old_sndbuf, new_sndbuf;
socklen_t sndbuf_size = sizeof old_sndbuf; socklen_t sndbuf_size = sizeof old_sndbuf;
// Retrieve current send buffer size. // Retrieve current send buffer size.
int rc = getsockopt (w, SOL_SOCKET, SO_SNDBUF, &old_sndbuf, int rc = getsockopt (w, SOL_SOCKET, SO_SNDBUF, &old_sndbuf,
&sndbuf_size); &sndbuf_size);
errno_assert (rc == 0); errno_assert (rc == 0);
new_sndbuf = old_sndbuf * 2; new_sndbuf = old_sndbuf * 2;
// Double the new send buffer size. // Double the new send buffer size.
rc = setsockopt (w, SOL_SOCKET, SO_SNDBUF, &new_sndbuf, sndbuf_size); rc = setsockopt (w, SOL_SOCKET, SO_SNDBUF, &new_sndbuf, sndbuf_size);
errno_assert (rc == 0); errno_assert (rc == 0);
// Verify that the OS actually honored the request. // Verify that the OS actually honored the request.
rc = getsockopt (w, SOL_SOCKET, SO_SNDBUF, &new_sndbuf, &sndbuf_size); rc = getsockopt (w, SOL_SOCKET, SO_SNDBUF, &new_sndbuf, &sndbuf_size);
errno_assert (rc == 0); errno_assert (rc == 0);
zmq_assert (new_sndbuf > old_sndbuf); zmq_assert (new_sndbuf > old_sndbuf);
// Retry the sending operation; at this point it must succeed. // Retry the sending operation; at this point it must succeed.
do { do {
nbytes = ::send (w, &cmd_, sizeof (command_t), 0); nbytes = ::send (w, &cmd_, sizeof (command_t), 0);
@ -184,7 +189,7 @@ void zmq::signaler_t::send (const command_t &cmd_)
zmq_assert (nbytes == sizeof (command_t)); zmq_assert (nbytes == sizeof (command_t));
} }
int zmq::signaler_t::recv (command_t *cmd_, bool block_) int zmq::mailbox_t::recv (command_t *cmd_, bool block_)
{ {
#ifdef MSG_DONTWAIT #ifdef MSG_DONTWAIT
// Attempt to read an entire command. Returns EAGAIN if non-blocking // Attempt to read an entire command. Returns EAGAIN if non-blocking
@ -195,33 +200,40 @@ int zmq::signaler_t::recv (command_t *cmd_, bool block_)
return -1; return -1;
#else #else
if (block_) { if (block_) {
// Set the reader to blocking mode. // Set the reader to blocking mode.
int flags = fcntl (r, F_GETFL, 0); int flags = fcntl (r, F_GETFL, 0);
errno_assert (flags >= 0); errno_assert (flags >= 0);
int rc = fcntl (r, F_SETFL, flags & ~O_NONBLOCK); int rc = fcntl (r, F_SETFL, flags & ~O_NONBLOCK);
errno_assert (rc == 0); errno_assert (rc == 0);
} }
// Attempt to read an entire command. Returns EAGAIN if non-blocking // Attempt to read an entire command. Returns EAGAIN if non-blocking
// and a command is not available. // and a command is not available.
int err = 0; int err = 0;
ssize_t nbytes = ::recv (r, cmd_, sizeof (command_t), 0); ssize_t nbytes = ::recv (r, cmd_, sizeof (command_t), 0);
if (nbytes == -1 && (errno == EAGAIN || errno == EINTR)) { if (nbytes == -1 && (errno == EAGAIN || errno == EINTR)) {
// Save value of errno if we wish to pass it to caller. // Save value of errno if we wish to pass it to caller.
err = errno; err = errno;
} }
if (block_) { if (block_) {
// Re-set the reader to non-blocking mode. // Re-set the reader to non-blocking mode.
int flags = fcntl (r, F_GETFL, 0); int flags = fcntl (r, F_GETFL, 0);
errno_assert (flags >= 0); errno_assert (flags >= 0);
int rc = fcntl (r, F_SETFL, flags | O_NONBLOCK); int rc = fcntl (r, F_SETFL, flags | O_NONBLOCK);
errno_assert (rc == 0); errno_assert (rc == 0);
} }
// If the recv failed, return with the saved errno if set. // If the recv failed, return with the saved errno if set.
if (err != 0) { if (err != 0) {
errno = err; errno = err;
return -1; return -1;
} }
#endif #endif
// Sanity check for success. // Sanity check for success.
errno_assert (nbytes != -1); errno_assert (nbytes != -1);
@ -233,7 +245,7 @@ int zmq::signaler_t::recv (command_t *cmd_, bool block_)
#endif #endif
int zmq::signaler_t::make_socketpair (fd_t *r_, fd_t *w_) int zmq::mailbox_t::make_socketpair (fd_t *r_, fd_t *w_)
{ {
#if defined ZMQ_HAVE_WINDOWS #if defined ZMQ_HAVE_WINDOWS

View File

@ -17,8 +17,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>. along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#ifndef __ZMQ_SIGNALER_HPP_INCLUDED__ #ifndef __ZMQ_MAILBOX_HPP_INCLUDED__
#define __ZMQ_SIGNALER_HPP_INCLUDED__ #define __ZMQ_MAILBOX_HPP_INCLUDED__
#include <stddef.h> #include <stddef.h>
@ -31,12 +31,12 @@
namespace zmq namespace zmq
{ {
class signaler_t class mailbox_t
{ {
public: public:
signaler_t (); mailbox_t ();
~signaler_t (); ~mailbox_t ();
fd_t get_fd (); fd_t get_fd ();
void send (const command_t &cmd_); void send (const command_t &cmd_);
@ -51,9 +51,9 @@ namespace zmq
// Platform-dependent function to create a socketpair. // Platform-dependent function to create a socketpair.
static int make_socketpair (fd_t *r_, fd_t *w_); static int make_socketpair (fd_t *r_, fd_t *w_);
// Disable copying of signaler_t object. // Disable copying of mailbox_t object.
signaler_t (const signaler_t&); mailbox_t (const mailbox_t&);
void operator = (const signaler_t&); void operator = (const mailbox_t&);
}; };
} }

View File

@ -83,7 +83,7 @@ void zmq::own_t::launch_sibling (own_t *object_)
{ {
// At this point it is important that object is plugged in before its // At this point it is important that object is plugged in before its
// owner has a chance to terminate it. Thus, 'plug' command is sent before // owner has a chance to terminate it. Thus, 'plug' command is sent before
// the 'own' command. Given that the signaler preserves ordering of // the 'own' command. Given that the mailbox preserves ordering of
// commands, 'term' command from the owner cannot make it to the object // commands, 'term' command from the owner cannot make it to the object
// before the already written 'plug' command. // before the already written 'plug' command.

View File

@ -118,9 +118,9 @@ zmq::socket_base_t::~socket_base_t ()
sessions_sync.unlock (); sessions_sync.unlock ();
} }
zmq::signaler_t *zmq::socket_base_t::get_signaler () zmq::mailbox_t *zmq::socket_base_t::get_mailbox ()
{ {
return &signaler; return &mailbox;
} }
void zmq::socket_base_t::stop () void zmq::socket_base_t::stop ()
@ -227,7 +227,7 @@ int zmq::socket_base_t::getsockopt (int option_, void *optval_,
errno = EINVAL; errno = EINVAL;
return -1; return -1;
} }
*((fd_t*) optval_) = signaler.get_fd (); *((fd_t*) optval_) = mailbox.get_fd ();
*optvallen_ = sizeof (fd_t); *optvallen_ = sizeof (fd_t);
return 0; return 0;
} }
@ -613,7 +613,7 @@ int zmq::socket_base_t::process_commands (bool block_, bool throttle_)
int rc; int rc;
command_t cmd; command_t cmd;
if (block_) { if (block_) {
rc = signaler.recv (&cmd, true); rc = mailbox.recv (&cmd, true);
if (rc == -1 && errno == EINTR) if (rc == -1 && errno == EINTR)
return -1; return -1;
errno_assert (rc == 0); errno_assert (rc == 0);
@ -640,7 +640,7 @@ int zmq::socket_base_t::process_commands (bool block_, bool throttle_)
} }
// Check whether there are any commands pending for this thread. // Check whether there are any commands pending for this thread.
rc = signaler.recv (&cmd, false); rc = mailbox.recv (&cmd, false);
} }
// Process all the commands available at the moment. // Process all the commands available at the moment.
@ -651,7 +651,7 @@ int zmq::socket_base_t::process_commands (bool block_, bool throttle_)
return -1; return -1;
errno_assert (rc == 0); errno_assert (rc == 0);
cmd.destination->process_command (cmd); cmd.destination->process_command (cmd);
rc = signaler.recv (&cmd, false); rc = mailbox.recv (&cmd, false);
} }
if (ctx_terminated) { if (ctx_terminated) {

View File

@ -30,7 +30,7 @@
#include "mutex.hpp" #include "mutex.hpp"
#include "stdint.hpp" #include "stdint.hpp"
#include "atomic_counter.hpp" #include "atomic_counter.hpp"
#include "signaler.hpp" #include "mailbox.hpp"
#include "stdint.hpp" #include "stdint.hpp"
#include "blob.hpp" #include "blob.hpp"
#include "own.hpp" #include "own.hpp"
@ -48,8 +48,8 @@ namespace zmq
static socket_base_t *create (int type_, class ctx_t *parent_, static socket_base_t *create (int type_, class ctx_t *parent_,
uint32_t tid_); uint32_t tid_);
// Returns the signaler associated with this socket. // Returns the mailbox associated with this socket.
signaler_t *get_signaler (); mailbox_t *get_mailbox ();
// Interrupt blocking call if the socket is stuck in one. // Interrupt blocking call if the socket is stuck in one.
// This function can be called from a different thread! // This function can be called from a different thread!
@ -148,8 +148,8 @@ namespace zmq
const blob_t &peer_identity_); const blob_t &peer_identity_);
void process_unplug (); void process_unplug ();
// App thread's signaler object. // Socket's mailbox object.
signaler_t signaler; mailbox_t mailbox;
// Timestamp of when commands were processed the last time. // Timestamp of when commands were processed the last time.
uint64_t last_tsc; uint64_t last_tsc;