diff --git a/doc/zmq_setsockopt.txt b/doc/zmq_setsockopt.txt index 1441fad5..cca37c49 100644 --- a/doc/zmq_setsockopt.txt +++ b/doc/zmq_setsockopt.txt @@ -670,6 +670,19 @@ Default value:: -1 (infinite) Applicable socket types:: all +ZMQ_STREAM_NOTIFY: send connect notifications +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +Enables connect notifications on a STREAM socket, when set to 1. By default a +STREAM socket does not notify new connections. When notifications are enabled, +it delivers a zero-length message to signal new client connections. + +[horizontal] +Option value type:: int +Option value unit:: 0, 1 +Default value:: 0 +Applicable socket types:: ZMQ_STREAM + + ZMQ_SUBSCRIBE: Establish message filter ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ The 'ZMQ_SUBSCRIBE' option shall establish a new message filter on a 'ZMQ_SUB' diff --git a/include/zmq.h b/include/zmq.h index 9687d328..83b42ccb 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -297,6 +297,7 @@ ZMQ_EXPORT const char *zmq_msg_gets (zmq_msg_t *msg, const char *property); #define ZMQ_BLOCKY 70 #define ZMQ_XPUB_MANUAL 71 #define ZMQ_XPUB_WELCOME_MSG 72 +#define ZMQ_STREAM_NOTIFY 73 /* Message options */ #define ZMQ_MORE 1 diff --git a/src/options.cpp b/src/options.cpp index ae58f0a5..2d6e95b0 100644 --- a/src/options.cpp +++ b/src/options.cpp @@ -46,7 +46,8 @@ zmq::options_t::options_t () : immediate (0), filter (false), recv_identity (false), - raw_sock (false), + raw_socket (false), + raw_notify (false), tcp_keepalive (-1), tcp_keepalive_cnt (-1), tcp_keepalive_idle (-1), diff --git a/src/options.hpp b/src/options.hpp index 144eaeaf..c6475745 100644 --- a/src/options.hpp +++ b/src/options.hpp @@ -112,7 +112,8 @@ namespace zmq bool recv_identity; // if true, router socket accepts non-zmq tcp connections - bool raw_sock; + bool raw_socket; + bool raw_notify; // Provide connect notifications // Addres of SOCKS proxy std::string socks_proxy_address; diff --git a/src/router.cpp b/src/router.cpp index 6755bf38..ac8e124d 100644 --- a/src/router.cpp +++ b/src/router.cpp @@ -33,14 +33,14 @@ zmq::router_t::router_t (class ctx_t *parent_, uint32_t tid_, int sid_) : more_out (false), next_rid (generate_random ()), mandatory (false), - // raw_sock functionality in ROUTER is deprecated - raw_sock (false), + // raw_socket functionality in ROUTER is deprecated + raw_socket (false), probe_router (false), handover (false) { options.type = ZMQ_ROUTER; options.recv_identity = true; - options.raw_sock = false; + options.raw_socket = false; prefetched_id.init (); prefetched_msg.init (); @@ -96,10 +96,10 @@ int zmq::router_t::xsetsockopt (int option_, const void *optval_, break; case ZMQ_ROUTER_RAW: if (is_int && value >= 0) { - raw_sock = (value != 0); - if (raw_sock) { + raw_socket = (value != 0); + if (raw_socket) { options.recv_identity = false; - options.raw_sock = true; + options.raw_socket = true; } return 0; } @@ -223,7 +223,7 @@ int zmq::router_t::xsend (msg_t *msg_) } // Ignore the MORE flag for raw-sock or assert? - if (options.raw_sock) + if (options.raw_socket) msg_->reset_flags (msg_t::more); // Check whether this is the last part of the message. @@ -235,7 +235,7 @@ int zmq::router_t::xsend (msg_t *msg_) // Close the remote connection if user has asked to do so // by sending zero length message. // Pending messages in the pipe will be dropped (on receiving term- ack) - if (raw_sock && msg_->size() == 0) { + if (raw_socket && msg_->size() == 0) { current_out->terminate (false); int rc = msg_->close (); errno_assert (rc == 0); @@ -401,14 +401,14 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_) zmq_assert(false); // Not allowed to duplicate an existing rid } else - if (options.raw_sock) { // Always assign identity for raw-socket + if (options.raw_socket) { // Always assign identity for raw-socket unsigned char buf [5]; buf [0] = 0; put_uint32 (buf + 1, next_rid++); identity = blob_t (buf, sizeof buf); } else - if (!options.raw_sock) { + if (!options.raw_socket) { // Pick up handshake cases and also case where next identity is set msg.init (); ok = pipe_->read (&msg); diff --git a/src/router.hpp b/src/router.hpp index 9a76c8d5..30c95fe8 100644 --- a/src/router.hpp +++ b/src/router.hpp @@ -111,7 +111,7 @@ namespace zmq // If true, report EAGAIN to the caller instead of silently dropping // the message targeting an unknown peer. bool mandatory; - bool raw_sock; + bool raw_socket; // if true, send an empty message to every connected router peer bool probe_router; diff --git a/src/session_base.cpp b/src/session_base.cpp index f8ba0629..5ddc0b83 100644 --- a/src/session_base.cpp +++ b/src/session_base.cpp @@ -213,14 +213,15 @@ void zmq::session_base_t::pipe_terminated (pipe_t *pipe_) cancel_timer (linger_timer_id); has_linger_timer = false; } - } else + } + else if (pipe_ == zap_pipe) zap_pipe = NULL; else // Remove the pipe from the detached pipes set terminating_pipes.erase (pipe_); - if (!is_terminating () && options.raw_sock) { + if (!is_terminating () && options.raw_socket) { if (engine) { engine->terminate (); engine = NULL; diff --git a/src/stream.cpp b/src/stream.cpp index 1062e5f1..11180b5c 100644 --- a/src/stream.cpp +++ b/src/stream.cpp @@ -33,7 +33,7 @@ zmq::stream_t::stream_t (class ctx_t *parent_, uint32_t tid_, int sid_) : next_rid (generate_random ()) { options.type = ZMQ_STREAM; - options.raw_sock = true; + options.raw_socket = true; prefetched_id.init (); prefetched_msg.init (); @@ -167,6 +167,8 @@ int zmq::stream_t::xsend (msg_t *msg_) int zmq::stream_t::xsetsockopt (int option_, const void *optval_, size_t optvallen_) { + bool is_int = (optvallen_ == sizeof (int)); + int value = is_int? *((int *) optval_): 0; switch (option_) { case ZMQ_CONNECT_RID: if (optval_ && optvallen_) { @@ -174,6 +176,14 @@ int zmq::stream_t::xsetsockopt (int option_, const void *optval_, return 0; } break; + + case ZMQ_STREAM_NOTIFY: + if (is_int && (value == 0 || value == 1)) { + options.raw_notify = value; + return 0; + } + break; + default: break; } diff --git a/src/stream_engine.cpp b/src/stream_engine.cpp index ef44aed0..0960ad48 100644 --- a/src/stream_engine.cpp +++ b/src/stream_engine.cpp @@ -180,7 +180,7 @@ void zmq::stream_engine_t::plug (io_thread_t *io_thread_, handle = add_fd (s); io_error = false; - if (options.raw_sock) { + if (options.raw_socket) { // no handshaking for raw sock, instantiate raw encoder and decoders encoder = new (std::nothrow) raw_encoder_t (out_batch_size); alloc_assert (encoder); @@ -194,13 +194,15 @@ void zmq::stream_engine_t::plug (io_thread_t *io_thread_, next_msg = &stream_engine_t::pull_msg_from_session; process_msg = &stream_engine_t::push_msg_to_session; - // For raw sockets, send an initial 0-length message to the - // application so that it knows a peer has connected. - msg_t connector; - connector.init(); - push_msg_to_session (&connector); - connector.close(); - session->flush (); + if (options.raw_notify) { + // For raw sockets, send an initial 0-length message to the + // application so that it knows a peer has connected. + msg_t connector; + connector.init(); + push_msg_to_session (&connector); + connector.close(); + session->flush (); + } } else { // start optional timer, to prevent handshake hanging on no input @@ -911,7 +913,7 @@ int zmq::stream_engine_t::write_subscription_msg (msg_t *msg_) void zmq::stream_engine_t::error (error_reason_t reason) { - if (options.raw_sock) { + if (options.raw_socket) { // For raw sockets, send a final 0-length message to the application // so that it knows the peer has been disconnected. msg_t terminator; @@ -931,7 +933,7 @@ void zmq::stream_engine_t::set_handshake_timer () { zmq_assert (!has_handshake_timer); - if (!options.raw_sock && options.handshake_ivl > 0) { + if (!options.raw_socket && options.handshake_ivl > 0) { add_timer (options.handshake_ivl, handshake_timer_id); has_handshake_timer = true; } diff --git a/tests/test_connect_rid.cpp b/tests/test_connect_rid.cpp index 266428be..a83ed1e4 100644 --- a/tests/test_connect_rid.cpp +++ b/tests/test_connect_rid.cpp @@ -63,19 +63,13 @@ void test_stream_2_stream(){ // Accept data on the bound stream. ret = zmq_recv (rbind, buff, 256, 0); - assert (ret && 0 == buff[0]); - assert (0 == buff[0]); - ret = zmq_recv (rbind, buff, 256, 0); - assert (0 == ret); - - // Handle close of the socket. - ret = zmq_recv (rbind, buff, 256, 0); assert (ret); assert (0 == buff[0]); ret = zmq_recv (rbind, buff+128, 128, 0); - assert (5 == ret); + assert (5 == ret); assert ('h' == buff[128]); + // Handle close of the socket. ret = zmq_unbind (rbind, bindip); assert(0 == ret); ret = zmq_close (rbind); diff --git a/tests/test_stream.cpp b/tests/test_stream.cpp index a0a42eba..20a03ca5 100644 --- a/tests/test_stream.cpp +++ b/tests/test_stream.cpp @@ -54,6 +54,9 @@ test_stream_to_dealer (void) int zero = 0; rc = zmq_setsockopt (stream, ZMQ_LINGER, &zero, sizeof (zero)); assert (rc == 0); + int enabled = 1; + rc = zmq_setsockopt (stream, ZMQ_STREAM_NOTIFY, &enabled, sizeof (enabled)); + assert (rc == 0); rc = zmq_bind (stream, "tcp://127.0.0.1:5556"); assert (rc == 0); @@ -182,11 +185,16 @@ test_stream_to_stream (void) void *server = zmq_socket (ctx, ZMQ_STREAM); assert (server); + int enabled = 1; + rc = zmq_setsockopt (server, ZMQ_STREAM_NOTIFY, &enabled, sizeof (enabled)); + assert (rc == 0); rc = zmq_bind (server, "tcp://127.0.0.1:9070"); assert (rc == 0); void *client = zmq_socket (ctx, ZMQ_STREAM); assert (client); + rc = zmq_setsockopt (client, ZMQ_STREAM_NOTIFY, &enabled, sizeof (enabled)); + assert (rc == 0); rc = zmq_connect (client, "tcp://localhost:9070"); assert (rc == 0); uint8_t id [256]; diff --git a/tests/test_stream_disconnect.cpp b/tests/test_stream_disconnect.cpp index 1233c859..7c221c46 100644 --- a/tests/test_stream_disconnect.cpp +++ b/tests/test_stream_disconnect.cpp @@ -55,15 +55,20 @@ int main(int, char**) { setup_test_environment(); - void* context = zmq_ctx_new (); - void* sockets [2]; + void *context = zmq_ctx_new (); + void *sockets [2]; int rc = 0; sockets [SERVER] = zmq_socket (context, ZMQ_STREAM); + int enabled = 1; + rc = zmq_setsockopt (sockets [SERVER], ZMQ_STREAM_NOTIFY, &enabled, sizeof (enabled)); + assert (rc == 0); rc = zmq_bind (sockets [SERVER], "tcp://0.0.0.0:6666"); assert (rc == 0); sockets [CLIENT] = zmq_socket (context, ZMQ_STREAM); + rc = zmq_setsockopt (sockets [CLIENT], ZMQ_STREAM_NOTIFY, &enabled, sizeof (enabled)); + assert (rc == 0); rc = zmq_connect (sockets [CLIENT], "tcp://localhost:6666"); assert (rc == 0); diff --git a/tests/testutil.hpp b/tests/testutil.hpp index 1b7a7728..082e711c 100644 --- a/tests/testutil.hpp +++ b/tests/testutil.hpp @@ -22,7 +22,7 @@ #include "../include/zmq.h" #include "../src/stdint.hpp" -#include "platform.hpp" +#include "../src/platform.hpp" // This defines the settle time used in tests; raise this if we // get test failures on slower systems due to binds/connects not