Move the pipe termination code to socket_base_t

So far, the pipe termination code was spread among socket type
classes, fair queuer, load balancer, etc. This patch moves
all the associated logic to a single place.

Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
This commit is contained in:
Martin Sustrik 2011-05-23 20:30:01 +02:00
parent acf0b0e515
commit 0f6f7276e3
29 changed files with 190 additions and 366 deletions

View File

@ -38,7 +38,8 @@ namespace zmq
inline array_item_t () : inline array_item_t () :
array_index1 (-1), array_index1 (-1),
array_index2 (-1) array_index2 (-1),
array_index3 (-1)
{ {
} }
@ -68,10 +69,21 @@ namespace zmq
return array_index2; return array_index2;
} }
inline void set_array_index3 (int index_)
{
array_index3 = index_;
}
inline int get_array_index3 ()
{
return array_index3;
}
private: private:
int array_index1; int array_index1;
int array_index2; int array_index2;
int array_index3;
array_item_t (const array_item_t&); array_item_t (const array_item_t&);
const array_item_t &operator = (const array_item_t&); const array_item_t &operator = (const array_item_t&);
@ -117,8 +129,10 @@ namespace zmq
if (item_) { if (item_) {
if (N == 1) if (N == 1)
item_->set_array_index1 ((int) items.size ()); item_->set_array_index1 ((int) items.size ());
else else if (N == 2)
item_->set_array_index2 ((int) items.size ()); item_->set_array_index2 ((int) items.size ());
else
item_->set_array_index3 ((int) items.size ());
} }
items.push_back (item_); items.push_back (item_);
} }
@ -127,16 +141,20 @@ namespace zmq
{ {
if (N == 1) if (N == 1)
erase (item_->get_array_index1 ()); erase (item_->get_array_index1 ());
else else if (N == 2)
erase (item_->get_array_index2 ()); erase (item_->get_array_index2 ());
else
erase (item_->get_array_index3 ());
} }
inline void erase (size_type index_) { inline void erase (size_type index_) {
if (items.back ()) { if (items.back ()) {
if (N == 1) if (N == 1)
items.back ()->set_array_index1 ((int) index_); items.back ()->set_array_index1 ((int) index_);
else else if (N == 2)
items.back ()->set_array_index2 ((int) index_); items.back ()->set_array_index2 ((int) index_);
else
items.back ()->set_array_index3 ((int) index_);
} }
items [index_] = items.back (); items [index_] = items.back ();
items.pop_back (); items.pop_back ();
@ -150,12 +168,18 @@ namespace zmq
if (items [index2_]) if (items [index2_])
items [index2_]->set_array_index1 ((int) index1_); items [index2_]->set_array_index1 ((int) index1_);
} }
else { else if (N == 2) {
if (items [index1_]) if (items [index1_])
items [index1_]->set_array_index2 ((int) index2_); items [index1_]->set_array_index2 ((int) index2_);
if (items [index2_]) if (items [index2_])
items [index2_]->set_array_index2 ((int) index1_); items [index2_]->set_array_index2 ((int) index1_);
} }
else {
if (items [index1_])
items [index1_]->set_array_index3 ((int) index2_);
if (items [index2_])
items [index2_]->set_array_index3 ((int) index1_);
}
std::swap (items [index1_], items [index2_]); std::swap (items [index1_], items [index2_]);
} }
@ -168,8 +192,10 @@ namespace zmq
{ {
if (N == 1) if (N == 1)
return (size_type) item_->get_array_index1 (); return (size_type) item_->get_array_index1 ();
else else if (N == 2)
return (size_type) item_->get_array_index2 (); return (size_type) item_->get_array_index2 ();
else
return (size_type) item_->get_array_index3 ();
} }
private: private:

View File

@ -21,16 +21,13 @@
#include "dist.hpp" #include "dist.hpp"
#include "pipe.hpp" #include "pipe.hpp"
#include "err.hpp" #include "err.hpp"
#include "own.hpp"
#include "msg.hpp" #include "msg.hpp"
#include "likely.hpp" #include "likely.hpp"
zmq::dist_t::dist_t (own_t *sink_) : zmq::dist_t::dist_t () :
active (0), active (0),
eligible (0), eligible (0),
more (false), more (false)
sink (sink_),
terminating (false)
{ {
} }
@ -55,21 +52,6 @@ void zmq::dist_t::attach (pipe_t *pipe_)
active++; active++;
eligible++; eligible++;
} }
if (unlikely (terminating)) {
sink->register_term_acks (1);
pipe_->terminate ();
}
}
void zmq::dist_t::terminate ()
{
zmq_assert (!terminating);
terminating = true;
sink->register_term_acks ((int) pipes.size ());
for (pipes_t::size_type i = 0; i != pipes.size (); i++)
pipes [i]->terminate ();
} }
void zmq::dist_t::terminated (pipe_t *pipe_) void zmq::dist_t::terminated (pipe_t *pipe_)
@ -81,9 +63,6 @@ void zmq::dist_t::terminated (pipe_t *pipe_)
if (pipes.index (pipe_) < eligible) if (pipes.index (pipe_) < eligible)
eligible--; eligible--;
pipes.erase (pipe_); pipes.erase (pipe_);
if (unlikely (terminating))
sink->unregister_term_ack ();
} }
void zmq::dist_t::activated (pipe_t *pipe_) void zmq::dist_t::activated (pipe_t *pipe_)

View File

@ -35,17 +35,16 @@ namespace zmq
{ {
public: public:
dist_t (class own_t *sink_); dist_t ();
~dist_t (); ~dist_t ();
void attach (class pipe_t *pipe_); void attach (class pipe_t *pipe_);
void terminate ();
int send (class msg_t *msg_, int flags_);
bool has_out ();
void activated (class pipe_t *pipe_); void activated (class pipe_t *pipe_);
void terminated (class pipe_t *pipe_); void terminated (class pipe_t *pipe_);
int send (class msg_t *msg_, int flags_);
bool has_out ();
private: private:
// Write the message to the pipe. Make the pipe inactive if writing // Write the message to the pipe. Make the pipe inactive if writing
@ -74,12 +73,6 @@ namespace zmq
// True if last we are in the middle of a multipart message. // True if last we are in the middle of a multipart message.
bool more; bool more;
// Object to send events to.
class own_t *sink;
// If true, termination process is already underway.
bool terminating;
dist_t (const dist_t&); dist_t (const dist_t&);
const dist_t &operator = (const dist_t&); const dist_t &operator = (const dist_t&);
}; };

