mirror of
https://github.com/zeromq/libzmq.git
synced 2024-12-28 16:15:23 +08:00
Merge pull request #2042 from sheremetyev/xpub-unsubscribe-manual
Send manual unsubscriptions on pipe termination in XPUB
This commit is contained in:
commit
a1f691e799
17
src/xpub.cpp
17
src/xpub.cpp
@ -92,6 +92,12 @@ void zmq::xpub_t::xread_activated (pipe_t *pipe_)
|
|||||||
if (size > 0 && (*data == 0 || *data == 1)) {
|
if (size > 0 && (*data == 0 || *data == 1)) {
|
||||||
if (manual)
|
if (manual)
|
||||||
{
|
{
|
||||||
|
// Store manual subscription to use on termination
|
||||||
|
if (*data == 0)
|
||||||
|
manual_subscriptions.rm(data + 1, size - 1, pipe_);
|
||||||
|
else
|
||||||
|
manual_subscriptions.add(data + 1, size - 1, pipe_);
|
||||||
|
|
||||||
pending_pipes.push_back(pipe_);
|
pending_pipes.push_back(pipe_);
|
||||||
pending_data.push_back(blob_t(data, size));
|
pending_data.push_back(blob_t(data, size));
|
||||||
pending_metadata.push_back(sub.metadata());
|
pending_metadata.push_back(sub.metadata());
|
||||||
@ -190,11 +196,20 @@ int zmq::xpub_t::xsetsockopt (int option_, const void *optval_,
|
|||||||
}
|
}
|
||||||
|
|
||||||
void zmq::xpub_t::xpipe_terminated (pipe_t *pipe_)
|
void zmq::xpub_t::xpipe_terminated (pipe_t *pipe_)
|
||||||
|
{
|
||||||
|
if (manual)
|
||||||
|
{
|
||||||
|
// Remove the pipe from the trie and send corresponding manual
|
||||||
|
// unsubscriptions upstream.
|
||||||
|
manual_subscriptions.rm (pipe_, send_unsubscription, this, false);
|
||||||
|
}
|
||||||
|
else
|
||||||
{
|
{
|
||||||
// Remove the pipe from the trie. If there are topics that nobody
|
// Remove the pipe from the trie. If there are topics that nobody
|
||||||
// is interested in anymore, send corresponding unsubscriptions
|
// is interested in anymore, send corresponding unsubscriptions
|
||||||
// upstream.
|
// upstream.
|
||||||
subscriptions.rm (pipe_, send_unsubscription, this, !(verbose_unsubs || manual));
|
subscriptions.rm (pipe_, send_unsubscription, this, !verbose_unsubs);
|
||||||
|
}
|
||||||
|
|
||||||
dist.pipe_terminated (pipe_);
|
dist.pipe_terminated (pipe_);
|
||||||
}
|
}
|
||||||
|
@ -79,6 +79,9 @@ namespace zmq
|
|||||||
// List of all subscriptions mapped to corresponding pipes.
|
// List of all subscriptions mapped to corresponding pipes.
|
||||||
mtrie_t subscriptions;
|
mtrie_t subscriptions;
|
||||||
|
|
||||||
|
// List of manual subscriptions mapped to corresponding pipes.
|
||||||
|
mtrie_t manual_subscriptions;
|
||||||
|
|
||||||
// Distributor of messages holding the list of outbound pipes.
|
// Distributor of messages holding the list of outbound pipes.
|
||||||
dist_t dist;
|
dist_t dist;
|
||||||
|
|
||||||
|
@ -90,6 +90,112 @@ int test_basic()
|
|||||||
return 0 ;
|
return 0 ;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
int test_unsubscribe_manual()
|
||||||
|
{
|
||||||
|
void *ctx = zmq_ctx_new ();
|
||||||
|
assert (ctx);
|
||||||
|
|
||||||
|
// Create a publisher
|
||||||
|
void *pub = zmq_socket (ctx, ZMQ_XPUB);
|
||||||
|
assert (pub);
|
||||||
|
int rc = zmq_bind (pub, "inproc://soname");
|
||||||
|
assert (rc == 0);
|
||||||
|
|
||||||
|
// set pub socket options
|
||||||
|
int manual = 1;
|
||||||
|
rc = zmq_setsockopt(pub, ZMQ_XPUB_MANUAL, &manual, 4);
|
||||||
|
assert (rc == 0);
|
||||||
|
|
||||||
|
// Create a subscriber
|
||||||
|
void *sub = zmq_socket (ctx, ZMQ_XSUB);
|
||||||
|
assert (sub);
|
||||||
|
rc = zmq_connect (sub, "inproc://soname");
|
||||||
|
assert (rc == 0);
|
||||||
|
|
||||||
|
// Subscribe for A
|
||||||
|
char subscription1[2] = { 1, 'A'};
|
||||||
|
rc = zmq_send_const(sub, subscription1, 2, 0);
|
||||||
|
assert (rc == 2);
|
||||||
|
|
||||||
|
// Subscribe for B
|
||||||
|
char subscription2[2] = { 1, 'B'};
|
||||||
|
rc = zmq_send_const(sub, subscription2, 2, 0);
|
||||||
|
assert (rc == 2);
|
||||||
|
|
||||||
|
char buffer[3];
|
||||||
|
|
||||||
|
// Receive subscription "A" from subscriber
|
||||||
|
rc = zmq_recv(pub, buffer, 2, 0);
|
||||||
|
assert(rc == 2);
|
||||||
|
assert(buffer[0] == 1);
|
||||||
|
assert(buffer[1] == 'A');
|
||||||
|
|
||||||
|
// Subscribe socket for XA instead
|
||||||
|
rc = zmq_setsockopt(pub, ZMQ_SUBSCRIBE, "XA", 2);
|
||||||
|
assert(rc == 0);
|
||||||
|
|
||||||
|
// Receive subscription "B" from subscriber
|
||||||
|
rc = zmq_recv(pub, buffer, 2, 0);
|
||||||
|
assert(rc == 2);
|
||||||
|
assert(buffer[0] == 1);
|
||||||
|
assert(buffer[1] == 'B');
|
||||||
|
|
||||||
|
// Subscribe socket for XB instead
|
||||||
|
rc = zmq_setsockopt(pub, ZMQ_SUBSCRIBE, "XB", 2);
|
||||||
|
assert(rc == 0);
|
||||||
|
|
||||||
|
// Unsubscribe from A
|
||||||
|
char unsubscription1[2] = { 0, 'A'};
|
||||||
|
rc = zmq_send_const(sub, unsubscription1, 2, 0);
|
||||||
|
assert (rc == 2);
|
||||||
|
|
||||||
|
// Receive unsubscription "A" from subscriber
|
||||||
|
rc = zmq_recv(pub, buffer, 2, 0);
|
||||||
|
assert(rc == 2);
|
||||||
|
assert(buffer[0] == 0);
|
||||||
|
assert(buffer[1] == 'A');
|
||||||
|
|
||||||
|
// Unsubscribe socket from XA instead
|
||||||
|
rc = zmq_setsockopt(pub, ZMQ_UNSUBSCRIBE, "XA", 2);
|
||||||
|
assert(rc == 0);
|
||||||
|
|
||||||
|
// Sending messages XA, XB
|
||||||
|
rc = zmq_send_const(pub, "XA", 2, 0);
|
||||||
|
assert(rc == 2);
|
||||||
|
rc = zmq_send_const(pub, "XB", 2, 0);
|
||||||
|
assert(rc == 2);
|
||||||
|
|
||||||
|
// Subscriber should receive XB only
|
||||||
|
rc = zmq_recv(sub, buffer, 2, ZMQ_DONTWAIT);
|
||||||
|
assert(rc == 2);
|
||||||
|
assert(buffer[0] == 'X');
|
||||||
|
assert(buffer[1] == 'B');
|
||||||
|
|
||||||
|
// Close subscriber
|
||||||
|
rc = zmq_close (sub);
|
||||||
|
assert (rc == 0);
|
||||||
|
|
||||||
|
// Receive unsubscription "B"
|
||||||
|
rc = zmq_recv(pub, buffer, 2, 0);
|
||||||
|
assert(rc == 2);
|
||||||
|
assert(buffer[0] == 0);
|
||||||
|
assert(buffer[1] == 'B');
|
||||||
|
|
||||||
|
// Unsubscribe socket from XB instead
|
||||||
|
rc = zmq_setsockopt(pub, ZMQ_UNSUBSCRIBE, "XB", 2);
|
||||||
|
assert(rc == 0);
|
||||||
|
|
||||||
|
// Clean up.
|
||||||
|
rc = zmq_close (pub);
|
||||||
|
assert (rc == 0);
|
||||||
|
rc = zmq_ctx_term (ctx);
|
||||||
|
assert (rc == 0);
|
||||||
|
|
||||||
|
return 0 ;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
int test_xpub_proxy_unsubscribe_on_disconnect(const char *frontend,
|
int test_xpub_proxy_unsubscribe_on_disconnect(const char *frontend,
|
||||||
const char *backend)
|
const char *backend)
|
||||||
{
|
{
|
||||||
@ -349,6 +455,7 @@ int main(void)
|
|||||||
{
|
{
|
||||||
setup_test_environment ();
|
setup_test_environment ();
|
||||||
test_basic ();
|
test_basic ();
|
||||||
|
test_unsubscribe_manual ();
|
||||||
|
|
||||||
const char *frontend;
|
const char *frontend;
|
||||||
const char *backend;
|
const char *backend;
|
||||||
|
Loading…
x
Reference in New Issue
Block a user