Multi-hop REQ/REP, part XII., generate unique identities for anonymous connections

This commit is contained in:
Martin Sustrik 2010-02-19 15:24:43 +01:00
parent b9caa319e2
commit 75f571c884
7 changed files with 44 additions and 32 deletions

View File

@ -60,9 +60,11 @@ If the socket has no identity, each run of the application is completely
separated from other runs. However, with identity application reconnects to
existing infrastructure left by the previous run. Thus it may receive
messages that were sent in the meantime, it shares pipe limits with the
previous run etc.
previous run etc. Identity should be at least one byte and at most 255 bytes
long. Identities starting with binary zero are reserver for use by 0MQ
infrastructure.
+
Type: string Unit: N/A Default: NULL
Type: BLOB Unit: N/A Default: NULL
*ZMQ_SUBSCRIBE*::
Applicable only to ZMQ_SUB socket type. It establishes new message filter.
@ -72,7 +74,7 @@ beginning with specific prefix (e.g. "animals.mammals.dogs."). Multiple
filters can be attached to a single 'sub' socket. In that case message passes
if it matches at least one of the filters.
+
Type: string Unit: N/A Default: N/A
Type: BLOB Unit: N/A Default: N/A
*ZMQ_UNSUBSCRIBE*::
Applicable only to ZMQ_SUB socket type. Removes existing message filter.
@ -81,7 +83,7 @@ exactly. If there were several instances of the same filter created,
this options removes only one of them, leaving the rest in place
and functional.
+
Type: string Unit: N/A Default: N/A
Type: BLOB Unit: N/A Default: N/A
*ZMQ_RATE*::
This option applies only to sending side of multicast transports (pgm & udp).

View File

@ -77,6 +77,15 @@ int zmq::options_t::setsockopt (int option_, const void *optval_,
return 0;
case ZMQ_IDENTITY:
// Empty identity is invalid as well as identity longer than
// 255 bytes. Identity starting with binary zero is invalid
// as these are used for auto-generated identities.
if (optvallen_ < 1 || optvallen_ > 255 ||
*((const unsigned char*) optval_) == 0) {
errno = EINVAL;
return -1;
}
identity.assign ((const unsigned char*) optval_, optvallen_);
return 0;

View File

@ -50,7 +50,7 @@ zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_,
peer_identity (peer_identity_),
options (options_)
{
if (!peer_identity.empty ()) {
if (!peer_identity.empty () && peer_identity [0] != 0) {
if (!owner->register_session (peer_identity, this)) {
// TODO: There's already a session with the specified
@ -103,7 +103,7 @@ void zmq::session_t::detach (owned_t *reconnecter_)
engine = NULL;
// Terminate transient session.
if (!ordinal && peer_identity.empty ())
if (!ordinal && (peer_identity.empty () || peer_identity [0] == 0))
term ();
}
@ -173,7 +173,7 @@ void zmq::session_t::process_unplug ()
// Unregister the session from the socket.
if (ordinal)
owner->unregister_session (ordinal);
else if (!peer_identity.empty ())
else if (!peer_identity.empty () && peer_identity [0] != 0)
owner->unregister_session (peer_identity);
// Ask associated pipes to terminate.

View File

@ -44,6 +44,9 @@ namespace zmq
uuid_t ();
~uuid_t ();
// The length of textual representation of UUID.
enum { uuid_string_len = 36 };
// Returns a pointer to buffer containing the textual
// representation of the UUID. The caller is reponsible to
// free the allocated memory.
@ -51,9 +54,6 @@ namespace zmq
private:
// The length of textual representation of UUID.
enum { uuid_string_len = 36 };
#if defined ZMQ_HAVE_WINDOWS
#ifdef ZMQ_HAVE_MINGW32
typedef unsigned char* RPC_CSTR;

View File

@ -89,7 +89,6 @@ bool zmq::zmq_encoder_t::message_ready ()
size -= prefix_size;
}
// For messages less than 255 bytes long, write one byte of message size.
// For longer messages write 0xff escape character followed by 8-byte
// message size.

View File

@ -17,6 +17,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <string.h>
#include <new>
#include "zmq_engine.hpp"
@ -161,7 +163,7 @@ void zmq::zmq_engine_t::revive ()
}
void zmq::zmq_engine_t::add_prefix (const blob_t &identity_)
{
{
decoder.add_prefix (identity_);
}

View File

@ -17,10 +17,13 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <string.h>
#include "zmq_init.hpp"
#include "zmq_engine.hpp"
#include "io_thread.hpp"
#include "session.hpp"
#include "uuid.hpp"
#include "err.hpp"
zmq::zmq_init_t::zmq_init_t (io_thread_t *parent_, socket_base_t *owner_,
@ -71,10 +74,19 @@ bool zmq::zmq_init_t::write (::zmq_msg_t *msg_)
if (received)
return false;
// Retreieve the remote identity.
peer_identity.assign ((const unsigned char*) zmq_msg_data (msg_),
zmq_msg_size (msg_));
engine->add_prefix (peer_identity);
// Retreieve the remote identity. If it's empty, generate a unique name.
if (!zmq_msg_size (msg_)) {
unsigned char identity [uuid_t::uuid_string_len + 1];
identity [0] = 0;
memcpy (identity + 1, uuid_t ().to_string (), uuid_t::uuid_string_len);
peer_identity.assign (identity, uuid_t::uuid_string_len + 1);
}
else {
peer_identity.assign ((const unsigned char*) zmq_msg_data (msg_),
zmq_msg_size (msg_));
}
if (options.traceroute)
engine->add_prefix (peer_identity);
received = true;
return true;
@ -155,10 +167,11 @@ void zmq::zmq_init_t::finalise ()
return;
}
}
else {
// If the peer has a unique name, find the associated session. If it
// doesn't exist, create it.
else if (!peer_identity.empty ()) {
// If the peer has a unique name, find the associated session.
// If it does not exist, create it.
zmq_assert (!peer_identity.empty ());
session = owner->find_session (peer_identity);
if (!session) {
session = new (std::nothrow) session_t (
@ -173,19 +186,6 @@ void zmq::zmq_init_t::finalise ()
}
}
// If the other party has no specific identity, let's create a
// transient session.
else {
session = new (std::nothrow) session_t (
choose_io_thread (options.affinity), owner, options, blob_t ());
zmq_assert (session);
send_plug (session);
send_own (owner, session);
// Reserve a sequence number for following 'attach' command.
session->inc_seqnum ();
}
// No need to increment seqnum as it was already incremented above.
send_attach (session, engine, peer_identity, false);