0
0
mirror of https://github.com/zeromq/libzmq.git synced 2025-01-01 19:05:18 +08:00

Multi-hop REQ/REP, part XI., finalise the XREQ/XREP functionality

This commit is contained in:
Martin Sustrik 2010-02-16 18:30:38 +01:00
parent 2ddce20535
commit b9caa319e2
33 changed files with 171 additions and 72 deletions

View File

@ -35,7 +35,7 @@ zmq::downstream_t::~downstream_t ()
}
void zmq::downstream_t::xattach_pipes (class reader_t *inpipe_,
class writer_t *outpipe_)
class writer_t *outpipe_, const blob_t &peer_identity_)
{
zmq_assert (!inpipe_ && outpipe_);
lb.attach (outpipe_);

View File

@ -34,7 +34,8 @@ namespace zmq
~downstream_t ();
// Overloads of functions from socket_base_t.
void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_);
void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
const blob_t &peer_identity_);
void xdetach_inpipe (class reader_t *pipe_);
void xdetach_outpipe (class writer_t *pipe_);
void xkill (class reader_t *pipe_);

View File

@ -20,6 +20,8 @@
#ifndef __ZMQ_I_ENDPOINT_HPP_INCLUDED__
#define __ZMQ_I_ENDPOINT_HPP_INCLUDED__
#include "blob.hpp"
namespace zmq
{
@ -28,7 +30,7 @@ namespace zmq
virtual ~i_endpoint () {}
virtual void attach_pipes (class reader_t *inpipe_,
class writer_t *outpipe_) = 0;
class writer_t *outpipe_, const blob_t &peer_identity_) = 0;
virtual void detach_inpipe (class reader_t *pipe_) = 0;
virtual void detach_outpipe (class writer_t *pipe_) = 0;
virtual void kill (class reader_t *pipe_) = 0;

View File

@ -41,10 +41,11 @@ namespace zmq
// are messages to send available.
virtual void revive () = 0;
// Start tracing the message route. Engine should add the identity
// supplied to all inbound messages and trim identity from all the
// outbound messages.
virtual void traceroute (const blob_t &identity_) = 0;
// Engine should add the prefix supplied to all inbound messages.
virtual void add_prefix (const blob_t &identity_) = 0;
// Engine should trim prefix from all the outbound messages.
virtual void trim_prefix () = 0;
};
}

View File

@ -42,7 +42,7 @@ zmq::p2p_t::~p2p_t ()
}
void zmq::p2p_t::xattach_pipes (class reader_t *inpipe_,
class writer_t *outpipe_)
class writer_t *outpipe_, const blob_t &peer_identity_)
{
zmq_assert (!inpipe && !outpipe);
inpipe = inpipe_;

View File

@ -33,7 +33,8 @@ namespace zmq
~p2p_t ();
// Overloads of functions from socket_base_t.
void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_);
void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
const blob_t &peer_identity_);
void xdetach_inpipe (class reader_t *pipe_);
void xdetach_outpipe (class writer_t *pipe_);
void xkill (class reader_t *pipe_);

View File

@ -88,7 +88,13 @@ void zmq::pgm_receiver_t::revive ()
zmq_assert (false);
}
void zmq::pgm_receiver_t::traceroute (const blob_t &identity_)
void zmq::pgm_receiver_t::add_prefix (const blob_t &identity_)
{
// No need for tracerouting functionality in PGM socket at the moment.
zmq_assert (false);
}
void zmq::pgm_receiver_t::trim_prefix ()
{
// No need for tracerouting functionality in PGM socket at the moment.
zmq_assert (false);

View File

@ -54,7 +54,8 @@ namespace zmq
void plug (struct i_inout *inout_);
void unplug ();
void revive ();
void traceroute (const blob_t &identity_);
void add_prefix (const blob_t &identity_);
void trim_prefix ();
// i_poll_events interface implementation.
void in_event ();

View File

@ -102,7 +102,13 @@ void zmq::pgm_sender_t::revive ()
out_event ();
}
void zmq::pgm_sender_t::traceroute (const blob_t &identity_)
void zmq::pgm_sender_t::add_prefix (const blob_t &identity_)
{
// No need for tracerouting functionality in PGM socket at the moment.
zmq_assert (false);
}
void zmq::pgm_sender_t::trim_prefix ()
{
// No need for tracerouting functionality in PGM socket at the moment.
zmq_assert (false);

View File

@ -52,7 +52,8 @@ namespace zmq
void plug (struct i_inout *inout_);
void unplug ();
void revive ();
void traceroute (const blob_t &identity_);
void add_prefix (const blob_t &identity_);
void trim_prefix ();
// i_poll_events interface implementation.
void in_event ();

View File

@ -39,7 +39,7 @@ zmq::pub_t::~pub_t ()
}
void zmq::pub_t::xattach_pipes (class reader_t *inpipe_,
class writer_t *outpipe_)
class writer_t *outpipe_, const blob_t &peer_identity_)
{
zmq_assert (!inpipe_);
out_pipes.push_back (outpipe_);

View File

@ -34,7 +34,8 @@ namespace zmq
~pub_t ();
// Overloads of functions from socket_base_t.
void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_);
void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
const blob_t &peer_identity_);
void xdetach_inpipe (class reader_t *pipe_);
void xdetach_outpipe (class writer_t *pipe_);
void xkill (class reader_t *pipe_);

