From 8ba097f3da8d1a752755a52b1d3ec967c23f0643 Mon Sep 17 00:00:00 2001 From: Pieter Hintjens Date: Fri, 23 Nov 2012 17:25:46 +0900 Subject: [PATCH 1/9] Test cases were failing on bind --- tests/test_connect_delay.cpp | 2 +- tests/test_raw_sock.cpp | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/tests/test_connect_delay.cpp b/tests/test_connect_delay.cpp index f8778d43..ad419929 100644 --- a/tests/test_connect_delay.cpp +++ b/tests/test_connect_delay.cpp @@ -53,7 +53,7 @@ int main (void) val = 0; rc = zmq_setsockopt(to, ZMQ_LINGER, &val, sizeof(val)); assert (rc == 0); - rc = zmq_bind(to, "tcp://*:5555"); + rc = zmq_bind(to, "tcp://*:6555"); assert (rc == 0); // Create a socket pushing to two endpoints - only 1 message should arrive. diff --git a/tests/test_raw_sock.cpp b/tests/test_raw_sock.cpp index 954339bf..f9a05c11 100644 --- a/tests/test_raw_sock.cpp +++ b/tests/test_raw_sock.cpp @@ -42,7 +42,7 @@ int tcp_client () struct sockaddr_in serv_addr; struct hostent *server; - const int portno = 5555; + const int portno = 7555; int sockfd = socket (AF_INET, SOCK_STREAM, 0); assert (sockfd >= 0); @@ -77,7 +77,7 @@ int tcp_server () memset (&serv_addr, 0, sizeof serv_addr); serv_addr.sin_family = AF_INET; serv_addr.sin_addr.s_addr = htonl (INADDR_ANY); - serv_addr.sin_port = htons (5555); + serv_addr.sin_port = htons (7555); rc = bind (listenfd, (struct sockaddr *) &serv_addr, sizeof serv_addr); assert (rc == 0); @@ -165,7 +165,7 @@ void test_zmq_connect () rc = zmq_setsockopt (zs, ZMQ_ROUTER_RAW, &raw_sock, sizeof raw_sock); assert (rc == 0); - rc = zmq_connect (zs, "tcp://127.0.0.1:5555"); + rc = zmq_connect (zs, "tcp://127.0.0.1:7555"); assert (rc == 0); int i; @@ -213,7 +213,7 @@ int main () int raw_sock = 1; int rc = zmq_setsockopt (sb, ZMQ_ROUTER_RAW, &raw_sock, sizeof raw_sock); assert (rc == 0); - rc = zmq_bind (sb, "tcp://127.0.0.1:5555"); + rc = zmq_bind (sb, "tcp://127.0.0.1:7555"); assert (rc == 0); int sock_fd = tcp_client (); From af934f85caf5ec1e56514270c81ec57a92e106d9 Mon Sep 17 00:00:00 2001 From: Pieter Hintjens Date: Sun, 23 Dec 2012 17:47:32 +0100 Subject: [PATCH 2/9] Removed use of deprecated API methods --- doc/zmq_msg_get.txt | 2 +- doc/zmq_msg_more.txt | 2 +- doc/zmq_recv.txt | 2 -- doc/zmq_recvmsg.txt | 1 - doc/zmq_send.txt | 2 -- doc/zmq_sendmsg.txt | 2 -- doc/zmq_socket_monitor.txt | 6 +++--- 7 files changed, 5 insertions(+), 12 deletions(-) diff --git a/doc/zmq_msg_get.txt b/doc/zmq_msg_get.txt index d5fd9186..cfbe950f 100644 --- a/doc/zmq_msg_get.txt +++ b/doc/zmq_msg_get.txt @@ -45,7 +45,7 @@ while (true) { int rc = zmq_msg_init (&frame); assert (rc == 0); // Block until a message is available to be received from socket - rc = zmq_recvmsg (socket, &frame, 0); + rc = zmq_msg_recv (socket, &frame, 0); assert (rc != -1); if (zmq_msg_get (&frame, ZMQ_MORE)) fprintf (stderr, "more\n"); diff --git a/doc/zmq_msg_more.txt b/doc/zmq_msg_more.txt index 7836872a..53f6618e 100644 --- a/doc/zmq_msg_more.txt +++ b/doc/zmq_msg_more.txt @@ -35,7 +35,7 @@ while (true) { int rc = zmq_msg_init (&part); assert (rc == 0); // Block until a message is available to be received from socket - rc = zmq_recvmsg (socket, &part, 0); + rc = zmq_msg_recv (socket, &part, 0); assert (rc != -1); if (zmq_msg_more (&part)) fprintf (stderr, "more\n"); diff --git a/doc/zmq_recv.txt b/doc/zmq_recv.txt index 923baddf..536ba104 100644 --- a/doc/zmq_recv.txt +++ b/doc/zmq_recv.txt @@ -78,9 +78,7 @@ assert (nbytes != -1); SEE ALSO -------- -linkzmq:zmq_recvmsg[3] linkzmq:zmq_send[3] -linkzmq:zmq_sendmsg[3] linkzmq:zmq_getsockopt[3] linkzmq:zmq_socket[7] linkzmq:zmq[7] diff --git a/doc/zmq_recvmsg.txt b/doc/zmq_recvmsg.txt index 9aededda..1260946a 100644 --- a/doc/zmq_recvmsg.txt +++ b/doc/zmq_recvmsg.txt @@ -110,7 +110,6 @@ SEE ALSO -------- linkzmq:zmq_recv[3] linkzmq:zmq_send[3] -linkzmq:zmq_sendmsg[3] linkzmq:zmq_getsockopt[3] linkzmq:zmq_socket[7] linkzmq:zmq[7] diff --git a/doc/zmq_send.txt b/doc/zmq_send.txt index 9409f1f8..44bf4d3f 100644 --- a/doc/zmq_send.txt +++ b/doc/zmq_send.txt @@ -87,9 +87,7 @@ assert (rc == 2); SEE ALSO -------- -linkzmq:zmq_sendmsg[3] linkzmq:zmq_recv[3] -linkzmq:zmq_recvmsg[3] linkzmq:zmq_socket[7] linkzmq:zmq[7] diff --git a/doc/zmq_sendmsg.txt b/doc/zmq_sendmsg.txt index aacd7aec..e9b6534e 100644 --- a/doc/zmq_sendmsg.txt +++ b/doc/zmq_sendmsg.txt @@ -106,8 +106,6 @@ rc = zmq_sendmsg (socket, &part3, 0); SEE ALSO -------- linkzmq:zmq_recv[3] -linkzmq:zmq_recv[3] -linkzmq:zmq_recvmsg[3] linkzmq:zmq_socket[7] linkzmq:zmq[7] diff --git a/doc/zmq_socket_monitor.txt b/doc/zmq_socket_monitor.txt index f289c908..321ab6ee 100644 --- a/doc/zmq_socket_monitor.txt +++ b/doc/zmq_socket_monitor.txt @@ -37,7 +37,7 @@ static void *req_socket_monitor (void *ctx) while (true) { zmq_msg_t msg; zmq_msg_init (&msg); - rc = zmq_recvmsg (s, &msg, 0); + rc = zmq_msg_recv (s, &msg, 0); if (rc == -1 && zmq_errno() == ETERM) break; assert (rc != -1); memcpy (&event, zmq_msg_data (&msg), sizeof (event)); @@ -217,7 +217,7 @@ static void *rep_socket_monitor (void *ctx) while (true) { zmq_msg_t msg; zmq_msg_init (&msg); - rc = zmq_recvmsg (s, &msg, 0); + rc = zmq_msg_recv (s, &msg, 0); if (rc == -1 && zmq_errno() == ETERM) break; assert (rc != -1); memcpy (&event, zmq_msg_data (&msg), sizeof (event)); @@ -285,4 +285,4 @@ linkzmq:zmq[7] AUTHORS ------- -This 0MQ manual page was written by Lourens Naudé \ No newline at end of file +This 0MQ manual page was written by Lourens Naudé From ef186fe15bfc7344fca015bc212eb2e98d8d3ed7 Mon Sep 17 00:00:00 2001 From: Pieter Hintjens Date: Sun, 23 Dec 2012 17:47:44 +0100 Subject: [PATCH 3/9] Spelling fixes --- src/stream_engine.cpp | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/src/stream_engine.cpp b/src/stream_engine.cpp index 883b0e8d..80b2afd3 100644 --- a/src/stream_engine.cpp +++ b/src/stream_engine.cpp @@ -194,7 +194,7 @@ void zmq::stream_engine_t::terminate () void zmq::stream_engine_t::in_event () { - // If still handshaking, receive and prcess the greeting message. + // If still handshaking, receive and process the greeting message. if (unlikely (handshaking)) if (!handshake ()) return; @@ -390,9 +390,8 @@ bool zmq::stream_engine_t::handshake () // Position of the version field in the greeting. const size_t version_pos = 10; - // Is the peer using the unversioned protocol? - // If so, we send and receive rests of identity - // messages. + // Is the peer using ZMTP/1.0 with no version number? + // If so, we send and receive rests of identity messages if (greeting [0] != 0xff || !(greeting [9] & 0x01)) { encoder = new (std::nothrow) encoder_t (out_batch_size); alloc_assert (encoder); @@ -417,8 +416,8 @@ bool zmq::stream_engine_t::handshake () insize = greeting_bytes_read; // To allow for interoperability with peers that do not forward - // their subscriptions, we inject a phony subsription - // message into the incomming message stream. To put this + // their subscriptions, we inject a phony subscription + // message into the incoming message stream. To put this // message right after the identity message, we temporarily // divert the message stream from session to ourselves. if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB) From 12a648db0773960e8b374a3b0ed57de36c62174a Mon Sep 17 00:00:00 2001 From: Pieter Hintjens Date: Sun, 23 Dec 2012 17:57:35 +0100 Subject: [PATCH 4/9] Specified that it's safe to call zmq_msg_more after zmq_msg_close --- doc/zmq_msg_more.txt | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/doc/zmq_msg_more.txt b/doc/zmq_msg_more.txt index 53f6618e..da77f1a5 100644 --- a/doc/zmq_msg_more.txt +++ b/doc/zmq_msg_more.txt @@ -15,7 +15,8 @@ SYNOPSIS DESCRIPTION ----------- The _zmq_msg_more()_ function indicates whether this is part of a multi-part -message, and there are further parts to receive. +message, and there are further parts to receive. This method can safely be +called after _zmq_msg_close()_. RETURN VALUE From b0f0d3fcb49de6f1ebb9227ae664b9f426ece290 Mon Sep 17 00:00:00 2001 From: Pieter Hintjens Date: Sun, 23 Dec 2012 17:59:34 +0100 Subject: [PATCH 5/9] Clarification on zmq_msg_more --- doc/zmq_msg_more.txt | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/doc/zmq_msg_more.txt b/doc/zmq_msg_more.txt index da77f1a5..1cad4e83 100644 --- a/doc/zmq_msg_more.txt +++ b/doc/zmq_msg_more.txt @@ -16,7 +16,8 @@ DESCRIPTION ----------- The _zmq_msg_more()_ function indicates whether this is part of a multi-part message, and there are further parts to receive. This method can safely be -called after _zmq_msg_close()_. +called after _zmq_msg_close()_. This method is identical to _zmq_msg_get()_ +with an argument of ZMQ_MORE. RETURN VALUE From da0efaa81771aba497a961d70f2166f500a52afd Mon Sep 17 00:00:00 2001 From: Pieter Hintjens Date: Tue, 8 Jan 2013 09:09:27 +0100 Subject: [PATCH 6/9] Old change to move ports off 5555 (was conflicting with other stuff) --- tests/test_connect_delay.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_connect_delay.cpp b/tests/test_connect_delay.cpp index ad419929..f8778d43 100644 --- a/tests/test_connect_delay.cpp +++ b/tests/test_connect_delay.cpp @@ -53,7 +53,7 @@ int main (void) val = 0; rc = zmq_setsockopt(to, ZMQ_LINGER, &val, sizeof(val)); assert (rc == 0); - rc = zmq_bind(to, "tcp://*:6555"); + rc = zmq_bind(to, "tcp://*:5555"); assert (rc == 0); // Create a socket pushing to two endpoints - only 1 message should arrive. From a4bedc52552b6e4f8a903c3781b7e9897b310741 Mon Sep 17 00:00:00 2001 From: Pieter Hintjens Date: Tue, 8 Jan 2013 09:16:50 +0100 Subject: [PATCH 7/9] Whitespace and comment fixes --- src/xpub.cpp | 17 ++++++++--------- src/xsub.cpp | 20 +++++++++++--------- 2 files changed, 19 insertions(+), 18 deletions(-) diff --git a/src/xpub.cpp b/src/xpub.cpp index 0f2a3266..5d1c3104 100644 --- a/src/xpub.cpp +++ b/src/xpub.cpp @@ -74,9 +74,9 @@ void zmq::xpub_t::xread_activated (pipe_t *pipe_) if (options.type == ZMQ_XPUB && (unique || (*data && verbose))) pending.push_back (blob_t (data, size)); } - else /*process message unrelated to sub/unsub*/ { + else + // Process user message coming upstream from xsub socket pending.push_back (blob_t (data, size)); - } sub.close (); } @@ -177,13 +177,12 @@ void zmq::xpub_t::send_unsubscription (unsigned char *data_, size_t size_, xpub_t *self = (xpub_t*) arg_; if (self->options.type != ZMQ_PUB) { - - // Place the unsubscription to the queue of pending (un)sunscriptions - // to be retrived by the user later on. - blob_t unsub (size_ + 1, 0); - unsub [0] = 0; - memcpy (&unsub [1], data_, size_); - self->pending.push_back (unsub); + // Place the unsubscription to the queue of pending (un)sunscriptions + // to be retrived by the user later on. + blob_t unsub (size_ + 1, 0); + unsub [0] = 0; + memcpy (&unsub [1], data_, size_); + self->pending.push_back (unsub); } } diff --git a/src/xsub.cpp b/src/xsub.cpp index 8f74b960..8e316b6a 100644 --- a/src/xsub.cpp +++ b/src/xsub.cpp @@ -48,7 +48,7 @@ zmq::xsub_t::~xsub_t () void zmq::xsub_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_) { // icanhasall_ is unused - (void)icanhasall_; + (void) icanhasall_; zmq_assert (pipe_); fq.attach (pipe_); @@ -87,22 +87,24 @@ int zmq::xsub_t::xsend (msg_t *msg_) size_t size = msg_->size (); unsigned char *data = (unsigned char*) msg_->data (); - // Process the subscription. if (*data == 1) { - // this used to filter out duplicate subscriptions, - // however this is alread done on the XPUB side and - // doing it here as well breaks ZMQ_XPUB_VERBOSE - // when there are forwarding devices involved + // Process subscribe message + // This used to filter out duplicate subscriptions, + // however this is alread done on the XPUB side and + // doing it here as well breaks ZMQ_XPUB_VERBOSE + // when there are forwarding devices involved. subscriptions.add (data + 1, size - 1); return dist.send_to_all (msg_); } - else if (*data == 0) { + else + if (*data == 0) { + // Process unsubscribe message if (subscriptions.rm (data + 1, size - 1)) return dist.send_to_all (msg_); } - else /*upstream message unrelated to sub/unsub*/ { + else + // User message sent upstream to XPUB socket return dist.send_to_all (msg_); - } int rc = msg_->close (); errno_assert (rc == 0); From aff14067135f66aa9ce7816e08bac3a9b661b3e2 Mon Sep 17 00:00:00 2001 From: Pieter Hintjens Date: Tue, 8 Jan 2013 09:18:23 +0100 Subject: [PATCH 8/9] Revert "Test cases were failing on bind" This reverts commit 8ba097f3da8d1a752755a52b1d3ec967c23f0643. --- tests/test_raw_sock.cpp | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/test_raw_sock.cpp b/tests/test_raw_sock.cpp index f9a05c11..954339bf 100644 --- a/tests/test_raw_sock.cpp +++ b/tests/test_raw_sock.cpp @@ -42,7 +42,7 @@ int tcp_client () struct sockaddr_in serv_addr; struct hostent *server; - const int portno = 7555; + const int portno = 5555; int sockfd = socket (AF_INET, SOCK_STREAM, 0); assert (sockfd >= 0); @@ -77,7 +77,7 @@ int tcp_server () memset (&serv_addr, 0, sizeof serv_addr); serv_addr.sin_family = AF_INET; serv_addr.sin_addr.s_addr = htonl (INADDR_ANY); - serv_addr.sin_port = htons (7555); + serv_addr.sin_port = htons (5555); rc = bind (listenfd, (struct sockaddr *) &serv_addr, sizeof serv_addr); assert (rc == 0); @@ -165,7 +165,7 @@ void test_zmq_connect () rc = zmq_setsockopt (zs, ZMQ_ROUTER_RAW, &raw_sock, sizeof raw_sock); assert (rc == 0); - rc = zmq_connect (zs, "tcp://127.0.0.1:7555"); + rc = zmq_connect (zs, "tcp://127.0.0.1:5555"); assert (rc == 0); int i; @@ -213,7 +213,7 @@ int main () int raw_sock = 1; int rc = zmq_setsockopt (sb, ZMQ_ROUTER_RAW, &raw_sock, sizeof raw_sock); assert (rc == 0); - rc = zmq_bind (sb, "tcp://127.0.0.1:7555"); + rc = zmq_bind (sb, "tcp://127.0.0.1:5555"); assert (rc == 0); int sock_fd = tcp_client (); From d997d88096f8588c0d80712a7fd2c212d36241d6 Mon Sep 17 00:00:00 2001 From: Pieter Hintjens Date: Tue, 8 Jan 2013 09:18:38 +0100 Subject: [PATCH 9/9] Revert "Old change to move ports off 5555 (was conflicting with other stuff)" This reverts commit da0efaa81771aba497a961d70f2166f500a52afd. --- tests/test_connect_delay.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_connect_delay.cpp b/tests/test_connect_delay.cpp index f8778d43..ad419929 100644 --- a/tests/test_connect_delay.cpp +++ b/tests/test_connect_delay.cpp @@ -53,7 +53,7 @@ int main (void) val = 0; rc = zmq_setsockopt(to, ZMQ_LINGER, &val, sizeof(val)); assert (rc == 0); - rc = zmq_bind(to, "tcp://*:5555"); + rc = zmq_bind(to, "tcp://*:6555"); assert (rc == 0); // Create a socket pushing to two endpoints - only 1 message should arrive.