View File

@ -21,15 +21,12 @@
#include "fq.hpp" #include "fq.hpp"
#include "pipe.hpp" #include "pipe.hpp"
#include "err.hpp" #include "err.hpp"
#include "own.hpp"
#include "msg.hpp" #include "msg.hpp"
zmq::fq_t::fq_t (own_t *sink_) : zmq::fq_t::fq_t () :
active (0), active (0),
current (0), current (0),
more (false), more (false)
sink (sink_),
terminating (false)
{ {
} }
@ -43,20 +40,10 @@ void zmq::fq_t::attach (pipe_t *pipe_)
pipes.push_back (pipe_); pipes.push_back (pipe_);
pipes.swap (active, pipes.size () - 1); pipes.swap (active, pipes.size () - 1);
active++; active++;
// If we are already terminating, ask the pipe to terminate straight away.
if (terminating) {
sink->register_term_acks (1);
pipe_->terminate ();
}
} }
void zmq::fq_t::terminated (pipe_t *pipe_) void zmq::fq_t::terminated (pipe_t *pipe_)
{ {
// Make sure that we are not closing current pipe while
// message is half-read.
zmq_assert (terminating || (!more || pipes [current] != pipe_));
// Remove the pipe from the list; adjust number of active pipes // Remove the pipe from the list; adjust number of active pipes
// accordingly. // accordingly.
if (pipes.index (pipe_) < active) { if (pipes.index (pipe_) < active) {
@ -65,19 +52,6 @@ void zmq::fq_t::terminated (pipe_t *pipe_)
current = 0; current = 0;
} }
pipes.erase (pipe_); pipes.erase (pipe_);
if (terminating)
sink->unregister_term_ack ();
}
void zmq::fq_t::terminate ()
{
zmq_assert (!terminating);
terminating = true;
sink->register_term_acks ((int) pipes.size ());
for (pipes_t::size_type i = 0; i != pipes.size (); i++)
pipes [i]->terminate ();
} }
void zmq::fq_t::activated (pipe_t *pipe_) void zmq::fq_t::activated (pipe_t *pipe_)

View File

@ -35,18 +35,16 @@ namespace zmq
{ {
public: public:
fq_t (class own_t *sink_); fq_t ();
~fq_t (); ~fq_t ();
void attach (pipe_t *pipe_); void attach (pipe_t *pipe_);
void terminate (); void activated (pipe_t *pipe_);
void terminated (pipe_t *pipe_);
int recv (msg_t *msg_, int flags_); int recv (msg_t *msg_, int flags_);
bool has_in (); bool has_in ();
void activated (pipe_t *pipe_);
void terminated (pipe_t *pipe_);
private: private:
// Inbound pipes. // Inbound pipes.
@ -64,12 +62,6 @@ namespace zmq
// there are following parts still waiting in the current pipe. // there are following parts still waiting in the current pipe.
bool more; bool more;
// Object to send events to.
class own_t *sink;
// If true, termination process is already underway.
bool terminating;
fq_t (const fq_t&); fq_t (const fq_t&);
const fq_t &operator = (const fq_t&); const fq_t &operator = (const fq_t&);
}; };

View File

@ -21,16 +21,13 @@
#include "lb.hpp" #include "lb.hpp"
#include "pipe.hpp" #include "pipe.hpp"
#include "err.hpp" #include "err.hpp"
#include "own.hpp"
#include "msg.hpp" #include "msg.hpp"
zmq::lb_t::lb_t (own_t *sink_) : zmq::lb_t::lb_t () :
active (0), active (0),
current (0), current (0),
more (false), more (false),
dropping (false), dropping (false)
sink (sink_),
terminating (false)
{ {
} }
@ -44,21 +41,6 @@ void zmq::lb_t::attach (pipe_t *pipe_)
pipes.push_back (pipe_); pipes.push_back (pipe_);
pipes.swap (active, pipes.size () - 1); pipes.swap (active, pipes.size () - 1);
active++; active++;
if (terminating) {
sink->register_term_acks (1);
pipe_->terminate ();
}
}
void zmq::lb_t::terminate ()
{
zmq_assert (!terminating);
terminating = true;
sink->register_term_acks ((int) pipes.size ());
for (pipes_t::size_type i = 0; i != pipes.size (); i++)
pipes [i]->terminate ();
} }
void zmq::lb_t::terminated (pipe_t *pipe_) void zmq::lb_t::terminated (pipe_t *pipe_)
@ -78,9 +60,6 @@ void zmq::lb_t::terminated (pipe_t *pipe_)
current = 0; current = 0;
} }
pipes.erase (pipe_); pipes.erase (pipe_);
if (terminating)
sink->unregister_term_ack ();
} }
void zmq::lb_t::activated (pipe_t *pipe_) void zmq::lb_t::activated (pipe_t *pipe_)

View File

@ -34,17 +34,16 @@ namespace zmq
{ {
public: public:
lb_t (class own_t *sink_); lb_t ();
~lb_t (); ~lb_t ();
void attach (pipe_t *pipe_); void attach (pipe_t *pipe_);
void terminate ();
int send (msg_t *msg_, int flags_);
bool has_out ();
void activated (pipe_t *pipe_); void activated (pipe_t *pipe_);
void terminated (pipe_t *pipe_); void terminated (pipe_t *pipe_);
int send (msg_t *msg_, int flags_);
bool has_out ();
private: private:
// List of outbound pipes. // List of outbound pipes.
@ -64,12 +63,6 @@ namespace zmq
// True if we are dropping current message. // True if we are dropping current message.
bool dropping; bool dropping;
// Object to send events to.
class own_t *sink;
// If true, termination process is already underway.
bool terminating;
lb_t (const lb_t&); lb_t (const lb_t&);
const lb_t &operator = (const lb_t&); const lb_t &operator = (const lb_t&);
}; };