View File

@ -44,7 +44,7 @@ zmq::rep_t::~rep_t ()
}
void zmq::rep_t::xattach_pipes (class reader_t *inpipe_,
class writer_t *outpipe_)
class writer_t *outpipe_, const blob_t &peer_identity_)
{
zmq_assert (inpipe_ && outpipe_);
zmq_assert (in_pipes.size () == out_pipes.size ());

View File

@ -34,7 +34,8 @@ namespace zmq
~rep_t ();
// Overloads of functions from socket_base_t.
void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_);
void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
const blob_t &peer_identity_);
void xdetach_inpipe (class reader_t *pipe_);
void xdetach_outpipe (class writer_t *pipe_);
void xkill (class reader_t *pipe_);

View File

@ -39,7 +39,7 @@ zmq::req_t::~req_t ()
}
void zmq::req_t::xattach_pipes (class reader_t *inpipe_,
class writer_t *outpipe_)
class writer_t *outpipe_, const blob_t &peer_identity_)
{
zmq_assert (inpipe_ && outpipe_);
zmq_assert (in_pipes.size () == out_pipes.size ());

View File

@ -34,7 +34,8 @@ namespace zmq
~req_t ();
// Overloads of functions from socket_base_t.
void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_);
void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
const blob_t &peer_identity_);
void xdetach_inpipe (class reader_t *pipe_);
void xdetach_outpipe (class writer_t *pipe_);
void xkill (class reader_t *pipe_);

View File

@ -124,7 +124,7 @@ uint64_t zmq::session_t::get_ordinal ()
}
void zmq::session_t::attach_pipes (class reader_t *inpipe_,
class writer_t *outpipe_)
class writer_t *outpipe_, const blob_t &peer_identity_)
{
if (inpipe_) {
zmq_assert (!in_pipe);
@ -251,4 +251,9 @@ void zmq::session_t::process_attach (i_engine *engine_,
zmq_assert (engine_);
engine = engine_;
engine->plug (this);
// Once the initial handshaking is over tracerouting should trim prefixes
// from outbound messages.
if (options.traceroute)
engine->trim_prefix ();
}

View File

@ -51,7 +51,8 @@ namespace zmq
uint64_t get_ordinal ();
// i_endpoint interface implementation.
void attach_pipes (class reader_t *inpipe_, class writer_t *outpipe_);
void attach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
const blob_t &peer_identity_);
void detach_inpipe (class reader_t *pipe_);
void detach_outpipe (class writer_t *pipe_);
void kill (class reader_t *pipe_);

View File

@ -169,7 +169,7 @@ int zmq::socket_base_t::connect (const char *addr_)
// Attach the pipes to this socket object.
attach_pipes (in_pipe ? &in_pipe->reader : NULL,
out_pipe ? &out_pipe->writer : NULL);
out_pipe ? &out_pipe->writer : NULL, blob_t ());
// Attach the pipes to the peer socket. Note that peer's seqnum
// was incremented in find_endpoint function. The callee is notified
@ -211,11 +211,11 @@ int zmq::socket_base_t::connect (const char *addr_)
// Attach the pipes to the socket object.
attach_pipes (in_pipe ? &in_pipe->reader : NULL,
out_pipe ? &out_pipe->writer : NULL);
out_pipe ? &out_pipe->writer : NULL, blob_t ());
// Attach the pipes to the session object.
session->attach_pipes (out_pipe ? &out_pipe->reader : NULL,
in_pipe ? &in_pipe->writer : NULL);
in_pipe ? &in_pipe->writer : NULL, blob_t ());
}
// Activate the session.
@ -553,13 +553,13 @@ void zmq::socket_base_t::revive (reader_t *pipe_)
}
void zmq::socket_base_t::attach_pipes (class reader_t *inpipe_,
class writer_t *outpipe_)
class writer_t *outpipe_, const blob_t &peer_identity_)
{
if (inpipe_)
inpipe_->set_endpoint (this);
if (outpipe_)
outpipe_->set_endpoint (this);
xattach_pipes (inpipe_, outpipe_);
xattach_pipes (inpipe_, outpipe_, peer_identity_);
}
void zmq::socket_base_t::detach_inpipe (class reader_t *pipe_)
@ -582,7 +582,7 @@ void zmq::socket_base_t::process_own (owned_t *object_)
void zmq::socket_base_t::process_bind (reader_t *in_pipe_, writer_t *out_pipe_,
const blob_t &peer_identity_)
{
attach_pipes (in_pipe_, out_pipe_);
attach_pipes (in_pipe_, out_pipe_, peer_identity_);
}
void zmq::socket_base_t::process_term_req (owned_t *object_)

