diff --git a/src/msg.hpp b/src/msg.hpp index f2f8fcf6..8c846708 100644 --- a/src/msg.hpp +++ b/src/msg.hpp @@ -50,6 +50,7 @@ namespace zmq enum { more = 1, + identity = 64, shared = 128 }; diff --git a/src/options.cpp b/src/options.cpp index aa94a218..4db1a6cb 100644 --- a/src/options.cpp +++ b/src/options.cpp @@ -46,7 +46,9 @@ zmq::options_t::options_t () : ipv4only (1), delay_on_close (true), delay_on_disconnect (true), - filter (false) + filter (false), + send_identity (false), + recv_identity (false) { } diff --git a/src/options.hpp b/src/options.hpp index d017c001..bfc9dc79 100644 --- a/src/options.hpp +++ b/src/options.hpp @@ -99,6 +99,12 @@ namespace zmq // If 1, (X)SUB socket should filter the messages. If 0, it should not. bool filter; + + // Sends identity to all new connections. + bool send_identity; + + // Receivers identity from all new connections. + bool recv_identity; }; } diff --git a/src/pipe.cpp b/src/pipe.cpp index 9f44c948..25dd51cc 100644 --- a/src/pipe.cpp +++ b/src/pipe.cpp @@ -65,8 +65,7 @@ zmq::pipe_t::pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_, peer (NULL), sink (NULL), state (active), - delay (delay_), - pipe_id (0) + delay (delay_) { } @@ -88,14 +87,14 @@ void zmq::pipe_t::set_event_sink (i_pipe_events *sink_) sink = sink_; } -void zmq::pipe_t::set_pipe_id (uint32_t id_) +void zmq::pipe_t::set_identity (const blob_t &identity_) { - pipe_id = id_; + identity = identity_; } -uint32_t zmq::pipe_t::get_pipe_id () +zmq::blob_t zmq::pipe_t::get_identity () { - return pipe_id; + return identity; } bool zmq::pipe_t::check_read () diff --git a/src/pipe.hpp b/src/pipe.hpp index 4533e584..75a20210 100644 --- a/src/pipe.hpp +++ b/src/pipe.hpp @@ -1,6 +1,7 @@ /* Copyright (c) 2009-2011 250bpm s.r.o. Copyright (c) 2007-2009 iMatix Corporation + Copyright (c) 2011 VMware, Inc. Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. @@ -28,6 +29,7 @@ #include "object.hpp" #include "stdint.hpp" #include "array.hpp" +#include "blob.hpp" namespace zmq { @@ -71,8 +73,8 @@ namespace zmq void set_event_sink (i_pipe_events *sink_); // Pipe endpoint can store an opaque ID to be used by its clients. - void set_pipe_id (uint32_t id_); - uint32_t get_pipe_id (); + void set_identity (const blob_t &identity_); + blob_t get_identity (); // Returns true if there is at least one message to read in the pipe. bool check_read (); @@ -183,8 +185,8 @@ namespace zmq // asks us to. bool delay; - // Opaque ID. To be used by the clients, not the pipe itself. - uint32_t pipe_id; + // Identity of the writer. Used uniquely by the reader side. + blob_t identity; // Returns true if the message is delimiter; false otherwise. static bool is_delimiter (msg_t &msg_); diff --git a/src/req.cpp b/src/req.cpp index 40c4765f..3ba1ec01 100644 --- a/src/req.cpp +++ b/src/req.cpp @@ -147,23 +147,32 @@ zmq::req_session_t::req_session_t (io_thread_t *io_thread_, bool connect_, zmq::req_session_t::~req_session_t () { + state = options.recv_identity ? identity : bottom; } int zmq::req_session_t::write (msg_t *msg_) { - if (state == bottom) { + switch (state) { + case bottom: if (msg_->flags () == msg_t::more && msg_->size () == 0) { state = body; return xreq_session_t::write (msg_); } - } - else { + break; + case body: if (msg_->flags () == msg_t::more) return xreq_session_t::write (msg_); if (msg_->flags () == 0) { state = bottom; return xreq_session_t::write (msg_); } + break; + case identity: + if (msg_->flags () == 0) { + state = bottom; + return xreq_session_t::write (msg_); + } + break; } errno = EFAULT; return -1; diff --git a/src/req.hpp b/src/req.hpp index 61066cad..8fae9d40 100644 --- a/src/req.hpp +++ b/src/req.hpp @@ -71,6 +71,7 @@ namespace zmq private: enum { + identity, bottom, body } state; diff --git a/src/session_base.cpp b/src/session_base.cpp index 4c5e5127..f2ee7139 100644 --- a/src/session_base.cpp +++ b/src/session_base.cpp @@ -112,7 +112,9 @@ zmq::session_base_t::session_base_t (class io_thread_t *io_thread_, engine (NULL), socket (socket_), io_thread (io_thread_), - has_linger_timer (false) + has_linger_timer (false), + send_identity (options_.send_identity), + recv_identity (options_.recv_identity) { if (protocol_) protocol = protocol_; @@ -146,6 +148,16 @@ void zmq::session_base_t::attach_pipe (pipe_t *pipe_) int zmq::session_base_t::read (msg_t *msg_) { + // First message to send is identity (if required). + if (send_identity) { + zmq_assert (!(msg_->flags () & msg_t::more)); + msg_->init_size (options.identity_size); + memcpy (msg_->data (), options.identity, options.identity_size); + send_identity = false; + incomplete_in = false; + return 0; + } + if (!pipe || !pipe->read (msg_)) { errno = EAGAIN; return -1; @@ -157,6 +169,12 @@ int zmq::session_base_t::read (msg_t *msg_) int zmq::session_base_t::write (msg_t *msg_) { + // First message to receive is identity (if required). + if (recv_identity) { + msg_->set_flags (msg_t::identity); + recv_identity = false; + } + if (pipe && pipe->write (msg_)) { int rc = msg_->init (); errno_assert (rc == 0); diff --git a/src/session_base.hpp b/src/session_base.hpp index 86a670f8..c89628f7 100644 --- a/src/session_base.hpp +++ b/src/session_base.hpp @@ -1,6 +1,7 @@ /* Copyright (c) 2009-2011 250bpm s.r.o. Copyright (c) 2007-2009 iMatix Corporation + Copyright (c) 2011 VMware, Inc. Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. @@ -119,6 +120,10 @@ namespace zmq // True is linger timer is running. bool has_linger_timer; + // If true, identity is to be sent/recvd from the network. + bool send_identity; + bool recv_identity; + // Protocol and address to use when connecting. std::string protocol; std::string address; diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 583818b2..a59ba698 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -845,7 +845,16 @@ void zmq::socket_base_t::terminated (pipe_t *pipe_) void zmq::socket_base_t::extract_flags (msg_t *msg_) { + // Test whether IDENTITY flag is valid for this socket type. + if (unlikely (msg_->flags () & msg_t::identity)) { + zmq_assert (options.recv_identity); +printf ("identity recvd\n"); + } + + + // Remove MORE flag. rcvmore = msg_->flags () & msg_t::more ? true : false; if (rcvmore) msg_->reset_flags (msg_t::more); } + diff --git a/src/xrep.cpp b/src/xrep.cpp index 350d7520..ea19e56d 100644 --- a/src/xrep.cpp +++ b/src/xrep.cpp @@ -43,6 +43,9 @@ zmq::xrep_t::xrep_t (class ctx_t *parent_, uint32_t tid_) : // all the outstanding requests from that peer. // options.delay_on_disconnect = false; + options.send_identity = true; + options.recv_identity = true; + prefetched_msg.init (); } @@ -56,33 +59,22 @@ void zmq::xrep_t::xattach_pipe (pipe_t *pipe_) { zmq_assert (pipe_); - // Generate a new peer ID. Take care to avoid duplicates. - outpipes_t::iterator it = outpipes.lower_bound (next_peer_id); - if (!outpipes.empty ()) { - while (true) { - if (it == outpipes.end ()) - it = outpipes.begin (); - if (it->first != next_peer_id) - break; - ++next_peer_id; - ++it; - } - } + // Generate a new unique peer identity. + unsigned char buf [5]; + buf [0] = 0; + put_uint32 (buf + 1, next_peer_id); + blob_t identity (buf, 5); + ++next_peer_id; // Add the pipe to the map out outbound pipes. outpipe_t outpipe = {pipe_, true}; bool ok = outpipes.insert (outpipes_t::value_type ( - next_peer_id, outpipe)).second; + identity, outpipe)).second; zmq_assert (ok); // Add the pipe to the list of inbound pipes. - pipe_->set_pipe_id (next_peer_id); - fq.attach (pipe_); - - // Advance next peer ID so that if new connection is dropped shortly after - // its creation we don't accidentally get two subsequent peers with - // the same ID. - ++next_peer_id; + pipe_->set_identity (identity); + fq.attach (pipe_); } void zmq::xrep_t::xterminated (pipe_t *pipe_) @@ -133,26 +125,25 @@ int zmq::xrep_t::xsend (msg_t *msg_, int flags_) more_out = true; - // Find the pipe associated with the peer ID stored in the prefix. + // Find the pipe associated with the identity stored in the prefix. // If there's no such pipe just silently ignore the message. - if (msg_->size () == 4) { - uint32_t peer_id = get_uint32 ((unsigned char*) msg_->data ()); - outpipes_t::iterator it = outpipes.find (peer_id); + blob_t identity ((unsigned char*) msg_->data (), msg_->size ()); + outpipes_t::iterator it = outpipes.find (identity); - if (it != outpipes.end ()) { - current_out = it->second.pipe; - msg_t empty; - int rc = empty.init (); - errno_assert (rc == 0); - if (!current_out->check_write (&empty)) { - it->second.active = false; - more_out = false; - current_out = NULL; - } - rc = empty.close (); - errno_assert (rc == 0); + if (it != outpipes.end ()) { + current_out = it->second.pipe; + msg_t empty; + int rc = empty.init (); + errno_assert (rc == 0); + if (!current_out->check_write (&empty)) { + it->second.active = false; + more_out = false; + current_out = NULL; } + rc = empty.close (); + errno_assert (rc == 0); } + } int rc = msg_->close (); @@ -204,6 +195,37 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_) if (rc != 0) return -1; + // If identity is received, change the key assigned to the pipe. + if (unlikely (msg_->flags () & msg_t::identity)) { + zmq_assert (!more_in); + + // Empty identity means we can preserve the auto-generated identity. + if (msg_->size () != 0) { + + // Actual change of the identity. + outpipes_t::iterator it = outpipes.begin (); + while (it != outpipes.end ()) { + if (it->second.pipe == pipe) { + blob_t identity ((unsigned char*) msg_->data (), + msg_->size ()); + pipe->set_identity (identity); + outpipes.erase (it); + outpipe_t outpipe = {pipe, true}; + outpipes.insert (outpipes_t::value_type (identity, + outpipe)); + break; + } + ++it; + } + zmq_assert (it != outpipes.end ()); + } + + // After processing the identity, try to get the next message. + rc = fq.recvpipe (msg_, flags_, &pipe); + if (rc != 0) + return -1; + } + // If we are in the middle of reading a message, just return the next part. if (more_in) { more_in = msg_->flags () & msg_t::more ? true : false; @@ -217,9 +239,11 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_) prefetched = true; rc = msg_->close (); errno_assert (rc == 0); - rc = msg_->init_size (4); + + blob_t identity = pipe->get_identity (); + rc = msg_->init_size (identity.size ()); errno_assert (rc == 0); - put_uint32 ((unsigned char*) msg_->data (), pipe->get_pipe_id ()); + memcpy (msg_->data (), identity.data (), identity.size ()); msg_->set_flags (msg_t::more); return 0; } diff --git a/src/xrep.hpp b/src/xrep.hpp index 8cec683d..fc02b11e 100644 --- a/src/xrep.hpp +++ b/src/xrep.hpp @@ -1,6 +1,7 @@ /* Copyright (c) 2009-2011 250bpm s.r.o. Copyright (c) 2011 iMatix Corporation + Copyright (c) 2011 VMware, Inc. Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. @@ -27,6 +28,7 @@ #include "socket_base.hpp" #include "session_base.hpp" #include "stdint.hpp" +#include "blob.hpp" #include "msg.hpp" #include "fq.hpp" @@ -78,7 +80,7 @@ namespace zmq }; // Outbound pipes indexed by the peer IDs. - typedef std::map outpipes_t; + typedef std::map outpipes_t; outpipes_t outpipes; // The pipe we are currently writing to. diff --git a/src/xreq.cpp b/src/xreq.cpp index f4f962f4..91317f79 100644 --- a/src/xreq.cpp +++ b/src/xreq.cpp @@ -1,5 +1,6 @@ /* Copyright (c) 2009-2011 250bpm s.r.o. + Copyright (c) 2011 VMware, Inc. Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. @@ -32,6 +33,9 @@ zmq::xreq_t::xreq_t (class ctx_t *parent_, uint32_t tid_) : // If the socket is closing we can drop all the outbound requests. There'll // be noone to receive the replies anyway. // options.delay_on_close = false; + + options.send_identity = true; + options.recv_identity = true; } zmq::xreq_t::~xreq_t () @@ -52,7 +56,15 @@ int zmq::xreq_t::xsend (msg_t *msg_, int flags_) int zmq::xreq_t::xrecv (msg_t *msg_, int flags_) { - return fq.recv (msg_, flags_); + // XREQ socket doesn't use identities. We can safely drop it and + while (true) { + int rc = fq.recv (msg_, flags_); + if (rc != 0) + return rc; + if (likely (!(msg_->flags () & msg_t::identity))) + break; + } + return 0; } bool zmq::xreq_t::xhas_in () diff --git a/tests/test_invalid_rep.cpp b/tests/test_invalid_rep.cpp index ada7ee11..9c77cc4b 100644 --- a/tests/test_invalid_rep.cpp +++ b/tests/test_invalid_rep.cpp @@ -25,7 +25,7 @@ int main (int argc, char *argv []) { - fprintf (stderr, "test_pair_inproc running...\n"); + fprintf (stderr, "test_invalid_rep running...\n"); // Create REQ/XREP wiring. void *ctx = zmq_init (1); @@ -49,23 +49,24 @@ int main (int argc, char *argv []) assert (rc == 1); // Receive the request. - char addr [4]; + char addr [32]; + int addr_size; char bottom [1]; char body [1]; - rc = zmq_recv (xrep_socket, addr, sizeof (addr), 0); - assert (rc == 4); + addr_size = zmq_recv (xrep_socket, addr, sizeof (addr), 0); + assert (addr_size >= 0); rc = zmq_recv (xrep_socket, bottom, sizeof (bottom), 0); assert (rc == 0); rc = zmq_recv (xrep_socket, body, sizeof (body), 0); assert (rc == 1); // Send invalid reply. - rc = zmq_send (xrep_socket, addr, 4, 0); - assert (rc == 4); + rc = zmq_send (xrep_socket, addr, addr_size, 0); + assert (rc == addr_size); // Send valid reply. - rc = zmq_send (xrep_socket, addr, 4, ZMQ_SNDMORE); - assert (rc == 4); + rc = zmq_send (xrep_socket, addr, addr_size, ZMQ_SNDMORE); + assert (rc == addr_size); rc = zmq_send (xrep_socket, bottom, 0, ZMQ_SNDMORE); assert (rc == 0); rc = zmq_send (xrep_socket, "b", 1, 0);