View File

@ -153,6 +153,11 @@ void zmq::own_t::terminate ()
send_term_req (owner, this); send_term_req (owner, this);
} }
bool zmq::own_t::is_terminating ()
{
return terminating;
}
void zmq::own_t::process_term (int linger_) void zmq::own_t::process_term (int linger_)
{ {
// Double termination should never happen. // Double termination should never happen.
@ -173,7 +178,6 @@ void zmq::own_t::process_term (int linger_)
void zmq::own_t::register_term_acks (int count_) void zmq::own_t::register_term_acks (int count_)
{ {
term_acks += count_; term_acks += count_;
printf ("reg %d acks (%p, %d)\n", count_, (void*) this, term_acks);
} }
void zmq::own_t::unregister_term_ack () void zmq::own_t::unregister_term_ack ()
@ -181,8 +185,6 @@ void zmq::own_t::unregister_term_ack ()
zmq_assert (term_acks > 0); zmq_assert (term_acks > 0);
term_acks--; term_acks--;
printf ("unreg 1 acks (%p, %d)\n", (void*) this, term_acks);
// This may be a last ack we are waiting for before termination... // This may be a last ack we are waiting for before termination...
check_term_acks (); check_term_acks ();
} }

View File

@ -76,6 +76,9 @@ namespace zmq
// called more than once. // called more than once.
void terminate (); void terminate ();
// Returns true if the object is in process of termination.
bool is_terminating ();
// Derived object destroys own_t. There's no point in allowing // Derived object destroys own_t. There's no point in allowing
// others to invoke the destructor. At the same time, it has to be // others to invoke the destructor. At the same time, it has to be
// virtual so that generic own_t deallocation mechanism destroys // virtual so that generic own_t deallocation mechanism destroys

View File