View File

@ -87,7 +87,8 @@ namespace zmq
class session_t *find_session (uint64_t ordinal_);
// i_endpoint interface implementation.
void attach_pipes (class reader_t *inpipe_, class writer_t *outpipe_);
void attach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
const blob_t &peer_identity_);
void detach_inpipe (class reader_t *pipe_);
void detach_outpipe (class writer_t *pipe_);
void kill (class reader_t *pipe_);
@ -100,7 +101,7 @@ namespace zmq
// Pipe management is done by individual socket types.
virtual void xattach_pipes (class reader_t *inpipe_,
class writer_t *outpipe_) = 0;
class writer_t *outpipe_, const blob_t &peer_identity_) = 0;
virtual void xdetach_inpipe (class reader_t *pipe_) = 0;
virtual void xdetach_outpipe (class writer_t *pipe_) = 0;
virtual void xkill (class reader_t *pipe_) = 0;

View File

@ -39,7 +39,7 @@ zmq::sub_t::~sub_t ()
}
void zmq::sub_t::xattach_pipes (class reader_t *inpipe_,
class writer_t *outpipe_)
class writer_t *outpipe_, const blob_t &peer_identity_)
{
zmq_assert (inpipe_ && !outpipe_);
fq.attach (inpipe_);

View File

@ -39,7 +39,8 @@ namespace zmq
protected:
// Overloads of functions from socket_base_t.
void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_);
void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
const blob_t &peer_identity_);
void xdetach_inpipe (class reader_t *pipe_);
void xdetach_outpipe (class writer_t *pipe_);
void xkill (class reader_t *pipe_);

View File

@ -34,7 +34,7 @@ zmq::upstream_t::~upstream_t ()
}
void zmq::upstream_t::xattach_pipes (class reader_t *inpipe_,
class writer_t *outpipe_)
class writer_t *outpipe_, const blob_t &peer_identity_)
{
zmq_assert (inpipe_ && !outpipe_);
fq.attach (inpipe_);

View File

@ -34,7 +34,8 @@ namespace zmq
~upstream_t ();
// Overloads of functions from socket_base_t.
void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_);
void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
const blob_t &peer_identity_);
void xdetach_inpipe (class reader_t *pipe_);
void xdetach_outpipe (class writer_t *pipe_);
void xkill (class reader_t *pipe_);

View File

