mirror of
https://github.com/zeromq/libzmq.git
synced 2025-03-10 07:56:09 +00:00
Merge pull request #1324 from jruffin/invert-matching
Added socket option ZMQ_INVERT_MATCHING.
This commit is contained in:
commit
96a27d11c9
@ -266,6 +266,29 @@ Default value:: 0 (false)
|
||||
Applicable socket types:: all, primarily when using TCP/IPC transports.
|
||||
|
||||
|
||||
ZMQ_INVERT_MATCHING: Retrieve inverted filtering status
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
Returns the value of the 'ZMQ_INVERT_MATCHING' option. A value of `1`
|
||||
means the socket uses inverted prefix matching.
|
||||
|
||||
On 'PUB' and 'XPUB' sockets, this causes messages to be sent to all
|
||||
connected sockets 'except' those subscribed to a prefix that matches
|
||||
the message. On 'SUB' sockets, this causes only incoming messages that
|
||||
do 'not' match any of the socket's subscriptions to be received by the user.
|
||||
|
||||
Whenever 'ZMQ_INVERT_MATCHING' is set to 1 on a 'PUB' socket, all 'SUB'
|
||||
sockets connecting to it must also have the option set to 1. Failure to
|
||||
do so will have the 'SUB' sockets reject everything the 'PUB' socket sends
|
||||
them. 'XSUB' sockets do not need to do this because they do not filter
|
||||
incoming messages.
|
||||
|
||||
[horizontal]
|
||||
Option value type:: int
|
||||
Option value unit:: 0,1
|
||||
Default value:: 0
|
||||
Applicable socket types:: ZMQ_PUB, ZMQ_XPUB, ZMQ_SUB
|
||||
|
||||
|
||||
ZMQ_IPV4ONLY: Retrieve IPv4-only socket override status
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
Retrieve the IPv4-only option for the socket. This option is deprecated.
|
||||
|
@ -937,6 +937,29 @@ Option value unit:: boolean
|
||||
Default value:: 1 (true)
|
||||
Applicable socket types:: all, when using TCP transports.
|
||||
|
||||
|
||||
ZMQ_INVERT_MATCHING: Invert message filtering
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
Reverses the filtering behavior of PUB-SUB sockets, when set to 1.
|
||||
|
||||
On 'PUB' and 'XPUB' sockets, this causes messages to be sent to all
|
||||
connected sockets 'except' those subscribed to a prefix that matches
|
||||
the message. On 'SUB' sockets, this causes only incoming messages that
|
||||
do 'not' match any of the socket's subscriptions to be received by the user.
|
||||
|
||||
Whenever 'ZMQ_INVERT_MATCHING' is set to 1 on a 'PUB' socket, all 'SUB'
|
||||
sockets connecting to it must also have the option set to 1. Failure to
|
||||
do so will have the 'SUB' sockets reject everything the 'PUB' socket sends
|
||||
them. 'XSUB' sockets do not need to do this because they do not filter
|
||||
incoming messages.
|
||||
|
||||
[horizontal]
|
||||
Option value type:: int
|
||||
Option value unit:: 0,1
|
||||
Default value:: 0
|
||||
Applicable socket types:: ZMQ_PUB, ZMQ_XPUB, ZMQ_SUB
|
||||
|
||||
|
||||
RETURN VALUE
|
||||
------------
|
||||
The _zmq_setsockopt()_ function shall return zero if successful. Otherwise it
|
||||
|
@ -298,6 +298,7 @@ ZMQ_EXPORT const char *zmq_msg_gets (zmq_msg_t *msg, const char *property);
|
||||
#define ZMQ_XPUB_MANUAL 71
|
||||
#define ZMQ_XPUB_WELCOME_MSG 72
|
||||
#define ZMQ_STREAM_NOTIFY 73
|
||||
#define ZMQ_INVERT_MATCHING 74
|
||||
|
||||
/* Message options */
|
||||
#define ZMQ_MORE 1
|
||||
|
16
src/dist.cpp
16
src/dist.cpp
@ -69,6 +69,22 @@ void zmq::dist_t::match (pipe_t *pipe_)
|
||||
matching++;
|
||||
}
|
||||
|
||||
void zmq::dist_t::reverse_match ()
|
||||
{
|
||||
pipes_t::size_type prev_matching = matching;
|
||||
|
||||
// Reset matching to 0
|
||||
unmatch();
|
||||
|
||||
// Mark all matching pipes as not matching and vice-versa.
|
||||
// To do this, push all pipes that are eligible but not
|
||||
// matched - i.e. between "matching" and "eligible" -
|
||||
// to the beginning of the queue.
|
||||
for (pipes_t::size_type i = prev_matching; i < eligible; ++i) {
|
||||
pipes.swap(i, matching++);
|
||||
}
|
||||
}
|
||||
|
||||
void zmq::dist_t::unmatch ()
|
||||
{
|
||||
matching = 0;
|
||||
|
@ -50,6 +50,9 @@ namespace zmq
|
||||
// will send message also to this pipe.
|
||||
void match (zmq::pipe_t *pipe_);
|
||||
|
||||
// Marks all pipes that are not matched as matched and vice-versa.
|
||||
void reverse_match();
|
||||
|
||||
// Mark all pipes as non-matching.
|
||||
void unmatch ();
|
||||
|
||||
|
@ -45,6 +45,7 @@ zmq::options_t::options_t () :
|
||||
ipv6 (0),
|
||||
immediate (0),
|
||||
filter (false),
|
||||
invert_matching(false),
|
||||
recv_identity (false),
|
||||
raw_socket (false),
|
||||
raw_notify (false),
|
||||
@ -500,6 +501,13 @@ int zmq::options_t::setsockopt (int option_, const void *optval_,
|
||||
}
|
||||
break;
|
||||
|
||||
case ZMQ_INVERT_MATCHING:
|
||||
if (is_int) {
|
||||
invert_matching = (value != 0);
|
||||
return 0;
|
||||
}
|
||||
break;
|
||||
|
||||
default:
|
||||
#if defined (ZMQ_ACT_MILITANT)
|
||||
// There are valid scenarios for probing with unknown socket option
|
||||
@ -846,6 +854,13 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_)
|
||||
}
|
||||
break;
|
||||
|
||||
case ZMQ_INVERT_MATCHING:
|
||||
if (is_int) {
|
||||
*value = invert_matching;
|
||||
return 0;
|
||||
}
|
||||
break;
|
||||
|
||||
default:
|
||||
#if defined (ZMQ_ACT_MILITANT)
|
||||
malformed = false;
|
||||
|
@ -108,6 +108,11 @@ namespace zmq
|
||||
// If 1, (X)SUB socket should filter the messages. If 0, it should not.
|
||||
bool filter;
|
||||
|
||||
// If true, the subscription matching on (X)PUB and (X)SUB sockets
|
||||
// is reversed. Messages are sent to and received by non-matching
|
||||
// sockets.
|
||||
bool invert_matching;
|
||||
|
||||
// If true, the identity message is forwarded to the socket.
|
||||
bool recv_identity;
|
||||
|
||||
|
@ -179,9 +179,14 @@ int zmq::xpub_t::xsend (msg_t *msg_)
|
||||
bool msg_more = msg_->flags () & msg_t::more ? true : false;
|
||||
|
||||
// For the first part of multi-part message, find the matching pipes.
|
||||
if (!more)
|
||||
if (!more) {
|
||||
subscriptions.match ((unsigned char*) msg_->data (), msg_->size (),
|
||||
mark_as_matching, this);
|
||||
// If inverted matching is used, reverse the selection now
|
||||
if (options.invert_matching) {
|
||||
dist.reverse_match();
|
||||
}
|
||||
}
|
||||
|
||||
int rc = -1; // Assume we fail
|
||||
if (lossy || dist.check_hwm ()) {
|
||||
|
@ -206,7 +206,9 @@ zmq::blob_t zmq::xsub_t::get_credential () const
|
||||
|
||||
bool zmq::xsub_t::match (msg_t *msg_)
|
||||
{
|
||||
return subscriptions.check ((unsigned char*) msg_->data (), msg_->size ());
|
||||
bool matching = subscriptions.check ((unsigned char*) msg_->data (), msg_->size ());
|
||||
|
||||
return matching ^ options.invert_matching;
|
||||
}
|
||||
|
||||
void zmq::xsub_t::send_subscription (unsigned char *data_, size_t size_,
|
||||
|
@ -44,6 +44,7 @@ set(tests
|
||||
test_diffserv
|
||||
test_connect_rid
|
||||
test_xpub_nodrop
|
||||
test_pub_invert_matching
|
||||
)
|
||||
if(NOT WIN32)
|
||||
list(APPEND tests
|
||||
|
126
tests/test_pub_invert_matching.cpp
Normal file
126
tests/test_pub_invert_matching.cpp
Normal file
@ -0,0 +1,126 @@
|
||||
/*
|
||||
Copyright (c) 2007-2014 Contributors as noted in the AUTHORS file
|
||||
|
||||
This file is part of 0MQ.
|
||||
|
||||
0MQ is free software; you can redistribute it and/or modify it under
|
||||
the terms of the GNU Lesser General Public License as published by
|
||||
the Free Software Foundation; either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
0MQ is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU Lesser General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU Lesser General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "testutil.hpp"
|
||||
|
||||
int main (void)
|
||||
{
|
||||
setup_test_environment();
|
||||
void *ctx = zmq_ctx_new ();
|
||||
assert (ctx);
|
||||
|
||||
// Create a publisher
|
||||
void *pub = zmq_socket (ctx, ZMQ_PUB);
|
||||
assert (pub);
|
||||
int rc = zmq_bind (pub, "inproc://soname");
|
||||
assert (rc == 0);
|
||||
|
||||
// Create two subscribers
|
||||
void *sub1 = zmq_socket (ctx, ZMQ_SUB);
|
||||
assert (sub1);
|
||||
rc = zmq_connect (sub1, "inproc://soname");
|
||||
assert (rc == 0);
|
||||
|
||||
void *sub2 = zmq_socket (ctx, ZMQ_SUB);
|
||||
assert (sub2);
|
||||
rc = zmq_connect (sub2, "inproc://soname");
|
||||
assert (rc == 0);
|
||||
|
||||
// Subscribe pub1 to one prefix
|
||||
// and pub2 to another prefix.
|
||||
const char PREFIX1[] = "prefix1";
|
||||
const char PREFIX2[] = "p2";
|
||||
|
||||
rc = zmq_setsockopt (sub1, ZMQ_SUBSCRIBE, PREFIX1, sizeof(PREFIX1));
|
||||
assert (rc == 0);
|
||||
|
||||
rc = zmq_setsockopt (sub2, ZMQ_SUBSCRIBE, PREFIX2, sizeof(PREFIX2));
|
||||
assert (rc == 0);
|
||||
|
||||
// Send a message with the first prefix
|
||||
rc = zmq_send_const(pub, PREFIX1, sizeof(PREFIX1), 0);
|
||||
assert (rc == sizeof(PREFIX1));
|
||||
|
||||
// sub1 should receive it, but not sub2
|
||||
rc = zmq_recv (sub1, NULL, 0, ZMQ_DONTWAIT);
|
||||
assert (rc == sizeof(PREFIX1));
|
||||
|
||||
rc = zmq_recv (sub2, NULL, 0, ZMQ_DONTWAIT);
|
||||
assert (rc == -1);
|
||||
assert (errno == EAGAIN);
|
||||
|
||||
// Send a message with the second prefix
|
||||
rc = zmq_send_const(pub, PREFIX2, sizeof(PREFIX2), 0);
|
||||
assert (rc == sizeof(PREFIX2));
|
||||
|
||||
// sub2 should receive it, but not sub1
|
||||
rc = zmq_recv (sub2, NULL, 0, ZMQ_DONTWAIT);
|
||||
assert (rc == sizeof(PREFIX2));
|
||||
|
||||
rc = zmq_recv (sub1, NULL, 0, ZMQ_DONTWAIT);
|
||||
assert (rc == -1);
|
||||
assert (errno == EAGAIN);
|
||||
|
||||
// Now invert the matching
|
||||
int invert = 1;
|
||||
rc = zmq_setsockopt (pub, ZMQ_INVERT_MATCHING, &invert, sizeof(invert));
|
||||
assert (rc == 0);
|
||||
|
||||
// ... on both sides, otherwise the SUB socket will filter the messages out
|
||||
rc = zmq_setsockopt (sub1, ZMQ_INVERT_MATCHING, &invert, sizeof(invert));
|
||||
rc = zmq_setsockopt (sub2, ZMQ_INVERT_MATCHING, &invert, sizeof(invert));
|
||||
assert (rc == 0);
|
||||
|
||||
// Send a message with the first prefix
|
||||
rc = zmq_send_const(pub, PREFIX1, sizeof(PREFIX1), 0);
|
||||
assert (rc == sizeof(PREFIX1));
|
||||
|
||||
// sub2 should receive it, but not sub1
|
||||
rc = zmq_recv (sub2, NULL, 0, ZMQ_DONTWAIT);
|
||||
assert (rc == sizeof(PREFIX1));
|
||||
|
||||
rc = zmq_recv (sub1, NULL, 0, ZMQ_DONTWAIT);
|
||||
assert (rc == -1);
|
||||
assert (errno == EAGAIN);
|
||||
|
||||
// Send a message with the second prefix
|
||||
rc = zmq_send_const(pub, PREFIX2, sizeof(PREFIX2), 0);
|
||||
assert (rc == sizeof(PREFIX2));
|
||||
|
||||
// sub1 should receive it, but not sub2
|
||||
rc = zmq_recv (sub1, NULL, 0, ZMQ_DONTWAIT);
|
||||
assert (rc == sizeof(PREFIX2));
|
||||
|
||||
rc = zmq_recv (sub2, NULL, 0, ZMQ_DONTWAIT);
|
||||
assert (rc == -1);
|
||||
assert (errno == EAGAIN);
|
||||
|
||||
|
||||
// Clean up.
|
||||
rc = zmq_close (pub);
|
||||
assert (rc == 0);
|
||||
rc = zmq_close (sub1);
|
||||
assert (rc == 0);
|
||||
rc = zmq_close (sub2);
|
||||
assert (rc == 0);
|
||||
rc = zmq_ctx_term (ctx);
|
||||
assert (rc == 0);
|
||||
|
||||
return 0 ;
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user