diff --git a/include/zmq.h b/include/zmq.h
index 34ce80c5..fad51ca5 100644
--- a/include/zmq.h
+++ b/include/zmq.h
@@ -67,7 +67,7 @@ extern "C" {
// single accept. There's no message routing or message filtering involved.
#define ZMQ_P2P 0
-// Socket to distribute data. Recv fuction is not implemeted for this socket
+// Socket to distribute data. Recv fuction is not implemented for this socket
// type. Messages are distributed in fanout fashion to all peers.
#define ZMQ_PUB 1
diff --git a/src/Makefile.am b/src/Makefile.am
index 396e3a30..b6a4540d 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -19,6 +19,7 @@ libzmq_la_SOURCES = \
io_object.hpp \
io_thread.hpp \
ip.hpp \
+ i_endpoint.hpp \
i_poller.hpp \
i_poll_events.hpp \
i_signaler.hpp \
@@ -66,6 +67,7 @@ libzmq_la_SOURCES = \
object.cpp \
options.cpp \
owned.cpp \
+ pipe.cpp \
poll.cpp \
select.cpp \
session.cpp \
diff --git a/src/app_thread.cpp b/src/app_thread.cpp
index 74ba3570..db73ec14 100644
--- a/src/app_thread.cpp
+++ b/src/app_thread.cpp
@@ -77,7 +77,7 @@ bool zmq::app_thread_t::make_current ()
return true;
}
-void zmq::app_thread_t::process_commands (bool block_)
+void zmq::app_thread_t::process_commands (bool block_, bool throttle_)
{
ypollset_t::signals_t signals;
if (block_)
@@ -91,24 +91,26 @@ void zmq::app_thread_t::process_commands (bool block_)
// depending on CPU speed: It's ~1ms on 3GHz CPU, ~2ms on 1.5GHz CPU
// etc. The optimisation makes sense only on platforms where getting
// a timestamp is a very cheap operation (tens of nanoseconds).
+ if (throttle_) {
- // Get timestamp counter.
+ // Get timestamp counter.
#if defined __GNUC__
- uint32_t low;
- uint32_t high;
- __asm__ volatile ("rdtsc" : "=a" (low), "=d" (high));
- uint64_t current_time = (uint64_t) high << 32 | low;
+ uint32_t low;
+ uint32_t high;
+ __asm__ volatile ("rdtsc" : "=a" (low), "=d" (high));
+ uint64_t current_time = (uint64_t) high << 32 | low;
#elif defined _MSC_VER
- uint64_t current_time = __rdtsc ();
+ uint64_t current_time = __rdtsc ();
#else
#error
#endif
- // Check whether certain time have elapsed since last command
- // processing.
- if (current_time - last_processing_time <= max_command_delay)
- return;
- last_processing_time = current_time;
+ // Check whether certain time have elapsed since last command
+ // processing.
+ if (current_time - last_processing_time <= max_command_delay)
+ return;
+ last_processing_time = current_time;
+ }
#endif
// Check whether there are any commands pending for this thread.
diff --git a/src/app_thread.hpp b/src/app_thread.hpp
index e7bbf707..e45b1b2e 100644
--- a/src/app_thread.hpp
+++ b/src/app_thread.hpp
@@ -53,7 +53,9 @@ namespace zmq
// Processes commands sent to this thread (if any). If 'block' is
// set to true, returns only after at least one command was processed.
- void process_commands (bool block_);
+ // If throttle argument is true, commands are processed at most once
+ // in a predefined time period.
+ void process_commands (bool block_, bool throttle_);
// Create a socket of a specified type.
class socket_base_t *create_socket (int type_);
diff --git a/src/command.hpp b/src/command.hpp
index 41c7d6c0..d3bad79a 100644
--- a/src/command.hpp
+++ b/src/command.hpp
@@ -39,6 +39,7 @@ namespace zmq
own,
attach,
bind,
+ revive,
term_req,
term,
term_ack
@@ -65,10 +66,18 @@ namespace zmq
class zmq_engine_t *engine;
} attach;
- // Sent between objects to establish pipe(s) between them.
+ // Sent from session to socket to establish pipe(s) between them.
struct {
+ class owned_t *session;
+ class reader_t *in_pipe;
+ class writer_t *out_pipe;
} bind;
+ // Sent by pipe writer to inform dormant pipe reader that there
+ // are messages in the pipe.
+ struct {
+ } revive;
+
// Sent by I/O object ot the socket to request the shutdown of
// the I/O object.
struct {
diff --git a/src/config.hpp b/src/config.hpp
index 88b93d7a..17e67b95 100644
--- a/src/config.hpp
+++ b/src/config.hpp
@@ -38,6 +38,14 @@ namespace zmq
// footprint of dispatcher.
command_pipe_granularity = 4,
+ // Determines how often does socket poll for new commands when it
+ // still has unprocessed messages to handle. Thus, if it is set to 100,
+ // socket will process 100 inbound messages before doing the poll.
+ // If there are no unprocessed messages available, poll is done
+ // immediately. Decreasing the value trades overall latency for more
+ // real-time behaviour (less latency peaks).
+ inbound_poll_rate = 100,
+
// Maximal batching size for engines with receiving functionality.
// So, if there are 10 messages that fit into the batch size, all of
// them may be read by a single 'recv' system call, thus avoiding
diff --git a/src/i_endpoint.hpp b/src/i_endpoint.hpp
new file mode 100644
index 00000000..bb7409e1
--- /dev/null
+++ b/src/i_endpoint.hpp
@@ -0,0 +1,33 @@
+/*
+ Copyright (c) 2007-2009 FastMQ Inc.
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see .
+*/
+
+#ifndef __ZMQ_I_ENDPOINT_HPP_INCLUDED__
+#define __ZMQ_I_ENDPOINT_HPP_INCLUDED__
+
+namespace zmq
+{
+
+ struct i_endpoint
+ {
+ virtual void revive (class reader_t *pipe_) = 0;
+ };
+
+}
+
+#endif
diff --git a/src/object.cpp b/src/object.cpp
index 0a257509..4d54ebf5 100644
--- a/src/object.cpp
+++ b/src/object.cpp
@@ -20,6 +20,7 @@
#include "object.hpp"
#include "dispatcher.hpp"
#include "err.hpp"
+#include "pipe.hpp"
#include "io_thread.hpp"
#include "simple_semaphore.hpp"
#include "owned.hpp"
@@ -57,6 +58,10 @@ void zmq::object_t::process_command (command_t &cmd_)
{
switch (cmd_.type) {
+ case command_t::revive:
+ process_revive ();
+ break;
+
case command_t::stop:
process_stop ();
break;
@@ -74,7 +79,8 @@ void zmq::object_t::process_command (command_t &cmd_)
return;
case command_t::bind:
- process_bind ();
+ process_bind (cmd_.args.bind.session,
+ cmd_.args.bind.in_pipe, cmd_.args.bind.out_pipe);
return;
case command_t::term_req:
@@ -140,11 +146,23 @@ void zmq::object_t::send_attach (session_t *destination_, zmq_engine_t *engine_)
send_command (cmd);
}
-void zmq::object_t::send_bind (object_t *destination_)
+void zmq::object_t::send_bind (object_t *destination_, owned_t *session_,
+ reader_t *in_pipe_, writer_t *out_pipe_)
{
command_t cmd;
cmd.destination = destination_;
cmd.type = command_t::bind;
+ cmd.args.bind.session = session_;
+ cmd.args.bind.in_pipe = in_pipe_;
+ cmd.args.bind.out_pipe = out_pipe_;
+ send_command (cmd);
+}
+
+void zmq::object_t::send_revive (object_t *destination_)
+{
+ command_t cmd;
+ cmd.destination = destination_;
+ cmd.type = command_t::revive;
send_command (cmd);
}
@@ -194,7 +212,13 @@ void zmq::object_t::process_attach (zmq_engine_t *engine_)
zmq_assert (false);
}
-void zmq::object_t::process_bind ()
+void zmq::object_t::process_bind (owned_t *session_,
+ reader_t *in_pipe_, writer_t *out_pipe_)
+{
+ zmq_assert (false);
+}
+
+void zmq::object_t::process_revive ()
{
zmq_assert (false);
}
diff --git a/src/object.hpp b/src/object.hpp
index 31c8c400..0dbac24a 100644
--- a/src/object.hpp
+++ b/src/object.hpp
@@ -24,7 +24,6 @@
namespace zmq
{
-
// Base class for all objects that participate in inter-thread
// communication.
@@ -58,7 +57,9 @@ namespace zmq
class owned_t *object_);
void send_attach (class session_t *destination_,
class zmq_engine_t *engine_);
- void send_bind (object_t *destination_);
+ void send_bind (object_t *destination_, class owned_t *session_,
+ class reader_t *in_pipe_, class writer_t *out_pipe_);
+ void send_revive (class object_t *destination_);
void send_term_req (class socket_base_t *destination_,
class owned_t *object_);
void send_term (class owned_t *destination_);
@@ -70,7 +71,9 @@ namespace zmq
virtual void process_plug ();
virtual void process_own (class owned_t *object_);
virtual void process_attach (class zmq_engine_t *engine_);
- virtual void process_bind ();
+ virtual void process_bind (class owned_t *session_,
+ class reader_t *in_pipe_, class writer_t *out_pipe_);
+ virtual void process_revive ();
virtual void process_term_req (class owned_t *object_);
virtual void process_term ();
virtual void process_term_ack ();
diff --git a/src/pipe.cpp b/src/pipe.cpp
new file mode 100644
index 00000000..50166313
--- /dev/null
+++ b/src/pipe.cpp
@@ -0,0 +1,112 @@
+/*
+ Copyright (c) 2007-2009 FastMQ Inc.
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see .
+*/
+
+#include
+
+#include "pipe.hpp"
+
+zmq::reader_t::reader_t (object_t *parent_, pipe_t *pipe_,
+ uint64_t hwm_, uint64_t lwm_) :
+ object_t (parent_),
+ pipe (pipe_),
+ peer (&pipe_->writer),
+ hwm (hwm_),
+ lwm (lwm_),
+ index (-1),
+ endpoint (NULL)
+{
+}
+
+zmq::reader_t::~reader_t ()
+{
+}
+
+bool zmq::reader_t::read (zmq_msg_t *msg_)
+{
+ return pipe->read (msg_);
+
+ // TODO: Adjust the size of the pipe.
+}
+
+void zmq::reader_t::set_endpoint (i_endpoint *endpoint_)
+{
+ endpoint = endpoint_;
+}
+
+void zmq::reader_t::set_index (int index_)
+{
+ index = index_;
+}
+
+int zmq::reader_t::get_index ()
+{
+ return index;
+}
+
+void zmq::reader_t::process_revive ()
+{
+ endpoint->revive (this);
+}
+
+zmq::writer_t::writer_t (object_t *parent_, pipe_t *pipe_,
+ uint64_t hwm_, uint64_t lwm_) :
+ object_t (parent_),
+ pipe (pipe_),
+ peer (&pipe_->reader),
+ hwm (hwm_),
+ lwm (lwm_)
+{
+}
+
+zmq::writer_t::~writer_t ()
+{
+}
+
+bool zmq::writer_t::check_write (uint64_t size_)
+{
+ // TODO: Check whether hwm is exceeded.
+
+ return true;
+}
+
+bool zmq::writer_t::write (struct zmq_msg_t *msg_)
+{
+ pipe->write (*msg_);
+ return true;
+
+ // TODO: Adjust size of the pipe.
+}
+
+void zmq::writer_t::flush ()
+{
+ if (!pipe->flush ())
+ send_revive (peer);
+}
+
+zmq::pipe_t::pipe_t (object_t *reader_parent_, object_t *writer_parent_,
+ uint64_t hwm_, uint64_t lwm_) :
+ reader (reader_parent_, this, hwm_, lwm_),
+ writer (writer_parent_, this, hwm_, lwm_)
+{
+}
+
+zmq::pipe_t::~pipe_t ()
+{
+}
+
diff --git a/src/pipe.hpp b/src/pipe.hpp
index 28e4b4d7..d48fc47d 100644
--- a/src/pipe.hpp
+++ b/src/pipe.hpp
@@ -22,15 +22,117 @@
#include "../include/zmq.h"
+#include "stdint.hpp"
+#include "i_endpoint.hpp"
#include "ypipe.hpp"
#include "config.hpp"
+#include "object.hpp"
namespace zmq
{
+ class reader_t : public object_t
+ {
+ public:
+
+ reader_t (class object_t *parent_, class pipe_t *pipe_,
+ uint64_t hwm_, uint64_t lwm_);
+ ~reader_t ();
+
+ // Reads a message to the underlying pipe.
+ bool read (struct zmq_msg_t *msg_);
+
+ // Mnaipulation of index of the pipe.
+ void set_endpoint (i_endpoint *endpoint_);
+ void set_index (int index_);
+ int get_index ();
+
+ private:
+
+ // Command handlers.
+ void process_revive ();
+
+ // The underlying pipe.
+ class pipe_t *pipe;
+
+ // Pipe writer associated with the other side of the pipe.
+ class object_t *peer;
+
+ // High and low watermarks for in-memory storage (in bytes).
+ uint64_t hwm;
+ uint64_t lwm;
+
+ // Positions of head and tail of the pipe (in bytes).
+ uint64_t head;
+ uint64_t tail;
+ uint64_t last_sent_head;
+
+ // Index of the pipe in the socket's list of inbound pipes.
+ int index;
+
+ // Endpoint (either session or socket) the pipe is attached to.
+ i_endpoint *endpoint;
+
+ reader_t (const reader_t&);
+ void operator = (const reader_t&);
+ };
+
+ class writer_t : public object_t
+ {
+ public:
+
+ writer_t (class object_t *parent_, class pipe_t *pipe_,
+ uint64_t hwm_, uint64_t lwm_);
+ ~writer_t ();
+
+ // Checks whether message with specified size can be written to the
+ // pipe. If writing the message would cause high watermark to be
+ // exceeded, the function returns false.
+ bool check_write (uint64_t size_);
+
+ // Writes a message to the underlying pipe. Returns false if the
+ // message cannot be written because high watermark was reached.
+ bool write (struct zmq_msg_t *msg_);
+
+ // Flush the messages downsteam.
+ void flush ();
+
+ private:
+
+ // The underlying pipe.
+ class pipe_t *pipe;
+
+ // Pipe reader associated with the other side of the pipe.
+ class object_t *peer;
+
+ // High and low watermarks for in-memory storage (in bytes).
+ uint64_t hwm;
+ uint64_t lwm;
+
+ // Positions of head and tail of the pipe (in bytes).
+ uint64_t head;
+ uint64_t tail;
+
+ writer_t (const writer_t&);
+ void operator = (const writer_t&);
+ };
+
// Message pipe.
class pipe_t : public ypipe_t
{
+ public:
+
+ pipe_t (object_t *reader_parent_, object_t *writer_parent_,
+ uint64_t hwm_, uint64_t lwm_);
+ ~pipe_t ();
+
+ reader_t reader;
+ writer_t writer;
+
+ private:
+
+ pipe_t (const pipe_t&);
+ void operator = (const pipe_t&);
};
}
diff --git a/src/session.cpp b/src/session.cpp
index fc1f858d..115fb850 100644
--- a/src/session.cpp
+++ b/src/session.cpp
@@ -20,12 +20,17 @@
#include "session.hpp"
#include "zmq_engine.hpp"
#include "err.hpp"
+#include "pipe.hpp"
zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_,
- const char *name_) :
+ const char *name_, const options_t &options_) :
owned_t (parent_, owner_),
+ in_pipe (NULL),
+ active (false),
+ out_pipe (NULL),
engine (NULL),
- name (name_)
+ name (name_),
+ options (options_)
{
}
@@ -33,18 +38,48 @@ zmq::session_t::~session_t ()
{
}
+void zmq::session_t::set_inbound_pipe (reader_t *pipe_)
+{
+ zmq_assert (!in_pipe);
+ in_pipe = pipe_;
+ active = true;
+ in_pipe->set_endpoint (this);
+}
+void zmq::session_t::set_outbound_pipe (writer_t *pipe_)
+{
+ zmq_assert (!out_pipe);
+ out_pipe = pipe_;
+}
+
+
bool zmq::session_t::read (::zmq_msg_t *msg_)
{
- return false;
+ if (!active)
+ return false;
+
+ bool fetched = in_pipe->read (msg_);
+ if (!fetched)
+ active = false;
+
+ return fetched;
}
bool zmq::session_t::write (::zmq_msg_t *msg_)
{
- return false;
+ return out_pipe->write (msg_);
}
void zmq::session_t::flush ()
{
+ out_pipe->flush ();
+}
+
+void zmq::session_t::revive (reader_t *pipe_)
+{
+ zmq_assert (in_pipe == pipe_);
+ active = true;
+ if (engine)
+ engine->revive ();
}
void zmq::session_t::process_plug ()
@@ -56,6 +91,19 @@ void zmq::session_t::process_plug ()
// We should syslog it and drop the session. TODO
zmq_assert (ok);
+ // If session is created by 'connect' function, it has the pipes set
+ // already. Otherwise, it's being created by the listener and the pipes
+ // are yet to be created.
+ if (!in_pipe && !out_pipe) {
+ pipe_t *inbound = new pipe_t (this, owner, options.hwm, options.lwm);
+ zmq_assert (inbound);
+ in_pipe = &inbound->reader;
+ pipe_t *outbound = new pipe_t (owner, this, options.hwm, options.lwm);
+ zmq_assert (outbound);
+ out_pipe = &outbound->writer;
+ send_bind (owner, this, &outbound->reader, &inbound->writer);
+ }
+
owned_t::process_plug ();
}
diff --git a/src/session.hpp b/src/session.hpp
index 6d6bcf7b..b79fb4b0 100644
--- a/src/session.hpp
+++ b/src/session.hpp
@@ -23,17 +23,22 @@
#include
#include "i_inout.hpp"
+#include "i_endpoint.hpp"
#include "owned.hpp"
#include "options.hpp"
namespace zmq
{
- class session_t : public owned_t, public i_inout
+ class session_t : public owned_t, public i_inout, public i_endpoint
{
public:
- session_t (object_t *parent_, socket_base_t *owner_, const char *name_);
+ session_t (object_t *parent_, socket_base_t *owner_, const char *name_,
+ const options_t &options_);
+
+ void set_inbound_pipe (class reader_t *pipe_);
+ void set_outbound_pipe (class writer_t *pipe_);
private:
@@ -44,17 +49,32 @@ namespace zmq
bool write (::zmq_msg_t *msg_);
void flush ();
+ // i_endpoint interface implementation.
+ void revive (class reader_t *pipe_);
+
// Handlers for incoming commands.
void process_plug ();
void process_unplug ();
void process_attach (class zmq_engine_t *engine_);
+ // Inbound pipe, i.e. one the session is getting messages from.
+ class reader_t *in_pipe;
+
+ // If true, in_pipe is active. Otherwise there are no messages to get.
+ bool active;
+
+ // Outbound pipe, i.e. one the socket is sending messages to.
+ class writer_t *out_pipe;
+
class zmq_engine_t *engine;
// The name of the session. One that is used to register it with
// socket-level repository of sessions.
std::string name;
+ // Inherited socket options.
+ options_t options;
+
session_t (const session_t&);
void operator = (const session_t&);
};
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index fb7bdcfd..68fc82b0 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -27,18 +27,23 @@
#include "err.hpp"
#include "zmq_listener.hpp"
#include "zmq_connecter.hpp"
+#include "msg_content.hpp"
#include "io_thread.hpp"
#include "session.hpp"
#include "config.hpp"
#include "owned.hpp"
#include "uuid.hpp"
+#include "pipe.hpp"
zmq::socket_base_t::socket_base_t (app_thread_t *parent_) :
object_t (parent_),
+ current (0),
+ active (0),
pending_term_acks (0),
+ ticks (0),
app_thread (parent_),
shutting_down (false)
-{
+{
}
zmq::socket_base_t::~socket_base_t ()
@@ -65,7 +70,7 @@ zmq::socket_base_t::~socket_base_t ()
// Process commands till we get all the termination acknowledgements.
while (pending_term_acks)
- app_thread->process_commands (true);
+ app_thread->process_commands (true, false);
}
// Check whether there are no session leaks.
@@ -150,8 +155,28 @@ int zmq::socket_base_t::connect (const char *addr_)
// Create the session.
io_thread_t *io_thread = choose_io_thread (options.affinity);
- session_t *session = new session_t (io_thread, this, session_name.c_str ());
+ session_t *session = new session_t (io_thread, this, session_name.c_str (),
+ options);
zmq_assert (session);
+
+ // Create inbound pipe.
+ pipe_t *in_pipe = new pipe_t (this, session, options.hwm, options.lwm);
+ zmq_assert (in_pipe);
+ in_pipe->reader.set_endpoint (this);
+ session->set_outbound_pipe (&in_pipe->writer);
+ in_pipes.push_back (std::make_pair (&in_pipe->reader, session));
+ in_pipes.back ().first->set_index (active);
+ in_pipes [active].first->set_index (in_pipes.size () - 1);
+ std::swap (in_pipes.back (), in_pipes [active]);
+ active++;
+
+ // Create outbound pipe.
+ pipe_t *out_pipe = new pipe_t (session, this, options.hwm, options.lwm);
+ zmq_assert (out_pipe);
+ session->set_inbound_pipe (&out_pipe->reader);
+ out_pipes.push_back (std::make_pair (&out_pipe->writer, session));
+
+ // Activate the session.
send_plug (session);
send_own (this, session);
@@ -173,17 +198,79 @@ int zmq::socket_base_t::connect (const char *addr_)
int zmq::socket_base_t::send (::zmq_msg_t *msg_, int flags_)
{
- zmq_assert (false);
+ // Process pending commands, if any.
+ app_thread->process_commands (false, true);
+
+ // Try to send the message.
+ bool sent = distribute (msg_, !(flags_ & ZMQ_NOFLUSH));
+
+ if (!(flags_ & ZMQ_NOBLOCK)) {
+
+ // Oops, we couldn't send the message. Wait for the next
+ // command, process it and try to send the message again.
+ while (!sent) {
+ app_thread->process_commands (true, false);
+ sent = distribute (msg_, !(flags_ & ZMQ_NOFLUSH));
+ }
+ }
+ else if (!sent) {
+ errno = EAGAIN;
+ return -1;
+ }
+
+ return 0;
}
int zmq::socket_base_t::flush ()
{
- zmq_assert (false);
+ for (out_pipes_t::iterator it = out_pipes.begin (); it != out_pipes.end ();
+ it++)
+ it->first->flush ();
+
+ return 0;
}
int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_)
{
- zmq_assert (false);
+ // If the message cannot be fetched immediately, there are two scenarios.
+ // For non-blocking recv, commands are processed in case there's a message
+ // already waiting we don't know about. If it's not, return EAGAIN.
+ // In blocking scenario, commands are processed over and over again until
+ // we are able to fetch a message.
+ bool fetched = fetch (msg_);
+ if (!fetched) {
+ if (flags_ & ZMQ_NOBLOCK) {
+ app_thread->process_commands (false, false);
+ fetched = fetch (msg_);
+ }
+ else {
+ while (!fetched) {
+ app_thread->process_commands (true, false);
+ ticks = 0;
+ fetched = fetch (msg_);
+ }
+ }
+ }
+
+ // Once every inbound_poll_rate messages check for signals and process
+ // incoming commands. This happens only if we are not polling altogether
+ // because there are messages available all the time. If poll occurs,
+ // ticks is set to zero and thus we avoid this code.
+ //
+ // Note that 'recv' uses different command throttling algorithm (the one
+ // described above) from the one used by 'send'. This is because counting
+ // ticks is more efficient than doing rdtsc all the time.
+ if (++ticks == inbound_poll_rate) {
+ app_thread->process_commands (false, false);
+ ticks = 0;
+ }
+
+ if (!fetched) {
+ errno = EAGAIN;
+ return -1;
+ }
+
+ return 0;
}
int zmq::socket_base_t::close ()
@@ -229,11 +316,35 @@ zmq::session_t *zmq::socket_base_t::find_session (const char *name_)
return it->second;
}
+void zmq::socket_base_t::revive (reader_t *pipe_)
+{
+ // Move the pipe to the list of active pipes.
+ in_pipes_t::size_type index = (in_pipes_t::size_type) pipe_->get_index ();
+ in_pipes [index].first->set_index (active);
+ in_pipes [active].first->set_index (index);
+ std::swap (in_pipes [index], in_pipes [active]);
+ active++;
+}
+
void zmq::socket_base_t::process_own (owned_t *object_)
{
io_objects.insert (object_);
}
+void zmq::socket_base_t::process_bind (owned_t *session_,
+ reader_t *in_pipe_, writer_t *out_pipe_)
+{
+ zmq_assert (in_pipe_);
+ in_pipe_->set_endpoint (this);
+ in_pipes.push_back (std::make_pair (in_pipe_, session_));
+ in_pipes.back ().first->set_index (active);
+ in_pipes [active].first->set_index (in_pipes.size () - 1);
+ std::swap (in_pipes.back (), in_pipes [active]);
+ active++;
+ zmq_assert (out_pipe_);
+ out_pipes.push_back (std::make_pair (out_pipe_, session_));
+}
+
void zmq::socket_base_t::process_term_req (owned_t *object_)
{
// When shutting down we can ignore termination requests from owned
@@ -260,3 +371,107 @@ void zmq::socket_base_t::process_term_ack ()
zmq_assert (pending_term_acks);
pending_term_acks--;
}
+
+bool zmq::socket_base_t::distribute (zmq_msg_t *msg_, bool flush_)
+{
+ int pipes_count = out_pipes.size ();
+
+ // If there are no pipes available, simply drop the message.
+ if (pipes_count == 0) {
+ int rc = zmq_msg_close (msg_);
+ zmq_assert (rc == 0);
+ rc = zmq_msg_init (msg_);
+ zmq_assert (rc == 0);
+ return true;
+ }
+
+ // First check whether all pipes are available for writing.
+ for (out_pipes_t::iterator it = out_pipes.begin (); it != out_pipes.end ();
+ it++)
+ if (!it->first->check_write (zmq_msg_size (msg_)))
+ return false;
+
+ msg_content_t *content = (msg_content_t*) msg_->content;
+
+ // For VSMs the copying is straighforward.
+ if (content == (msg_content_t*) ZMQ_VSM) {
+ for (out_pipes_t::iterator it = out_pipes.begin ();
+ it != out_pipes.end (); it++) {
+ it->first->write (msg_);
+ if (flush_)
+ it->first->flush ();
+ }
+ int rc = zmq_msg_init (msg_);
+ zmq_assert (rc == 0);
+ return true;
+ }
+
+ // Optimisation for the case when there's only a single pipe
+ // to send the message to - no refcount adjustment i.e. no atomic
+ // operations are needed.
+ if (pipes_count == 1) {
+ out_pipes.begin ()->first->write (msg_);
+ if (flush_)
+ out_pipes.begin ()->first->flush ();
+ int rc = zmq_msg_init (msg_);
+ zmq_assert (rc == 0);
+ return true;
+ }
+
+ // There are at least 2 destinations for the message. That means we have
+ // to deal with reference counting. First add N-1 references to
+ // the content (we are holding one reference anyway, that's why -1).
+ if (msg_->shared)
+ content->refcnt.add (pipes_count - 1);
+ else {
+ content->refcnt.set (pipes_count);
+ msg_->shared = true;
+ }
+
+ // Push the message to all destinations.
+ for (out_pipes_t::iterator it = out_pipes.begin (); it != out_pipes.end ();
+ it++) {
+ it->first->write (msg_);
+ if (flush_)
+ it->first->flush ();
+ }
+
+ // Detach the original message from the data buffer.
+ int rc = zmq_msg_init (msg_);
+ zmq_assert (rc == 0);
+
+ return true;
+}
+
+bool zmq::socket_base_t::fetch (zmq_msg_t *msg_)
+{
+ // Deallocate old content of the message.
+ zmq_msg_close (msg_);
+
+ // Round-robin over the pipes to get next message.
+ for (int count = active; count != 0; count--) {
+
+ bool fetched = in_pipes [current].first->read (msg_);
+
+ // If there's no message in the pipe, move it to the list of
+ // non-active pipes.
+ if (!fetched) {
+ in_pipes [current].first->set_index (active - 1);
+ in_pipes [active - 1].first->set_index (current);
+ std::swap (in_pipes [current], in_pipes [active - 1]);
+ active--;
+ }
+
+ current ++;
+ if (current >= active)
+ current = 0;
+
+ if (fetched)
+ return true;
+ }
+
+ // No message is available. Initialise the output parameter
+ // to be a 0-byte message.
+ zmq_msg_init (msg_);
+ return false;
+}
diff --git a/src/socket_base.hpp b/src/socket_base.hpp
index 20ac4e2e..1f04bda6 100644
--- a/src/socket_base.hpp
+++ b/src/socket_base.hpp
@@ -22,8 +22,11 @@
#include
#include