@ -21,6 +21,7 @@
#include "xrep.hpp"
#include "err.hpp"
#include "pipe.hpp"
zmq::xrep_t::xrep_t (class app_thread_t *parent_) :
socket_base_t (parent_)
@ -42,12 +43,15 @@ zmq::xrep_t::~xrep_t ()
}
void zmq::xrep_t::xattach_pipes (class reader_t *inpipe_,
class writer_t *outpipe_)
class writer_t *outpipe_, const blob_t &peer_identity_)
{
zmq_assert (inpipe_ && outpipe_);
fq.attach (inpipe_);
zmq_assert (false);
// TODO: What if new connection has same peer identity as the old one?
bool ok = outpipes.insert (std::make_pair (
peer_identity_, outpipe_)).second;
zmq_assert (ok);
}
void zmq::xrep_t::xdetach_inpipe (class reader_t *pipe_)
@ -58,6 +62,12 @@ void zmq::xrep_t::xdetach_inpipe (class reader_t *pipe_)
void zmq::xrep_t::xdetach_outpipe (class writer_t *pipe_)
{
for (outpipes_t::iterator it = outpipes.begin ();
it != outpipes.end (); ++it)
if (it->second == pipe_) {
outpipes.erase (it);
return;
}
zmq_assert (false);
}
@ -80,8 +90,35 @@ int zmq::xrep_t::xsetsockopt (int option_, const void *optval_,
int zmq::xrep_t::xsend (zmq_msg_t *msg_, int flags_)
{
zmq_assert (false);
return -1;
unsigned char *data = (unsigned char*) zmq_msg_data (msg_);
size_t size = zmq_msg_size (msg_);
// Check whether the message is well-formed.
zmq_assert (size >= 1);
zmq_assert (size_t (*data + 1) <= size);
// Find the corresponding outbound pipe. If there's none, just drop the
// message.
// TODO: There's an allocation here! It's the critical path! Get rid of it!
blob_t identity (data + 1, *data);
outpipes_t::iterator it = outpipes.find (identity);
if (it == outpipes.end ()) {
int rc = zmq_msg_close (msg_);
zmq_assert (rc == 0);
rc = zmq_msg_init (msg_);
zmq_assert (rc == 0);
return 0;
}
// Push message to the selected pipe.
it->second->write (msg_);
it->second->flush ();
// Detach the message from the data buffer.
int rc = zmq_msg_init (msg_);
zmq_assert (rc == 0);
return 0;
}
int zmq::xrep_t::xflush ()
@ -102,8 +139,10 @@ bool zmq::xrep_t::xhas_in ()
bool zmq::xrep_t::xhas_out ()
{
zmq_assert (false);
return false;
// In theory, XREP socket is always ready for writing. Whether actual
// attempt to write succeeds depends on whitch pipe the message is going
// to be routed to.
return true;
}

View File

@ -20,7 +20,10 @@
#ifndef __ZMQ_XREP_HPP_INCLUDED__
#define __ZMQ_XREP_HPP_INCLUDED__
#include <map>
#include "socket_base.hpp"
#include "blob.hpp"
#include "fq.hpp"
namespace zmq
@ -34,7 +37,8 @@ namespace zmq
~xrep_t ();
// Overloads of functions from socket_base_t.
void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_);
void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
const blob_t &peer_identity_);
void xdetach_inpipe (class reader_t *pipe_);
void xdetach_outpipe (class writer_t *pipe_);
void xkill (class reader_t *pipe_);
@ -51,6 +55,10 @@ namespace zmq
// Inbound messages are fair-queued.
fq_t fq;
// Outbound pipes indexed by the peer names.
typedef std::map <blob_t, class writer_t*> outpipes_t;
outpipes_t outpipes;
xrep_t (const xrep_t&);
void operator = (const xrep_t&);
};

View File

@ -34,7 +34,7 @@ zmq::xreq_t::~xreq_t ()
}
void zmq::xreq_t::xattach_pipes (class reader_t *inpipe_,
class writer_t *outpipe_)
class writer_t *outpipe_, const blob_t &peer_identity_)
{
zmq_assert (inpipe_ && outpipe_);
fq.attach (inpipe_);

View File

@ -35,7 +35,8 @@ namespace zmq
~xreq_t ();
// Overloads of functions from socket_base_t.
void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_);
void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
const blob_t &peer_identity_);
void xdetach_inpipe (class reader_t *pipe_);
void xdetach_outpipe (class writer_t *pipe_);
void xkill (class reader_t *pipe_);

View File

