mirror of
https://github.com/zeromq/libzmq.git
synced 2025-01-15 02:07:59 +08:00
Merge pull request #1569 from djelenc/xpub_manual_lost_subscriptions
Fixed missing subscriptions on XPUB with manual subscriptions
This commit is contained in:
commit
39a0d33912
@ -41,6 +41,7 @@ zmq::xpub_t::xpub_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
|
||||
more (false),
|
||||
lossy (true),
|
||||
manual(false),
|
||||
pending_pipes (),
|
||||
welcome_msg ()
|
||||
{
|
||||
last_pipe = NULL;
|
||||
@ -90,7 +91,7 @@ void zmq::xpub_t::xread_activated (pipe_t *pipe_)
|
||||
if (size > 0 && (*data == 0 || *data == 1)) {
|
||||
if (manual)
|
||||
{
|
||||
last_pipe = pipe_;
|
||||
pending_pipes.push_back(pipe_);
|
||||
pending_data.push_back(blob_t(data, size));
|
||||
pending_metadata.push_back(sub.metadata());
|
||||
pending_flags.push_back(0);
|
||||
@ -243,6 +244,10 @@ int zmq::xpub_t::xrecv (msg_t *msg_)
|
||||
return -1;
|
||||
}
|
||||
|
||||
// User is reading a message, set last_pipe and remove it from the deque
|
||||
last_pipe = pending_pipes.front ();
|
||||
pending_pipes.pop_front ();
|
||||
|
||||
int rc = msg_->close ();
|
||||
errno_assert (rc == 0);
|
||||
rc = msg_->init_size (pending_data.front ().size ());
|
||||
@ -281,6 +286,7 @@ void zmq::xpub_t::send_unsubscription (unsigned char *data_, size_t size_,
|
||||
if (size_ > 0)
|
||||
memcpy (&unsub [1], data_, size_);
|
||||
self->last_pipe = NULL;
|
||||
self->pending_pipes.push_back (NULL);
|
||||
self->pending_data.push_back (unsub);
|
||||
self->pending_metadata.push_back (NULL);
|
||||
self->pending_flags.push_back (0);
|
||||
|
@ -99,9 +99,12 @@ namespace zmq
|
||||
// Subscriptions will not bed added automatically, only after calling set option with ZMQ_SUBSCRIBE or ZMQ_UNSUBSCRIBE
|
||||
bool manual;
|
||||
|
||||
// Last pipe send subscription message, only used if xpub is on manual
|
||||
// Last pipe that sent subscription message, only used if xpub is on manual
|
||||
pipe_t *last_pipe;
|
||||
|
||||
// Pipes that sent subscriptions messages that have not yet been processed, only used if xpub is on manual
|
||||
std::deque <pipe_t*> pending_pipes;
|
||||
|
||||
// Welcome message to send to pipe when attached
|
||||
msg_t welcome_msg;
|
||||
|
||||
|
@ -228,11 +228,128 @@ int test_xpub_proxy_unsubscribe_on_disconnect()
|
||||
return 0;
|
||||
}
|
||||
|
||||
int test_missing_subscriptions()
|
||||
{
|
||||
const char* frontend = "ipc://frontend";
|
||||
const char* backend = "ipc://backend";
|
||||
const char* topic1 = "1";
|
||||
const char* topic2 = "2";
|
||||
const char* payload = "X";
|
||||
|
||||
int manual = 1;
|
||||
|
||||
void *ctx = zmq_ctx_new ();
|
||||
assert (ctx);
|
||||
|
||||
// proxy frontend
|
||||
void *xsub_proxy = zmq_socket (ctx, ZMQ_XSUB);
|
||||
assert (xsub_proxy);
|
||||
assert (zmq_bind (xsub_proxy, frontend) == 0);
|
||||
|
||||
// proxy backend
|
||||
void *xpub_proxy = zmq_socket (ctx, ZMQ_XPUB);
|
||||
assert (xpub_proxy);
|
||||
assert (zmq_setsockopt (xpub_proxy, ZMQ_XPUB_MANUAL, &manual, 4) == 0);
|
||||
assert (zmq_bind (xpub_proxy, backend) == 0);
|
||||
|
||||
// publisher
|
||||
void *pub = zmq_socket (ctx, ZMQ_PUB);
|
||||
assert (zmq_connect (pub, frontend) == 0);
|
||||
|
||||
// Here's the problem: because subscribers subscribe in quick succession,
|
||||
// the proxy is unable to confirm the first subscription before receiving
|
||||
// the second. This causes the first subscription to get lost.
|
||||
|
||||
// first subscriber
|
||||
void *sub1 = zmq_socket (ctx, ZMQ_SUB);
|
||||
assert (sub1);
|
||||
assert (zmq_connect (sub1, backend) == 0);
|
||||
assert (zmq_setsockopt (sub1, ZMQ_SUBSCRIBE, topic1, 1) == 0);
|
||||
|
||||
// second subscriber
|
||||
void *sub2 = zmq_socket (ctx, ZMQ_SUB);
|
||||
assert (sub2);
|
||||
assert (zmq_connect (sub2, backend) == 0);
|
||||
assert (zmq_setsockopt (sub2, ZMQ_SUBSCRIBE, topic2, 1) == 0);
|
||||
|
||||
// wait
|
||||
assert (zmq_poll (0, 0, 100) == 0);
|
||||
|
||||
// proxy now reroutes and confirms subscriptions
|
||||
char buffer[2];
|
||||
assert (zmq_recv (xpub_proxy, buffer, 2, ZMQ_DONTWAIT) == 2);
|
||||
assert (buffer [0] == 1);
|
||||
assert (buffer [1] == *topic1);
|
||||
assert (zmq_setsockopt (xpub_proxy, ZMQ_SUBSCRIBE, topic1, 1) == 0);
|
||||
assert (zmq_send (xsub_proxy, buffer, 2, 0) == 2);
|
||||
|
||||
assert (zmq_recv (xpub_proxy, buffer, 2, ZMQ_DONTWAIT) == 2);
|
||||
assert (buffer [0] == 1);
|
||||
assert (buffer [1] == *topic2);
|
||||
assert (zmq_setsockopt (xpub_proxy, ZMQ_SUBSCRIBE, topic2, 1) == 0);
|
||||
assert (zmq_send (xsub_proxy, buffer, 2, 0) == 2);
|
||||
|
||||
// wait
|
||||
assert (zmq_poll (0, 0, 100) == 0);
|
||||
|
||||
// let publisher send 2 msgs, each with its own topic
|
||||
assert (zmq_send (pub, topic1, 1, ZMQ_SNDMORE) == 1);
|
||||
assert (zmq_send (pub, payload, 1, 0) == 1);
|
||||
assert (zmq_send (pub, topic2, 1, ZMQ_SNDMORE) == 1);
|
||||
assert (zmq_send (pub, payload, 1, 0) == 1);
|
||||
|
||||
// wait
|
||||
assert (zmq_poll (0, 0, 100) == 0);
|
||||
|
||||
// proxy reroutes data messages to subscribers
|
||||
char topic_buff [1];
|
||||
char data_buff [1];
|
||||
assert (zmq_recv (xsub_proxy, topic_buff, 1, ZMQ_DONTWAIT) == 1);
|
||||
assert (topic_buff [0] == *topic1);
|
||||
assert (zmq_recv (xsub_proxy, data_buff, 1, ZMQ_DONTWAIT) == 1);
|
||||
assert (data_buff [0] == *payload);
|
||||
assert (zmq_send (xpub_proxy, topic_buff, 1, ZMQ_SNDMORE) == 1);
|
||||
assert (zmq_send (xpub_proxy, data_buff, 1, 0) == 1);
|
||||
|
||||
assert (zmq_recv (xsub_proxy, topic_buff, 1, ZMQ_DONTWAIT) == 1);
|
||||
assert (topic_buff [0] == *topic2);
|
||||
assert (zmq_recv (xsub_proxy, data_buff, 1, ZMQ_DONTWAIT) == 1);
|
||||
assert (data_buff [0] == *payload);
|
||||
assert (zmq_send (xpub_proxy, topic_buff, 1, ZMQ_SNDMORE) == 1);
|
||||
assert (zmq_send (xpub_proxy, data_buff, 1, 0) == 1);
|
||||
|
||||
// wait
|
||||
assert (zmq_poll (0, 0, 100) == 0);
|
||||
|
||||
// each subscriber should now get a message
|
||||
assert (zmq_recv (sub2, topic_buff, 1, ZMQ_DONTWAIT) == 1);
|
||||
assert (topic_buff [0] == *topic2);
|
||||
assert (zmq_recv (sub2, data_buff, 1, ZMQ_DONTWAIT) == 1);
|
||||
assert (data_buff [0] == *payload);
|
||||
|
||||
assert (zmq_recv (sub1, topic_buff, 1, ZMQ_DONTWAIT) == 1);
|
||||
assert (topic_buff [0] == *topic1);
|
||||
assert (zmq_recv (sub1, data_buff, 1, ZMQ_DONTWAIT) == 1);
|
||||
assert (data_buff [0] == *payload);
|
||||
|
||||
// Clean up
|
||||
assert (zmq_close (sub1) == 0);
|
||||
assert (zmq_close (sub2) == 0);
|
||||
assert (zmq_close (pub) == 0);
|
||||
assert (zmq_close (xpub_proxy) == 0);
|
||||
assert (zmq_close (xsub_proxy) == 0);
|
||||
assert (zmq_ctx_term (ctx) == 0);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
int main(void)
|
||||
{
|
||||
setup_test_environment ();
|
||||
test_basic ();
|
||||
test_xpub_proxy_unsubscribe_on_disconnect ();
|
||||
test_missing_subscriptions ();
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user