From 51750a7d2a0ddceb1c51096cac7cd79a442bce42 Mon Sep 17 00:00:00 2001 From: shripchenko Date: Thu, 23 May 2013 01:49:40 -0700 Subject: [PATCH 1/3] refactored ZMQ_ROUTER_ANNOUNCE_SELF code. renamed it to ZMQ_PROBE_NEW_PEERS. implement it for DEALER tocket. +documentation --- doc/zmq_setsockopt.txt | 15 +++++++++++++++ include/zmq.h | 2 +- src/dealer.cpp | 41 ++++++++++++++++++++++++++++++++++++++++- src/dealer.hpp | 4 ++++ src/router.cpp | 35 ++++++++++++++++++++++------------- src/router.hpp | 8 ++++---- 6 files changed, 86 insertions(+), 19 deletions(-) diff --git a/doc/zmq_setsockopt.txt b/doc/zmq_setsockopt.txt index f62d8423..7f8fdc93 100644 --- a/doc/zmq_setsockopt.txt +++ b/doc/zmq_setsockopt.txt @@ -421,6 +421,21 @@ Default value:: 0 Applicable socket types:: ZMQ_ROUTER +ZMQ_PROBE_NEW_PEERS: automatically send empty packet to every established connection +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Sets the 'ROUTER' & 'DEALER' sockets behavior to automatically send an empty packet +to any new connection made (or accepted) by socket. It could help sockets to +auto discovery themself. It especially important in 'ROUTER' <-> 'ROUTER' connections +where it solves 'who will write first' problems. + +[horizontal] +Option value type:: int +Option value unit:: 0, 1 +Default value:: 0 +Applicable socket types:: ZMQ_ROUTER, ZMQ_DEALER + + ZMQ_XPUB_VERBOSE: provide all subscription messages on XPUB sockets ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ diff --git a/include/zmq.h b/include/zmq.h index 247114c9..dcd81de6 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -274,7 +274,7 @@ ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int option, int optval); #define ZMQ_CURVE_SERVER 47 #define ZMQ_CURVE_PUBLICKEY 48 #define ZMQ_CURVE_SERVERKEY 49 -#define ZMQ_ROUTER_ANNOUNCE_SELF 50 +#define ZMQ_PROBE_NEW_PEERS 50 /* Message options */ #define ZMQ_MORE 1 diff --git a/src/dealer.cpp b/src/dealer.cpp index 7c6c2dde..8da06960 100644 --- a/src/dealer.cpp +++ b/src/dealer.cpp @@ -22,7 +22,8 @@ #include "msg.hpp" zmq::dealer_t::dealer_t (class ctx_t *parent_, uint32_t tid_, int sid_) : - socket_base_t (parent_, tid_, sid_) + socket_base_t (parent_, tid_, sid_), + probe_new_peers(false) { options.type = ZMQ_DEALER; } @@ -37,10 +38,48 @@ void zmq::dealer_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_) (void) icanhasall_; zmq_assert (pipe_); + + if (probe_new_peers) { + int rc, ok; + msg_t probe_msg_; + + rc = probe_msg_.init (); + errno_assert (rc == 0); + + ok = pipe_->write (&probe_msg_); + zmq_assert (ok); + pipe_->flush (); + + rc = probe_msg_.close (); + errno_assert (rc == 0); + } + fq.attach (pipe_); lb.attach (pipe_); } +int zmq::dealer_t::xsetsockopt (int option_, const void *optval_, + size_t optvallen_) +{ + bool is_int = (optvallen_ == sizeof (int)); + int value = is_int? *((int *) optval_): 0; + + switch (option_) { + case ZMQ_PROBE_NEW_PEERS: + if (is_int && value >= 0) { + probe_new_peers = value; + return 0; + } + break; + + default: + break; + } + + errno = EINVAL; + return -1; +} + int zmq::dealer_t::xsend (msg_t *msg_) { return lb.send (msg_); diff --git a/src/dealer.hpp b/src/dealer.hpp index aa308117..e693cada 100644 --- a/src/dealer.hpp +++ b/src/dealer.hpp @@ -46,6 +46,7 @@ namespace zmq // Overloads of functions from socket_base_t. void xattach_pipe (zmq::pipe_t *pipe_, bool icanhasall_); + int xsetsockopt (int option_, const void *optval_, size_t optvallen_); int xsend (zmq::msg_t *msg_); int xrecv (zmq::msg_t *msg_); bool xhas_in (); @@ -61,6 +62,9 @@ namespace zmq fq_t fq; lb_t lb; + // if true, send an empty message to every connected peer + bool probe_new_peers; + dealer_t (const dealer_t&); const dealer_t &operator = (const dealer_t&); }; diff --git a/src/router.cpp b/src/router.cpp index d6eb0fb9..4ce2476e 100644 --- a/src/router.cpp +++ b/src/router.cpp @@ -34,7 +34,7 @@ zmq::router_t::router_t (class ctx_t *parent_, uint32_t tid_, int sid_) : next_peer_id (generate_random ()), mandatory(false), raw_sock(false), - announce_self(false) + probe_new_peers(false) { options.type = ZMQ_ROUTER; options.recv_identity = true; @@ -85,7 +85,7 @@ int zmq::router_t::xsetsockopt (int option_, const void *optval_, // DEBUGGING PROBLEM WITH TRAVIS CI printf ("E: invalid option value (int=%d value=%d)\n", is_int, value); break; - + case ZMQ_ROUTER_MANDATORY: if (is_int && value >= 0) { mandatory = value; @@ -94,14 +94,14 @@ int zmq::router_t::xsetsockopt (int option_, const void *optval_, // DEBUGGING PROBLEM WITH TRAVIS CI printf ("E: invalid option value (int=%d value=%d)\n", is_int, value); break; - - case ZMQ_ROUTER_ANNOUNCE_SELF: + + case ZMQ_PROBE_NEW_PEERS: if (is_int && value >= 0) { - announce_self = value; + probe_new_peers = value; return 0; } break; - + default: break; } @@ -397,14 +397,23 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_) ok = outpipes.insert (outpipes_t::value_type (identity, outpipe)).second; zmq_assert (ok); - if (announce_self) { - msg_t tmp_; - tmp_.init (); - ok = pipe_->write (&tmp_); - zmq_assert (ok); + if (probe_new_peers) { + int rc; + msg_t probe_msg_; + + rc = probe_msg_.init (); + errno_assert (rc == 0); + + ok = pipe_->write (&probe_msg_); pipe_->flush (); - tmp_.close (); - }; + + rc = probe_msg_.close (); + errno_assert (rc == 0); + + // Ignore not probed peers + if (!ok) + return false; + } return true; } diff --git a/src/router.hpp b/src/router.hpp index 426858e9..3b5d1550 100644 --- a/src/router.hpp +++ b/src/router.hpp @@ -47,8 +47,8 @@ namespace zmq // Overloads of functions from socket_base_t. void xattach_pipe (zmq::pipe_t *pipe_, bool icanhasall_); int xsetsockopt (int option_, const void *optval_, size_t optvallen_); - int xsend (msg_t *msg_); - int xrecv (msg_t *msg_); + int xsend (zmq::msg_t *msg_); + int xrecv (zmq::msg_t *msg_); bool xhas_in (); bool xhas_out (); void xread_activated (zmq::pipe_t *pipe_); @@ -112,8 +112,8 @@ namespace zmq bool mandatory; bool raw_sock; - // if true, send an empty message to every connected peer to solve 'who will write first' race condition - bool announce_self; + // if true, send an empty message to every connected peer to solve 'who will write first?' auto discovery problem + bool probe_new_peers; router_t (const router_t&); const router_t &operator = (const router_t&); From aec989fb5dcdbd9043057c94035621dbac8b8e33 Mon Sep 17 00:00:00 2001 From: shripchenko Date: Thu, 23 May 2013 02:02:18 -0700 Subject: [PATCH 2/3] small refactoring --- src/dealer.cpp | 6 +++--- src/router.cpp | 7 ++----- 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/src/dealer.cpp b/src/dealer.cpp index 8da06960..8d86cc54 100644 --- a/src/dealer.cpp +++ b/src/dealer.cpp @@ -40,14 +40,14 @@ void zmq::dealer_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_) zmq_assert (pipe_); if (probe_new_peers) { - int rc, ok; + int rc; msg_t probe_msg_; rc = probe_msg_.init (); errno_assert (rc == 0); - ok = pipe_->write (&probe_msg_); - zmq_assert (ok); + rc = pipe_->write (&probe_msg_); + zmq_assert (rc); pipe_->flush (); rc = probe_msg_.close (); diff --git a/src/router.cpp b/src/router.cpp index 4ce2476e..654e4516 100644 --- a/src/router.cpp +++ b/src/router.cpp @@ -404,15 +404,12 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_) rc = probe_msg_.init (); errno_assert (rc == 0); - ok = pipe_->write (&probe_msg_); + rc = pipe_->write (&probe_msg_); + zmq_assert (rc); pipe_->flush (); rc = probe_msg_.close (); errno_assert (rc == 0); - - // Ignore not probed peers - if (!ok) - return false; } return true; From f805e4dd03d012e644b4153866c70e99490b0349 Mon Sep 17 00:00:00 2001 From: shripchenko Date: Fri, 24 May 2013 07:09:53 -0700 Subject: [PATCH 3/3] changed option name. +documentation changes --- doc/zmq_setsockopt.txt | 10 ++++++---- include/zmq.h | 2 +- src/dealer.cpp | 2 +- src/router.cpp | 2 +- 4 files changed, 9 insertions(+), 7 deletions(-) diff --git a/doc/zmq_setsockopt.txt b/doc/zmq_setsockopt.txt index 7f8fdc93..6fa0cc75 100644 --- a/doc/zmq_setsockopt.txt +++ b/doc/zmq_setsockopt.txt @@ -421,19 +421,21 @@ Default value:: 0 Applicable socket types:: ZMQ_ROUTER -ZMQ_PROBE_NEW_PEERS: automatically send empty packet to every established connection +ZMQ_PROBE: automatically send empty packet to every established connection ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -Sets the 'ROUTER' & 'DEALER' sockets behavior to automatically send an empty packet +Sets the compatible sockets behavior to automatically send an empty packet to any new connection made (or accepted) by socket. It could help sockets to -auto discovery themself. It especially important in 'ROUTER' <-> 'ROUTER' connections +auto discovery them-self. It especially important in 'ROUTER' <-> 'ROUTER' connections where it solves 'who will write first' problems. +NOTE: Don't set this options for sockets working with ZMQ_REP, ZMQ_REQ sockets. +It will interfere with their strict synchronous logic and framing. [horizontal] Option value type:: int Option value unit:: 0, 1 Default value:: 0 -Applicable socket types:: ZMQ_ROUTER, ZMQ_DEALER +Applicable socket types:: ZMQ_ROUTER, ZMQ_DEALER, ZMQ_REP, ZMQ_REQ ZMQ_XPUB_VERBOSE: provide all subscription messages on XPUB sockets diff --git a/include/zmq.h b/include/zmq.h index dcd81de6..6f499c2c 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -274,7 +274,7 @@ ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int option, int optval); #define ZMQ_CURVE_SERVER 47 #define ZMQ_CURVE_PUBLICKEY 48 #define ZMQ_CURVE_SERVERKEY 49 -#define ZMQ_PROBE_NEW_PEERS 50 +#define ZMQ_PROBE 50 /* Message options */ #define ZMQ_MORE 1 diff --git a/src/dealer.cpp b/src/dealer.cpp index 8d86cc54..33b0eb68 100644 --- a/src/dealer.cpp +++ b/src/dealer.cpp @@ -65,7 +65,7 @@ int zmq::dealer_t::xsetsockopt (int option_, const void *optval_, int value = is_int? *((int *) optval_): 0; switch (option_) { - case ZMQ_PROBE_NEW_PEERS: + case ZMQ_PROBE: if (is_int && value >= 0) { probe_new_peers = value; return 0; diff --git a/src/router.cpp b/src/router.cpp index 654e4516..f49e6b4b 100644 --- a/src/router.cpp +++ b/src/router.cpp @@ -95,7 +95,7 @@ int zmq::router_t::xsetsockopt (int option_, const void *optval_, printf ("E: invalid option value (int=%d value=%d)\n", is_int, value); break; - case ZMQ_PROBE_NEW_PEERS: + case ZMQ_PROBE: if (is_int && value >= 0) { probe_new_peers = value; return 0;