@ -63,16 +63,22 @@ bool zmq::zmq_decoder_t::one_byte_size_ready ()
// in_progress is initialised at this point so in theory we should
// close it before calling zmq_msg_init_size, however, it's a 0-byte
// message and thus we can treat it as uninitialised...
int rc = zmq_msg_init_size (&in_progress, prefix.size () + *tmpbuf);
if (prefix.empty ()) {
int rc = zmq_msg_init_size (&in_progress, *tmpbuf);
errno_assert (rc == 0);
// Fill in the message prefix if any.
if (!prefix.empty ())
memcpy (zmq_msg_data (&in_progress), prefix.data (),
prefix.size ());
next_step ((unsigned char*) zmq_msg_data (&in_progress) +
prefix.size (), *tmpbuf, &zmq_decoder_t::message_ready);
next_step (zmq_msg_data (&in_progress), *tmpbuf,
&zmq_decoder_t::message_ready);
}
else {
int rc = zmq_msg_init_size (&in_progress,
*tmpbuf + 1 + prefix.size ());
errno_assert (rc == 0);
unsigned char *data = (unsigned char*) zmq_msg_data (&in_progress);
*data = (unsigned char) prefix.size ();
memcpy (data + 1, prefix.data (), *data);
next_step (data + *data + 1, *tmpbuf,
&zmq_decoder_t::message_ready);
}
}
return true;
}
@ -87,15 +93,21 @@ bool zmq::zmq_decoder_t::eight_byte_size_ready ()
// in_progress is initialised at this point so in theory we should
// close it before calling zmq_msg_init_size, however, it's a 0-byte
// message and thus we can treat it as uninitialised...
int rc = zmq_msg_init_size (&in_progress, prefix.size () + size);
if (prefix.empty ()) {
int rc = zmq_msg_init_size (&in_progress, size);
errno_assert (rc == 0);
next_step (zmq_msg_data (&in_progress), size,
&zmq_decoder_t::message_ready);
}
else {
int rc = zmq_msg_init_size (&in_progress, size + 1 + prefix.size ());
errno_assert (rc == 0);
unsigned char *data = (unsigned char*) zmq_msg_data (&in_progress);
*data = (unsigned char) prefix.size ();
memcpy (data + 1, prefix.data (), *data);
next_step (data + *data + 1, size, &zmq_decoder_t::message_ready);
}
// Fill in the message prefix if any.
if (!prefix.empty ())
memcpy (zmq_msg_data (&in_progress), prefix.data (), prefix.size ());
next_step ((unsigned char*) zmq_msg_data (&in_progress) + prefix.size (),
size, &zmq_decoder_t::message_ready);
return true;
}

View File

@ -56,8 +56,9 @@ bool zmq::zmq_encoder_t::size_ready ()
}
else {
size_t prefix_size = *(unsigned char*) zmq_msg_data (&in_progress);
next_step ((unsigned char*) zmq_msg_data (&in_progress) + prefix_size,
zmq_msg_size (&in_progress) - prefix_size,
next_step (
(unsigned char*) zmq_msg_data (&in_progress) + prefix_size + 1,
zmq_msg_size (&in_progress) - prefix_size - 1,
&zmq_encoder_t::message_ready, false);
}
return true;
@ -80,8 +81,14 @@ bool zmq::zmq_encoder_t::message_ready ()
// Get the message size. If the prefix is not to be sent, adjust the
// size accordingly.
size_t size = zmq_msg_size (&in_progress);
if (trim)
size -= *(unsigned char*) zmq_msg_data (&in_progress);
if (trim) {
zmq_assert (size);
size_t prefix_size =
(*(unsigned char*) zmq_msg_data (&in_progress)) + 1;
zmq_assert (prefix_size <= size);
size -= prefix_size;
}
// For messages less than 255 bytes long, write one byte of message size.
// For longer messages write 0xff escape character followed by 8-byte

View File

@ -160,10 +160,14 @@ void zmq::zmq_engine_t::revive ()
out_event ();
}
void zmq::zmq_engine_t::traceroute (const blob_t &identity_)
void zmq::zmq_engine_t::add_prefix (const blob_t &identity_)
{
decoder.add_prefix (identity_);
}
void zmq::zmq_engine_t::trim_prefix ()
{
encoder.trim_prefix ();
decoder.add_prefix (identity_);
}
void zmq::zmq_engine_t::error ()

View File

@ -47,7 +47,8 @@ namespace zmq
void plug (struct i_inout *inout_);
void unplug ();
void revive ();
void traceroute (const blob_t &identity_);
void add_prefix (const blob_t &identity_);
void trim_prefix ();
// i_poll_events interface implementation.
void in_event ();

View File

@ -74,13 +74,9 @@ bool zmq::zmq_init_t::write (::zmq_msg_t *msg_)
// Retreieve the remote identity.
peer_identity.assign ((const unsigned char*) zmq_msg_data (msg_),
zmq_msg_size (msg_));
engine->add_prefix (peer_identity);
received = true;
// Once the initial handshaking is over, XREP sockets should start
// tracerouting individual messages.
if (options.traceroute)
engine->traceroute (peer_identity);
return true;
}