@ -25,8 +25,7 @@
zmq::pair_t::pair_t (class ctx_t *parent_, uint32_t tid_) : zmq::pair_t::pair_t (class ctx_t *parent_, uint32_t tid_) :
socket_base_t (parent_, tid_), socket_base_t (parent_, tid_),
pipe (NULL), pipe (NULL)
terminating (false)
{ {
options.type = ZMQ_PAIR; options.type = ZMQ_PAIR;
} }
@ -39,44 +38,22 @@ zmq::pair_t::~pair_t ()
void zmq::pair_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_) void zmq::pair_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_)
{ {
zmq_assert (!pipe); zmq_assert (!pipe);
pipe = pipe_; pipe = pipe_;
pipe->set_event_sink (this);
if (terminating) {
register_term_acks (1);
pipe_->terminate ();
}
} }
void zmq::pair_t::terminated (pipe_t *pipe_) void zmq::pair_t::xterminated (pipe_t *pipe_)
{ {
zmq_assert (pipe_ == pipe); zmq_assert (pipe_ == pipe);
pipe = NULL; pipe = NULL;
if (terminating)
unregister_term_ack ();
} }
void zmq::pair_t::process_term (int linger_) void zmq::pair_t::xread_activated (pipe_t *pipe_)
{
terminating = true;
if (pipe) {
register_term_acks (1);
pipe->terminate ();
}
socket_base_t::process_term (linger_);
}
void zmq::pair_t::read_activated (pipe_t *pipe_)
{ {
// There's just one pipe. No lists of active and inactive pipes. // There's just one pipe. No lists of active and inactive pipes.
// There's nothing to do here. // There's nothing to do here.
} }
void zmq::pair_t::write_activated (pipe_t *pipe_) void zmq::pair_t::xwrite_activated (pipe_t *pipe_)
{ {
// There's just one pipe. No lists of active and inactive pipes. // There's just one pipe. No lists of active and inactive pipes.
// There's nothing to do here. // There's nothing to do here.

View File

@ -22,14 +22,12 @@
#define __ZMQ_PAIR_HPP_INCLUDED__ #define __ZMQ_PAIR_HPP_INCLUDED__
#include "socket_base.hpp" #include "socket_base.hpp"
#include "pipe.hpp"
namespace zmq namespace zmq
{ {
class pair_t : class pair_t :
public socket_base_t, public socket_base_t
public i_pipe_events
{ {
public: public:
@ -42,21 +40,14 @@ namespace zmq
int xrecv (class msg_t *msg_, int flags_); int xrecv (class msg_t *msg_, int flags_);
bool xhas_in (); bool xhas_in ();
bool xhas_out (); bool xhas_out ();
void xread_activated (class pipe_t *pipe_);
// i_pipe_events interface implementation. void xwrite_activated (class pipe_t *pipe_);
void read_activated (class pipe_t *pipe_); void xterminated (class pipe_t *pipe_);
void write_activated (class pipe_t *pipe_);
void terminated (class pipe_t *pipe_);
private: private:
// Hook into termination process.
void process_term (int linger_);
class pipe_t *pipe; class pipe_t *pipe;
bool terminating;
pair_t (const pair_t&); pair_t (const pair_t&);
const pair_t &operator = (const pair_t&); const pair_t &operator = (const pair_t&);
}; };

View File

@ -108,7 +108,7 @@ bool zmq::pipe_t::check_read ()
// If pipe_term was already received but wasn't processed because // If pipe_term was already received but wasn't processed because
// of pending messages, we can ack it now. // of pending messages, we can ack it now.
if (terminating) if (term_recvd)
send_pipe_term_ack (peer); send_pipe_term_ack (peer);
return false; return false;
@ -133,7 +133,7 @@ bool zmq::pipe_t::read (msg_t *msg_)
// If pipe_term was already received but wasn't processed because // If pipe_term was already received but wasn't processed because
// of pending messages, we can ack it now. // of pending messages, we can ack it now.
if (terminating) if (term_recvd)
send_pipe_term_ack (peer); send_pipe_term_ack (peer);
return false; return false;

View File

@ -21,10 +21,10 @@
#include "pull.hpp" #include "pull.hpp"
#include "err.hpp" #include "err.hpp"
#include "msg.hpp" #include "msg.hpp"
#include "pipe.hpp"
zmq::pull_t::pull_t (class ctx_t *parent_, uint32_t tid_) : zmq::pull_t::pull_t (class ctx_t *parent_, uint32_t tid_) :
socket_base_t (parent_, tid_), socket_base_t (parent_, tid_)
fq (this)
{ {
options.type = ZMQ_PULL; options.type = ZMQ_PULL;
} }
@ -36,32 +36,19 @@ zmq::pull_t::~pull_t ()
void zmq::pull_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_) void zmq::pull_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_)
{ {
zmq_assert (pipe_); zmq_assert (pipe_);
pipe_->set_event_sink (this);
fq.attach (pipe_); fq.attach (pipe_);
} }
void zmq::pull_t::read_activated (pipe_t *pipe_) void zmq::pull_t::xread_activated (pipe_t *pipe_)
{ {
fq.activated (pipe_); fq.activated (pipe_);
} }
void zmq::pull_t::write_activated (pipe_t *pipe_) void zmq::pull_t::xterminated (pipe_t *pipe_)
{
// There are no outbound messages in pull socket. This should never happen.
zmq_assert (false);
}
void zmq::pull_t::terminated (pipe_t *pipe_)
{ {
fq.terminated (pipe_); fq.terminated (pipe_);
} }
void zmq::pull_t::process_term (int linger_)
{
fq.terminate ();
socket_base_t::process_term (linger_);
}
int zmq::pull_t::xrecv (msg_t *msg_, int flags_) int zmq::pull_t::xrecv (msg_t *msg_, int flags_)
{ {
return fq.recv (msg_, flags_); return fq.recv (msg_, flags_);

View File

@ -22,15 +22,13 @@
#define __ZMQ_PULL_HPP_INCLUDED__ #define __ZMQ_PULL_HPP_INCLUDED__
#include "socket_base.hpp" #include "socket_base.hpp"
#include "pipe.hpp"
#include "fq.hpp" #include "fq.hpp"
namespace zmq namespace zmq
{ {
class pull_t : class pull_t :
public socket_base_t, public socket_base_t
public i_pipe_events
{ {
public: public:
@ -43,17 +41,11 @@ namespace zmq
void xattach_pipe (class pipe_t *pipe_, const blob_t &peer_identity_); void xattach_pipe (class pipe_t *pipe_, const blob_t &peer_identity_);
int xrecv (class msg_t *msg_, int flags_); int xrecv (class msg_t *msg_, int flags_);
bool xhas_in (); bool xhas_in ();
void xread_activated (class pipe_t *pipe_);
void xterminated (class pipe_t *pipe_);
private: private:
// i_pipe_events interface implementation.
void read_activated (pipe_t *pipe_);
void write_activated (pipe_t *pipe_);
void terminated (pipe_t *pipe_);
// Hook into the termination process.
void process_term (int linger_);
// Fair queueing object for inbound pipes. // Fair queueing object for inbound pipes.
fq_t fq; fq_t fq;

View File

@ -24,8 +24,7 @@
#include "msg.hpp" #include "msg.hpp"
zmq::push_t::push_t (class ctx_t *parent_, uint32_t tid_) : zmq::push_t::push_t (class ctx_t *parent_, uint32_t tid_) :
socket_base_t (parent_, tid_), socket_base_t (parent_, tid_)
lb (this)
{ {
options.type = ZMQ_PUSH; options.type = ZMQ_PUSH;
} }
@ -37,32 +36,19 @@ zmq::push_t::~push_t ()
void zmq::push_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_) void zmq::push_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_)
{ {
zmq_assert (pipe_); zmq_assert (pipe_);
pipe_->set_event_sink (this);
lb.attach (pipe_); lb.attach (pipe_);
} }
void zmq::push_t::read_activated (pipe_t *pipe_) void zmq::push_t::xwrite_activated (pipe_t *pipe_)
{
// There are no inbound messages in push socket. This should never happen.
zmq_assert (false);
}
void zmq::push_t::write_activated (pipe_t *pipe_)
{ {
lb.activated (pipe_); lb.activated (pipe_);
} }
void zmq::push_t::terminated (pipe_t *pipe_) void zmq::push_t::xterminated (pipe_t *pipe_)
{ {
lb.terminated (pipe_); lb.terminated (pipe_);
} }
void zmq::push_t::process_term (int linger_)
{
lb.terminate ();
socket_base_t::process_term (linger_);
}
int zmq::push_t::xsend (msg_t *msg_, int flags_) int zmq::push_t::xsend (msg_t *msg_, int flags_)
{ {
return lb.send (msg_, flags_); return lb.send (msg_, flags_);

View File

@ -22,15 +22,13 @@
#define __ZMQ_PUSH_HPP_INCLUDED__ #define __ZMQ_PUSH_HPP_INCLUDED__
#include "socket_base.hpp" #include "socket_base.hpp"
#include "pipe.hpp"
#include "lb.hpp" #include "lb.hpp"
namespace zmq namespace zmq
{ {
class push_t : class push_t :
public socket_base_t, public socket_base_t
public i_pipe_events
{ {
public: public:
@ -43,17 +41,11 @@ namespace zmq
void xattach_pipe (class pipe_t *pipe_, const blob_t &peer_identity_); void xattach_pipe (class pipe_t *pipe_, const blob_t &peer_identity_);
int xsend (class msg_t *msg_, int flags_); int xsend (class msg_t *msg_, int flags_);
bool xhas_out (); bool xhas_out ();
void xwrite_activated (class pipe_t *pipe_);
void xterminated (class pipe_t *pipe_);
private: private:
// i_pipe_events interface implementation.
void read_activated (pipe_t *pipe_);
void write_activated (pipe_t *pipe_);
void terminated (pipe_t *pipe_);
// Hook into the termination process.
void process_term (int linger_);
// Load balancer managing the outbound pipes. // Load balancer managing the outbound pipes.
lb_t lb; lb_t lb;

View File

@ -87,7 +87,7 @@ void zmq::reaper_t::process_stop ()
{ {
terminating = true; terminating = true;
// If there are no sockets beig reaped finish immediately. // If there are no sockets being reaped finish immediately.
if (!sockets) { if (!sockets) {
send_done (); send_done ();
poller->rm_fd (mailbox_handle); poller->rm_fd (mailbox_handle);
@ -100,10 +100,6 @@ void zmq::reaper_t::process_reap (socket_base_t *socket_)
// Add the socket to the poller. // Add the socket to the poller.
socket_->start_reaping (poller); socket_->start_reaping (poller);
// Start termination of associated I/O object hierarchy.
socket_->terminate ();
socket_->check_destroy ();
++sockets; ++sockets;
} }

View File

@ -211,10 +211,15 @@ int zmq::socket_base_t::check_protocol (const std::string &protocol_)
return 0; return 0;
} }
void zmq::socket_base_t::attach_pipe (class pipe_t *pipe_, void zmq::socket_base_t::attach_pipe (pipe_t *pipe_,
const blob_t &peer_identity_) const blob_t &peer_identity_)
{ {
// If the peer haven't specified it's identity, let's generate one. // First, register the pipe so that we can terminate it later on.
pipe_->set_event_sink (this);
pipes.push_back (pipe_);
// Then, pass the pipe to the specific socket type.
// If the peer haven't specified it's identity, let's generate one.
if (peer_identity_.size ()) { if (peer_identity_.size ()) {
xattach_pipe (pipe_, peer_identity_); xattach_pipe (pipe_, peer_identity_);
} }
@ -223,6 +228,13 @@ void zmq::socket_base_t::attach_pipe (class pipe_t *pipe_,
generate_uuid ((unsigned char*) identity.data () + 1); generate_uuid ((unsigned char*) identity.data () + 1);
xattach_pipe (pipe_, identity); xattach_pipe (pipe_, identity);
} }
// If the socket is already being closed, ask any new pipes to terminate
// straight away.
if (is_terminating ()) {
register_term_acks (1);
pipe_->terminate ();
}
} }
int zmq::socket_base_t::setsockopt (int option_, const void *optval_, int zmq::socket_base_t::setsockopt (int option_, const void *optval_,
@ -635,9 +647,15 @@ zmq::session_t *zmq::socket_base_t::find_session (const blob_t &name_)
void zmq::socket_base_t::start_reaping (poller_t *poller_) void zmq::socket_base_t::start_reaping (poller_t *poller_)
{ {
// Plug the socket to the reaper thread.
poller = poller_; poller = poller_;
handle = poller->add_fd (mailbox.get_fd (), this); handle = poller->add_fd (mailbox.get_fd (), this);
poller->set_pollin (handle); poller->set_pollin (handle);
// Initialise the termination and check whether it can be deallocated
// immediately.
terminate ();
check_destroy ();
} }
int zmq::socket_base_t::process_commands (bool block_, bool throttle_) int zmq::socket_base_t::process_commands (bool block_, bool throttle_)
@ -720,6 +738,11 @@ void zmq::socket_base_t::process_term (int linger_)
// will be initiated. // will be initiated.
unregister_endpoints (this); unregister_endpoints (this);
// Ask all attached pipes to terminate.
for (pipes_t::size_type i = 0; i != pipes.size (); ++i)
pipes [i]->terminate ();
register_term_acks (pipes.size ());
// Continue the termination process immediately. // Continue the termination process immediately.
own_t::process_term (linger_); own_t::process_term (linger_);
} }
@ -758,6 +781,15 @@ int zmq::socket_base_t::xrecv (msg_t *msg_, int options_)
return -1; return -1;
} }
void zmq::socket_base_t::xread_activated (pipe_t *pipe_)
{
zmq_assert (false);
}
void zmq::socket_base_t::xwrite_activated (pipe_t *pipe_)
{
zmq_assert (false);
}
void zmq::socket_base_t::in_event () void zmq::socket_base_t::in_event ()
{ {
// Process any commands from other threads/sockets that may be available // Process any commands from other threads/sockets that may be available
@ -794,3 +826,26 @@ void zmq::socket_base_t::check_destroy ()
own_t::process_destroy (); own_t::process_destroy ();
} }
} }
void zmq::socket_base_t::read_activated (pipe_t *pipe_)
{
xread_activated (pipe_);
}
void zmq::socket_base_t::write_activated (pipe_t *pipe_)
{
xwrite_activated (pipe_);
}
void zmq::socket_base_t::terminated (pipe_t *pipe_)
{
// Notify the specific socket type about the pipe termination.
xterminated (pipe_);
// Remove the pipe from the list of attached pipes and confirm its
// termination if we are already shutting down.
pipes.erase (pipe_);
if (is_terminating ())
unregister_term_ack ();
}

View File

@ -34,6 +34,7 @@
#include "mailbox.hpp" #include "mailbox.hpp"
#include "stdint.hpp" #include "stdint.hpp"
#include "blob.hpp" #include "blob.hpp"
#include "pipe.hpp"
#include "own.hpp" #include "own.hpp"
namespace zmq namespace zmq
@ -42,7 +43,8 @@ namespace zmq
class socket_base_t : class socket_base_t :
public own_t, public own_t,
public array_item_t, public array_item_t,
public i_poll_events public i_poll_events,
public i_pipe_events
{ {
friend class reaper_t; friend class reaper_t;
@ -81,14 +83,6 @@ namespace zmq
void unregister_session (const blob_t &name_); void unregister_session (const blob_t &name_);
class session_t *find_session (const blob_t &name_); class session_t *find_session (const blob_t &name_);
// i_reader_events interface implementation.
void activated (class reader_t *pipe_);
void terminated (class reader_t *pipe_);
// i_writer_events interface implementation.
void activated (class writer_t *pipe_);
void terminated (class writer_t *pipe_);
// Using this function reaper thread ask the socket to regiter with // Using this function reaper thread ask the socket to regiter with
// its poller. // its poller.
void start_reaping (poller_t *poller_); void start_reaping (poller_t *poller_);
@ -99,9 +93,10 @@ namespace zmq
void out_event (); void out_event ();
void timer_event (int id_); void timer_event (int id_);
// To be called after processing commands or invoking any command // i_pipe_events interface implementation.
// handlers explicitly. If required, it will deallocate the socket. void read_activated (pipe_t *pipe_);
void check_destroy (); void write_activated (pipe_t *pipe_);
void terminated (pipe_t *pipe_);
protected: protected:
@ -127,16 +122,20 @@ namespace zmq
virtual bool xhas_in (); virtual bool xhas_in ();
virtual int xrecv (class msg_t *msg_, int options_); virtual int xrecv (class msg_t *msg_, int options_);
// We are declaring termination handler as protected so that // i_pipe_events will be forwarded to these functions.
// individual socket types can hook into the termination process virtual void xread_activated (pipe_t *pipe_);
// by overloading it. virtual void xwrite_activated (pipe_t *pipe_);
void process_term (int linger_); virtual void xterminated (pipe_t *pipe_) = 0;
// Delay actual destruction of the socket. // Delay actual destruction of the socket.
void process_destroy (); void process_destroy ();
private: private:
// To be called after processing commands or invoking any command
// handlers explicitly. If required, it will deallocate the socket.
void check_destroy ();
// Used to check whether the object is a socket. // Used to check whether the object is a socket.
uint32_t tag; uint32_t tag;
@ -156,7 +155,7 @@ namespace zmq
// bind, is available and compatible with the socket type. // bind, is available and compatible with the socket type.
int check_protocol (const std::string &protocol_); int check_protocol (const std::string &protocol_);
// If no identity is set, generate one and call xattach_pipe (). // Register the pipe with this socket.
void attach_pipe (class pipe_t *pipe_, const blob_t &peer_identity_); void attach_pipe (class pipe_t *pipe_, const blob_t &peer_identity_);
// Processes commands sent to this socket (if any). If 'block' is // Processes commands sent to this socket (if any). If 'block' is
@ -169,10 +168,15 @@ namespace zmq
void process_stop (); void process_stop ();
void process_bind (class pipe_t *pipe_, const blob_t &peer_identity_); void process_bind (class pipe_t *pipe_, const blob_t &peer_identity_);
void process_unplug (); void process_unplug ();
void process_term (int linger_);
// Socket's mailbox object. // Socket's mailbox object.
mailbox_t mailbox; mailbox_t mailbox;
// List of attached pipes.
typedef array_t <pipe_t, 3> pipes_t;
pipes_t pipes;
// Reaper's poller and handle of this socket within it. // Reaper's poller and handle of this socket within it.
poller_t *poller; poller_t *poller;
poller_t::handle_t handle; poller_t::handle_t handle;

View File

@ -37,7 +37,7 @@ namespace zmq
int xsetsockopt (int option_, const void *optval_, size_t optvallen_); int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
int xsend (class msg_t *msg_, int options_); int xsend (class msg_t *msg_, int options_);
bool xhas_out (); bool xhas_out ();
private: private:

View File

@ -24,8 +24,7 @@
#include "msg.hpp" #include "msg.hpp"
zmq::xpub_t::xpub_t (class ctx_t *parent_, uint32_t tid_) : zmq::xpub_t::xpub_t (class ctx_t *parent_, uint32_t tid_) :
socket_base_t (parent_, tid_), socket_base_t (parent_, tid_)
dist (this)
{ {
options.type = ZMQ_XPUB; options.type = ZMQ_XPUB;
} }
@ -37,35 +36,19 @@ zmq::xpub_t::~xpub_t ()
void zmq::xpub_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_) void zmq::xpub_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_)
{ {
zmq_assert (pipe_); zmq_assert (pipe_);
pipe_->set_event_sink (this);
dist.attach (pipe_); dist.attach (pipe_);
} }
void zmq::xpub_t::read_activated (pipe_t *pipe_) void zmq::xpub_t::xwrite_activated (pipe_t *pipe_)
{
// PUB socket never receives messages. This should never happen.
zmq_assert (false);
}
void zmq::xpub_t::write_activated (pipe_t *pipe_)
{ {
dist.activated (pipe_); dist.activated (pipe_);
} }
void zmq::xpub_t::terminated (pipe_t *pipe_) void zmq::xpub_t::xterminated (pipe_t *pipe_)
{ {
dist.terminated (pipe_); dist.terminated (pipe_);
} }
void zmq::xpub_t::process_term (int linger_)
{
// Terminate the outbound pipes.
dist.terminate ();
// Continue with the termination immediately.
socket_base_t::process_term (linger_);
}
int zmq::xpub_t::xsend (msg_t *msg_, int flags_) int zmq::xpub_t::xsend (msg_t *msg_, int flags_)
{ {
return dist.send (msg_, flags_); return dist.send (msg_, flags_);

View File

@ -23,15 +23,13 @@
#include "socket_base.hpp" #include "socket_base.hpp"
#include "array.hpp" #include "array.hpp"
#include "pipe.hpp"
#include "dist.hpp" #include "dist.hpp"
namespace zmq namespace zmq
{ {
class xpub_t : class xpub_t :
public socket_base_t, public socket_base_t
public i_pipe_events
{ {
public: public:
@ -44,17 +42,11 @@ namespace zmq
bool xhas_out (); bool xhas_out ();
int xrecv (class msg_t *msg_, int flags_); int xrecv (class msg_t *msg_, int flags_);
bool xhas_in (); bool xhas_in ();
void xwrite_activated (class pipe_t *pipe_);
void xterminated (class pipe_t *pipe_);
private: private:
// i_pipe_events interface implementation.
void read_activated (pipe_t *pipe_);
void write_activated (pipe_t *pipe_);
void terminated (pipe_t *pipe_);
// Hook into the termination process.
void process_term (int linger_);
// Distributor of messages holding the list of outbound pipes. // Distributor of messages holding the list of outbound pipes.
dist_t dist; dist_t dist;

View File

@ -28,8 +28,7 @@ zmq::xrep_t::xrep_t (class ctx_t *parent_, uint32_t tid_) :
prefetched (false), prefetched (false),
more_in (false), more_in (false),
current_out (NULL), current_out (NULL),
more_out (false), more_out (false)
terminating (false)
{ {
options.type = ZMQ_XREP; options.type = ZMQ_XREP;
@ -47,7 +46,6 @@ zmq::xrep_t::~xrep_t ()
void zmq::xrep_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_) void zmq::xrep_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_)
{ {
zmq_assert (pipe_); zmq_assert (pipe_);
pipe_->set_event_sink (this);
// Add the pipe to the map out outbound pipes. // Add the pipe to the map out outbound pipes.
// TODO: What if new connection has same peer identity as the old one? // TODO: What if new connection has same peer identity as the old one?
@ -59,27 +57,9 @@ void zmq::xrep_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_)
// Add the pipe to the list of inbound pipes. // Add the pipe to the list of inbound pipes.
inpipe_t inpipe = {pipe_, peer_identity_, true}; inpipe_t inpipe = {pipe_, peer_identity_, true};
inpipes.push_back (inpipe); inpipes.push_back (inpipe);
// In case we are already terminating, ask this pipe to terminate as well.
if (terminating) {
register_term_acks (1);
pipe_->terminate ();
}
} }
void zmq::xrep_t::process_term (int linger_) void zmq::xrep_t::xterminated (pipe_t *pipe_)
{
terminating = true;
register_term_acks ((int) (inpipes.size () + outpipes.size ()));
for (inpipes_t::iterator it = inpipes.begin (); it != inpipes.end (); ++it)
it->pipe->terminate ();
socket_base_t::process_term (linger_);
}
void zmq::xrep_t::terminated (pipe_t *pipe_)
{ {
for (inpipes_t::iterator it = inpipes.begin (); it != inpipes.end (); for (inpipes_t::iterator it = inpipes.begin (); it != inpipes.end ();
++it) { ++it) {
@ -89,8 +69,6 @@ void zmq::xrep_t::terminated (pipe_t *pipe_)
inpipes.erase (it); inpipes.erase (it);
if (current_in >= inpipes.size ()) if (current_in >= inpipes.size ())
current_in = 0; current_in = 0;
if (terminating)
unregister_term_ack ();
goto clean_outpipes; goto clean_outpipes;
} }
} }
@ -103,15 +81,13 @@ clean_outpipes:
outpipes.erase (it); outpipes.erase (it);
if (pipe_ == current_out) if (pipe_ == current_out)
current_out = NULL; current_out = NULL;
if (terminating)
unregister_term_ack ();
return; return;
} }
} }
zmq_assert (false); zmq_assert (false);
} }
void zmq::xrep_t::read_activated (pipe_t *pipe_) void zmq::xrep_t::xread_activated (pipe_t *pipe_)
{ {
for (inpipes_t::iterator it = inpipes.begin (); it != inpipes.end (); for (inpipes_t::iterator it = inpipes.begin (); it != inpipes.end ();
++it) { ++it) {
@ -124,7 +100,7 @@ void zmq::xrep_t::read_activated (pipe_t *pipe_)
zmq_assert (false); zmq_assert (false);
} }
void zmq::xrep_t::write_activated (pipe_t *pipe_) void zmq::xrep_t::xwrite_activated (pipe_t *pipe_)
{ {
for (outpipes_t::iterator it = outpipes.begin (); for (outpipes_t::iterator it = outpipes.begin ();
it != outpipes.end (); ++it) { it != outpipes.end (); ++it) {
@ -312,3 +288,4 @@ bool zmq::xrep_t::xhas_out ()
} }

View File

@ -26,7 +26,6 @@
#include "socket_base.hpp" #include "socket_base.hpp"
#include "blob.hpp" #include "blob.hpp"
#include "pipe.hpp"
#include "msg.hpp" #include "msg.hpp"
namespace zmq namespace zmq
@ -34,8 +33,7 @@ namespace zmq
// TODO: This class uses O(n) scheduling. Rewrite it to use O(1) algorithm. // TODO: This class uses O(n) scheduling. Rewrite it to use O(1) algorithm.
class xrep_t : class xrep_t :
public socket_base_t, public socket_base_t
public i_pipe_events
{ {
public: public:
@ -48,6 +46,9 @@ namespace zmq
int xrecv (class msg_t *msg_, int flags_); int xrecv (class msg_t *msg_, int flags_);
bool xhas_in (); bool xhas_in ();
bool xhas_out (); bool xhas_out ();
void xread_activated (class pipe_t *pipe_);
void xwrite_activated (class pipe_t *pipe_);
void xterminated (class pipe_t *pipe_);
protected: protected:
@ -56,14 +57,6 @@ namespace zmq
private: private:
// Hook into the termination process.
void process_term (int linger_);
// i_pipe_events interface implementation.
void read_activated (pipe_t *pipe_);
void write_activated (pipe_t *pipe_);
void terminated (pipe_t *pipe_);
struct inpipe_t struct inpipe_t
{ {
class pipe_t *pipe; class pipe_t *pipe;
@ -103,9 +96,6 @@ namespace zmq
// If true, more outgoing message parts are expected. // If true, more outgoing message parts are expected.
bool more_out; bool more_out;
// If true, termination process is already underway.
bool terminating;
xrep_t (const xrep_t&); xrep_t (const xrep_t&);
const xrep_t &operator = (const xrep_t&); const xrep_t &operator = (const xrep_t&);
}; };

View File

@ -23,9 +23,7 @@
#include "msg.hpp" #include "msg.hpp"
zmq::xreq_t::xreq_t (class ctx_t *parent_, uint32_t tid_) : zmq::xreq_t::xreq_t (class ctx_t *parent_, uint32_t tid_) :
socket_base_t (parent_, tid_), socket_base_t (parent_, tid_)
fq (this),
lb (this)
{ {
options.type = ZMQ_XREQ; options.type = ZMQ_XREQ;
} }
@ -34,21 +32,13 @@ zmq::xreq_t::~xreq_t ()
{ {
} }
void zmq::xreq_t::xattach_pipe (class pipe_t *pipe_, const blob_t &peer_identity_) void zmq::xreq_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_)
{ {
zmq_assert (pipe_); zmq_assert (pipe_);
pipe_->set_event_sink (this);
fq.attach (pipe_); fq.attach (pipe_);
lb.attach (pipe_); lb.attach (pipe_);
} }
void zmq::xreq_t::process_term (int linger_)
{
fq.terminate ();
lb.terminate ();
socket_base_t::process_term (linger_);
}
int zmq::xreq_t::xsend (msg_t *msg_, int flags_) int zmq::xreq_t::xsend (msg_t *msg_, int flags_)
{ {
return lb.send (msg_, flags_); return lb.send (msg_, flags_);
@ -69,17 +59,17 @@ bool zmq::xreq_t::xhas_out ()
return lb.has_out (); return lb.has_out ();
} }
void zmq::xreq_t::read_activated (pipe_t *pipe_) void zmq::xreq_t::xread_activated (pipe_t *pipe_)
{ {
fq.activated (pipe_); fq.activated (pipe_);
} }
void zmq::xreq_t::write_activated (pipe_t *pipe_) void zmq::xreq_t::xwrite_activated (pipe_t *pipe_)
{ {
lb.activated (pipe_); lb.activated (pipe_);
} }
void zmq::xreq_t::terminated (pipe_t *pipe_) void zmq::xreq_t::xterminated (pipe_t *pipe_)
{ {
fq.terminated (pipe_); fq.terminated (pipe_);
lb.terminated (pipe_); lb.terminated (pipe_);

View File

@ -23,7 +23,6 @@
#define __ZMQ_XREQ_HPP_INCLUDED__ #define __ZMQ_XREQ_HPP_INCLUDED__
#include "socket_base.hpp" #include "socket_base.hpp"
#include "pipe.hpp"
#include "fq.hpp" #include "fq.hpp"
#include "lb.hpp" #include "lb.hpp"
@ -31,8 +30,7 @@ namespace zmq
{ {
class xreq_t : class xreq_t :
public socket_base_t, public socket_base_t
public i_pipe_events
{ {
public: public:
@ -47,17 +45,12 @@ namespace zmq
int xrecv (class msg_t *msg_, int flags_); int xrecv (class msg_t *msg_, int flags_);
bool xhas_in (); bool xhas_in ();
bool xhas_out (); bool xhas_out ();
void xread_activated (class pipe_t *pipe_);
void xwrite_activated (class pipe_t *pipe_);
void xterminated (class pipe_t *pipe_);
private: private:
// i_pipe_events interface implementation.
void read_activated (pipe_t *pipe_);
void write_activated (pipe_t *pipe_);
void terminated (pipe_t *pipe_);
// Hook into the termination process.
void process_term (int linger_);
// Messages are fair-queued from inbound pipes. And load-balanced to // Messages are fair-queued from inbound pipes. And load-balanced to
// the outbound pipes. // the outbound pipes.
fq_t fq; fq_t fq;

View File

@ -25,7 +25,6 @@
zmq::xsub_t::xsub_t (class ctx_t *parent_, uint32_t tid_) : zmq::xsub_t::xsub_t (class ctx_t *parent_, uint32_t tid_) :
socket_base_t (parent_, tid_), socket_base_t (parent_, tid_),
fq (this),
has_message (false), has_message (false),
more (false) more (false)
{ {
@ -43,32 +42,19 @@ zmq::xsub_t::~xsub_t ()
void zmq::xsub_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_) void zmq::xsub_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_)
{ {
zmq_assert (pipe_); zmq_assert (pipe_);
pipe_->set_event_sink (this);
fq.attach (pipe_); fq.attach (pipe_);
} }
void zmq::xsub_t::read_activated (pipe_t *pipe_) void zmq::xsub_t::xread_activated (pipe_t *pipe_)
{ {
fq.activated (pipe_); fq.activated (pipe_);
} }
void zmq::xsub_t::write_activated (pipe_t *pipe_) void zmq::xsub_t::xterminated (pipe_t *pipe_)
{
// SUB socket never sends messages. This should never happen.
zmq_assert (false);
}
void zmq::xsub_t::terminated (pipe_t *pipe_)
{ {
fq.terminated (pipe_); fq.terminated (pipe_);
} }
void zmq::xsub_t::process_term (int linger_)
{
fq.terminate ();
socket_base_t::process_term (linger_);
}
int zmq::xsub_t::xsend (msg_t *msg_, int options_) int zmq::xsub_t::xsend (msg_t *msg_, int options_)
{ {
size_t size = msg_->size (); size_t size = msg_->size ();

View File

@ -23,7 +23,6 @@
#include "trie.hpp" #include "trie.hpp"
#include "socket_base.hpp" #include "socket_base.hpp"
#include "pipe.hpp"
#include "msg.hpp" #include "msg.hpp"
#include "fq.hpp" #include "fq.hpp"
@ -31,8 +30,7 @@ namespace zmq
{ {
class xsub_t : class xsub_t :
public socket_base_t, public socket_base_t
public i_pipe_events
{ {
public: public:
@ -47,17 +45,11 @@ namespace zmq
bool xhas_out (); bool xhas_out ();
int xrecv (class msg_t *msg_, int flags_); int xrecv (class msg_t *msg_, int flags_);
bool xhas_in (); bool xhas_in ();
void xread_activated (class pipe_t *pipe_);
void xterminated (class pipe_t *pipe_);
private: private:
// i_pipe_events interface implementation.
void read_activated (pipe_t *pipe_);
void write_activated (pipe_t *pipe_);
void terminated (pipe_t *pipe_);
// Hook into the termination process.
void process_term (int linger_);
// Check whether the message matches at least one subscription. // Check whether the message matches at least one subscription.
bool match (class msg_t *msg_); bool match (class msg_t *msg_);

View File

@ -58,7 +58,7 @@ int main (int argc, char *argv [])
ctx = zmq_init (7); ctx = zmq_init (7);
assert (ctx); assert (ctx);
s1 = zmq_socket (ctx, ZMQ_REP); s1 = zmq_socket (ctx, ZMQ_PUB);
assert (s1); assert (s1);
rc = zmq_bind (s1, "tcp://127.0.0.1:5560"); rc = zmq_bind (s1, "tcp://127.0.0.1:5560");