diff --git a/doc/zmq_msg_get.txt b/doc/zmq_msg_get.txt index 8dc89946..11c934ec 100644 --- a/doc/zmq_msg_get.txt +++ b/doc/zmq_msg_get.txt @@ -24,14 +24,14 @@ The following properties can be retrieved with the _zmq_msg_get()_ function: Indicates that there are more message frames to follow after the 'message'. *ZMQ_SRCFD*:: -Returns the file descriptor of the socket the 'message' was read from. This -allows application to retrieve the remote endpoint via 'getpeername(2)'. Be -aware that the respective socket might be closed already, reused even. +Returns the file descriptor of the socket the 'message' was read from. This +allows application to retrieve the remote endpoint via 'getpeername(2)'. Be +aware that the respective socket might be closed already, reused even. Currently only implemented for TCP sockets. *ZMQ_SHARED*:: -Indicates that a message has been copied and MAY share underlying storage -with another copy of this message. +Indicates that a message MAY share underlying storage with another copy of +this message. RETURN VALUE ------------ diff --git a/src/zmq.cpp b/src/zmq.cpp index 93ebc491..5c85fda6 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -214,7 +214,7 @@ void *zmq_init (int io_threads_) return ctx; } errno = EINVAL; - return NULL; + return NULL; } int zmq_term (void *ctx_) @@ -366,7 +366,7 @@ int zmq_send (void *s_, const void *buf_, size_t len_, int flags_) errno = err; return -1; } - + // Note the optimisation here. We don't close the msg object as it is // empty anyway. This may change when implementation of zmq_msg_t changes. return rc; @@ -392,7 +392,7 @@ int zmq_send_const (void *s_, const void *buf_, size_t len_, int flags_) errno = err; return -1; } - + // Note the optimisation here. We don't close the msg object as it is // empty anyway. This may change when implementation of zmq_msg_t changes. return rc; @@ -415,7 +415,7 @@ int zmq_sendiov (void *s_, iovec *a_, size_t count_, int flags_) int rc = 0; zmq_msg_t msg; zmq::socket_base_t *s = (zmq::socket_base_t *) s_; - + for (size_t i = 0; i < count_; ++i) { rc = zmq_msg_init_size (&msg, a_[i].iov_len); if (rc != 0) { @@ -435,7 +435,7 @@ int zmq_sendiov (void *s_, iovec *a_, size_t count_, int flags_) break; } } - return rc; + return rc; } // Receiving functions. @@ -488,7 +488,7 @@ int zmq_recv (void *s_, void *buf_, size_t len_, int flags_) } // Receive a multi-part message -// +// // Receives up to *count_ parts of a multi-part message. // Sets *count_ to the actual number of parts read. // ZMQ_RCVMORE is set to indicate if a complete multi-part message was read. @@ -499,7 +499,7 @@ int zmq_recv (void *s_, void *buf_, size_t len_, int flags_) // *count_ to retrieve message parts successfully read, // even if -1 is returned. // -// The iov_base* buffers of each iovec *a_ filled in by this +// The iov_base* buffers of each iovec *a_ filled in by this // function may be freed using free(). // TODO: this function has no man page // @@ -514,11 +514,11 @@ int zmq_recviov (void *s_, iovec *a_, size_t *count_, int flags_) size_t count = *count_; int nread = 0; bool recvmore = true; - + *count_ = 0; for (size_t i = 0; recvmore && i < count; ++i) { - + zmq_msg_t msg; int rc = zmq_msg_init (&msg); errno_assert (rc == 0); @@ -630,7 +630,8 @@ int zmq_msg_get (zmq_msg_t *msg_, int property_) // warning: int64_t to int return ((zmq::msg_t*) msg_)->fd (); case ZMQ_SHARED: - return (((zmq::msg_t*) msg_)->flags () & zmq::msg_t::shared)? 1: 0; + return (((zmq::msg_t*) msg_)->is_cmsg ()) || + (((zmq::msg_t*) msg_)->flags () & zmq::msg_t::shared)? 1: 0; default: errno = EINVAL; return -1; diff --git a/tests/test_msg_flags.cpp b/tests/test_msg_flags.cpp index 936bbf8c..4fdd7a1e 100644 --- a/tests/test_msg_flags.cpp +++ b/tests/test_msg_flags.cpp @@ -25,19 +25,19 @@ int main (void) // Create the infrastructure void *ctx = zmq_ctx_new (); assert (ctx); - + void *sb = zmq_socket (ctx, ZMQ_ROUTER); assert (sb); - + int rc = zmq_bind (sb, "inproc://a"); assert (rc == 0); - + void *sc = zmq_socket (ctx, ZMQ_DEALER); assert (sc); - + rc = zmq_connect (sc, "inproc://a"); assert (rc == 0); - + // Send 2-part message. rc = zmq_send (sc, "A", 1, ZMQ_SNDMORE); assert (rc == 1); @@ -65,7 +65,7 @@ int main (void) more = zmq_msg_more (&msg); assert (more == 0); - // Test ZMQ_SHARED property + // Test ZMQ_SHARED property (case 1, refcounted messages) zmq_msg_t msg_a; rc = zmq_msg_init_size(&msg_a, 1024); // large enough to be a type_lmsg assert (rc == 0); @@ -85,13 +85,31 @@ int main (void) rc = zmq_msg_get(&msg_b, ZMQ_SHARED); assert (rc == 1); + // cleanup + rc = zmq_msg_close(&msg_a); + assert (rc == 0); + rc = zmq_msg_close(&msg_b); + assert (rc == 0); + + // Test ZMQ_SHARED property (case 2, constant data messages) + rc = zmq_msg_init_data(&msg_a, (void*) "TEST", 5, 0, 0); + assert (rc == 0); + + // Message reports as shared + rc = zmq_msg_get(&msg_a, ZMQ_SHARED); + assert (rc == 1); + + // cleanup + rc = zmq_msg_close(&msg_a); + assert (rc == 0); + // Deallocate the infrastructure. rc = zmq_close (sc); assert (rc == 0); - + rc = zmq_close (sb); assert (rc == 0); - + rc = zmq_ctx_term (ctx); assert (rc == 0); return 0 ;