0
0
mirror of https://github.com/zeromq/libzmq.git synced 2025-01-14 09:47:56 +08:00

Merge pull request #274 from Kobolog/master

An options to choose the behavior for unroutable messages in ZMQ_ROUTER sockets
This commit is contained in:
Pieter Hintjens 2012-03-15 09:01:15 -07:00
commit 32c85e0ea3
4 changed files with 51 additions and 5 deletions

View File

@ -12,8 +12,9 @@ SYNOPSIS
--------
*int zmq_setsockopt (void '*socket', int 'option_name', const void '*option_value', size_t 'option_len');*
Caution: All options, with the exception of ZMQ_SUBSCRIBE, ZMQ_UNSUBSCRIBE and
ZMQ_LINGER, only take effect for subsequent socket bind/connects.
Caution: All options, with the exception of ZMQ_SUBSCRIBE, ZMQ_UNSUBSCRIBE,
ZMQ_LINGER and ZMQ_FAIL_UNROUTABLE only take effect for subsequent socket
bind/connects.
DESCRIPTION
-----------
@ -348,6 +349,21 @@ Default value:: 1 (true)
Applicable socket types:: all, when using TCP transports.
ZMQ_FAIL_UNROUTABLE: Set unroutable message behavior
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Sets the behavior when an unroutable message is encountered in a 'ZMQ_ROUTER'
socket. A value of `0` is the default behavior when the message is silently
dropped, while a value of `1` forces the sending to fail with a 'EHOSTUNREACH'
error code.
[horizontal]
Option value type:: int
Option value unit:: boolean
Default value:: 0 (false)
Applicable socket types:: ZMQ_ROUTER
RETURN VALUE
------------
The _zmq_setsockopt()_ function shall return zero if successful. Otherwise it

View File

@ -230,6 +230,7 @@ ZMQ_EXPORT int zmq_term (zmq_ctx_t context);
#define ZMQ_SNDTIMEO 28
#define ZMQ_IPV4ONLY 31
#define ZMQ_LAST_ENDPOINT 32
#define ZMQ_FAIL_UNROUTABLE 33
/* Message options */
#define ZMQ_MORE 1

View File

@ -33,7 +33,8 @@ zmq::xrep_t::xrep_t (class ctx_t *parent_, uint32_t tid_) :
more_in (false),
current_out (NULL),
more_out (false),
next_peer_id (generate_random ())
next_peer_id (generate_random ()),
fail_unroutable(false)
{
options.type = ZMQ_XREP;
@ -77,6 +78,24 @@ void zmq::xrep_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)
fq.attach (pipe_);
}
int zmq::xrep_t::xsetsockopt (int option_, const void *optval_,
size_t optvallen_)
{
if (option_ != ZMQ_FAIL_UNROUTABLE) {
errno = EINVAL;
return -1;
}
if(sizeof(optvallen_) != sizeof(int)) {
errno = EINVAL;
return -1;
}
fail_unroutable = *((const int*) optval_);
return 0;
}
void zmq::xrep_t::xterminated (pipe_t *pipe_)
{
fq.terminated (pipe_);
@ -118,6 +137,8 @@ int zmq::xrep_t::xsend (msg_t *msg_, int flags_)
if (!more_out) {
zmq_assert (!current_out);
int retval = 0;
// If we have malformed message (prefix with no subsequent message)
// then just silently ignore it.
// TODO: The connections should be killed instead.
@ -126,7 +147,8 @@ int zmq::xrep_t::xsend (msg_t *msg_, int flags_)
more_out = true;
// Find the pipe associated with the identity stored in the prefix.
// If there's no such pipe just silently ignore the message.
// If there's no such pipe just silently ignore the message, unless
// fail_unreachable is set.
blob_t identity ((unsigned char*) msg_->data (), msg_->size ());
outpipes_t::iterator it = outpipes.find (identity);
@ -142,6 +164,9 @@ int zmq::xrep_t::xsend (msg_t *msg_, int flags_)
}
rc = empty.close ();
errno_assert (rc == 0);
} else if(fail_unroutable) {
more_out = false;
retval = EHOSTUNREACH;
}
}
@ -150,7 +175,7 @@ int zmq::xrep_t::xsend (msg_t *msg_, int flags_)
errno_assert (rc == 0);
rc = msg_->init ();
errno_assert (rc == 0);
return 0;
return retval;
}
// Check whether this is the last part of the message.

View File

@ -49,6 +49,7 @@ namespace zmq
// Overloads of functions from socket_base_t.
void xattach_pipe (zmq::pipe_t *pipe_, bool icanhasall_);
int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
int xsend (msg_t *msg_, int flags_);
int xrecv (msg_t *msg_, int flags_);
bool xhas_in ();
@ -100,6 +101,9 @@ namespace zmq
// algorithm. This value is the next ID to use (if not used already).
uint32_t next_peer_id;
// If true, fail on unroutable messages instead of silently dropping them.
bool fail_unroutable;
xrep_t (const xrep_t&);
const xrep_t &operator = (const xrep_t&);
};