From 97324398a7d647c3a2cdba7597db2b8057c60c30 Mon Sep 17 00:00:00 2001 From: shripchenko Date: Thu, 23 May 2013 01:49:40 -0700 Subject: [PATCH 1/5] refactored ZMQ_ROUTER_ANNOUNCE_SELF code. renamed it to ZMQ_PROBE_NEW_PEERS. implement it for DEALER tocket. +documentation --- doc/zmq_setsockopt.txt | 10 ++++------ src/dealer.cpp | 6 +++--- src/router.cpp | 7 +++++-- 3 files changed, 12 insertions(+), 11 deletions(-) diff --git a/doc/zmq_setsockopt.txt b/doc/zmq_setsockopt.txt index 6fa0cc75..7f8fdc93 100644 --- a/doc/zmq_setsockopt.txt +++ b/doc/zmq_setsockopt.txt @@ -421,21 +421,19 @@ Default value:: 0 Applicable socket types:: ZMQ_ROUTER -ZMQ_PROBE: automatically send empty packet to every established connection +ZMQ_PROBE_NEW_PEERS: automatically send empty packet to every established connection ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -Sets the compatible sockets behavior to automatically send an empty packet +Sets the 'ROUTER' & 'DEALER' sockets behavior to automatically send an empty packet to any new connection made (or accepted) by socket. It could help sockets to -auto discovery them-self. It especially important in 'ROUTER' <-> 'ROUTER' connections +auto discovery themself. It especially important in 'ROUTER' <-> 'ROUTER' connections where it solves 'who will write first' problems. -NOTE: Don't set this options for sockets working with ZMQ_REP, ZMQ_REQ sockets. -It will interfere with their strict synchronous logic and framing. [horizontal] Option value type:: int Option value unit:: 0, 1 Default value:: 0 -Applicable socket types:: ZMQ_ROUTER, ZMQ_DEALER, ZMQ_REP, ZMQ_REQ +Applicable socket types:: ZMQ_ROUTER, ZMQ_DEALER ZMQ_XPUB_VERBOSE: provide all subscription messages on XPUB sockets diff --git a/src/dealer.cpp b/src/dealer.cpp index 33b0eb68..8fb2f63a 100644 --- a/src/dealer.cpp +++ b/src/dealer.cpp @@ -40,14 +40,14 @@ void zmq::dealer_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_) zmq_assert (pipe_); if (probe_new_peers) { - int rc; + int rc, ok; msg_t probe_msg_; rc = probe_msg_.init (); errno_assert (rc == 0); - rc = pipe_->write (&probe_msg_); - zmq_assert (rc); + ok = pipe_->write (&probe_msg_); + zmq_assert (ok); pipe_->flush (); rc = probe_msg_.close (); diff --git a/src/router.cpp b/src/router.cpp index 49009c69..7357310d 100644 --- a/src/router.cpp +++ b/src/router.cpp @@ -398,12 +398,15 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_) rc = probe_msg_.init (); errno_assert (rc == 0); - rc = pipe_->write (&probe_msg_); - zmq_assert (rc); + ok = pipe_->write (&probe_msg_); pipe_->flush (); rc = probe_msg_.close (); errno_assert (rc == 0); + + // Ignore not probed peers + if (!ok) + return false; } return true; From 9c980e17dd6f83bc01f9b4442fe90d53e9562850 Mon Sep 17 00:00:00 2001 From: shripchenko Date: Fri, 24 May 2013 07:09:53 -0700 Subject: [PATCH 2/5] changed option name. +documentation changes --- doc/zmq_setsockopt.txt | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/doc/zmq_setsockopt.txt b/doc/zmq_setsockopt.txt index 7f8fdc93..6fa0cc75 100644 --- a/doc/zmq_setsockopt.txt +++ b/doc/zmq_setsockopt.txt @@ -421,19 +421,21 @@ Default value:: 0 Applicable socket types:: ZMQ_ROUTER -ZMQ_PROBE_NEW_PEERS: automatically send empty packet to every established connection +ZMQ_PROBE: automatically send empty packet to every established connection ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -Sets the 'ROUTER' & 'DEALER' sockets behavior to automatically send an empty packet +Sets the compatible sockets behavior to automatically send an empty packet to any new connection made (or accepted) by socket. It could help sockets to -auto discovery themself. It especially important in 'ROUTER' <-> 'ROUTER' connections +auto discovery them-self. It especially important in 'ROUTER' <-> 'ROUTER' connections where it solves 'who will write first' problems. +NOTE: Don't set this options for sockets working with ZMQ_REP, ZMQ_REQ sockets. +It will interfere with their strict synchronous logic and framing. [horizontal] Option value type:: int Option value unit:: 0, 1 Default value:: 0 -Applicable socket types:: ZMQ_ROUTER, ZMQ_DEALER +Applicable socket types:: ZMQ_ROUTER, ZMQ_DEALER, ZMQ_REP, ZMQ_REQ ZMQ_XPUB_VERBOSE: provide all subscription messages on XPUB sockets From dbd58f8e151681dfe210af93be3e2b72c10dc40f Mon Sep 17 00:00:00 2001 From: Pieter Hintjens Date: Wed, 5 Jun 2013 12:42:25 +0200 Subject: [PATCH 3/5] Fixed out-of-date reference --- doc/zmq_socket.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doc/zmq_socket.txt b/doc/zmq_socket.txt index 6c8712b7..708d9777 100644 --- a/doc/zmq_socket.txt +++ b/doc/zmq_socket.txt @@ -147,7 +147,7 @@ message before passing it to the application. Messages received are fair-queued from among all connected peers. When sending messages a 'ZMQ_ROUTER' socket shall remove the first part of the message and use it to determine the _identity_ of the peer the message shall be routed to. If the peer does not exist anymore -the message shall be silently discarded by default, unless 'ZMQ_ROUTER_BEHAVIOR' +the message shall be silently discarded by default, unless 'ZMQ_ROUTER_MANDATORY' socket option is set to '1'. When a 'ZMQ_ROUTER' socket enters the 'mute' state due to having reached the From 2344131db3800e95a05e3ed11ca0e31aed468166 Mon Sep 17 00:00:00 2001 From: Pieter Hintjens Date: Wed, 5 Jun 2013 15:25:52 +0200 Subject: [PATCH 4/5] Packaging of ZMQ_PROBE - Cleaned up man page a little - Wrote test case tests/test_router_probe.cpp --- .gitignore | 1 + doc/zmq_setsockopt.txt | 26 +++++++------ tests/Makefile.am | 2 + tests/test_router_probe.cpp | 75 +++++++++++++++++++++++++++++++++++++ 4 files changed, 92 insertions(+), 12 deletions(-) create mode 100644 tests/test_router_probe.cpp diff --git a/.gitignore b/.gitignore index 57dc9040..7eac8490 100644 --- a/.gitignore +++ b/.gitignore @@ -48,6 +48,7 @@ tests/test_disconnect_inproc tests/test_ctx_options tests/test_iov tests/test_security +tests/test_router_probe src/platform.hpp* src/stamp-h1 perf/local_lat diff --git a/doc/zmq_setsockopt.txt b/doc/zmq_setsockopt.txt index 6fa0cc75..3417fd09 100644 --- a/doc/zmq_setsockopt.txt +++ b/doc/zmq_setsockopt.txt @@ -13,8 +13,8 @@ SYNOPSIS *int zmq_setsockopt (void '*socket', int 'option_name', const void '*option_value', size_t 'option_len');* Caution: All options, with the exception of ZMQ_SUBSCRIBE, ZMQ_UNSUBSCRIBE, -ZMQ_LINGER, ZMQ_ROUTER_MANDATORY and ZMQ_XPUB_VERBOSE only take effect for -subsequent socket bind/connects. +ZMQ_LINGER, ZMQ_ROUTER_MANDATORY, ZMQ_PROBE, and ZMQ_XPUB_VERBOSE only take +effect for subsequent socket bind/connects. DESCRIPTION ----------- @@ -392,7 +392,7 @@ Applicable socket types:: all, only for connection-oriented transports. ZMQ_ROUTER_MANDATORY: accept only routable messages on ROUTER sockets ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -Sets the 'ROUTER' socket behavior when an unroutable message is encountered. A +Sets the ROUTER socket behavior when an unroutable message is encountered. A value of `0` is the default and discards the message silently when it cannot be routed. A value of `1` returns an 'EHOSTUNREACH' error code if the message cannot be routed. @@ -407,7 +407,7 @@ Applicable socket types:: ZMQ_ROUTER ZMQ_ROUTER_RAW: switch ROUTER socket to raw mode ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -Sets the raw mode on the 'ROUTER', when set to 1. When the ROUTER socket is in +Sets the raw mode on the ROUTER, when set to 1. When the ROUTER socket is in raw mode, and when using the tcp:// transport, it will read and write TCP data without 0MQ framing. This lets 0MQ applications talk to non-0MQ applications. When using raw mode, you cannot set explicit identities, and the ZMQ_MSGMORE @@ -421,15 +421,17 @@ Default value:: 0 Applicable socket types:: ZMQ_ROUTER -ZMQ_PROBE: automatically send empty packet to every established connection -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +ZMQ_PROBE: bootstrap connections to ROUTER sockets +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -Sets the compatible sockets behavior to automatically send an empty packet -to any new connection made (or accepted) by socket. It could help sockets to -auto discovery them-self. It especially important in 'ROUTER' <-> 'ROUTER' connections -where it solves 'who will write first' problems. -NOTE: Don't set this options for sockets working with ZMQ_REP, ZMQ_REQ sockets. -It will interfere with their strict synchronous logic and framing. +When set to 1, the socket will automatically send an empty message when a +new connection is made or accepted. You may set this on REQ, DEALER, or +ROUTER sockets connected to a ROUTER socket. The application must filter +such empty messages. The ZMQ_PROBE option in effect provides the ROUTER +application with an event signaling the arrival of a new peer. + +NOTE: do not set this option on a socket that talks to any other socket +types: the results are undefined. [horizontal] Option value type:: int diff --git a/tests/Makefile.am b/tests/Makefile.am index 276609a8..249b4804 100644 --- a/tests/Makefile.am +++ b/tests/Makefile.am @@ -18,6 +18,7 @@ noinst_PROGRAMS = test_pair_inproc \ test_term_endpoint \ test_monitor \ test_router_mandatory \ + test_router_probe \ test_raw_sock \ test_disconnect_inproc \ test_ctx_options \ @@ -46,6 +47,7 @@ test_last_endpoint_SOURCES = test_last_endpoint.cpp test_term_endpoint_SOURCES = test_term_endpoint.cpp test_monitor_SOURCES = test_monitor.cpp test_router_mandatory_SOURCES = test_router_mandatory.cpp +test_router_probe_SOURCES = test_router_probe.cpp test_raw_sock_SOURCES = test_raw_sock.cpp test_disconnect_inproc_SOURCES = test_disconnect_inproc.cpp test_ctx_options_SOURCES = test_ctx_options.cpp diff --git a/tests/test_router_probe.cpp b/tests/test_router_probe.cpp new file mode 100644 index 00000000..38eb6fc7 --- /dev/null +++ b/tests/test_router_probe.cpp @@ -0,0 +1,75 @@ +/* + Copyright (c) 2007-2013 Contributors as noted in the AUTHORS file + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include "../include/zmq.h" +#include +#include +#undef NDEBUG +#include + +int main (void) +{ + void *ctx = zmq_ctx_new (); + assert (ctx); + + // Create server and bind to endpoint + void *server = zmq_socket (ctx, ZMQ_ROUTER); + assert (server); + int rc = zmq_bind (server, "tcp://*:5560"); + assert (rc == 0); + + // Create client and connect to server, doing a probe + void *client = zmq_socket (ctx, ZMQ_DEALER); + assert (client); + rc = zmq_setsockopt (client, ZMQ_IDENTITY, "X", 1); + assert (rc == 0); + int probe = 1; + rc = zmq_setsockopt (client, ZMQ_PROBE, &probe, sizeof (probe)); + assert (rc == 0); + rc = zmq_connect (client, "tcp://localhost:5560"); + assert (rc == 0); + + // We expect an identity=X + empty message from client + unsigned char buffer [255]; + rc = zmq_recv (server, buffer, 255, 0); + assert (rc == 1); + assert (buffer [0] == 'X'); + rc = zmq_recv (server, buffer, 255, 0); + assert (rc == 0); + + // Send a message to client now + rc = zmq_send (server, "X", 1, ZMQ_SNDMORE); + assert (rc == 1); + rc = zmq_send (server, "Hello", 5, 0); + assert (rc == 5); + + rc = zmq_recv (client, buffer, 255, 0); + assert (rc == 5); + + rc = zmq_close (server); + assert (rc == 0); + + rc = zmq_close (client); + assert (rc == 0); + + rc = zmq_ctx_term (ctx); + assert (rc == 0); + + return 0 ; +} From a9679da764a279b368603b9160c0f81a419c2160 Mon Sep 17 00:00:00 2001 From: Pieter Hintjens Date: Wed, 5 Jun 2013 15:55:15 +0200 Subject: [PATCH 5/5] Packaging on ZMQ_PROBE_ROUTER - renamed to ZMQ_PROBE_ROUTER --- .gitignore | 2 +- doc/zmq_setsockopt.txt | 14 +++++++------- include/zmq.h | 2 +- src/dealer.cpp | 14 ++++++-------- src/dealer.hpp | 4 ++-- src/router.cpp | 16 +++++++--------- src/router.hpp | 4 ++-- tests/Makefile.am | 4 ++-- ...st_router_probe.cpp => test_probe_router.cpp} | 4 +++- 9 files changed, 31 insertions(+), 33 deletions(-) rename tests/{test_router_probe.cpp => test_probe_router.cpp} (91%) diff --git a/.gitignore b/.gitignore index 7eac8490..ce586a13 100644 --- a/.gitignore +++ b/.gitignore @@ -48,7 +48,7 @@ tests/test_disconnect_inproc tests/test_ctx_options tests/test_iov tests/test_security -tests/test_router_probe +tests/test_probe_router src/platform.hpp* src/stamp-h1 perf/local_lat diff --git a/doc/zmq_setsockopt.txt b/doc/zmq_setsockopt.txt index 3417fd09..3eecec6e 100644 --- a/doc/zmq_setsockopt.txt +++ b/doc/zmq_setsockopt.txt @@ -13,8 +13,8 @@ SYNOPSIS *int zmq_setsockopt (void '*socket', int 'option_name', const void '*option_value', size_t 'option_len');* Caution: All options, with the exception of ZMQ_SUBSCRIBE, ZMQ_UNSUBSCRIBE, -ZMQ_LINGER, ZMQ_ROUTER_MANDATORY, ZMQ_PROBE, and ZMQ_XPUB_VERBOSE only take -effect for subsequent socket bind/connects. +ZMQ_LINGER, ZMQ_ROUTER_MANDATORY, ZMQ_PROBE_ROUTER, and ZMQ_XPUB_VERBOSE +only take effect for subsequent socket bind/connects. DESCRIPTION ----------- @@ -421,14 +421,14 @@ Default value:: 0 Applicable socket types:: ZMQ_ROUTER -ZMQ_PROBE: bootstrap connections to ROUTER sockets -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +ZMQ_PROBE_ROUTER: bootstrap connections to ROUTER sockets +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ When set to 1, the socket will automatically send an empty message when a new connection is made or accepted. You may set this on REQ, DEALER, or ROUTER sockets connected to a ROUTER socket. The application must filter -such empty messages. The ZMQ_PROBE option in effect provides the ROUTER -application with an event signaling the arrival of a new peer. +such empty messages. The ZMQ_PROBE_ROUTER option in effect provides the +ROUTER application with an event signaling the arrival of a new peer. NOTE: do not set this option on a socket that talks to any other socket types: the results are undefined. @@ -437,7 +437,7 @@ types: the results are undefined. Option value type:: int Option value unit:: 0, 1 Default value:: 0 -Applicable socket types:: ZMQ_ROUTER, ZMQ_DEALER, ZMQ_REP, ZMQ_REQ +Applicable socket types:: ZMQ_ROUTER, ZMQ_DEALER, ZMQ_REQ ZMQ_XPUB_VERBOSE: provide all subscription messages on XPUB sockets diff --git a/include/zmq.h b/include/zmq.h index 6f499c2c..475f86dd 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -274,7 +274,7 @@ ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int option, int optval); #define ZMQ_CURVE_SERVER 47 #define ZMQ_CURVE_PUBLICKEY 48 #define ZMQ_CURVE_SERVERKEY 49 -#define ZMQ_PROBE 50 +#define ZMQ_PROBE_ROUTER 50 /* Message options */ #define ZMQ_MORE 1 diff --git a/src/dealer.cpp b/src/dealer.cpp index 8fb2f63a..d1d5c94e 100644 --- a/src/dealer.cpp +++ b/src/dealer.cpp @@ -23,7 +23,7 @@ zmq::dealer_t::dealer_t (class ctx_t *parent_, uint32_t tid_, int sid_) : socket_base_t (parent_, tid_, sid_), - probe_new_peers(false) + probe_router (false) { options.type = ZMQ_DEALER; } @@ -39,14 +39,12 @@ void zmq::dealer_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_) zmq_assert (pipe_); - if (probe_new_peers) { - int rc, ok; + if (probe_router) { msg_t probe_msg_; - - rc = probe_msg_.init (); + int rc = probe_msg_.init (); errno_assert (rc == 0); - ok = pipe_->write (&probe_msg_); + int ok = pipe_->write (&probe_msg_); zmq_assert (ok); pipe_->flush (); @@ -65,9 +63,9 @@ int zmq::dealer_t::xsetsockopt (int option_, const void *optval_, int value = is_int? *((int *) optval_): 0; switch (option_) { - case ZMQ_PROBE: + case ZMQ_PROBE_ROUTER: if (is_int && value >= 0) { - probe_new_peers = value; + probe_router = value; return 0; } break; diff --git a/src/dealer.hpp b/src/dealer.hpp index e693cada..361a9568 100644 --- a/src/dealer.hpp +++ b/src/dealer.hpp @@ -62,8 +62,8 @@ namespace zmq fq_t fq; lb_t lb; - // if true, send an empty message to every connected peer - bool probe_new_peers; + // if true, send an empty message to every connected router peer + bool probe_router; dealer_t (const dealer_t&); const dealer_t &operator = (const dealer_t&); diff --git a/src/router.cpp b/src/router.cpp index 7357310d..8d2e89f6 100644 --- a/src/router.cpp +++ b/src/router.cpp @@ -32,9 +32,9 @@ zmq::router_t::router_t (class ctx_t *parent_, uint32_t tid_, int sid_) : current_out (NULL), more_out (false), next_peer_id (generate_random ()), - mandatory(false), - raw_sock(false), - probe_new_peers(false) + mandatory (false), + raw_sock (false), + probe_router (false) { options.type = ZMQ_ROUTER; options.recv_identity = true; @@ -91,9 +91,9 @@ int zmq::router_t::xsetsockopt (int option_, const void *optval_, } break; - case ZMQ_PROBE: + case ZMQ_PROBE_ROUTER: if (is_int && value >= 0) { - probe_new_peers = value; + probe_router = value; return 0; } break; @@ -391,11 +391,9 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_) ok = outpipes.insert (outpipes_t::value_type (identity, outpipe)).second; zmq_assert (ok); - if (probe_new_peers) { - int rc; + if (probe_router) { msg_t probe_msg_; - - rc = probe_msg_.init (); + int rc = probe_msg_.init (); errno_assert (rc == 0); ok = pipe_->write (&probe_msg_); diff --git a/src/router.hpp b/src/router.hpp index 3b5d1550..98717714 100644 --- a/src/router.hpp +++ b/src/router.hpp @@ -112,8 +112,8 @@ namespace zmq bool mandatory; bool raw_sock; - // if true, send an empty message to every connected peer to solve 'who will write first?' auto discovery problem - bool probe_new_peers; + // if true, send an empty message to every connected router peer + bool probe_router; router_t (const router_t&); const router_t &operator = (const router_t&); diff --git a/tests/Makefile.am b/tests/Makefile.am index 249b4804..8503677c 100644 --- a/tests/Makefile.am +++ b/tests/Makefile.am @@ -18,7 +18,7 @@ noinst_PROGRAMS = test_pair_inproc \ test_term_endpoint \ test_monitor \ test_router_mandatory \ - test_router_probe \ + test_probe_router \ test_raw_sock \ test_disconnect_inproc \ test_ctx_options \ @@ -47,7 +47,7 @@ test_last_endpoint_SOURCES = test_last_endpoint.cpp test_term_endpoint_SOURCES = test_term_endpoint.cpp test_monitor_SOURCES = test_monitor.cpp test_router_mandatory_SOURCES = test_router_mandatory.cpp -test_router_probe_SOURCES = test_router_probe.cpp +test_probe_router_SOURCES = test_probe_router.cpp test_raw_sock_SOURCES = test_raw_sock.cpp test_disconnect_inproc_SOURCES = test_disconnect_inproc.cpp test_ctx_options_SOURCES = test_ctx_options.cpp diff --git a/tests/test_router_probe.cpp b/tests/test_probe_router.cpp similarity index 91% rename from tests/test_router_probe.cpp rename to tests/test_probe_router.cpp index 38eb6fc7..28cf3650 100644 --- a/tests/test_router_probe.cpp +++ b/tests/test_probe_router.cpp @@ -36,11 +36,13 @@ int main (void) // Create client and connect to server, doing a probe void *client = zmq_socket (ctx, ZMQ_DEALER); + // Trying this results in the first recv waiting forever + // void *client = zmq_socket (ctx, ZMQ_ROUTER); assert (client); rc = zmq_setsockopt (client, ZMQ_IDENTITY, "X", 1); assert (rc == 0); int probe = 1; - rc = zmq_setsockopt (client, ZMQ_PROBE, &probe, sizeof (probe)); + rc = zmq_setsockopt (client, ZMQ_PROBE_ROUTER, &probe, sizeof (probe)); assert (rc == 0); rc = zmq_connect (client, "tcp://localhost:5560"); assert (rc == 0);