diff --git a/.gitignore b/.gitignore index fa21b500..7a6b1026 100644 --- a/.gitignore +++ b/.gitignore @@ -59,6 +59,7 @@ tests/test_spec_rep tests/test_spec_req tests/test_spec_router tests/test_req_request_ids +tests/test_req_send_resets src/platform.hpp* src/stamp-h1 perf/local_lat diff --git a/doc/zmq_setsockopt.txt b/doc/zmq_setsockopt.txt index 7ee3ff91..28912f46 100644 --- a/doc/zmq_setsockopt.txt +++ b/doc/zmq_setsockopt.txt @@ -13,10 +13,10 @@ SYNOPSIS *int zmq_setsockopt (void '*socket', int 'option_name', const void '*option_value', size_t 'option_len');* Caution: All options, with the exception of ZMQ_SUBSCRIBE, ZMQ_UNSUBSCRIBE, -ZMQ_LINGER, ZMQ_ROUTER_MANDATORY, ZMQ_PROBE_ROUTER, ZMQ_XPUB_VERBOSE only -take effect for subsequent socket bind/connects. Specifically, security -options take effect for subsequent binds/connects and can be changed at any -time to affect subsequent binds and/or connects. +ZMQ_LINGER, ZMQ_ROUTER_MANDATORY, ZMQ_PROBE_ROUTER, ZMQ_XPUB_VERBOSE, +ZMQ_REQ_SEND_RESETS only take effect for subsequent socket bind/connects. +Specifically, security options take effect for subsequent binds/connects and can be +changed at any time to affect subsequent binds and/or connects. DESCRIPTION ----------- @@ -476,6 +476,28 @@ Default value:: 0 Applicable socket types:: ZMQ_REQ +ZMQ_REQ_SEND_RESETS: reset request-reply sequence by sending another message +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +When set to 0, a REQ socket does not allow initiating a new request with +_zmq_send(3)_ until the reply to the previous one has been received. +When set to 1, sending another message is allowed and has the effect of +disconnecting the underlying connection to the peer from which the reply was +expected, triggering a reconnection attempt on transports that support it. +The request-reply state machine is reset and a new request is sent to the +next available peer. + +When this option is enabled, also enable ZMQ_REQ_REQUEST_IDS to ensure correct +matching of requests and replies. Otherwise a late reply to an aborted request +can be reported as the reply to the superseding request. + +[horizontal] +Option value type:: int +Option value unit:: 0, 1 +Default value:: 0 +Applicable socket types:: ZMQ_REQ + + ZMQ_TCP_KEEPALIVE: Override SO_KEEPALIVE socket option ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ diff --git a/include/zmq.h b/include/zmq.h index c18d52c2..98a2bd57 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -277,6 +277,7 @@ ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int option, int optval); #define ZMQ_CURVE_SERVERKEY 50 #define ZMQ_PROBE_ROUTER 51 #define ZMQ_REQ_REQUEST_IDS 52 +#define ZMQ_REQ_SEND_RESETS 53 /* Message options */ #define ZMQ_MORE 1 diff --git a/src/req.cpp b/src/req.cpp index 2f295210..c68ec8e9 100644 --- a/src/req.cpp +++ b/src/req.cpp @@ -30,7 +30,8 @@ zmq::req_t::req_t (class ctx_t *parent_, uint32_t tid_, int sid_) : message_begins (true), reply_pipe (NULL), request_id_frames_enabled (false), - request_id (generate_random()) + request_id (generate_random()), + send_resets (false) { options.type = ZMQ_REQ; } @@ -42,10 +43,17 @@ zmq::req_t::~req_t () int zmq::req_t::xsend (msg_t *msg_) { // If we've sent a request and we still haven't got the reply, - // we can't send another request. + // we can't send another request unless the send_resets option is enabled. if (receiving_reply) { - errno = EFSM; - return -1; + if (!send_resets) { + errno = EFSM; + return -1; + } + + if (reply_pipe) + reply_pipe->terminate (false); + receiving_reply = false; + message_begins = true; } // First part of the request is the request identity. @@ -197,6 +205,13 @@ int zmq::req_t::xsetsockopt(int option_, const void *optval_, size_t optvallen_) } break; + case ZMQ_REQ_SEND_RESETS: + if (is_int && value >= 0) { + send_resets = value; + return 0; + } + break; + default: break; } diff --git a/src/req.hpp b/src/req.hpp index 20558a3c..1a7abf3a 100644 --- a/src/req.hpp +++ b/src/req.hpp @@ -43,7 +43,7 @@ namespace zmq int xrecv (zmq::msg_t *msg_); bool xhas_in (); bool xhas_out (); - int xsetsockopt(int option_, const void *optval_, size_t optvallen_); + int xsetsockopt (int option_, const void *optval_, size_t optvallen_); protected: @@ -71,8 +71,9 @@ namespace zmq // request is sent. uint32_t request_id; - // If true, send() will reset its internal state instead of failing if - // a previous request is still pending. + // If true, send() will reset its internal state and terminate the + // reply_pipe's connection instead of failing if a previous request is + // still pending. bool send_resets; req_t (const req_t&); diff --git a/tests/Makefile.am b/tests/Makefile.am index da16981b..7e975fdf 100644 --- a/tests/Makefile.am +++ b/tests/Makefile.am @@ -30,7 +30,8 @@ noinst_PROGRAMS = test_pair_inproc \ test_spec_dealer \ test_spec_router \ test_spec_pushpull \ - test_req_request_ids + test_req_request_ids \ + test_req_send_resets if !ON_MINGW noinst_PROGRAMS += test_shutdown_stress \ @@ -67,6 +68,7 @@ test_spec_dealer_SOURCES = test_spec_dealer.cpp test_spec_router_SOURCES = test_spec_router.cpp test_spec_pushpull_SOURCES = test_spec_pushpull.cpp test_req_request_ids_SOURCES = test_req_request_ids.cpp +test_req_send_resets_SOURCES = test_req_send_resets.cpp if !ON_MINGW test_shutdown_stress_SOURCES = test_shutdown_stress.cpp test_pair_ipc_SOURCES = test_pair_ipc.cpp testutil.hpp diff --git a/tests/test_req_send_resets.cpp b/tests/test_req_send_resets.cpp new file mode 100644 index 00000000..e3c2228d --- /dev/null +++ b/tests/test_req_send_resets.cpp @@ -0,0 +1,120 @@ +/* + Copyright (c) 2007-2013 Contributors as noted in the AUTHORS file + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include +#include +#include +#include "testutil.hpp" + +int main (void) +{ + void *ctx = zmq_ctx_new (); + assert (ctx); + + void *req = zmq_socket (ctx, ZMQ_REQ); + assert (req); + + int enabled = 1; + int rc = zmq_setsockopt (req, ZMQ_REQ_SEND_RESETS, &enabled, sizeof (int)); + assert (rc == 0); + + rc = zmq_setsockopt (req, ZMQ_REQ_REQUEST_IDS, &enabled, sizeof (int)); + assert (rc == 0); + + rc = zmq_bind (req, "tcp://*:5555"); + assert (rc == 0); + + const size_t services = 5; + void *rep [services]; + for (size_t peer = 0; peer < services; peer++) { + rep [peer] = zmq_socket (ctx, ZMQ_REP); + assert (rep [peer]); + + int timeout = 100; + rc = zmq_setsockopt (rep [peer], ZMQ_RCVTIMEO, &timeout, sizeof (int)); + assert (rc == 0); + + rc = zmq_connect (rep [peer], "tcp://localhost:5555"); + assert (rc == 0); + } + // We have to give the connects time to finish otherwise the requests + // will not properly round-robin. We could alternatively connect the + // REQ sockets to the REP sockets. + struct timespec t = { 0, 250 * 1000000 }; + nanosleep (&t, NULL); + + + // Case 1: Second send() before a reply arrives in a pipe. + + // Send a request, ensure it arrives, don't send a reply + s_send_seq (req, "A", "B", SEQ_END); + s_recv_seq (rep [0], "A", "B", SEQ_END); + + // Send another request on the REQ socket + s_send_seq (req, "C", "D", SEQ_END); + s_recv_seq (rep [1], "C", "D", SEQ_END); + + // Send a reply to the first request - that should be discarded by the REQ + s_send_seq (rep [0], "WRONG", SEQ_END); + + // Send the expected reply + s_send_seq (rep [1], "OK", SEQ_END); + s_recv_seq (req, "OK", SEQ_END); + + + // Another standard req-rep cycle, just to check + s_send_seq (req, "E", SEQ_END); + s_recv_seq (rep [2], "E", SEQ_END); + s_send_seq (rep [2], "F", "G", SEQ_END); + s_recv_seq (req, "F", "G", SEQ_END); + + + // Case 2: Second send() after a reply is already in a pipe on the REQ. + + // Send a request, ensure it arrives, send a reply + s_send_seq (req, "H", SEQ_END); + s_recv_seq (rep [3], "H", SEQ_END); + s_send_seq (rep [3], "BAD", SEQ_END); + + // Wait for message to be there. + rc = zmq_poll (0, 0, 100); + assert (rc == 0); + + // Without receiving that reply, send another request on the REQ socket + s_send_seq (req, "I", SEQ_END); + s_recv_seq (rep [4], "I", SEQ_END); + + // Send the expected reply + s_send_seq (rep [4], "GOOD", SEQ_END); + s_recv_seq (req, "GOOD", SEQ_END); + + + close_zero_linger (req); + for (size_t peer = 0; peer < services; peer++) + close_zero_linger (rep [peer]); + + // Wait for disconnects. + rc = zmq_poll (0, 0, 100); + assert (rc == 0); + + rc = zmq_ctx_term (ctx); + assert (rc == 0); + + return 0 ; +}