diff --git a/doc/zmq_setsockopt.txt b/doc/zmq_setsockopt.txt index f62d8423..6fa0cc75 100644 --- a/doc/zmq_setsockopt.txt +++ b/doc/zmq_setsockopt.txt @@ -421,6 +421,23 @@ Default value:: 0 Applicable socket types:: ZMQ_ROUTER +ZMQ_PROBE: automatically send empty packet to every established connection +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Sets the compatible sockets behavior to automatically send an empty packet +to any new connection made (or accepted) by socket. It could help sockets to +auto discovery them-self. It especially important in 'ROUTER' <-> 'ROUTER' connections +where it solves 'who will write first' problems. +NOTE: Don't set this options for sockets working with ZMQ_REP, ZMQ_REQ sockets. +It will interfere with their strict synchronous logic and framing. + +[horizontal] +Option value type:: int +Option value unit:: 0, 1 +Default value:: 0 +Applicable socket types:: ZMQ_ROUTER, ZMQ_DEALER, ZMQ_REP, ZMQ_REQ + + ZMQ_XPUB_VERBOSE: provide all subscription messages on XPUB sockets ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ diff --git a/include/zmq.h b/include/zmq.h index 247114c9..6f499c2c 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -274,7 +274,7 @@ ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int option, int optval); #define ZMQ_CURVE_SERVER 47 #define ZMQ_CURVE_PUBLICKEY 48 #define ZMQ_CURVE_SERVERKEY 49 -#define ZMQ_ROUTER_ANNOUNCE_SELF 50 +#define ZMQ_PROBE 50 /* Message options */ #define ZMQ_MORE 1 diff --git a/src/dealer.cpp b/src/dealer.cpp index 7c6c2dde..33b0eb68 100644 --- a/src/dealer.cpp +++ b/src/dealer.cpp @@ -22,7 +22,8 @@ #include "msg.hpp" zmq::dealer_t::dealer_t (class ctx_t *parent_, uint32_t tid_, int sid_) : - socket_base_t (parent_, tid_, sid_) + socket_base_t (parent_, tid_, sid_), + probe_new_peers(false) { options.type = ZMQ_DEALER; } @@ -37,10 +38,48 @@ void zmq::dealer_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_) (void) icanhasall_; zmq_assert (pipe_); + + if (probe_new_peers) { + int rc; + msg_t probe_msg_; + + rc = probe_msg_.init (); + errno_assert (rc == 0); + + rc = pipe_->write (&probe_msg_); + zmq_assert (rc); + pipe_->flush (); + + rc = probe_msg_.close (); + errno_assert (rc == 0); + } + fq.attach (pipe_); lb.attach (pipe_); } +int zmq::dealer_t::xsetsockopt (int option_, const void *optval_, + size_t optvallen_) +{ + bool is_int = (optvallen_ == sizeof (int)); + int value = is_int? *((int *) optval_): 0; + + switch (option_) { + case ZMQ_PROBE: + if (is_int && value >= 0) { + probe_new_peers = value; + return 0; + } + break; + + default: + break; + } + + errno = EINVAL; + return -1; +} + int zmq::dealer_t::xsend (msg_t *msg_) { return lb.send (msg_); diff --git a/src/dealer.hpp b/src/dealer.hpp index aa308117..e693cada 100644 --- a/src/dealer.hpp +++ b/src/dealer.hpp @@ -46,6 +46,7 @@ namespace zmq // Overloads of functions from socket_base_t. void xattach_pipe (zmq::pipe_t *pipe_, bool icanhasall_); + int xsetsockopt (int option_, const void *optval_, size_t optvallen_); int xsend (zmq::msg_t *msg_); int xrecv (zmq::msg_t *msg_); bool xhas_in (); @@ -61,6 +62,9 @@ namespace zmq fq_t fq; lb_t lb; + // if true, send an empty message to every connected peer + bool probe_new_peers; + dealer_t (const dealer_t&); const dealer_t &operator = (const dealer_t&); }; diff --git a/src/router.cpp b/src/router.cpp index 4d6cf6bf..49009c69 100644 --- a/src/router.cpp +++ b/src/router.cpp @@ -34,7 +34,7 @@ zmq::router_t::router_t (class ctx_t *parent_, uint32_t tid_, int sid_) : next_peer_id (generate_random ()), mandatory(false), raw_sock(false), - announce_self(false) + probe_new_peers(false) { options.type = ZMQ_ROUTER; options.recv_identity = true; @@ -83,21 +83,21 @@ int zmq::router_t::xsetsockopt (int option_, const void *optval_, return 0; } break; - + case ZMQ_ROUTER_MANDATORY: if (is_int && value >= 0) { mandatory = value; return 0; } break; - - case ZMQ_ROUTER_ANNOUNCE_SELF: + + case ZMQ_PROBE: if (is_int && value >= 0) { - announce_self = value; + probe_new_peers = value; return 0; } break; - + default: break; } @@ -391,14 +391,20 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_) ok = outpipes.insert (outpipes_t::value_type (identity, outpipe)).second; zmq_assert (ok); - if (announce_self) { - msg_t tmp_; - tmp_.init (); - ok = pipe_->write (&tmp_); - zmq_assert (ok); + if (probe_new_peers) { + int rc; + msg_t probe_msg_; + + rc = probe_msg_.init (); + errno_assert (rc == 0); + + rc = pipe_->write (&probe_msg_); + zmq_assert (rc); pipe_->flush (); - tmp_.close (); - }; + + rc = probe_msg_.close (); + errno_assert (rc == 0); + } return true; } diff --git a/src/router.hpp b/src/router.hpp index 426858e9..3b5d1550 100644 --- a/src/router.hpp +++ b/src/router.hpp @@ -47,8 +47,8 @@ namespace zmq // Overloads of functions from socket_base_t. void xattach_pipe (zmq::pipe_t *pipe_, bool icanhasall_); int xsetsockopt (int option_, const void *optval_, size_t optvallen_); - int xsend (msg_t *msg_); - int xrecv (msg_t *msg_); + int xsend (zmq::msg_t *msg_); + int xrecv (zmq::msg_t *msg_); bool xhas_in (); bool xhas_out (); void xread_activated (zmq::pipe_t *pipe_); @@ -112,8 +112,8 @@ namespace zmq bool mandatory; bool raw_sock; - // if true, send an empty message to every connected peer to solve 'who will write first' race condition - bool announce_self; + // if true, send an empty message to every connected peer to solve 'who will write first?' auto discovery problem + bool probe_new_peers; router_t (const router_t&); const router_t &operator = (const router_t&);