mirror of
https://github.com/zeromq/libzmq.git
synced 2025-03-09 15:26:04 +00:00
Add ZMQ_REQ_SEND_RESETS option.
This allows making a new request on a REQ socket by sending a new message. Without the option set, calling send() after the first message is done will continue to return an EFSM error. It's useful for when a REQ is not getting a response. Previously that meant creating a new socket or switching to DEALER.
This commit is contained in:
parent
637f794193
commit
a0cc87a9d9
1
.gitignore
vendored
1
.gitignore
vendored
@ -59,6 +59,7 @@ tests/test_spec_rep
|
|||||||
tests/test_spec_req
|
tests/test_spec_req
|
||||||
tests/test_spec_router
|
tests/test_spec_router
|
||||||
tests/test_req_request_ids
|
tests/test_req_request_ids
|
||||||
|
tests/test_req_send_resets
|
||||||
src/platform.hpp*
|
src/platform.hpp*
|
||||||
src/stamp-h1
|
src/stamp-h1
|
||||||
perf/local_lat
|
perf/local_lat
|
||||||
|
@ -13,10 +13,10 @@ SYNOPSIS
|
|||||||
*int zmq_setsockopt (void '*socket', int 'option_name', const void '*option_value', size_t 'option_len');*
|
*int zmq_setsockopt (void '*socket', int 'option_name', const void '*option_value', size_t 'option_len');*
|
||||||
|
|
||||||
Caution: All options, with the exception of ZMQ_SUBSCRIBE, ZMQ_UNSUBSCRIBE,
|
Caution: All options, with the exception of ZMQ_SUBSCRIBE, ZMQ_UNSUBSCRIBE,
|
||||||
ZMQ_LINGER, ZMQ_ROUTER_MANDATORY, ZMQ_PROBE_ROUTER, ZMQ_XPUB_VERBOSE only
|
ZMQ_LINGER, ZMQ_ROUTER_MANDATORY, ZMQ_PROBE_ROUTER, ZMQ_XPUB_VERBOSE,
|
||||||
take effect for subsequent socket bind/connects. Specifically, security
|
ZMQ_REQ_SEND_RESETS only take effect for subsequent socket bind/connects.
|
||||||
options take effect for subsequent binds/connects and can be changed at any
|
Specifically, security options take effect for subsequent binds/connects and can be
|
||||||
time to affect subsequent binds and/or connects.
|
changed at any time to affect subsequent binds and/or connects.
|
||||||
|
|
||||||
DESCRIPTION
|
DESCRIPTION
|
||||||
-----------
|
-----------
|
||||||
@ -476,6 +476,28 @@ Default value:: 0
|
|||||||
Applicable socket types:: ZMQ_REQ
|
Applicable socket types:: ZMQ_REQ
|
||||||
|
|
||||||
|
|
||||||
|
ZMQ_REQ_SEND_RESETS: reset request-reply sequence by sending another message
|
||||||
|
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||||
|
|
||||||
|
When set to 0, a REQ socket does not allow initiating a new request with
|
||||||
|
_zmq_send(3)_ until the reply to the previous one has been received.
|
||||||
|
When set to 1, sending another message is allowed and has the effect of
|
||||||
|
disconnecting the underlying connection to the peer from which the reply was
|
||||||
|
expected, triggering a reconnection attempt on transports that support it.
|
||||||
|
The request-reply state machine is reset and a new request is sent to the
|
||||||
|
next available peer.
|
||||||
|
|
||||||
|
When this option is enabled, also enable ZMQ_REQ_REQUEST_IDS to ensure correct
|
||||||
|
matching of requests and replies. Otherwise a late reply to an aborted request
|
||||||
|
can be reported as the reply to the superseding request.
|
||||||
|
|
||||||
|
[horizontal]
|
||||||
|
Option value type:: int
|
||||||
|
Option value unit:: 0, 1
|
||||||
|
Default value:: 0
|
||||||
|
Applicable socket types:: ZMQ_REQ
|
||||||
|
|
||||||
|
|
||||||
ZMQ_TCP_KEEPALIVE: Override SO_KEEPALIVE socket option
|
ZMQ_TCP_KEEPALIVE: Override SO_KEEPALIVE socket option
|
||||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||||
|
|
||||||
|
@ -277,6 +277,7 @@ ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int option, int optval);
|
|||||||
#define ZMQ_CURVE_SERVERKEY 50
|
#define ZMQ_CURVE_SERVERKEY 50
|
||||||
#define ZMQ_PROBE_ROUTER 51
|
#define ZMQ_PROBE_ROUTER 51
|
||||||
#define ZMQ_REQ_REQUEST_IDS 52
|
#define ZMQ_REQ_REQUEST_IDS 52
|
||||||
|
#define ZMQ_REQ_SEND_RESETS 53
|
||||||
|
|
||||||
/* Message options */
|
/* Message options */
|
||||||
#define ZMQ_MORE 1
|
#define ZMQ_MORE 1
|
||||||
|
23
src/req.cpp
23
src/req.cpp
@ -30,7 +30,8 @@ zmq::req_t::req_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
|
|||||||
message_begins (true),
|
message_begins (true),
|
||||||
reply_pipe (NULL),
|
reply_pipe (NULL),
|
||||||
request_id_frames_enabled (false),
|
request_id_frames_enabled (false),
|
||||||
request_id (generate_random())
|
request_id (generate_random()),
|
||||||
|
send_resets (false)
|
||||||
{
|
{
|
||||||
options.type = ZMQ_REQ;
|
options.type = ZMQ_REQ;
|
||||||
}
|
}
|
||||||
@ -42,10 +43,17 @@ zmq::req_t::~req_t ()
|
|||||||
int zmq::req_t::xsend (msg_t *msg_)
|
int zmq::req_t::xsend (msg_t *msg_)
|
||||||
{
|
{
|
||||||
// If we've sent a request and we still haven't got the reply,
|
// If we've sent a request and we still haven't got the reply,
|
||||||
// we can't send another request.
|
// we can't send another request unless the send_resets option is enabled.
|
||||||
if (receiving_reply) {
|
if (receiving_reply) {
|
||||||
errno = EFSM;
|
if (!send_resets) {
|
||||||
return -1;
|
errno = EFSM;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (reply_pipe)
|
||||||
|
reply_pipe->terminate (false);
|
||||||
|
receiving_reply = false;
|
||||||
|
message_begins = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
// First part of the request is the request identity.
|
// First part of the request is the request identity.
|
||||||
@ -197,6 +205,13 @@ int zmq::req_t::xsetsockopt(int option_, const void *optval_, size_t optvallen_)
|
|||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
|
|
||||||
|
case ZMQ_REQ_SEND_RESETS:
|
||||||
|
if (is_int && value >= 0) {
|
||||||
|
send_resets = value;
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
|
||||||
default:
|
default:
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -43,7 +43,7 @@ namespace zmq
|
|||||||
int xrecv (zmq::msg_t *msg_);
|
int xrecv (zmq::msg_t *msg_);
|
||||||
bool xhas_in ();
|
bool xhas_in ();
|
||||||
bool xhas_out ();
|
bool xhas_out ();
|
||||||
int xsetsockopt(int option_, const void *optval_, size_t optvallen_);
|
int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
|
|
||||||
@ -71,8 +71,9 @@ namespace zmq
|
|||||||
// request is sent.
|
// request is sent.
|
||||||
uint32_t request_id;
|
uint32_t request_id;
|
||||||
|
|
||||||
// If true, send() will reset its internal state instead of failing if
|
// If true, send() will reset its internal state and terminate the
|
||||||
// a previous request is still pending.
|
// reply_pipe's connection instead of failing if a previous request is
|
||||||
|
// still pending.
|
||||||
bool send_resets;
|
bool send_resets;
|
||||||
|
|
||||||
req_t (const req_t&);
|
req_t (const req_t&);
|
||||||
|
@ -30,7 +30,8 @@ noinst_PROGRAMS = test_pair_inproc \
|
|||||||
test_spec_dealer \
|
test_spec_dealer \
|
||||||
test_spec_router \
|
test_spec_router \
|
||||||
test_spec_pushpull \
|
test_spec_pushpull \
|
||||||
test_req_request_ids
|
test_req_request_ids \
|
||||||
|
test_req_send_resets
|
||||||
|
|
||||||
if !ON_MINGW
|
if !ON_MINGW
|
||||||
noinst_PROGRAMS += test_shutdown_stress \
|
noinst_PROGRAMS += test_shutdown_stress \
|
||||||
@ -67,6 +68,7 @@ test_spec_dealer_SOURCES = test_spec_dealer.cpp
|
|||||||
test_spec_router_SOURCES = test_spec_router.cpp
|
test_spec_router_SOURCES = test_spec_router.cpp
|
||||||
test_spec_pushpull_SOURCES = test_spec_pushpull.cpp
|
test_spec_pushpull_SOURCES = test_spec_pushpull.cpp
|
||||||
test_req_request_ids_SOURCES = test_req_request_ids.cpp
|
test_req_request_ids_SOURCES = test_req_request_ids.cpp
|
||||||
|
test_req_send_resets_SOURCES = test_req_send_resets.cpp
|
||||||
if !ON_MINGW
|
if !ON_MINGW
|
||||||
test_shutdown_stress_SOURCES = test_shutdown_stress.cpp
|
test_shutdown_stress_SOURCES = test_shutdown_stress.cpp
|
||||||
test_pair_ipc_SOURCES = test_pair_ipc.cpp testutil.hpp
|
test_pair_ipc_SOURCES = test_pair_ipc.cpp testutil.hpp
|
||||||
|
120
tests/test_req_send_resets.cpp
Normal file
120
tests/test_req_send_resets.cpp
Normal file
@ -0,0 +1,120 @@
|
|||||||
|
/*
|
||||||
|
Copyright (c) 2007-2013 Contributors as noted in the AUTHORS file
|
||||||
|
|
||||||
|
This file is part of 0MQ.
|
||||||
|
|
||||||
|
0MQ is free software; you can redistribute it and/or modify it under
|
||||||
|
the terms of the GNU Lesser General Public License as published by
|
||||||
|
the Free Software Foundation; either version 3 of the License, or
|
||||||
|
(at your option) any later version.
|
||||||
|
|
||||||
|
0MQ is distributed in the hope that it will be useful,
|
||||||
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
GNU Lesser General Public License for more details.
|
||||||
|
|
||||||
|
You should have received a copy of the GNU Lesser General Public License
|
||||||
|
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include <stdio.h>
|
||||||
|
#include <unistd.h>
|
||||||
|
#include <time.h>
|
||||||
|
#include "testutil.hpp"
|
||||||
|
|
||||||
|
int main (void)
|
||||||
|
{
|
||||||
|
void *ctx = zmq_ctx_new ();
|
||||||
|
assert (ctx);
|
||||||
|
|
||||||
|
void *req = zmq_socket (ctx, ZMQ_REQ);
|
||||||
|
assert (req);
|
||||||
|
|
||||||
|
int enabled = 1;
|
||||||
|
int rc = zmq_setsockopt (req, ZMQ_REQ_SEND_RESETS, &enabled, sizeof (int));
|
||||||
|
assert (rc == 0);
|
||||||
|
|
||||||
|
rc = zmq_setsockopt (req, ZMQ_REQ_REQUEST_IDS, &enabled, sizeof (int));
|
||||||
|
assert (rc == 0);
|
||||||
|
|
||||||
|
rc = zmq_bind (req, "tcp://*:5555");
|
||||||
|
assert (rc == 0);
|
||||||
|
|
||||||
|
const size_t services = 5;
|
||||||
|
void *rep [services];
|
||||||
|
for (size_t peer = 0; peer < services; peer++) {
|
||||||
|
rep [peer] = zmq_socket (ctx, ZMQ_REP);
|
||||||
|
assert (rep [peer]);
|
||||||
|
|
||||||
|
int timeout = 100;
|
||||||
|
rc = zmq_setsockopt (rep [peer], ZMQ_RCVTIMEO, &timeout, sizeof (int));
|
||||||
|
assert (rc == 0);
|
||||||
|
|
||||||
|
rc = zmq_connect (rep [peer], "tcp://localhost:5555");
|
||||||
|
assert (rc == 0);
|
||||||
|
}
|
||||||
|
// We have to give the connects time to finish otherwise the requests
|
||||||
|
// will not properly round-robin. We could alternatively connect the
|
||||||
|
// REQ sockets to the REP sockets.
|
||||||
|
struct timespec t = { 0, 250 * 1000000 };
|
||||||
|
nanosleep (&t, NULL);
|
||||||
|
|
||||||
|
|
||||||
|
// Case 1: Second send() before a reply arrives in a pipe.
|
||||||
|
|
||||||
|
// Send a request, ensure it arrives, don't send a reply
|
||||||
|
s_send_seq (req, "A", "B", SEQ_END);
|
||||||
|
s_recv_seq (rep [0], "A", "B", SEQ_END);
|
||||||
|
|
||||||
|
// Send another request on the REQ socket
|
||||||
|
s_send_seq (req, "C", "D", SEQ_END);
|
||||||
|
s_recv_seq (rep [1], "C", "D", SEQ_END);
|
||||||
|
|
||||||
|
// Send a reply to the first request - that should be discarded by the REQ
|
||||||
|
s_send_seq (rep [0], "WRONG", SEQ_END);
|
||||||
|
|
||||||
|
// Send the expected reply
|
||||||
|
s_send_seq (rep [1], "OK", SEQ_END);
|
||||||
|
s_recv_seq (req, "OK", SEQ_END);
|
||||||
|
|
||||||
|
|
||||||
|
// Another standard req-rep cycle, just to check
|
||||||
|
s_send_seq (req, "E", SEQ_END);
|
||||||
|
s_recv_seq (rep [2], "E", SEQ_END);
|
||||||
|
s_send_seq (rep [2], "F", "G", SEQ_END);
|
||||||
|
s_recv_seq (req, "F", "G", SEQ_END);
|
||||||
|
|
||||||
|
|
||||||
|
// Case 2: Second send() after a reply is already in a pipe on the REQ.
|
||||||
|
|
||||||
|
// Send a request, ensure it arrives, send a reply
|
||||||
|
s_send_seq (req, "H", SEQ_END);
|
||||||
|
s_recv_seq (rep [3], "H", SEQ_END);
|
||||||
|
s_send_seq (rep [3], "BAD", SEQ_END);
|
||||||
|
|
||||||
|
// Wait for message to be there.
|
||||||
|
rc = zmq_poll (0, 0, 100);
|
||||||
|
assert (rc == 0);
|
||||||
|
|
||||||
|
// Without receiving that reply, send another request on the REQ socket
|
||||||
|
s_send_seq (req, "I", SEQ_END);
|
||||||
|
s_recv_seq (rep [4], "I", SEQ_END);
|
||||||
|
|
||||||
|
// Send the expected reply
|
||||||
|
s_send_seq (rep [4], "GOOD", SEQ_END);
|
||||||
|
s_recv_seq (req, "GOOD", SEQ_END);
|
||||||
|
|
||||||
|
|
||||||
|
close_zero_linger (req);
|
||||||
|
for (size_t peer = 0; peer < services; peer++)
|
||||||
|
close_zero_linger (rep [peer]);
|
||||||
|
|
||||||
|
// Wait for disconnects.
|
||||||
|
rc = zmq_poll (0, 0, 100);
|
||||||
|
assert (rc == 0);
|
||||||
|
|
||||||
|
rc = zmq_ctx_term (ctx);
|
||||||
|
assert (rc == 0);
|
||||||
|
|
||||||
|
return 0 ;
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user