diff --git a/tests/test_xpub_manual.cpp b/tests/test_xpub_manual.cpp index 6f581ab3..1aa53288 100644 --- a/tests/test_xpub_manual.cpp +++ b/tests/test_xpub_manual.cpp @@ -90,6 +90,112 @@ int test_basic() return 0 ; } + +int test_unsubscribe_manual() +{ + void *ctx = zmq_ctx_new (); + assert (ctx); + + // Create a publisher + void *pub = zmq_socket (ctx, ZMQ_XPUB); + assert (pub); + int rc = zmq_bind (pub, "inproc://soname"); + assert (rc == 0); + + // set pub socket options + int manual = 1; + rc = zmq_setsockopt(pub, ZMQ_XPUB_MANUAL, &manual, 4); + assert (rc == 0); + + // Create a subscriber + void *sub = zmq_socket (ctx, ZMQ_XSUB); + assert (sub); + rc = zmq_connect (sub, "inproc://soname"); + assert (rc == 0); + + // Subscribe for A + char subscription1[2] = { 1, 'A'}; + rc = zmq_send_const(sub, subscription1, 2, 0); + assert (rc == 2); + + // Subscribe for B + char subscription2[2] = { 1, 'B'}; + rc = zmq_send_const(sub, subscription2, 2, 0); + assert (rc == 2); + + char buffer[3]; + + // Receive subscription "A" from subscriber + rc = zmq_recv(pub, buffer, 2, 0); + assert(rc == 2); + assert(buffer[0] == 1); + assert(buffer[1] == 'A'); + + // Subscribe socket for XA instead + rc = zmq_setsockopt(pub, ZMQ_SUBSCRIBE, "XA", 2); + assert(rc == 0); + + // Receive subscription "B" from subscriber + rc = zmq_recv(pub, buffer, 2, 0); + assert(rc == 2); + assert(buffer[0] == 1); + assert(buffer[1] == 'B'); + + // Subscribe socket for XB instead + rc = zmq_setsockopt(pub, ZMQ_SUBSCRIBE, "XB", 2); + assert(rc == 0); + + // Unsubscribe from A + char unsubscription1[2] = { 0, 'A'}; + rc = zmq_send_const(sub, unsubscription1, 2, 0); + assert (rc == 2); + + // Receive unsubscription "A" from subscriber + rc = zmq_recv(pub, buffer, 2, 0); + assert(rc == 2); + assert(buffer[0] == 0); + assert(buffer[1] == 'A'); + + // Unsubscribe socket from XA instead + rc = zmq_setsockopt(pub, ZMQ_UNSUBSCRIBE, "XA", 2); + assert(rc == 0); + + // Sending messages XA, XB + rc = zmq_send_const(pub, "XA", 2, 0); + assert(rc == 2); + rc = zmq_send_const(pub, "XB", 2, 0); + assert(rc == 2); + + // Subscriber should receive XB only + rc = zmq_recv(sub, buffer, 2, ZMQ_DONTWAIT); + assert(rc == 2); + assert(buffer[0] == 'X'); + assert(buffer[1] == 'B'); + + // Close subscriber + rc = zmq_close (sub); + assert (rc == 0); + + // Receive unsubscription "B" + rc = zmq_recv(pub, buffer, 2, 0); + assert(rc == 2); + assert(buffer[0] == 0); + assert(buffer[1] == 'B'); + + // Unsubscribe socket from XB instead + rc = zmq_setsockopt(pub, ZMQ_UNSUBSCRIBE, "XB", 2); + assert(rc == 0); + + // Clean up. + rc = zmq_close (pub); + assert (rc == 0); + rc = zmq_ctx_term (ctx); + assert (rc == 0); + + return 0 ; +} + + int test_xpub_proxy_unsubscribe_on_disconnect(const char *frontend, const char *backend) { @@ -349,6 +455,7 @@ int main(void) { setup_test_environment (); test_basic (); + test_unsubscribe_manual (); const char *frontend; const char *backend;