diff --git a/doc/zmq_msg_get.txt b/doc/zmq_msg_get.txt index d5fd9186..cfbe950f 100644 --- a/doc/zmq_msg_get.txt +++ b/doc/zmq_msg_get.txt @@ -45,7 +45,7 @@ while (true) { int rc = zmq_msg_init (&frame); assert (rc == 0); // Block until a message is available to be received from socket - rc = zmq_recvmsg (socket, &frame, 0); + rc = zmq_msg_recv (socket, &frame, 0); assert (rc != -1); if (zmq_msg_get (&frame, ZMQ_MORE)) fprintf (stderr, "more\n"); diff --git a/doc/zmq_msg_more.txt b/doc/zmq_msg_more.txt index 7836872a..1cad4e83 100644 --- a/doc/zmq_msg_more.txt +++ b/doc/zmq_msg_more.txt @@ -15,7 +15,9 @@ SYNOPSIS DESCRIPTION ----------- The _zmq_msg_more()_ function indicates whether this is part of a multi-part -message, and there are further parts to receive. +message, and there are further parts to receive. This method can safely be +called after _zmq_msg_close()_. This method is identical to _zmq_msg_get()_ +with an argument of ZMQ_MORE. RETURN VALUE @@ -35,7 +37,7 @@ while (true) { int rc = zmq_msg_init (&part); assert (rc == 0); // Block until a message is available to be received from socket - rc = zmq_recvmsg (socket, &part, 0); + rc = zmq_msg_recv (socket, &part, 0); assert (rc != -1); if (zmq_msg_more (&part)) fprintf (stderr, "more\n"); diff --git a/doc/zmq_recv.txt b/doc/zmq_recv.txt index 923baddf..536ba104 100644 --- a/doc/zmq_recv.txt +++ b/doc/zmq_recv.txt @@ -78,9 +78,7 @@ assert (nbytes != -1); SEE ALSO -------- -linkzmq:zmq_recvmsg[3] linkzmq:zmq_send[3] -linkzmq:zmq_sendmsg[3] linkzmq:zmq_getsockopt[3] linkzmq:zmq_socket[7] linkzmq:zmq[7] diff --git a/doc/zmq_recvmsg.txt b/doc/zmq_recvmsg.txt index 9aededda..1260946a 100644 --- a/doc/zmq_recvmsg.txt +++ b/doc/zmq_recvmsg.txt @@ -110,7 +110,6 @@ SEE ALSO -------- linkzmq:zmq_recv[3] linkzmq:zmq_send[3] -linkzmq:zmq_sendmsg[3] linkzmq:zmq_getsockopt[3] linkzmq:zmq_socket[7] linkzmq:zmq[7] diff --git a/doc/zmq_send.txt b/doc/zmq_send.txt index 9409f1f8..44bf4d3f 100644 --- a/doc/zmq_send.txt +++ b/doc/zmq_send.txt @@ -87,9 +87,7 @@ assert (rc == 2); SEE ALSO -------- -linkzmq:zmq_sendmsg[3] linkzmq:zmq_recv[3] -linkzmq:zmq_recvmsg[3] linkzmq:zmq_socket[7] linkzmq:zmq[7] diff --git a/doc/zmq_sendmsg.txt b/doc/zmq_sendmsg.txt index aacd7aec..e9b6534e 100644 --- a/doc/zmq_sendmsg.txt +++ b/doc/zmq_sendmsg.txt @@ -106,8 +106,6 @@ rc = zmq_sendmsg (socket, &part3, 0); SEE ALSO -------- linkzmq:zmq_recv[3] -linkzmq:zmq_recv[3] -linkzmq:zmq_recvmsg[3] linkzmq:zmq_socket[7] linkzmq:zmq[7] diff --git a/doc/zmq_socket_monitor.txt b/doc/zmq_socket_monitor.txt index f289c908..321ab6ee 100644 --- a/doc/zmq_socket_monitor.txt +++ b/doc/zmq_socket_monitor.txt @@ -37,7 +37,7 @@ static void *req_socket_monitor (void *ctx) while (true) { zmq_msg_t msg; zmq_msg_init (&msg); - rc = zmq_recvmsg (s, &msg, 0); + rc = zmq_msg_recv (s, &msg, 0); if (rc == -1 && zmq_errno() == ETERM) break; assert (rc != -1); memcpy (&event, zmq_msg_data (&msg), sizeof (event)); @@ -217,7 +217,7 @@ static void *rep_socket_monitor (void *ctx) while (true) { zmq_msg_t msg; zmq_msg_init (&msg); - rc = zmq_recvmsg (s, &msg, 0); + rc = zmq_msg_recv (s, &msg, 0); if (rc == -1 && zmq_errno() == ETERM) break; assert (rc != -1); memcpy (&event, zmq_msg_data (&msg), sizeof (event)); @@ -285,4 +285,4 @@ linkzmq:zmq[7] AUTHORS ------- -This 0MQ manual page was written by Lourens Naudé \ No newline at end of file +This 0MQ manual page was written by Lourens Naudé diff --git a/src/stream_engine.cpp b/src/stream_engine.cpp index 883b0e8d..80b2afd3 100644 --- a/src/stream_engine.cpp +++ b/src/stream_engine.cpp @@ -194,7 +194,7 @@ void zmq::stream_engine_t::terminate () void zmq::stream_engine_t::in_event () { - // If still handshaking, receive and prcess the greeting message. + // If still handshaking, receive and process the greeting message. if (unlikely (handshaking)) if (!handshake ()) return; @@ -390,9 +390,8 @@ bool zmq::stream_engine_t::handshake () // Position of the version field in the greeting. const size_t version_pos = 10; - // Is the peer using the unversioned protocol? - // If so, we send and receive rests of identity - // messages. + // Is the peer using ZMTP/1.0 with no version number? + // If so, we send and receive rests of identity messages if (greeting [0] != 0xff || !(greeting [9] & 0x01)) { encoder = new (std::nothrow) encoder_t (out_batch_size); alloc_assert (encoder); @@ -417,8 +416,8 @@ bool zmq::stream_engine_t::handshake () insize = greeting_bytes_read; // To allow for interoperability with peers that do not forward - // their subscriptions, we inject a phony subsription - // message into the incomming message stream. To put this + // their subscriptions, we inject a phony subscription + // message into the incoming message stream. To put this // message right after the identity message, we temporarily // divert the message stream from session to ourselves. if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB) diff --git a/src/xpub.cpp b/src/xpub.cpp index 0f2a3266..5d1c3104 100644 --- a/src/xpub.cpp +++ b/src/xpub.cpp @@ -74,9 +74,9 @@ void zmq::xpub_t::xread_activated (pipe_t *pipe_) if (options.type == ZMQ_XPUB && (unique || (*data && verbose))) pending.push_back (blob_t (data, size)); } - else /*process message unrelated to sub/unsub*/ { + else + // Process user message coming upstream from xsub socket pending.push_back (blob_t (data, size)); - } sub.close (); } @@ -177,13 +177,12 @@ void zmq::xpub_t::send_unsubscription (unsigned char *data_, size_t size_, xpub_t *self = (xpub_t*) arg_; if (self->options.type != ZMQ_PUB) { - - // Place the unsubscription to the queue of pending (un)sunscriptions - // to be retrived by the user later on. - blob_t unsub (size_ + 1, 0); - unsub [0] = 0; - memcpy (&unsub [1], data_, size_); - self->pending.push_back (unsub); + // Place the unsubscription to the queue of pending (un)sunscriptions + // to be retrived by the user later on. + blob_t unsub (size_ + 1, 0); + unsub [0] = 0; + memcpy (&unsub [1], data_, size_); + self->pending.push_back (unsub); } } diff --git a/src/xsub.cpp b/src/xsub.cpp index 8f74b960..8e316b6a 100644 --- a/src/xsub.cpp +++ b/src/xsub.cpp @@ -48,7 +48,7 @@ zmq::xsub_t::~xsub_t () void zmq::xsub_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_) { // icanhasall_ is unused - (void)icanhasall_; + (void) icanhasall_; zmq_assert (pipe_); fq.attach (pipe_); @@ -87,22 +87,24 @@ int zmq::xsub_t::xsend (msg_t *msg_) size_t size = msg_->size (); unsigned char *data = (unsigned char*) msg_->data (); - // Process the subscription. if (*data == 1) { - // this used to filter out duplicate subscriptions, - // however this is alread done on the XPUB side and - // doing it here as well breaks ZMQ_XPUB_VERBOSE - // when there are forwarding devices involved + // Process subscribe message + // This used to filter out duplicate subscriptions, + // however this is alread done on the XPUB side and + // doing it here as well breaks ZMQ_XPUB_VERBOSE + // when there are forwarding devices involved. subscriptions.add (data + 1, size - 1); return dist.send_to_all (msg_); } - else if (*data == 0) { + else + if (*data == 0) { + // Process unsubscribe message if (subscriptions.rm (data + 1, size - 1)) return dist.send_to_all (msg_); } - else /*upstream message unrelated to sub/unsub*/ { + else + // User message sent upstream to XPUB socket return dist.send_to_all (msg_); - } int rc = msg_->close (); errno_assert (rc == 0); diff --git a/tests/test_connect_delay.cpp b/tests/test_connect_delay.cpp index f8778d43..ad419929 100644 --- a/tests/test_connect_delay.cpp +++ b/tests/test_connect_delay.cpp @@ -53,7 +53,7 @@ int main (void) val = 0; rc = zmq_setsockopt(to, ZMQ_LINGER, &val, sizeof(val)); assert (rc == 0); - rc = zmq_bind(to, "tcp://*:5555"); + rc = zmq_bind(to, "tcp://*:6555"); assert (rc == 0); // Create a socket pushing to two endpoints - only 1 message should arrive.