mirror of
https://github.com/zeromq/libzmq.git
synced 2025-01-07 21:05:34 +08:00
Merge pull request #2603 from bluca/xpub_manual_subs
Problem: XPUB_MANUAL subscriptions not removed on peer term
This commit is contained in:
commit
f5da4b1c6c
12
src/xpub.cpp
12
src/xpub.cpp
@ -34,6 +34,7 @@
|
||||
#include "pipe.hpp"
|
||||
#include "err.hpp"
|
||||
#include "msg.hpp"
|
||||
#include "macros.hpp"
|
||||
|
||||
zmq::xpub_t::xpub_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
|
||||
socket_base_t (parent_, tid_, sid_),
|
||||
@ -203,6 +204,13 @@ int zmq::xpub_t::xsetsockopt (int option_, const void *optval_,
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void stub (unsigned char *data_, size_t size_, void *arg_)
|
||||
{
|
||||
LIBZMQ_UNUSED(data_);
|
||||
LIBZMQ_UNUSED(size_);
|
||||
LIBZMQ_UNUSED(arg_);
|
||||
}
|
||||
|
||||
void zmq::xpub_t::xpipe_terminated (pipe_t *pipe_)
|
||||
{
|
||||
if (manual)
|
||||
@ -210,6 +218,10 @@ void zmq::xpub_t::xpipe_terminated (pipe_t *pipe_)
|
||||
// Remove the pipe from the trie and send corresponding manual
|
||||
// unsubscriptions upstream.
|
||||
manual_subscriptions.rm (pipe_, send_unsubscription, this, false);
|
||||
// Remove pipe without actually sending the message as it was taken
|
||||
// care of by the manual call above. subscriptions is the real mtrie,
|
||||
// so the pipe must be removed from there or it will be left over.
|
||||
subscriptions.rm (pipe_, stub, NULL, false);
|
||||
}
|
||||
else
|
||||
{
|
||||
|
@ -37,12 +37,10 @@ int test_basic()
|
||||
// Create a publisher
|
||||
void *pub = zmq_socket (ctx, ZMQ_XPUB);
|
||||
assert (pub);
|
||||
int rc = zmq_bind (pub, "inproc://soname");
|
||||
assert (rc == 0);
|
||||
|
||||
// set pub socket options
|
||||
int manual = 1;
|
||||
rc = zmq_setsockopt(pub, ZMQ_XPUB_MANUAL, &manual, 4);
|
||||
int rc = zmq_setsockopt(pub, ZMQ_XPUB_MANUAL, &manual, 4);
|
||||
assert (rc == 0);
|
||||
rc = zmq_bind (pub, "inproc://soname");
|
||||
assert (rc == 0);
|
||||
|
||||
// Create a subscriber
|
||||
@ -468,6 +466,124 @@ int test_missing_subscriptions(void)
|
||||
}
|
||||
|
||||
|
||||
int test_unsubscribe_cleanup (void)
|
||||
{
|
||||
size_t len = MAX_SOCKET_STRING;
|
||||
char my_endpoint[MAX_SOCKET_STRING];
|
||||
|
||||
void *ctx = zmq_ctx_new ();
|
||||
assert (ctx);
|
||||
|
||||
// Create a publisher
|
||||
void *pub = zmq_socket (ctx, ZMQ_XPUB);
|
||||
assert (pub);
|
||||
int manual = 1;
|
||||
int rc = zmq_setsockopt (pub, ZMQ_XPUB_MANUAL, &manual, 4);
|
||||
assert (rc == 0);
|
||||
rc = zmq_bind (pub, "tcp://127.0.0.1:*");
|
||||
assert (rc == 0);
|
||||
rc = zmq_getsockopt (pub, ZMQ_LAST_ENDPOINT, my_endpoint, &len);
|
||||
assert (rc == 0);
|
||||
|
||||
// Create a subscriber
|
||||
void *sub = zmq_socket (ctx, ZMQ_XSUB);
|
||||
assert (sub);
|
||||
rc = zmq_connect (sub, my_endpoint);
|
||||
assert (rc == 0);
|
||||
|
||||
// Subscribe for A
|
||||
char subscription[2] = { 1, 'A'};
|
||||
rc = zmq_send_const (sub, subscription, 2, 0);
|
||||
assert (rc == 2);
|
||||
|
||||
char buffer[2];
|
||||
|
||||
// Receive subscriptions from subscriber
|
||||
rc = zmq_recv(pub, buffer, 2, 0);
|
||||
assert (rc == 2);
|
||||
assert (buffer[0] == 1);
|
||||
assert (buffer[1] == 'A');
|
||||
rc = zmq_setsockopt (pub, ZMQ_SUBSCRIBE, "XA", 2);
|
||||
assert (rc == 0);
|
||||
|
||||
// send 2 messages
|
||||
rc = zmq_send_const (pub, "XA", 2, 0);
|
||||
assert (rc == 2);
|
||||
rc = zmq_send_const (pub, "XB", 2, 0);
|
||||
assert (rc == 2);
|
||||
|
||||
// receive the single message
|
||||
rc = zmq_recv (sub, buffer, 2, 0);
|
||||
assert (rc == 2);
|
||||
assert (buffer[0] == 'X');
|
||||
assert (buffer[1] == 'A');
|
||||
|
||||
// should be nothing left in the queue
|
||||
rc = zmq_recv (sub, buffer, 2, ZMQ_DONTWAIT);
|
||||
assert (rc == -1);
|
||||
|
||||
// close the socket
|
||||
rc = zmq_close (sub);
|
||||
assert (rc == 0);
|
||||
|
||||
// closing the socket will result in an unsubscribe event
|
||||
rc = zmq_recv (pub, buffer, 2, 0);
|
||||
assert (rc == 2);
|
||||
assert (buffer[0] == 0);
|
||||
assert (buffer[1] == 'A');
|
||||
|
||||
// this doesn't really do anything
|
||||
// there is no last_pipe set it will just fail silently
|
||||
rc = zmq_setsockopt (pub, ZMQ_UNSUBSCRIBE, "XA", 2);
|
||||
assert (rc == 0);
|
||||
|
||||
// reconnect
|
||||
sub = zmq_socket (ctx, ZMQ_XSUB);
|
||||
rc = zmq_connect (sub, my_endpoint);
|
||||
assert (rc == 0);
|
||||
|
||||
// send a subscription for B
|
||||
subscription[0] = 1;
|
||||
subscription[1] = 'B';
|
||||
rc = zmq_send (sub, subscription, 2, 0);
|
||||
assert (rc == 2);
|
||||
|
||||
// receive the subscription, overwrite it to XB
|
||||
rc = zmq_recv (pub, buffer, 2, 0);
|
||||
assert (rc == 2);
|
||||
assert (buffer[0] == 1);
|
||||
assert(buffer[1] == 'B');
|
||||
rc = zmq_setsockopt (pub, ZMQ_SUBSCRIBE, "XB", 2);
|
||||
assert (rc == 0);
|
||||
|
||||
// send 2 messages
|
||||
rc = zmq_send_const (pub, "XA", 2, 0);
|
||||
assert (rc == 2);
|
||||
rc = zmq_send_const (pub, "XB", 2, 0);
|
||||
assert (rc == 2);
|
||||
|
||||
// receive the single message
|
||||
rc = zmq_recv (sub, buffer, 2, 0);
|
||||
assert (rc == 2);
|
||||
assert (buffer[0] == 'X');
|
||||
assert (buffer[1] == 'B'); // this assertion will fail
|
||||
|
||||
// should be nothing left in the queue
|
||||
rc = zmq_recv (sub, buffer, 2, ZMQ_DONTWAIT);
|
||||
assert (rc == -1);
|
||||
|
||||
// Clean up.
|
||||
rc = zmq_close (pub);
|
||||
assert (rc == 0);
|
||||
rc = zmq_close (sub);
|
||||
assert (rc == 0);
|
||||
rc = zmq_ctx_term (ctx);
|
||||
assert (rc == 0);
|
||||
|
||||
return 0 ;
|
||||
}
|
||||
|
||||
|
||||
int main(void)
|
||||
{
|
||||
setup_test_environment ();
|
||||
@ -475,6 +591,7 @@ int main(void)
|
||||
test_unsubscribe_manual ();
|
||||
test_xpub_proxy_unsubscribe_on_disconnect ();
|
||||
test_missing_subscriptions ();
|
||||
test_unsubscribe_cleanup ();
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user