From 15ad6f805143d1faf1229da7913e30f0ab54f458 Mon Sep 17 00:00:00 2001 From: somdoron Date: Thu, 28 Jan 2016 11:25:26 +0200 Subject: [PATCH 1/6] save some bytes in msg class --- src/msg.cpp | 20 +++++++++-------- src/msg.hpp | 63 ++++++++++++++++++++++++++++++++++------------------- 2 files changed, 52 insertions(+), 31 deletions(-) diff --git a/src/msg.cpp b/src/msg.cpp index 8d24b267..9ddabaf8 100644 --- a/src/msg.cpp +++ b/src/msg.cpp @@ -85,25 +85,26 @@ int zmq::msg_t::init () u.vsm.flags = 0; u.vsm.size = 0; u.vsm.routing_id = 0; - file_desc = -1; + u.vsm.fd = retired_fd; return 0; } int zmq::msg_t::init_size (size_t size_) { - file_desc = -1; if (size_ <= max_vsm_size) { u.vsm.metadata = NULL; u.vsm.type = type_vsm; u.vsm.flags = 0; u.vsm.size = (unsigned char) size_; u.vsm.routing_id = 0; + u.vsm.fd = retired_fd; } else { u.lmsg.metadata = NULL; u.lmsg.type = type_lmsg; u.lmsg.flags = 0; u.lmsg.routing_id = 0; + u.lmsg.fd = retired_fd; u.lmsg.content = NULL; if (sizeof (content_t) + size_ > size_) u.lmsg.content = (content_t*) malloc (sizeof (content_t) + size_); @@ -127,12 +128,12 @@ int zmq::msg_t::init_external_storage(void *data_, size_t size_, zmq::atomic_cou zmq_assert(NULL != data_); zmq_assert(NULL != ctr); - file_desc = -1; u.zclmsg.metadata = NULL; u.zclmsg.type = type_zclmsg; u.zclmsg.flags = 0; u.zclmsg.routing_id = 0; + u.zclmsg.fd = retired_fd; u.zclmsg.data = data_; u.zclmsg.size = size_; @@ -151,8 +152,6 @@ int zmq::msg_t::init_data (void *data_, size_t size_, // would occur once the data is accessed zmq_assert (data_ != NULL || size_ == 0); - file_desc = -1; - // Initialize constant message if there's no need to deallocate if (ffn_ == NULL) { u.cmsg.metadata = NULL; @@ -161,12 +160,14 @@ int zmq::msg_t::init_data (void *data_, size_t size_, u.cmsg.data = data_; u.cmsg.size = size_; u.cmsg.routing_id = 0; + u.cmsg.fd = retired_fd; } else { u.lmsg.metadata = NULL; u.lmsg.type = type_lmsg; u.lmsg.flags = 0; u.lmsg.routing_id = 0; + u.lmsg.fd = retired_fd; u.lmsg.content = (content_t*) malloc (sizeof (content_t)); if (!u.lmsg.content) { errno = ENOMEM; @@ -189,6 +190,7 @@ int zmq::msg_t::init_delimiter () u.delimiter.type = type_delimiter; u.delimiter.flags = 0; u.delimiter.routing_id = 0; + u.delimiter.fd = retired_fd; return 0; } @@ -369,14 +371,14 @@ void zmq::msg_t::reset_flags (unsigned char flags_) u.base.flags &= ~flags_; } -int64_t zmq::msg_t::fd () +zmq::fd_t zmq::msg_t::fd () { - return file_desc; + return u.base.fd; } -void zmq::msg_t::set_fd (int64_t fd_) +void zmq::msg_t::set_fd (fd_t fd_) { - file_desc = fd_; + u.base.fd = fd_; } zmq::metadata_t *zmq::msg_t::metadata () const diff --git a/src/msg.hpp b/src/msg.hpp index b93ea7ac..e420b337 100644 --- a/src/msg.hpp +++ b/src/msg.hpp @@ -34,6 +34,7 @@ #include #include "config.hpp" +#include "fd.hpp" #include "atomic_counter.hpp" #include "metadata.hpp" @@ -86,8 +87,8 @@ namespace zmq unsigned char flags (); void set_flags (unsigned char flags_); void reset_flags (unsigned char flags_); - int64_t fd (); - void set_fd (int64_t fd_); + fd_t fd (); + void set_fd (fd_t fd_); metadata_t *metadata () const; void set_metadata (metadata_t *metadata_); void reset_metadata (); @@ -112,7 +113,10 @@ namespace zmq // Size in bytes of the largest message that is still copied around // rather than being reference-counted. enum { msg_t_size = 64 }; - enum { max_vsm_size = msg_t_size - (8 + sizeof (metadata_t *) + 3 + sizeof(uint32_t)) }; + enum { max_vsm_size = msg_t_size - (sizeof (metadata_t *) + + 3 + + sizeof (uint32_t) + + sizeof (fd_t)) }; private: zmq::atomic_counter_t* refcnt(); @@ -152,9 +156,6 @@ namespace zmq type_max = 105 }; - // the file descriptor where this message originated, needs to be 64bit due to alignment - int64_t file_desc; - // Note that fields shared between different message types are not // moved to the parent class (msg_t). This way we get tighter packing // of the data. Shared fields can be accessed via 'base' member of @@ -162,10 +163,14 @@ namespace zmq union { struct { metadata_t *metadata; - unsigned char unused [msg_t_size - (8 + sizeof (metadata_t *) + 2 + sizeof(uint32_t))]; + unsigned char unused [msg_t_size - (sizeof (metadata_t *) + + 2 + + sizeof (uint32_t) + + sizeof (fd_t))]; unsigned char type; unsigned char flags; uint32_t routing_id; + fd_t fd; } base; struct { metadata_t *metadata; @@ -174,17 +179,20 @@ namespace zmq unsigned char type; unsigned char flags; uint32_t routing_id; + fd_t fd; } vsm; struct { metadata_t *metadata; content_t *content; - unsigned char unused [msg_t_size - (8 + sizeof (metadata_t *) - + sizeof (content_t*) - + 2 - + sizeof(uint32_t))]; + unsigned char unused [msg_t_size - (sizeof (metadata_t *) + + sizeof (content_t*) + + 2 + + sizeof (uint32_t) + + sizeof (fd_t))]; unsigned char type; unsigned char flags; uint32_t routing_id; + fd_t fd; } lmsg; struct { metadata_t *metadata; @@ -193,34 +201,45 @@ namespace zmq msg_free_fn *ffn; void *hint; zmq::atomic_counter_t* refcnt; - unsigned char unused [msg_t_size - (8 + sizeof (metadata_t *) - + sizeof (void*) - + sizeof (size_t) - + sizeof (msg_free_fn*) - + sizeof (void*) - + sizeof (zmq::atomic_counter_t*) - + 2 - + sizeof(uint32_t))]; + unsigned char unused [msg_t_size - (sizeof (metadata_t *) + + sizeof (void*) + + sizeof (size_t) + + sizeof (msg_free_fn*) + + sizeof (void*) + + sizeof (zmq::atomic_counter_t*) + + 2 + + sizeof (uint32_t) + + sizeof (fd_t))]; unsigned char type; unsigned char flags; uint32_t routing_id; + fd_t fd; } zclmsg; struct { metadata_t *metadata; void* data; size_t size; - unsigned char unused - [msg_t_size - (8 + sizeof (metadata_t *) + sizeof (void*) + sizeof (size_t) + 2 + sizeof(uint32_t))]; + unsigned char unused [msg_t_size - (sizeof (metadata_t *) + + sizeof (void*) + + sizeof (size_t) + + 2 + + sizeof (uint32_t) + + sizeof (fd_t))]; unsigned char type; unsigned char flags; uint32_t routing_id; + fd_t fd; } cmsg; struct { metadata_t *metadata; - unsigned char unused [msg_t_size - (8 + sizeof (metadata_t *) + 2 + sizeof(uint32_t))]; + unsigned char unused [msg_t_size - (sizeof (metadata_t *) + + 2 + + sizeof (uint32_t) + + sizeof (fd_t))]; unsigned char type; unsigned char flags; uint32_t routing_id; + fd_t fd; } delimiter; } u; }; From b2718149e07e4f31aeefca38fe577ab94b9bbecd Mon Sep 17 00:00:00 2001 From: somdoron Date: Fri, 29 Jan 2016 10:45:44 +0200 Subject: [PATCH 2/6] msg external storage is using content_t --- src/decoder_allocators.cpp | 10 ++++---- src/decoder_allocators.hpp | 11 +++++---- src/msg.cpp | 47 ++++++++++++++++++------------------- src/msg.hpp | 48 ++++++++++++++++---------------------- src/raw_decoder.cpp | 8 +++---- src/v2_decoder.cpp | 4 ++-- 6 files changed, 60 insertions(+), 68 deletions(-) diff --git a/src/decoder_allocators.cpp b/src/decoder_allocators.cpp index 2359f015..8014bc51 100644 --- a/src/decoder_allocators.cpp +++ b/src/decoder_allocators.cpp @@ -37,7 +37,7 @@ zmq::shared_message_memory_allocator::shared_message_memory_allocator (std::size buf(NULL), bufsize(0), max_size(bufsize_), - msg_refcnt(NULL), + msg_content(NULL), maxCounters (static_cast (std::ceil (static_cast (max_size) / static_cast (msg_t::max_vsm_size)))) { } @@ -46,7 +46,7 @@ zmq::shared_message_memory_allocator::shared_message_memory_allocator (std::size buf(NULL), bufsize(0), max_size(bufsize_), - msg_refcnt(NULL), + msg_content(NULL), maxCounters(maxMessages) { } @@ -77,7 +77,7 @@ unsigned char* zmq::shared_message_memory_allocator::allocate () // allocate memory for reference counters together with reception buffer std::size_t const allocationsize = max_size + sizeof (zmq::atomic_counter_t) + - maxCounters * sizeof (zmq::atomic_counter_t); + maxCounters * sizeof (zmq::msg_t::content_t); buf = static_cast (std::malloc (allocationsize)); alloc_assert (buf); @@ -90,7 +90,7 @@ unsigned char* zmq::shared_message_memory_allocator::allocate () } bufsize = max_size; - msg_refcnt = reinterpret_cast (buf + sizeof (atomic_counter_t) + max_size); + msg_content = reinterpret_cast (buf + sizeof (atomic_counter_t) + max_size); return buf + sizeof (zmq::atomic_counter_t); } @@ -108,7 +108,7 @@ unsigned char* zmq::shared_message_memory_allocator::release () unsigned char* b = buf; buf = NULL; bufsize = 0; - msg_refcnt = NULL; + msg_content = NULL; return b; } diff --git a/src/decoder_allocators.hpp b/src/decoder_allocators.hpp index 380452e8..30cfaf1a 100644 --- a/src/decoder_allocators.hpp +++ b/src/decoder_allocators.hpp @@ -34,6 +34,7 @@ #include #include "atomic_counter.hpp" +#include "msg.hpp" #include "err.hpp" namespace zmq @@ -132,21 +133,21 @@ namespace zmq bufsize = new_size; } - zmq::atomic_counter_t* provide_refcnt () + zmq::msg_t::content_t* provide_content () { - return msg_refcnt; + return msg_content; } - void advance_refcnt () + void advance_content () { - msg_refcnt++; + msg_content++; } private: unsigned char* buf; std::size_t bufsize; std::size_t max_size; - zmq::atomic_counter_t* msg_refcnt; + zmq::msg_t::content_t* msg_content; std::size_t maxCounters; }; } diff --git a/src/msg.cpp b/src/msg.cpp index 9ddabaf8..37379fcb 100644 --- a/src/msg.cpp +++ b/src/msg.cpp @@ -53,7 +53,7 @@ bool zmq::msg_t::check () int zmq::msg_t::init (void* data_, size_t size_, msg_free_fn* ffn_, void* hint, - zmq::atomic_counter_t* refcnt_) + content_t* content_) { if (size_ < max_vsm_size) { int const rc = init_size(size_); @@ -68,9 +68,9 @@ int zmq::msg_t::init (void* data_, size_t size_, return -1; } } - else if(refcnt_) + else if(content_) { - return init_external_storage(data_, size_, refcnt_, ffn_, hint); + return init_external_storage(content_, data_, size_, ffn_, hint); } else { @@ -122,12 +122,11 @@ int zmq::msg_t::init_size (size_t size_) return 0; } -int zmq::msg_t::init_external_storage(void *data_, size_t size_, zmq::atomic_counter_t* ctr, - msg_free_fn *ffn_, void *hint_) +int zmq::msg_t::init_external_storage(content_t* content_, void* data_, size_t size_, + msg_free_fn *ffn_, void* hint_) { zmq_assert(NULL != data_); - zmq_assert(NULL != ctr); - + zmq_assert(NULL != content_); u.zclmsg.metadata = NULL; u.zclmsg.type = type_zclmsg; @@ -135,12 +134,12 @@ int zmq::msg_t::init_external_storage(void *data_, size_t size_, zmq::atomic_cou u.zclmsg.routing_id = 0; u.zclmsg.fd = retired_fd; - u.zclmsg.data = data_; - u.zclmsg.size = size_; - u.zclmsg.ffn = ffn_; - u.zclmsg.hint = hint_; - u.zclmsg.refcnt = ctr; - new (u.zclmsg.refcnt) zmq::atomic_counter_t(); + u.zclmsg.content = content_; + u.zclmsg.content->data = data_; + u.zclmsg.content->size = size_; + u.zclmsg.content->ffn = ffn_; + u.zclmsg.content->hint = hint_; + new (&u.zclmsg.content->refcnt) zmq::atomic_counter_t(); return 0; } @@ -222,19 +221,19 @@ int zmq::msg_t::close () if (is_zcmsg()) { - zmq_assert( u.zclmsg.ffn ); + zmq_assert( u.zclmsg.content->ffn ); // If the content is not shared, or if it is shared and the reference // count has dropped to zero, deallocate it. if (!(u.zclmsg.flags & msg_t::shared) || - !u.zclmsg.refcnt->sub (1)) { + !u.zclmsg.content->refcnt.sub (1)) { // We used "placement new" operator to initialize the reference // counter so we call the destructor explicitly now. - u.zclmsg.refcnt->~atomic_counter_t (); + u.zclmsg.content->refcnt.~atomic_counter_t (); - u.zclmsg.ffn (u.zclmsg.data, - u.zclmsg.hint); + u.zclmsg.content->ffn (u.zclmsg.content->data, + u.zclmsg.content->hint); } } @@ -329,7 +328,7 @@ void *zmq::msg_t::data () case type_cmsg: return u.cmsg.data; case type_zclmsg: - return u.zclmsg.data; + return u.zclmsg.content->data; default: zmq_assert (false); return NULL; @@ -347,7 +346,7 @@ size_t zmq::msg_t::size () case type_lmsg: return u.lmsg.content->size; case type_zclmsg: - return u.zclmsg.size; + return u.zclmsg.content->size; case type_cmsg: return u.cmsg.size; default: @@ -487,10 +486,10 @@ bool zmq::msg_t::rm_refs (int refs_) return false; } - if (is_zcmsg() && !u.zclmsg.refcnt->sub(refs_)) { + if (is_zcmsg() && !u.zclmsg.content->refcnt.sub(refs_)) { // storage for rfcnt is provided externally - if (u.zclmsg.ffn) { - u.zclmsg.ffn(u.zclmsg.data, u.zclmsg.hint); + if (u.zclmsg.content->ffn) { + u.zclmsg.content->ffn(u.zclmsg.content->data, u.zclmsg.content->hint); } return false; @@ -527,7 +526,7 @@ zmq::atomic_counter_t *zmq::msg_t::refcnt() case type_lmsg: return &u.lmsg.content->refcnt; case type_zclmsg: - return u.zclmsg.refcnt; + return &u.zclmsg.content->refcnt; default: zmq_assert(false); return NULL; diff --git a/src/msg.hpp b/src/msg.hpp index e420b337..4a14ad82 100644 --- a/src/msg.hpp +++ b/src/msg.hpp @@ -56,6 +56,22 @@ namespace zmq { public: + // Shared message buffer. Message data are either allocated in one + // continuous block along with this structure - thus avoiding one + // malloc/free pair or they are stored in used-supplied memory. + // In the latter case, ffn member stores pointer to the function to be + // used to deallocate the data. If the buffer is actually shared (there + // are at least 2 references to it) refcount member contains number of + // references. + struct content_t + { + void *data; + size_t size; + msg_free_fn *ffn; + void *hint; + zmq::atomic_counter_t refcnt; + }; + // Message flags. enum { @@ -71,12 +87,12 @@ namespace zmq int init (void* data, size_t size_, msg_free_fn* ffn_, void* hint, - zmq::atomic_counter_t* refcnt_ = NULL); + content_t* content_ = NULL); int init_size (size_t size_); int init_data (void *data_, size_t size_, msg_free_fn *ffn_, void *hint_); - int init_external_storage(void *data_, size_t size_, zmq::atomic_counter_t* ctr, + int init_external_storage(content_t* content_, void *data_, size_t size_, msg_free_fn *ffn_, void *hint_); int init_delimiter (); int close (); @@ -121,22 +137,6 @@ namespace zmq private: zmq::atomic_counter_t* refcnt(); - // Shared message buffer. Message data are either allocated in one - // continuous block along with this structure - thus avoiding one - // malloc/free pair or they are stored in used-supplied memory. - // In the latter case, ffn member stores pointer to the function to be - // used to deallocate the data. If the buffer is actually shared (there - // are at least 2 references to it) refcount member contains number of - // references. - struct content_t - { - void *data; - size_t size; - msg_free_fn *ffn; - void *hint; - zmq::atomic_counter_t refcnt; - }; - // Different message types. enum type_t { @@ -196,17 +196,9 @@ namespace zmq } lmsg; struct { metadata_t *metadata; - void *data; - size_t size; - msg_free_fn *ffn; - void *hint; - zmq::atomic_counter_t* refcnt; + content_t *content; unsigned char unused [msg_t_size - (sizeof (metadata_t *) + - sizeof (void*) + - sizeof (size_t) + - sizeof (msg_free_fn*) + - sizeof (void*) + - sizeof (zmq::atomic_counter_t*) + + sizeof (content_t*) + 2 + sizeof (uint32_t) + sizeof (fd_t))]; diff --git a/src/raw_decoder.cpp b/src/raw_decoder.cpp index 53b9a330..633530ef 100644 --- a/src/raw_decoder.cpp +++ b/src/raw_decoder.cpp @@ -62,13 +62,13 @@ int zmq::raw_decoder_t::decode (const uint8_t *data_, size_t size_, { int rc = in_progress.init ((unsigned char*)data_, size_, shared_message_memory_allocator::call_dec_ref, - allocator.buffer(), - allocator.provide_refcnt() ); + allocator.buffer (), + allocator.provide_content ()); // if the buffer serves as memory for a zero-copy message, release it // and allocate a new buffer in get_buffer for the next decode - if (in_progress.is_zcmsg()) { - allocator.advance_refcnt(); + if (in_progress.is_zcmsg ()) { + allocator.advance_content(); allocator.release(); } diff --git a/src/v2_decoder.cpp b/src/v2_decoder.cpp index 09154144..6af74fd2 100644 --- a/src/v2_decoder.cpp +++ b/src/v2_decoder.cpp @@ -127,12 +127,12 @@ int zmq::v2_decoder_t::size_ready(uint64_t msg_size, unsigned char const* read_p // if the message will be a large message, pass a valid refcnt memory location as well rc = in_progress.init ((unsigned char *) read_pos, static_cast (msg_size), shared_message_memory_allocator::call_dec_ref, buffer(), - provide_refcnt ()); + provide_content ()); // For small messages, data has been copied and refcount does not have to be increased if (in_progress.is_zcmsg()) { - advance_refcnt(); + advance_content(); inc_ref(); } } From 68675e23d987b013a70a15c35bf6d9f5235361aa Mon Sep 17 00:00:00 2001 From: somdoron Date: Thu, 28 Jan 2016 13:43:23 +0200 Subject: [PATCH 3/6] adds group to zmq_msg --- include/zmq.h | 4 +- src/msg.cpp | 25 +++++++++ src/msg.hpp | 16 +++++- src/radio.cpp | 25 +-------- src/zmq.cpp | 10 ++++ tests/test_radio_dish.cpp | 111 +++++++++++++++++++++++++++----------- 6 files changed, 134 insertions(+), 57 deletions(-) diff --git a/include/zmq.h b/include/zmq.h index ac34b167..0b53af67 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -239,6 +239,8 @@ ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int property, int optval); ZMQ_EXPORT const char *zmq_msg_gets (zmq_msg_t *msg, const char *property); ZMQ_EXPORT int zmq_msg_set_routing_id (zmq_msg_t *msg, uint32_t routing_id); ZMQ_EXPORT uint32_t zmq_msg_routing_id (zmq_msg_t *msg); +ZMQ_EXPORT int zmq_msg_set_group (zmq_msg_t *msg, const char *group); +ZMQ_EXPORT const char *zmq_msg_group (zmq_msg_t *msg); /******************************************************************************/ @@ -359,7 +361,7 @@ ZMQ_EXPORT uint32_t zmq_msg_routing_id (zmq_msg_t *msg); #define ZMQ_GSSAPI 3 /* RADIO-DISH protocol */ -#define ZMQ_GROUP_MAX_LENGTH 255 +#define ZMQ_GROUP_MAX_LENGTH 15 /* Deprecated options and aliases */ #define ZMQ_TCP_ACCEPT_FILTER 38 diff --git a/src/msg.cpp b/src/msg.cpp index 37379fcb..013d30b3 100644 --- a/src/msg.cpp +++ b/src/msg.cpp @@ -84,6 +84,7 @@ int zmq::msg_t::init () u.vsm.type = type_vsm; u.vsm.flags = 0; u.vsm.size = 0; + u.vsm.group[0] = '\0'; u.vsm.routing_id = 0; u.vsm.fd = retired_fd; return 0; @@ -96,6 +97,7 @@ int zmq::msg_t::init_size (size_t size_) u.vsm.type = type_vsm; u.vsm.flags = 0; u.vsm.size = (unsigned char) size_; + u.vsm.group[0] = '\0'; u.vsm.routing_id = 0; u.vsm.fd = retired_fd; } @@ -103,6 +105,7 @@ int zmq::msg_t::init_size (size_t size_) u.lmsg.metadata = NULL; u.lmsg.type = type_lmsg; u.lmsg.flags = 0; + u.lmsg.group[0] = '\0'; u.lmsg.routing_id = 0; u.lmsg.fd = retired_fd; u.lmsg.content = NULL; @@ -131,6 +134,7 @@ int zmq::msg_t::init_external_storage(content_t* content_, void* data_, size_t s u.zclmsg.metadata = NULL; u.zclmsg.type = type_zclmsg; u.zclmsg.flags = 0; + u.zclmsg.group[0] = '\0'; u.zclmsg.routing_id = 0; u.zclmsg.fd = retired_fd; @@ -158,6 +162,7 @@ int zmq::msg_t::init_data (void *data_, size_t size_, u.cmsg.flags = 0; u.cmsg.data = data_; u.cmsg.size = size_; + u.cmsg.group[0] = '\0'; u.cmsg.routing_id = 0; u.cmsg.fd = retired_fd; } @@ -165,6 +170,7 @@ int zmq::msg_t::init_data (void *data_, size_t size_, u.lmsg.metadata = NULL; u.lmsg.type = type_lmsg; u.lmsg.flags = 0; + u.lmsg.group[0] = '\0'; u.lmsg.routing_id = 0; u.lmsg.fd = retired_fd; u.lmsg.content = (content_t*) malloc (sizeof (content_t)); @@ -188,6 +194,7 @@ int zmq::msg_t::init_delimiter () u.delimiter.metadata = NULL; u.delimiter.type = type_delimiter; u.delimiter.flags = 0; + u.delimiter.group[0] = '\0'; u.delimiter.routing_id = 0; u.delimiter.fd = retired_fd; return 0; @@ -519,6 +526,24 @@ int zmq::msg_t::reset_routing_id () return 0; } +const char * zmq::msg_t::group () +{ + return u.base.group; +} + +int zmq::msg_t::set_group (const char * group_) +{ + if (strlen (group_) > ZMQ_GROUP_MAX_LENGTH) + { + errno = EINVAL; + return -1; + } + + strcpy (u.base.group, group_); + + return 0; +} + zmq::atomic_counter_t *zmq::msg_t::refcnt() { switch(u.base.type) diff --git a/src/msg.hpp b/src/msg.hpp index 4a14ad82..3cad5ac0 100644 --- a/src/msg.hpp +++ b/src/msg.hpp @@ -117,6 +117,8 @@ namespace zmq uint32_t get_routing_id (); int set_routing_id (uint32_t routing_id_); int reset_routing_id (); + const char * group (); + int set_group (const char* group_); // After calling this function you can copy the message in POD-style // refs_ times. No need to call copy. @@ -131,8 +133,9 @@ namespace zmq enum { msg_t_size = 64 }; enum { max_vsm_size = msg_t_size - (sizeof (metadata_t *) + 3 + + 16 + sizeof (uint32_t) + - sizeof (fd_t)) }; + sizeof (fd_t))}; private: zmq::atomic_counter_t* refcnt(); @@ -165,10 +168,12 @@ namespace zmq metadata_t *metadata; unsigned char unused [msg_t_size - (sizeof (metadata_t *) + 2 + + 16 + sizeof (uint32_t) + sizeof (fd_t))]; unsigned char type; unsigned char flags; + char group [16]; uint32_t routing_id; fd_t fd; } base; @@ -178,6 +183,7 @@ namespace zmq unsigned char size; unsigned char type; unsigned char flags; + char group [16]; uint32_t routing_id; fd_t fd; } vsm; @@ -187,10 +193,12 @@ namespace zmq unsigned char unused [msg_t_size - (sizeof (metadata_t *) + sizeof (content_t*) + 2 + + 16 + sizeof (uint32_t) + sizeof (fd_t))]; unsigned char type; unsigned char flags; + char group [16]; uint32_t routing_id; fd_t fd; } lmsg; @@ -200,10 +208,12 @@ namespace zmq unsigned char unused [msg_t_size - (sizeof (metadata_t *) + sizeof (content_t*) + 2 + + 16 + sizeof (uint32_t) + sizeof (fd_t))]; unsigned char type; unsigned char flags; + char group [16]; uint32_t routing_id; fd_t fd; } zclmsg; @@ -215,10 +225,12 @@ namespace zmq sizeof (void*) + sizeof (size_t) + 2 + + 16 + sizeof (uint32_t) + sizeof (fd_t))]; unsigned char type; unsigned char flags; + char group [16]; uint32_t routing_id; fd_t fd; } cmsg; @@ -226,10 +238,12 @@ namespace zmq metadata_t *metadata; unsigned char unused [msg_t_size - (sizeof (metadata_t *) + 2 + + 16 + sizeof (uint32_t) + sizeof (fd_t))]; unsigned char type; unsigned char flags; + char group [16]; uint32_t routing_id; fd_t fd; } delimiter; diff --git a/src/radio.cpp b/src/radio.cpp index 14441d6a..ee9e0697 100644 --- a/src/radio.cpp +++ b/src/radio.cpp @@ -115,33 +115,10 @@ int zmq::radio_t::xsend (msg_t *msg_) return -1; } - size_t size = msg_->size (); - char *group = (char*) msg_->data(); - - // Maximum allowed group length is 255 - if (size > ZMQ_GROUP_MAX_LENGTH) - size = ZMQ_GROUP_MAX_LENGTH; - - // Check if NULL terminated - bool terminated = false; - - for (size_t index = 0; index < size; index++) { - if (group[index] == '\0') { - terminated = true; - break; - } - } - - if (!terminated) { - // User didn't include a group in the message - errno = EINVAL; - return -1; - } - dist.unmatch (); std::pair range = - subscriptions.equal_range (std::string(group)); + subscriptions.equal_range (std::string(msg_->group ())); for (subscriptions_t::iterator it = range.first; it != range.second; ++it) { dist.match (it-> second); diff --git a/src/zmq.cpp b/src/zmq.cpp index 4d508496..a041538d 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -695,6 +695,16 @@ uint32_t zmq_msg_routing_id (zmq_msg_t *msg_) return ((zmq::msg_t *) msg_)->get_routing_id (); } +int zmq_msg_set_group (zmq_msg_t *msg_, const char *group_) +{ + return ((zmq::msg_t *) msg_)->set_group (group_); +} + +const char *zmq_msg_group (zmq_msg_t *msg_) +{ + return ((zmq::msg_t *) msg_)->group (); +} + // Get message metadata string const char *zmq_msg_gets (zmq_msg_t *msg_, const char *property_) diff --git a/tests/test_radio_dish.cpp b/tests/test_radio_dish.cpp index 80db94ad..43618093 100644 --- a/tests/test_radio_dish.cpp +++ b/tests/test_radio_dish.cpp @@ -29,6 +29,57 @@ #include "testutil.hpp" +int msg_send (zmq_msg_t *msg_, void *s_, const char* group_, const char* body_) +{ + int rc = zmq_msg_init_size (msg_, strlen (body_)); + if (rc != 0) + return rc; + + memcpy (zmq_msg_data (msg_), body_, strlen (body_)); + + rc = zmq_msg_set_group (msg_, group_); + if (rc != 0) { + zmq_msg_close (msg_); + return rc; + } + + rc = zmq_msg_send (msg_, s_, 0); + + zmq_msg_close (msg_); + + return rc; +} + +int msg_recv_cmp (zmq_msg_t *msg_, void *s_, const char* group_, const char* body_) +{ + int rc = zmq_msg_init (msg_); + if (rc != 0) + return -1; + + int recv_rc = zmq_msg_recv (msg_, s_, 0); + if (recv_rc == -1) + return -1; + + if (strcmp (zmq_msg_group (msg_), group_) != 0) + { + zmq_msg_close (msg_); + return -1; + } + + char * body = (char*) malloc (sizeof(char) * (zmq_msg_size (msg_) + 1)); + memcpy (body, zmq_msg_data (msg_), zmq_msg_size (msg_)); + body [zmq_msg_size (msg_)] = '\0'; + + if (strcmp (body, body_) != 0) + { + zmq_msg_close (msg_); + return -1; + } + + zmq_msg_close (msg_); + return recv_rc; +} + int main (void) { setup_test_environment (); @@ -42,7 +93,7 @@ int main (void) assert (rc == 0); // Leaving a group which we didn't join - rc = zmq_leave (dish, "World"); + rc = zmq_leave (dish, "Movies"); assert (rc == -1); // Joining too long group @@ -54,11 +105,11 @@ int main (void) assert (rc == -1); // Joining - rc = zmq_join (dish, "World"); + rc = zmq_join (dish, "Movies"); assert (rc == 0); // Duplicate Joining - rc = zmq_join (dish, "World"); + rc = zmq_join (dish, "Movies"); assert (rc == -1); // Connecting @@ -67,53 +118,51 @@ int main (void) zmq_sleep (1); - // This is not going to be sent as dish only subscribe to "World" - rc = zmq_send (radio, "Hello\0Message", 13, 0); - assert (rc == 13); + zmq_msg_t msg; + + // This is not going to be sent as dish only subscribe to "Movies" + rc = msg_send (&msg, radio, "TV", "Friends"); + assert (rc == 7); // This is going to be sent to the dish - rc = zmq_send (radio, "World\0Message", 13, 0); - assert (rc == 13); + rc = msg_send (&msg, radio, "Movies", "Godfather"); + assert (rc == 9); - char* data = (char*) malloc (sizeof(char) * 13); - - rc = zmq_recv (dish, data, 13, 0); - assert (rc == 13); - assert (strcmp (data, "World") == 0); + // Check the correct message arrived + rc = msg_recv_cmp (&msg, dish, "Movies", "Godfather"); + assert (rc == 9); // Join group during connection optvallen - rc = zmq_join (dish, "Hello"); + rc = zmq_join (dish, "TV"); assert (rc == 0); zmq_sleep (1); // This should arrive now as we joined the group - rc = zmq_send (radio, "Hello\0Message", 13, 0); - assert (rc == 13); + rc = msg_send (&msg, radio, "TV", "Friends"); + assert (rc == 7); - rc = zmq_recv (dish, data, 13, 0); - assert (rc == 13); - assert (strcmp (data, "Hello") == 0); + // Check the correct message arrived + rc = msg_recv_cmp (&msg, dish, "TV", "Friends"); + assert (rc == 7); - // Leaving group - rc = zmq_leave (dish, "Hello"); + // Leaving groupr + rc = zmq_leave (dish, "TV"); assert (rc == 0); zmq_sleep (1); - // This is not going to be sent as dish only subscribe to "World" - rc = zmq_send (radio, "Hello\0Message", 13, 0); - assert (rc == 13); + // This is not going to be sent as dish only subscribe to "Movies" + rc = msg_send (&msg, radio, "TV", "Friends"); + assert (rc == 7); // This is going to be sent to the dish - rc = zmq_send (radio, "World\0Message", 13, 0); - assert (rc == 13); + rc = msg_send (&msg, radio, "Movies", "Godfather"); + assert (rc == 9); - rc = zmq_recv (dish, data, 13, 0); - assert (rc == 13); - assert (strcmp (data, "World") == 0); - - free (data); + // Check the correct message arrived + rc = msg_recv_cmp (&msg, dish, "Movies", "Godfather"); + assert (rc == 9); rc = zmq_close (dish); assert (rc == 0); From 5054f2eb6163dd448d475b539ee75df781da972e Mon Sep 17 00:00:00 2001 From: somdoron Date: Thu, 28 Jan 2016 15:26:07 +0200 Subject: [PATCH 4/6] radio-dish is sending the group as first frame --- src/dish.cpp | 64 +++++++++++++++++++++++++++++++++++++++ src/dish.hpp | 28 ++++++++++++++++- src/msg.cpp | 10 ++++-- src/msg.hpp | 1 + src/radio.cpp | 47 ++++++++++++++++++++++++++-- src/radio.hpp | 24 +++++++++++++++ src/session_base.cpp | 12 ++++++-- tests/test_radio_dish.cpp | 4 +-- 8 files changed, 181 insertions(+), 9 deletions(-) diff --git a/src/dish.cpp b/src/dish.cpp index fe8ac42d..acfe2988 100644 --- a/src/dish.cpp +++ b/src/dish.cpp @@ -29,6 +29,7 @@ #include +#include "../include/zmq.h" #include "macros.hpp" #include "dish.hpp" #include "err.hpp" @@ -244,3 +245,66 @@ void zmq::dish_t::send_subscriptions (pipe_t *pipe_) pipe_->flush (); } + +zmq::dish_session_t::dish_session_t (io_thread_t *io_thread_, bool connect_, + socket_base_t *socket_, const options_t &options_, + address_t *addr_) : + session_base_t (io_thread_, connect_, socket_, options_, addr_), + state (group) +{ +} + +zmq::dish_session_t::~dish_session_t () +{ +} + +int zmq::dish_session_t::push_msg (msg_t *msg_) +{ + if (state == group) { + if ((msg_->flags() & msg_t::more) != msg_t::more) { + errno = EFAULT; + return -1; + } + + if (msg_->size() > ZMQ_GROUP_MAX_LENGTH) { + errno = EFAULT; + return -1; + } + + group_msg = *msg_; + state = body; + + int rc = msg_->init (); + errno_assert (rc == 0); + return 0; + } + else { + // Set the message group + int rc = msg_->set_group ((char*)group_msg.data (), group_msg. size()); + errno_assert (rc == 0); + + // We set the group, so we don't need the group_msg anymore + rc = group_msg.close (); + errno_assert (rc == 0); + + // Thread safe socket doesn't support multipart messages + if ((msg_->flags() & msg_t::more) == msg_t::more) { + errno = EFAULT; + return -1; + } + + // Push message to dish socket + rc = session_base_t::push_msg (msg_); + + if (rc == 0) + state = group; + + return rc; + } +} + +void zmq::dish_session_t::reset () +{ + session_base_t::reset (); + state = group; +} diff --git a/src/dish.hpp b/src/dish.hpp index d0f42d70..414083ee 100644 --- a/src/dish.hpp +++ b/src/dish.hpp @@ -71,7 +71,7 @@ namespace zmq int xleave (const char *group_); private: - // Send subscriptions to a pipe + // Send subscriptions to a pipe void send_subscriptions (pipe_t *pipe_); // Fair queueing object for inbound pipes. @@ -93,6 +93,32 @@ namespace zmq const dish_t &operator = (const dish_t&); }; + class dish_session_t : public session_base_t + { + public: + + dish_session_t (zmq::io_thread_t *io_thread_, bool connect_, + zmq::socket_base_t *socket_, const options_t &options_, + address_t *addr_); + ~dish_session_t (); + + // Overrides of the functions from session_base_t. + int push_msg (msg_t *msg_); + void reset (); + + private: + + enum { + group, + body + } state; + + msg_t group_msg; + + dish_session_t (const dish_session_t&); + const dish_session_t &operator = (const dish_session_t&); + }; + } #endif diff --git a/src/msg.cpp b/src/msg.cpp index 013d30b3..263c8e93 100644 --- a/src/msg.cpp +++ b/src/msg.cpp @@ -533,13 +533,19 @@ const char * zmq::msg_t::group () int zmq::msg_t::set_group (const char * group_) { - if (strlen (group_) > ZMQ_GROUP_MAX_LENGTH) + return set_group (group_, strlen (group_)); +} + +int zmq::msg_t::set_group (const char * group_, size_t length_) +{ + if (length_> ZMQ_GROUP_MAX_LENGTH) { errno = EINVAL; return -1; } - strcpy (u.base.group, group_); + strncpy (u.base.group, group_, length_); + u.base.group[length_] = '\0'; return 0; } diff --git a/src/msg.hpp b/src/msg.hpp index 3cad5ac0..330c9984 100644 --- a/src/msg.hpp +++ b/src/msg.hpp @@ -119,6 +119,7 @@ namespace zmq int reset_routing_id (); const char * group (); int set_group (const char* group_); + int set_group (const char*, size_t length); // After calling this function you can copy the message in POD-style // refs_ times. No need to call copy. diff --git a/src/radio.cpp b/src/radio.cpp index ee9e0697..03c6ba41 100644 --- a/src/radio.cpp +++ b/src/radio.cpp @@ -120,9 +120,8 @@ int zmq::radio_t::xsend (msg_t *msg_) std::pair range = subscriptions.equal_range (std::string(msg_->group ())); - for (subscriptions_t::iterator it = range.first; it != range.second; ++it) { + for (subscriptions_t::iterator it = range.first; it != range.second; ++it) dist.match (it-> second); - } int rc = dist.send_to_matching (msg_); @@ -145,3 +144,47 @@ bool zmq::radio_t::xhas_in () { return false; } + +zmq::radio_session_t::radio_session_t (io_thread_t *io_thread_, bool connect_, + socket_base_t *socket_, const options_t &options_, + address_t *addr_) : + session_base_t (io_thread_, connect_, socket_, options_, addr_), + state (group) +{ +} + +zmq::radio_session_t::~radio_session_t () +{ +} + +int zmq::radio_session_t::pull_msg (msg_t *msg_) +{ + if (state == group) { + int rc = session_base_t::pull_msg (&pending_msg); + if (rc != 0) + return rc; + + const char *group = pending_msg.group (); + int length = strlen (group); + + // First frame is the group + msg_->init_size (length); + msg_->set_flags (msg_t::more); + memcpy (msg_->data (), group, length); + + // Next status is the body + state = body; + return 0; + } + else { + *msg_ = pending_msg; + state = group; + return 0; + } +} + +void zmq::radio_session_t::reset () +{ + session_base_t::reset (); + state = group; +} diff --git a/src/radio.hpp b/src/radio.hpp index 52157660..596a8cd4 100644 --- a/src/radio.hpp +++ b/src/radio.hpp @@ -77,6 +77,30 @@ namespace zmq const radio_t &operator = (const radio_t&); }; + class radio_session_t : public session_base_t + { + public: + + radio_session_t (zmq::io_thread_t *io_thread_, bool connect_, + zmq::socket_base_t *socket_, const options_t &options_, + address_t *addr_); + ~radio_session_t (); + + // Overrides of the functions from session_base_t. + int pull_msg (msg_t *msg_); + void reset (); + private: + + enum { + group, + body + } state; + + msg_t pending_msg; + + radio_session_t (const radio_session_t&); + const radio_session_t &operator = (const radio_session_t&); + }; } #endif diff --git a/src/session_base.cpp b/src/session_base.cpp index 6d6ea5bd..515f2947 100644 --- a/src/session_base.cpp +++ b/src/session_base.cpp @@ -45,6 +45,8 @@ #include "ctx.hpp" #include "req.hpp" +#include "radio.hpp" +#include "dish.hpp" zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_, bool active_, class socket_base_t *socket_, const options_t &options_, @@ -56,6 +58,14 @@ zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_, s = new (std::nothrow) req_session_t (io_thread_, active_, socket_, options_, addr_); break; + case ZMQ_RADIO: + s = new (std::nothrow) radio_session_t (io_thread_, active_, + socket_, options_, addr_); + break; + case ZMQ_DISH: + s = new (std::nothrow) dish_session_t (io_thread_, active_, + socket_, options_, addr_); + break; case ZMQ_DEALER: case ZMQ_REP: case ZMQ_ROUTER: @@ -69,8 +79,6 @@ zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_, case ZMQ_STREAM: case ZMQ_SERVER: case ZMQ_CLIENT: - case ZMQ_RADIO: - case ZMQ_DISH: s = new (std::nothrow) session_base_t (io_thread_, active_, socket_, options_, addr_); break; diff --git a/tests/test_radio_dish.cpp b/tests/test_radio_dish.cpp index 43618093..5cc93c33 100644 --- a/tests/test_radio_dish.cpp +++ b/tests/test_radio_dish.cpp @@ -89,7 +89,7 @@ int main (void) void *radio = zmq_socket (ctx, ZMQ_RADIO); void *dish = zmq_socket (ctx, ZMQ_DISH); - int rc = zmq_bind (radio, "inproc://test-radio-dish"); + int rc = zmq_bind (radio, "tcp://127.0.0.1:5556"); assert (rc == 0); // Leaving a group which we didn't join @@ -113,7 +113,7 @@ int main (void) assert (rc == -1); // Connecting - rc = zmq_connect (dish, "inproc://test-radio-dish"); + rc = zmq_connect (dish, "tcp://127.0.0.1:5556"); assert (rc == 0); zmq_sleep (1); From c7d52ec260e0168c53e795e389501d55f25aa708 Mon Sep 17 00:00:00 2001 From: somdoron Date: Thu, 28 Jan 2016 18:20:01 +0200 Subject: [PATCH 5/6] radio-dish join/leave are ZMTP commands --- src/dish.cpp | 90 +++++++++++++++++++++++++++++++++++---------------- src/dish.hpp | 1 + src/msg.cpp | 32 ++++++++++++++++++ src/msg.hpp | 13 ++++++-- src/radio.cpp | 60 +++++++++++++++++++++++++++++----- src/radio.hpp | 1 + 6 files changed, 160 insertions(+), 37 deletions(-) diff --git a/src/dish.cpp b/src/dish.cpp index acfe2988..45023d3f 100644 --- a/src/dish.cpp +++ b/src/dish.cpp @@ -90,13 +90,15 @@ void zmq::dish_t::xhiccuped (pipe_t *pipe_) int zmq::dish_t::xjoin (const char* group_) { - if (strlen (group_) > ZMQ_GROUP_MAX_LENGTH) { + std::string group = std::string (group_); + + if (group.length () > ZMQ_GROUP_MAX_LENGTH) { errno = EINVAL; return -1; } subscriptions_t::iterator it = - std::find (subscriptions.begin (), subscriptions.end (), std::string(group_)); + std::find (subscriptions.begin (), subscriptions.end (), group); // User cannot join same group twice if (it != subscriptions.end ()) { @@ -104,16 +106,14 @@ int zmq::dish_t::xjoin (const char* group_) return -1; } - subscriptions.push_back (std::string (group_)); + subscriptions.push_back (group); - size_t len = strlen (group_); msg_t msg; - int rc = msg.init_size (len + 1); + int rc = msg.init_join (); errno_assert (rc == 0); - char *data = (char*) msg.data (); - data[0] = 'J'; - memcpy (data + 1, group_, len); + rc = msg.set_group (group_); + errno_assert (rc == 0); int err = 0; rc = dist.send_to_all (&msg); @@ -128,12 +128,14 @@ int zmq::dish_t::xjoin (const char* group_) int zmq::dish_t::xleave (const char* group_) { - if (strlen (group_) > ZMQ_GROUP_MAX_LENGTH) { + std::string group = std::string (group_); + + if (group.length () > ZMQ_GROUP_MAX_LENGTH) { errno = EINVAL; return -1; } - subscriptions_t::iterator it = std::find (subscriptions.begin (), subscriptions.end (), std::string (group_)); + subscriptions_t::iterator it = std::find (subscriptions.begin (), subscriptions.end (), group); if (it == subscriptions.end ()) { errno = EINVAL; @@ -142,14 +144,12 @@ int zmq::dish_t::xleave (const char* group_) subscriptions.erase (it); - size_t len = strlen (group_); msg_t msg; - int rc = msg.init_size (len + 1); + int rc = msg.init_leave (); errno_assert (rc == 0); - char *data = (char*) msg.data (); - data[0] = 'L'; - memcpy (data + 1, group_, len); + rc = msg.set_group (group_); + errno_assert (rc == 0); int err = 0; rc = dist.send_to_all (&msg); @@ -226,21 +226,15 @@ void zmq::dish_t::send_subscriptions (pipe_t *pipe_) { for (subscriptions_t::iterator it = subscriptions.begin (); it != subscriptions.end (); ++it) { msg_t msg; - int rc = msg.init_size (it->length () + 1); + int rc = msg.init_join (); + errno_assert (rc == 0); + + rc = msg.set_group (it->c_str()); errno_assert (rc == 0); - char *data = (char*) msg.data (); - data [0] = 'J'; - it->copy (data + 1, it->length ()); // Send it to the pipe. - bool sent = pipe_->write (&msg); - - // If we reached the SNDHWM, and thus cannot send the subscription, drop - // the subscription message instead. This matches the behaviour of - // zmq_setsockopt(ZMQ_SUBSCRIBE, ...), which also drops subscriptions - // when the SNDHWM is reached. - if (!sent) - msg.close (); + pipe_->write (&msg); + msg.close (); } pipe_->flush (); @@ -303,6 +297,48 @@ int zmq::dish_session_t::push_msg (msg_t *msg_) } } +int zmq::dish_session_t::pull_msg (msg_t *msg_) +{ + int rc = session_base_t::pull_msg (msg_); + + if (rc != 0) + return rc; + + if (!msg_->is_join () && !msg_->is_leave ()) + return rc; + else { + int group_length = strlen (msg_->group ()); + + msg_t command; + int offset; + + if (msg_->is_join ()) { + command.init_size (group_length + 5); + offset = 5; + memcpy (command.data (), "\4JOIN", 5); + } + else { + command.init_size (group_length + 6); + offset = 6; + memcpy (command.data (), "\5LEAVE", 6); + } + + command.set_flags (msg_t::command); + char* command_data = (char*)command.data (); + + // Copy the group + memcpy (command_data + offset, msg_->group (), group_length); + + // Close the join message + int rc = msg_->close (); + errno_assert (rc == 0); + + *msg_ = command; + + return 0; + } +} + void zmq::dish_session_t::reset () { session_base_t::reset (); diff --git a/src/dish.hpp b/src/dish.hpp index 414083ee..7759a462 100644 --- a/src/dish.hpp +++ b/src/dish.hpp @@ -104,6 +104,7 @@ namespace zmq // Overrides of the functions from session_base_t. int push_msg (msg_t *msg_); + int pull_msg (msg_t *msg_); void reset (); private: diff --git a/src/msg.cpp b/src/msg.cpp index 263c8e93..a421756e 100644 --- a/src/msg.cpp +++ b/src/msg.cpp @@ -200,6 +200,28 @@ int zmq::msg_t::init_delimiter () return 0; } +int zmq::msg_t::init_join () +{ + u.base.metadata = NULL; + u.base.type = type_join; + u.base.flags = 0; + u.base.group[0] = '\0'; + u.base.routing_id = 0; + u.base.fd = retired_fd; + return 0; +} + +int zmq::msg_t::init_leave () +{ + u.base.metadata = NULL; + u.base.type = type_leave; + u.base.flags = 0; + u.base.group[0] = '\0'; + u.base.routing_id = 0; + u.base.fd = retired_fd; + return 0; +} + int zmq::msg_t::close () { // Check the validity of the message. @@ -440,6 +462,16 @@ bool zmq::msg_t::is_zcmsg() const return u.base.type == type_zclmsg; } +bool zmq::msg_t::is_join() const +{ + return u.base.type == type_join; +} + +bool zmq::msg_t::is_leave() const +{ + return u.base.type == type_leave; +} + void zmq::msg_t::add_refs (int refs_) { zmq_assert (refs_ >= 0); diff --git a/src/msg.hpp b/src/msg.hpp index 330c9984..22cda9ac 100644 --- a/src/msg.hpp +++ b/src/msg.hpp @@ -95,6 +95,8 @@ namespace zmq int init_external_storage(content_t* content_, void *data_, size_t size_, msg_free_fn *ffn_, void *hint_); int init_delimiter (); + int init_join (); + int init_leave (); int close (); int move (msg_t &src_); int copy (msg_t &src_); @@ -111,6 +113,8 @@ namespace zmq bool is_identity () const; bool is_credential () const; bool is_delimiter () const; + bool is_join () const; + bool is_leave () const; bool is_vsm () const; bool is_cmsg () const; bool is_zcmsg() const; @@ -137,7 +141,6 @@ namespace zmq 16 + sizeof (uint32_t) + sizeof (fd_t))}; - private: zmq::atomic_counter_t* refcnt(); @@ -157,7 +160,13 @@ namespace zmq // zero-copy LMSG message for v2_decoder type_zclmsg = 105, - type_max = 105 + // Join message for radio_dish + type_join = 106, + + // Leave message for radio_dish + type_leave = 107, + + type_max = 107 }; // Note that fields shared between different message types are not diff --git a/src/radio.cpp b/src/radio.cpp index 03c6ba41..51e654c9 100644 --- a/src/radio.cpp +++ b/src/radio.cpp @@ -65,15 +65,13 @@ void zmq::radio_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_) void zmq::radio_t::xread_activated (pipe_t *pipe_) { // There are some subscriptions waiting. Let's process them. - msg_t sub; - while (pipe_->read (&sub)) { + msg_t msg; + while (pipe_->read (&msg)) { // Apply the subscription to the trie - const char * data = (char *) sub.data (); - const size_t size = sub.size (); - if (size > 0 && (*data == 'J' || *data == 'L')) { - std::string group = std::string (data + 1, sub. size() - 1); + if (msg.is_join () || msg.is_leave ()) { + std::string group = std::string (msg.group ()); - if (*data == 'J') + if (msg.is_join ()) subscriptions.insert (subscriptions_t::value_type (group, pipe_)); else { std::pair range = @@ -87,7 +85,7 @@ void zmq::radio_t::xread_activated (pipe_t *pipe_) } } } - sub.close (); + msg.close (); } } @@ -157,6 +155,52 @@ zmq::radio_session_t::~radio_session_t () { } +int zmq::radio_session_t::push_msg (msg_t *msg_) +{ + if (msg_->flags() & msg_t::command) { + char *command_data = + static_cast (msg_->data ()); + const size_t data_size = msg_->size (); + + int group_length; + char * group; + + msg_t join_leave_msg; + int rc; + + // Set the msg type to either JOIN or LEAVE + if (data_size >= 5 && memcmp (command_data, "\4JOIN", 5) == 0) { + group_length = data_size - 5; + group = command_data + 5; + rc = join_leave_msg.init_join (); + } + else if (data_size >= 6 && memcmp (command_data, "\5LEAVE", 6) == 0) { + group_length = data_size - 6; + group = command_data + 6; + rc = join_leave_msg.init_leave (); + } + // If it is not a JOIN or LEAVE just push the message + else + return session_base_t::push_msg (msg_); + + errno_assert (rc == 0); + + // Set the group + rc = join_leave_msg.set_group (group, group_length); + errno_assert (rc == 0); + + // Close the current command + rc = msg_->close (); + errno_assert (rc == 0); + + // Push the join or leave command + *msg_ = join_leave_msg; + return session_base_t::push_msg (msg_); + } + else + return session_base_t::push_msg (msg_); +} + int zmq::radio_session_t::pull_msg (msg_t *msg_) { if (state == group) { diff --git a/src/radio.hpp b/src/radio.hpp index 596a8cd4..8e76a550 100644 --- a/src/radio.hpp +++ b/src/radio.hpp @@ -87,6 +87,7 @@ namespace zmq ~radio_session_t (); // Overrides of the functions from session_base_t. + int push_msg (msg_t *msg_); int pull_msg (msg_t *msg_); void reset (); private: From 1960b4e8a9ec151b924d8189fea415c3e35ec550 Mon Sep 17 00:00:00 2001 From: somdoron Date: Thu, 28 Jan 2016 19:29:06 +0200 Subject: [PATCH 6/6] Filtering messages on dish side --- src/dish.cpp | 51 ++++++++++++++++++++++++--------------- src/dish.hpp | 2 +- tests/test_radio_dish.cpp | 2 +- 3 files changed, 33 insertions(+), 22 deletions(-) diff --git a/src/dish.cpp b/src/dish.cpp index 45023d3f..2bd60f52 100644 --- a/src/dish.cpp +++ b/src/dish.cpp @@ -97,8 +97,7 @@ int zmq::dish_t::xjoin (const char* group_) return -1; } - subscriptions_t::iterator it = - std::find (subscriptions.begin (), subscriptions.end (), group); + subscriptions_t::iterator it = subscriptions.find (group); // User cannot join same group twice if (it != subscriptions.end ()) { @@ -106,7 +105,7 @@ int zmq::dish_t::xjoin (const char* group_) return -1; } - subscriptions.push_back (group); + subscriptions.insert (group); msg_t msg; int rc = msg.init_join (); @@ -185,15 +184,21 @@ int zmq::dish_t::xrecv (msg_t *msg_) return 0; } - // Get a message using fair queueing algorithm. - int rc = fq.recv (msg_); + while (true) { - // If there's no message available, return immediately. - // The same when error occurs. - if (rc != 0) - return -1; + // Get a message using fair queueing algorithm. + int rc = fq.recv (msg_); - return 0; + // If there's no message available, return immediately. + // The same when error occurs. + if (rc != 0) + return -1; + + // Filtering non matching messages + subscriptions_t::iterator it = subscriptions.find (std::string(msg_->group ())); + if (it != subscriptions.end ()) + return 0; + } } bool zmq::dish_t::xhas_in () @@ -203,18 +208,24 @@ bool zmq::dish_t::xhas_in () if (has_message) return true; - // Get a message using fair queueing algorithm. - int rc = fq.recv (&message); + while (true) { + // Get a message using fair queueing algorithm. + int rc = fq.recv (&message); - // If there's no message available, return immediately. - // The same when error occurs. - if (rc != 0) { - errno_assert (errno == EAGAIN); - return false; + // If there's no message available, return immediately. + // The same when error occurs. + if (rc != 0) { + errno_assert (errno == EAGAIN); + return false; + } + + // Filtering non matching messages + subscriptions_t::iterator it = subscriptions.find (std::string(message.group ())); + if (it != subscriptions.end ()) { + has_message = true; + return true; + } } - - has_message = true; - return true; } zmq::blob_t zmq::dish_t::get_credential () const diff --git a/src/dish.hpp b/src/dish.hpp index 7759a462..bb805ab9 100644 --- a/src/dish.hpp +++ b/src/dish.hpp @@ -81,7 +81,7 @@ namespace zmq dist_t dist; // The repository of subscriptions. - typedef std::vector subscriptions_t; + typedef std::set subscriptions_t; subscriptions_t subscriptions; // If true, 'message' contains a matching message to return on the diff --git a/tests/test_radio_dish.cpp b/tests/test_radio_dish.cpp index 5cc93c33..560d330d 100644 --- a/tests/test_radio_dish.cpp +++ b/tests/test_radio_dish.cpp @@ -60,7 +60,7 @@ int msg_recv_cmp (zmq_msg_t *msg_, void *s_, const char* group_, const char* bod if (recv_rc == -1) return -1; - if (strcmp (zmq_msg_group (msg_), group_) != 0) + if (strcmp (zmq_msg_group (msg_), group_) != 0) { zmq_msg_close (msg_); return -1;