From 6ced7027a0861e22a20bfa0a872b55cade6323ec Mon Sep 17 00:00:00 2001 From: Pieter Hintjens Date: Fri, 23 Jan 2015 15:25:40 +0100 Subject: [PATCH] Problem: commit afb24b53 broke ZMQ_STREAM contract Symptom is that ZMQ_STREAM sockets in 4.1.0 and 4.1.1 generate zero sized messages on each new connection, unlike 4.0.x which did not do this. Person who made this commit also changed test cases so that contract breakage did not show. Same person was later banned for persistently poor form in CZMQ contributions. Solution: enable connect notifications on ZMQ_STREAM sockets using a new ZMQ_STREAM_NOTIFY setting. By default, socket does not deliver notifications, and behaves as in 4.0.x. Fixes #1316 --- doc/zmq_setsockopt.txt | 13 +++++++++++++ include/zmq.h | 1 + src/options.cpp | 3 ++- src/options.hpp | 3 ++- src/router.cpp | 20 ++++++++++---------- src/router.hpp | 2 +- src/session_base.cpp | 5 +++-- src/stream.cpp | 12 +++++++++++- src/stream_engine.cpp | 22 ++++++++++++---------- tests/test_connect_rid.cpp | 10 ++-------- tests/test_stream.cpp | 8 ++++++++ tests/test_stream_disconnect.cpp | 9 +++++++-- tests/testutil.hpp | 2 +- 13 files changed, 73 insertions(+), 37 deletions(-) diff --git a/doc/zmq_setsockopt.txt b/doc/zmq_setsockopt.txt index 1441fad5..cca37c49 100644 --- a/doc/zmq_setsockopt.txt +++ b/doc/zmq_setsockopt.txt @@ -670,6 +670,19 @@ Default value:: -1 (infinite) Applicable socket types:: all +ZMQ_STREAM_NOTIFY: send connect notifications +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +Enables connect notifications on a STREAM socket, when set to 1. By default a +STREAM socket does not notify new connections. When notifications are enabled, +it delivers a zero-length message to signal new client connections. + +[horizontal] +Option value type:: int +Option value unit:: 0, 1 +Default value:: 0 +Applicable socket types:: ZMQ_STREAM + + ZMQ_SUBSCRIBE: Establish message filter ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ The 'ZMQ_SUBSCRIBE' option shall establish a new message filter on a 'ZMQ_SUB' diff --git a/include/zmq.h b/include/zmq.h index 9687d328..83b42ccb 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -297,6 +297,7 @@ ZMQ_EXPORT const char *zmq_msg_gets (zmq_msg_t *msg, const char *property); #define ZMQ_BLOCKY 70 #define ZMQ_XPUB_MANUAL 71 #define ZMQ_XPUB_WELCOME_MSG 72 +#define ZMQ_STREAM_NOTIFY 73 /* Message options */ #define ZMQ_MORE 1 diff --git a/src/options.cpp b/src/options.cpp index ae58f0a5..2d6e95b0 100644 --- a/src/options.cpp +++ b/src/options.cpp @@ -46,7 +46,8 @@ zmq::options_t::options_t () : immediate (0), filter (false), recv_identity (false), - raw_sock (false), + raw_socket (false), + raw_notify (false), tcp_keepalive (-1), tcp_keepalive_cnt (-1), tcp_keepalive_idle (-1), diff --git a/src/options.hpp b/src/options.hpp index 144eaeaf..c6475745 100644 --- a/src/options.hpp +++ b/src/options.hpp @@ -112,7 +112,8 @@ namespace zmq bool recv_identity; // if true, router socket accepts non-zmq tcp connections - bool raw_sock; + bool raw_socket; + bool raw_notify; // Provide connect notifications // Addres of SOCKS proxy std::string socks_proxy_address; diff --git a/src/router.cpp b/src/router.cpp index 7b061be1..fe67433e 100644 --- a/src/router.cpp +++ b/src/router.cpp @@ -33,14 +33,14 @@ zmq::router_t::router_t (class ctx_t *parent_, uint32_t tid_, int sid_) : more_out (false), next_rid (generate_random ()), mandatory (false), - // raw_sock functionality in ROUTER is deprecated - raw_sock (false), + // raw_socket functionality in ROUTER is deprecated + raw_socket (false), probe_router (false), handover (false) { options.type = ZMQ_ROUTER; options.recv_identity = true; - options.raw_sock = false; + options.raw_socket = false; prefetched_id.init (); prefetched_msg.init (); @@ -96,10 +96,10 @@ int zmq::router_t::xsetsockopt (int option_, const void *optval_, break; case ZMQ_ROUTER_RAW: if (is_int && value >= 0) { - raw_sock = (value != 0); - if (raw_sock) { + raw_socket = (value != 0); + if (raw_socket) { options.recv_identity = false; - options.raw_sock = true; + options.raw_socket = true; } return 0; } @@ -223,7 +223,7 @@ int zmq::router_t::xsend (msg_t *msg_) } // Ignore the MORE flag for raw-sock or assert? - if (options.raw_sock) + if (options.raw_socket) msg_->reset_flags (msg_t::more); // Check whether this is the last part of the message. @@ -235,7 +235,7 @@ int zmq::router_t::xsend (msg_t *msg_) // Close the remote connection if user has asked to do so // by sending zero length message. // Pending messages in the pipe will be dropped (on receiving term- ack) - if (raw_sock && msg_->size() == 0) { + if (raw_socket && msg_->size() == 0) { current_out->terminate (false); int rc = msg_->close (); errno_assert (rc == 0); @@ -397,14 +397,14 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_) zmq_assert(false); // Not allowed to duplicate an existing rid } else - if (options.raw_sock) { // Always assign identity for raw-socket + if (options.raw_socket) { // Always assign identity for raw-socket unsigned char buf [5]; buf [0] = 0; put_uint32 (buf + 1, next_rid++); identity = blob_t (buf, sizeof buf); } else - if (!options.raw_sock) { + if (!options.raw_socket) { // Pick up handshake cases and also case where next identity is set msg.init (); ok = pipe_->read (&msg); diff --git a/src/router.hpp b/src/router.hpp index 9a76c8d5..30c95fe8 100644 --- a/src/router.hpp +++ b/src/router.hpp @@ -111,7 +111,7 @@ namespace zmq // If true, report EAGAIN to the caller instead of silently dropping // the message targeting an unknown peer. bool mandatory; - bool raw_sock; + bool raw_socket; // if true, send an empty message to every connected router peer bool probe_router; diff --git a/src/session_base.cpp b/src/session_base.cpp index f8ba0629..5ddc0b83 100644 --- a/src/session_base.cpp +++ b/src/session_base.cpp @@ -213,14 +213,15 @@ void zmq::session_base_t::pipe_terminated (pipe_t *pipe_) cancel_timer (linger_timer_id); has_linger_timer = false; } - } else + } + else if (pipe_ == zap_pipe) zap_pipe = NULL; else // Remove the pipe from the detached pipes set terminating_pipes.erase (pipe_); - if (!is_terminating () && options.raw_sock) { + if (!is_terminating () && options.raw_socket) { if (engine) { engine->terminate (); engine = NULL; diff --git a/src/stream.cpp b/src/stream.cpp index 1062e5f1..11180b5c 100644 --- a/src/stream.cpp +++ b/src/stream.cpp @@ -33,7 +33,7 @@ zmq::stream_t::stream_t (class ctx_t *parent_, uint32_t tid_, int sid_) : next_rid (generate_random ()) { options.type = ZMQ_STREAM; - options.raw_sock = true; + options.raw_socket = true; prefetched_id.init (); prefetched_msg.init (); @@ -167,6 +167,8 @@ int zmq::stream_t::xsend (msg_t *msg_) int zmq::stream_t::xsetsockopt (int option_, const void *optval_, size_t optvallen_) { + bool is_int = (optvallen_ == sizeof (int)); + int value = is_int? *((int *) optval_): 0; switch (option_) { case ZMQ_CONNECT_RID: if (optval_ && optvallen_) { @@ -174,6 +176,14 @@ int zmq::stream_t::xsetsockopt (int option_, const void *optval_, return 0; } break; + + case ZMQ_STREAM_NOTIFY: + if (is_int && (value == 0 || value == 1)) { + options.raw_notify = value; + return 0; + } + break; + default: break; } diff --git a/src/stream_engine.cpp b/src/stream_engine.cpp index 4bcf0475..3b134e25 100644 --- a/src/stream_engine.cpp +++ b/src/stream_engine.cpp @@ -180,7 +180,7 @@ void zmq::stream_engine_t::plug (io_thread_t *io_thread_, handle = add_fd (s); io_error = false; - if (options.raw_sock) { + if (options.raw_socket) { // no handshaking for raw sock, instantiate raw encoder and decoders encoder = new (std::nothrow) raw_encoder_t (out_batch_size); alloc_assert (encoder); @@ -194,13 +194,15 @@ void zmq::stream_engine_t::plug (io_thread_t *io_thread_, next_msg = &stream_engine_t::pull_msg_from_session; process_msg = &stream_engine_t::push_msg_to_session; - // For raw sockets, send an initial 0-length message to the - // application so that it knows a peer has connected. - msg_t connector; - connector.init(); - push_msg_to_session (&connector); - connector.close(); - session->flush (); + if (options.raw_notify) { + // For raw sockets, send an initial 0-length message to the + // application so that it knows a peer has connected. + msg_t connector; + connector.init(); + push_msg_to_session (&connector); + connector.close(); + session->flush (); + } } else { // start optional timer, to prevent handshake hanging on no input @@ -914,7 +916,7 @@ int zmq::stream_engine_t::write_subscription_msg (msg_t *msg_) void zmq::stream_engine_t::error (error_reason_t reason) { - if (options.raw_sock) { + if (options.raw_socket) { // For raw sockets, send a final 0-length message to the application // so that it knows the peer has been disconnected. msg_t terminator; @@ -934,7 +936,7 @@ void zmq::stream_engine_t::set_handshake_timer () { zmq_assert (!has_handshake_timer); - if (!options.raw_sock && options.handshake_ivl > 0) { + if (!options.raw_socket && options.handshake_ivl > 0) { add_timer (options.handshake_ivl, handshake_timer_id); has_handshake_timer = true; } diff --git a/tests/test_connect_rid.cpp b/tests/test_connect_rid.cpp index 266428be..a83ed1e4 100644 --- a/tests/test_connect_rid.cpp +++ b/tests/test_connect_rid.cpp @@ -63,19 +63,13 @@ void test_stream_2_stream(){ // Accept data on the bound stream. ret = zmq_recv (rbind, buff, 256, 0); - assert (ret && 0 == buff[0]); - assert (0 == buff[0]); - ret = zmq_recv (rbind, buff, 256, 0); - assert (0 == ret); - - // Handle close of the socket. - ret = zmq_recv (rbind, buff, 256, 0); assert (ret); assert (0 == buff[0]); ret = zmq_recv (rbind, buff+128, 128, 0); - assert (5 == ret); + assert (5 == ret); assert ('h' == buff[128]); + // Handle close of the socket. ret = zmq_unbind (rbind, bindip); assert(0 == ret); ret = zmq_close (rbind); diff --git a/tests/test_stream.cpp b/tests/test_stream.cpp index a0a42eba..20a03ca5 100644 --- a/tests/test_stream.cpp +++ b/tests/test_stream.cpp @@ -54,6 +54,9 @@ test_stream_to_dealer (void) int zero = 0; rc = zmq_setsockopt (stream, ZMQ_LINGER, &zero, sizeof (zero)); assert (rc == 0); + int enabled = 1; + rc = zmq_setsockopt (stream, ZMQ_STREAM_NOTIFY, &enabled, sizeof (enabled)); + assert (rc == 0); rc = zmq_bind (stream, "tcp://127.0.0.1:5556"); assert (rc == 0); @@ -182,11 +185,16 @@ test_stream_to_stream (void) void *server = zmq_socket (ctx, ZMQ_STREAM); assert (server); + int enabled = 1; + rc = zmq_setsockopt (server, ZMQ_STREAM_NOTIFY, &enabled, sizeof (enabled)); + assert (rc == 0); rc = zmq_bind (server, "tcp://127.0.0.1:9070"); assert (rc == 0); void *client = zmq_socket (ctx, ZMQ_STREAM); assert (client); + rc = zmq_setsockopt (client, ZMQ_STREAM_NOTIFY, &enabled, sizeof (enabled)); + assert (rc == 0); rc = zmq_connect (client, "tcp://localhost:9070"); assert (rc == 0); uint8_t id [256]; diff --git a/tests/test_stream_disconnect.cpp b/tests/test_stream_disconnect.cpp index 1233c859..7c221c46 100644 --- a/tests/test_stream_disconnect.cpp +++ b/tests/test_stream_disconnect.cpp @@ -55,15 +55,20 @@ int main(int, char**) { setup_test_environment(); - void* context = zmq_ctx_new (); - void* sockets [2]; + void *context = zmq_ctx_new (); + void *sockets [2]; int rc = 0; sockets [SERVER] = zmq_socket (context, ZMQ_STREAM); + int enabled = 1; + rc = zmq_setsockopt (sockets [SERVER], ZMQ_STREAM_NOTIFY, &enabled, sizeof (enabled)); + assert (rc == 0); rc = zmq_bind (sockets [SERVER], "tcp://0.0.0.0:6666"); assert (rc == 0); sockets [CLIENT] = zmq_socket (context, ZMQ_STREAM); + rc = zmq_setsockopt (sockets [CLIENT], ZMQ_STREAM_NOTIFY, &enabled, sizeof (enabled)); + assert (rc == 0); rc = zmq_connect (sockets [CLIENT], "tcp://localhost:6666"); assert (rc == 0); diff --git a/tests/testutil.hpp b/tests/testutil.hpp index 1b7a7728..082e711c 100644 --- a/tests/testutil.hpp +++ b/tests/testutil.hpp @@ -22,7 +22,7 @@ #include "../include/zmq.h" #include "../src/stdint.hpp" -#include "platform.hpp" +#include "../src/platform.hpp" // This defines the settle time used in tests; raise this if we // get test failures on slower systems due to binds/connects not