From 9da84a5239e5356e34d872c2b5af1d19b9c7eb4f Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Fri, 5 Nov 2010 17:39:51 +0100 Subject: [PATCH] signaler renamed to mailbox For historical reasons queue to transfer commands between threads was called 'signaler'. Given that it was used to pass commands rather than signals it was renamed to 'mailbox', see Erlang mailboxes. Signed-off-by: Martin Sustrik --- MAINTAINERS | 2 +- builds/msvc/libzmq/libzmq.vcproj | 4 ++-- src/Makefile.am | 4 ++-- src/ctx.cpp | 14 ++++++------ src/ctx.hpp | 6 +++--- src/io_thread.cpp | 12 +++++------ src/io_thread.hpp | 15 ++++++------- src/{signaler.cpp => mailbox.cpp} | 36 ++++++++++++++++++++----------- src/{signaler.hpp => mailbox.hpp} | 16 +++++++------- src/own.cpp | 2 +- src/socket_base.cpp | 12 +++++------ src/socket_base.hpp | 10 ++++----- 12 files changed, 72 insertions(+), 61 deletions(-) rename src/{signaler.cpp => mailbox.cpp} (95%) rename src/{signaler.hpp => mailbox.hpp} (81%) diff --git a/MAINTAINERS b/MAINTAINERS index 9d65ad4b..8e572eb0 100644 --- a/MAINTAINERS +++ b/MAINTAINERS @@ -38,7 +38,7 @@ Component: Devices Maintainer: Martin Sustrik Contact: sustrik@250bpm.com -Component: Generic Infrastructure (context, signaler, command, pipe) +Component: Generic Infrastructure (context, mailbox, command, pipe) Maintainer: Martin Sustrik Contact: sustrik@250bpm.com diff --git a/builds/msvc/libzmq/libzmq.vcproj b/builds/msvc/libzmq/libzmq.vcproj index ffe8a17a..456b5ed0 100644 --- a/builds/msvc/libzmq/libzmq.vcproj +++ b/builds/msvc/libzmq/libzmq.vcproj @@ -310,7 +310,7 @@ > get_signaler (); + slots [i] = io_thread->get_mailbox (); io_thread->start (); } @@ -92,8 +92,8 @@ zmq::ctx_t::~ctx_t () for (io_threads_t::size_type i = 0; i != io_threads.size (); i++) delete io_threads [i]; - // Deallocate the array of slots. No special work is - // needed as signalers themselves were deallocated with their + // Deallocate the array of mailboxes. No special work is + // needed as mailboxes themselves were deallocated with their // corresponding io_thread/socket objects. free (slots); @@ -178,7 +178,7 @@ zmq::socket_base_t *zmq::ctx_t::create_socket (int type_) uint32_t slot = empty_slots.back (); empty_slots.pop_back (); - // Create the socket and register its signaler. + // Create the socket and register its mailbox. socket_base_t *s = socket_base_t::create (type_, this, slot); if (!s) { empty_slots.push_back (slot); @@ -186,7 +186,7 @@ zmq::socket_base_t *zmq::ctx_t::create_socket (int type_) return NULL; } sockets.push_back (s); - slots [slot] = s->get_signaler (); + slots [slot] = s->get_mailbox (); slot_sync.unlock (); diff --git a/src/ctx.hpp b/src/ctx.hpp index 5a3a6aa4..0f2dd522 100644 --- a/src/ctx.hpp +++ b/src/ctx.hpp @@ -26,7 +26,7 @@ #include "../include/zmq.h" -#include "signaler.hpp" +#include "mailbox.hpp" #include "semaphore.hpp" #include "ypipe.hpp" #include "array.hpp" @@ -117,9 +117,9 @@ namespace zmq typedef std::vector io_threads_t; io_threads_t io_threads; - // Array of pointers to signalers for both application and I/O threads. + // Array of pointers to mailboxes for both application and I/O threads. uint32_t slot_count; - signaler_t **slots; + mailbox_t **slots; // List of inproc endpoints within this context. typedef std::map endpoints_t; diff --git a/src/io_thread.cpp b/src/io_thread.cpp index aacf8430..7ba8905f 100644 --- a/src/io_thread.cpp +++ b/src/io_thread.cpp @@ -32,8 +32,8 @@ zmq::io_thread_t::io_thread_t (ctx_t *ctx_, uint32_t tid_) : poller = new (std::nothrow) poller_t; zmq_assert (poller); - signaler_handle = poller->add_fd (signaler.get_fd (), this); - poller->set_pollin (signaler_handle); + mailbox_handle = poller->add_fd (mailbox.get_fd (), this); + poller->set_pollin (mailbox_handle); } zmq::io_thread_t::~io_thread_t () @@ -52,9 +52,9 @@ void zmq::io_thread_t::stop () send_stop (); } -zmq::signaler_t *zmq::io_thread_t::get_signaler () +zmq::mailbox_t *zmq::io_thread_t::get_mailbox () { - return &signaler; + return &mailbox; } int zmq::io_thread_t::get_load () @@ -71,7 +71,7 @@ void zmq::io_thread_t::in_event () // Get the next command. If there is none, exit. command_t cmd; - int rc = signaler.recv (&cmd, false); + int rc = mailbox.recv (&cmd, false); if (rc != 0 && errno == EINTR) continue; if (rc != 0 && errno == EAGAIN) @@ -103,6 +103,6 @@ zmq::poller_t *zmq::io_thread_t::get_poller () void zmq::io_thread_t::process_stop () { - poller->rm_fd (signaler_handle); + poller->rm_fd (mailbox_handle); poller->stop (); } diff --git a/src/io_thread.hpp b/src/io_thread.hpp index a0704fc9..b01eecb3 100644 --- a/src/io_thread.hpp +++ b/src/io_thread.hpp @@ -26,7 +26,7 @@ #include "object.hpp" #include "poller.hpp" #include "i_poll_events.hpp" -#include "signaler.hpp" +#include "mailbox.hpp" namespace zmq { @@ -50,8 +50,8 @@ namespace zmq // Ask underlying thread to stop. void stop (); - // Returns signaler associated with this I/O thread. - signaler_t *get_signaler (); + // Returns mailbox associated with this I/O thread. + mailbox_t *get_mailbox (); // i_poll_events implementation. void in_event (); @@ -69,12 +69,11 @@ namespace zmq private: - // Poll thread gets notifications about incoming commands using - // this signaler. - signaler_t signaler; + // I/O thread accesses incoming commands via this mailbox. + mailbox_t mailbox; - // Handle associated with signaler's file descriptor. - poller_t::handle_t signaler_handle; + // Handle associated with mailbox' file descriptor. + poller_t::handle_t mailbox_handle; // I/O multiplexing is performed using a poller object. poller_t *poller; diff --git a/src/signaler.cpp b/src/mailbox.cpp similarity index 95% rename from src/signaler.cpp rename to src/mailbox.cpp index a6920782..927c230f 100644 --- a/src/signaler.cpp +++ b/src/mailbox.cpp @@ -17,7 +17,7 @@ along with this program. If not, see . */ -#include "signaler.hpp" +#include "mailbox.hpp" #include "platform.hpp" #include "err.hpp" #include "fd.hpp" @@ -35,14 +35,14 @@ #include #endif -zmq::fd_t zmq::signaler_t::get_fd () +zmq::fd_t zmq::mailbox_t::get_fd () { return r; } #if defined ZMQ_HAVE_WINDOWS -zmq::signaler_t::signaler_t () +zmq::mailbox_t::mailbox_t () { // Create the socketpair for signalling. int rc = make_socketpair (&r, &w); @@ -59,7 +59,7 @@ zmq::signaler_t::signaler_t () wsa_assert (rc != SOCKET_ERROR); } -zmq::signaler_t::~signaler_t () +zmq::mailbox_t::~mailbox_t () { int rc = closesocket (w); wsa_assert (rc != SOCKET_ERROR); @@ -68,7 +68,7 @@ zmq::signaler_t::~signaler_t () wsa_assert (rc != SOCKET_ERROR); } -void zmq::signaler_t::send (const command_t &cmd_) +void zmq::mailbox_t::send (const command_t &cmd_) { // TODO: Implement SNDBUF auto-resizing as for POSIX platforms. // In the mean time, the following code with assert if the send() @@ -78,7 +78,7 @@ void zmq::signaler_t::send (const command_t &cmd_) zmq_assert (nbytes == sizeof (command_t)); } -int zmq::signaler_t::recv (command_t *cmd_, bool block_) +int zmq::mailbox_t::recv (command_t *cmd_, bool block_) { if (block_) { // Set the reader to blocking mode. @@ -115,7 +115,7 @@ int zmq::signaler_t::recv (command_t *cmd_, bool block_) #else // !ZMQ_HAVE_WINDOWS -zmq::signaler_t::signaler_t () +zmq::mailbox_t::mailbox_t () { #ifdef PIPE_BUF // Make sure that command can be written to the socket in atomic fashion. @@ -143,35 +143,40 @@ zmq::signaler_t::signaler_t () #endif } -zmq::signaler_t::~signaler_t () +zmq::mailbox_t::~mailbox_t () { close (w); close (r); } -void zmq::signaler_t::send (const command_t &cmd_) +void zmq::mailbox_t::send (const command_t &cmd_) { // Attempt to write an entire command without blocking. ssize_t nbytes; do { nbytes = ::send (w, &cmd_, sizeof (command_t), 0); } while (nbytes == -1 && errno == EINTR); - // Attempt to increase signaler SNDBUF if the send failed. + + // Attempt to increase mailbox SNDBUF if the send failed. if (nbytes == -1 && errno == EAGAIN) { int old_sndbuf, new_sndbuf; socklen_t sndbuf_size = sizeof old_sndbuf; + // Retrieve current send buffer size. int rc = getsockopt (w, SOL_SOCKET, SO_SNDBUF, &old_sndbuf, &sndbuf_size); errno_assert (rc == 0); new_sndbuf = old_sndbuf * 2; + // Double the new send buffer size. rc = setsockopt (w, SOL_SOCKET, SO_SNDBUF, &new_sndbuf, sndbuf_size); errno_assert (rc == 0); + // Verify that the OS actually honored the request. rc = getsockopt (w, SOL_SOCKET, SO_SNDBUF, &new_sndbuf, &sndbuf_size); errno_assert (rc == 0); zmq_assert (new_sndbuf > old_sndbuf); + // Retry the sending operation; at this point it must succeed. do { nbytes = ::send (w, &cmd_, sizeof (command_t), 0); @@ -184,7 +189,7 @@ void zmq::signaler_t::send (const command_t &cmd_) zmq_assert (nbytes == sizeof (command_t)); } -int zmq::signaler_t::recv (command_t *cmd_, bool block_) +int zmq::mailbox_t::recv (command_t *cmd_, bool block_) { #ifdef MSG_DONTWAIT // Attempt to read an entire command. Returns EAGAIN if non-blocking @@ -195,33 +200,40 @@ int zmq::signaler_t::recv (command_t *cmd_, bool block_) return -1; #else if (block_) { + // Set the reader to blocking mode. int flags = fcntl (r, F_GETFL, 0); errno_assert (flags >= 0); int rc = fcntl (r, F_SETFL, flags & ~O_NONBLOCK); errno_assert (rc == 0); } + // Attempt to read an entire command. Returns EAGAIN if non-blocking // and a command is not available. int err = 0; ssize_t nbytes = ::recv (r, cmd_, sizeof (command_t), 0); if (nbytes == -1 && (errno == EAGAIN || errno == EINTR)) { + // Save value of errno if we wish to pass it to caller. err = errno; } + if (block_) { + // Re-set the reader to non-blocking mode. int flags = fcntl (r, F_GETFL, 0); errno_assert (flags >= 0); int rc = fcntl (r, F_SETFL, flags | O_NONBLOCK); errno_assert (rc == 0); } + // If the recv failed, return with the saved errno if set. if (err != 0) { errno = err; return -1; } #endif + // Sanity check for success. errno_assert (nbytes != -1); @@ -233,7 +245,7 @@ int zmq::signaler_t::recv (command_t *cmd_, bool block_) #endif -int zmq::signaler_t::make_socketpair (fd_t *r_, fd_t *w_) +int zmq::mailbox_t::make_socketpair (fd_t *r_, fd_t *w_) { #if defined ZMQ_HAVE_WINDOWS diff --git a/src/signaler.hpp b/src/mailbox.hpp similarity index 81% rename from src/signaler.hpp rename to src/mailbox.hpp index faf3f1f3..dc49aad9 100644 --- a/src/signaler.hpp +++ b/src/mailbox.hpp @@ -17,8 +17,8 @@ along with this program. If not, see . */ -#ifndef __ZMQ_SIGNALER_HPP_INCLUDED__ -#define __ZMQ_SIGNALER_HPP_INCLUDED__ +#ifndef __ZMQ_MAILBOX_HPP_INCLUDED__ +#define __ZMQ_MAILBOX_HPP_INCLUDED__ #include @@ -31,12 +31,12 @@ namespace zmq { - class signaler_t + class mailbox_t { public: - signaler_t (); - ~signaler_t (); + mailbox_t (); + ~mailbox_t (); fd_t get_fd (); void send (const command_t &cmd_); @@ -51,9 +51,9 @@ namespace zmq // Platform-dependent function to create a socketpair. static int make_socketpair (fd_t *r_, fd_t *w_); - // Disable copying of signaler_t object. - signaler_t (const signaler_t&); - void operator = (const signaler_t&); + // Disable copying of mailbox_t object. + mailbox_t (const mailbox_t&); + void operator = (const mailbox_t&); }; } diff --git a/src/own.cpp b/src/own.cpp index 955113ab..15d25675 100644 --- a/src/own.cpp +++ b/src/own.cpp @@ -83,7 +83,7 @@ void zmq::own_t::launch_sibling (own_t *object_) { // At this point it is important that object is plugged in before its // owner has a chance to terminate it. Thus, 'plug' command is sent before - // the 'own' command. Given that the signaler preserves ordering of + // the 'own' command. Given that the mailbox preserves ordering of // commands, 'term' command from the owner cannot make it to the object // before the already written 'plug' command. diff --git a/src/socket_base.cpp b/src/socket_base.cpp index c6728fe5..a10ed0e5 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -118,9 +118,9 @@ zmq::socket_base_t::~socket_base_t () sessions_sync.unlock (); } -zmq::signaler_t *zmq::socket_base_t::get_signaler () +zmq::mailbox_t *zmq::socket_base_t::get_mailbox () { - return &signaler; + return &mailbox; } void zmq::socket_base_t::stop () @@ -227,7 +227,7 @@ int zmq::socket_base_t::getsockopt (int option_, void *optval_, errno = EINVAL; return -1; } - *((fd_t*) optval_) = signaler.get_fd (); + *((fd_t*) optval_) = mailbox.get_fd (); *optvallen_ = sizeof (fd_t); return 0; } @@ -613,7 +613,7 @@ int zmq::socket_base_t::process_commands (bool block_, bool throttle_) int rc; command_t cmd; if (block_) { - rc = signaler.recv (&cmd, true); + rc = mailbox.recv (&cmd, true); if (rc == -1 && errno == EINTR) return -1; errno_assert (rc == 0); @@ -640,7 +640,7 @@ int zmq::socket_base_t::process_commands (bool block_, bool throttle_) } // Check whether there are any commands pending for this thread. - rc = signaler.recv (&cmd, false); + rc = mailbox.recv (&cmd, false); } // Process all the commands available at the moment. @@ -651,7 +651,7 @@ int zmq::socket_base_t::process_commands (bool block_, bool throttle_) return -1; errno_assert (rc == 0); cmd.destination->process_command (cmd); - rc = signaler.recv (&cmd, false); + rc = mailbox.recv (&cmd, false); } if (ctx_terminated) { diff --git a/src/socket_base.hpp b/src/socket_base.hpp index 0fdfefd7..69de24dd 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -30,7 +30,7 @@ #include "mutex.hpp" #include "stdint.hpp" #include "atomic_counter.hpp" -#include "signaler.hpp" +#include "mailbox.hpp" #include "stdint.hpp" #include "blob.hpp" #include "own.hpp" @@ -48,8 +48,8 @@ namespace zmq static socket_base_t *create (int type_, class ctx_t *parent_, uint32_t tid_); - // Returns the signaler associated with this socket. - signaler_t *get_signaler (); + // Returns the mailbox associated with this socket. + mailbox_t *get_mailbox (); // Interrupt blocking call if the socket is stuck in one. // This function can be called from a different thread! @@ -148,8 +148,8 @@ namespace zmq const blob_t &peer_identity_); void process_unplug (); - // App thread's signaler object. - signaler_t signaler; + // Socket's mailbox object. + mailbox_t mailbox; // Timestamp of when commands were processed the last time. uint64_t last_tsc;