diff --git a/include/zmq.h b/include/zmq.h index ac34b167..0b53af67 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -239,6 +239,8 @@ ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int property, int optval); ZMQ_EXPORT const char *zmq_msg_gets (zmq_msg_t *msg, const char *property); ZMQ_EXPORT int zmq_msg_set_routing_id (zmq_msg_t *msg, uint32_t routing_id); ZMQ_EXPORT uint32_t zmq_msg_routing_id (zmq_msg_t *msg); +ZMQ_EXPORT int zmq_msg_set_group (zmq_msg_t *msg, const char *group); +ZMQ_EXPORT const char *zmq_msg_group (zmq_msg_t *msg); /******************************************************************************/ @@ -359,7 +361,7 @@ ZMQ_EXPORT uint32_t zmq_msg_routing_id (zmq_msg_t *msg); #define ZMQ_GSSAPI 3 /* RADIO-DISH protocol */ -#define ZMQ_GROUP_MAX_LENGTH 255 +#define ZMQ_GROUP_MAX_LENGTH 15 /* Deprecated options and aliases */ #define ZMQ_TCP_ACCEPT_FILTER 38 diff --git a/src/decoder_allocators.cpp b/src/decoder_allocators.cpp index 2359f015..8014bc51 100644 --- a/src/decoder_allocators.cpp +++ b/src/decoder_allocators.cpp @@ -37,7 +37,7 @@ zmq::shared_message_memory_allocator::shared_message_memory_allocator (std::size buf(NULL), bufsize(0), max_size(bufsize_), - msg_refcnt(NULL), + msg_content(NULL), maxCounters (static_cast (std::ceil (static_cast (max_size) / static_cast (msg_t::max_vsm_size)))) { } @@ -46,7 +46,7 @@ zmq::shared_message_memory_allocator::shared_message_memory_allocator (std::size buf(NULL), bufsize(0), max_size(bufsize_), - msg_refcnt(NULL), + msg_content(NULL), maxCounters(maxMessages) { } @@ -77,7 +77,7 @@ unsigned char* zmq::shared_message_memory_allocator::allocate () // allocate memory for reference counters together with reception buffer std::size_t const allocationsize = max_size + sizeof (zmq::atomic_counter_t) + - maxCounters * sizeof (zmq::atomic_counter_t); + maxCounters * sizeof (zmq::msg_t::content_t); buf = static_cast (std::malloc (allocationsize)); alloc_assert (buf); @@ -90,7 +90,7 @@ unsigned char* zmq::shared_message_memory_allocator::allocate () } bufsize = max_size; - msg_refcnt = reinterpret_cast (buf + sizeof (atomic_counter_t) + max_size); + msg_content = reinterpret_cast (buf + sizeof (atomic_counter_t) + max_size); return buf + sizeof (zmq::atomic_counter_t); } @@ -108,7 +108,7 @@ unsigned char* zmq::shared_message_memory_allocator::release () unsigned char* b = buf; buf = NULL; bufsize = 0; - msg_refcnt = NULL; + msg_content = NULL; return b; } diff --git a/src/decoder_allocators.hpp b/src/decoder_allocators.hpp index 380452e8..30cfaf1a 100644 --- a/src/decoder_allocators.hpp +++ b/src/decoder_allocators.hpp @@ -34,6 +34,7 @@ #include #include "atomic_counter.hpp" +#include "msg.hpp" #include "err.hpp" namespace zmq @@ -132,21 +133,21 @@ namespace zmq bufsize = new_size; } - zmq::atomic_counter_t* provide_refcnt () + zmq::msg_t::content_t* provide_content () { - return msg_refcnt; + return msg_content; } - void advance_refcnt () + void advance_content () { - msg_refcnt++; + msg_content++; } private: unsigned char* buf; std::size_t bufsize; std::size_t max_size; - zmq::atomic_counter_t* msg_refcnt; + zmq::msg_t::content_t* msg_content; std::size_t maxCounters; }; } diff --git a/src/dish.cpp b/src/dish.cpp index fe8ac42d..2bd60f52 100644 --- a/src/dish.cpp +++ b/src/dish.cpp @@ -29,6 +29,7 @@ #include +#include "../include/zmq.h" #include "macros.hpp" #include "dish.hpp" #include "err.hpp" @@ -89,13 +90,14 @@ void zmq::dish_t::xhiccuped (pipe_t *pipe_) int zmq::dish_t::xjoin (const char* group_) { - if (strlen (group_) > ZMQ_GROUP_MAX_LENGTH) { + std::string group = std::string (group_); + + if (group.length () > ZMQ_GROUP_MAX_LENGTH) { errno = EINVAL; return -1; } - subscriptions_t::iterator it = - std::find (subscriptions.begin (), subscriptions.end (), std::string(group_)); + subscriptions_t::iterator it = subscriptions.find (group); // User cannot join same group twice if (it != subscriptions.end ()) { @@ -103,16 +105,14 @@ int zmq::dish_t::xjoin (const char* group_) return -1; } - subscriptions.push_back (std::string (group_)); + subscriptions.insert (group); - size_t len = strlen (group_); msg_t msg; - int rc = msg.init_size (len + 1); + int rc = msg.init_join (); errno_assert (rc == 0); - char *data = (char*) msg.data (); - data[0] = 'J'; - memcpy (data + 1, group_, len); + rc = msg.set_group (group_); + errno_assert (rc == 0); int err = 0; rc = dist.send_to_all (&msg); @@ -127,12 +127,14 @@ int zmq::dish_t::xjoin (const char* group_) int zmq::dish_t::xleave (const char* group_) { - if (strlen (group_) > ZMQ_GROUP_MAX_LENGTH) { + std::string group = std::string (group_); + + if (group.length () > ZMQ_GROUP_MAX_LENGTH) { errno = EINVAL; return -1; } - subscriptions_t::iterator it = std::find (subscriptions.begin (), subscriptions.end (), std::string (group_)); + subscriptions_t::iterator it = std::find (subscriptions.begin (), subscriptions.end (), group); if (it == subscriptions.end ()) { errno = EINVAL; @@ -141,14 +143,12 @@ int zmq::dish_t::xleave (const char* group_) subscriptions.erase (it); - size_t len = strlen (group_); msg_t msg; - int rc = msg.init_size (len + 1); + int rc = msg.init_leave (); errno_assert (rc == 0); - char *data = (char*) msg.data (); - data[0] = 'L'; - memcpy (data + 1, group_, len); + rc = msg.set_group (group_); + errno_assert (rc == 0); int err = 0; rc = dist.send_to_all (&msg); @@ -184,15 +184,21 @@ int zmq::dish_t::xrecv (msg_t *msg_) return 0; } - // Get a message using fair queueing algorithm. - int rc = fq.recv (msg_); + while (true) { - // If there's no message available, return immediately. - // The same when error occurs. - if (rc != 0) - return -1; + // Get a message using fair queueing algorithm. + int rc = fq.recv (msg_); - return 0; + // If there's no message available, return immediately. + // The same when error occurs. + if (rc != 0) + return -1; + + // Filtering non matching messages + subscriptions_t::iterator it = subscriptions.find (std::string(msg_->group ())); + if (it != subscriptions.end ()) + return 0; + } } bool zmq::dish_t::xhas_in () @@ -202,18 +208,24 @@ bool zmq::dish_t::xhas_in () if (has_message) return true; - // Get a message using fair queueing algorithm. - int rc = fq.recv (&message); + while (true) { + // Get a message using fair queueing algorithm. + int rc = fq.recv (&message); - // If there's no message available, return immediately. - // The same when error occurs. - if (rc != 0) { - errno_assert (errno == EAGAIN); - return false; + // If there's no message available, return immediately. + // The same when error occurs. + if (rc != 0) { + errno_assert (errno == EAGAIN); + return false; + } + + // Filtering non matching messages + subscriptions_t::iterator it = subscriptions.find (std::string(message.group ())); + if (it != subscriptions.end ()) { + has_message = true; + return true; + } } - - has_message = true; - return true; } zmq::blob_t zmq::dish_t::get_credential () const @@ -225,22 +237,121 @@ void zmq::dish_t::send_subscriptions (pipe_t *pipe_) { for (subscriptions_t::iterator it = subscriptions.begin (); it != subscriptions.end (); ++it) { msg_t msg; - int rc = msg.init_size (it->length () + 1); + int rc = msg.init_join (); + errno_assert (rc == 0); + + rc = msg.set_group (it->c_str()); errno_assert (rc == 0); - char *data = (char*) msg.data (); - data [0] = 'J'; - it->copy (data + 1, it->length ()); // Send it to the pipe. - bool sent = pipe_->write (&msg); - - // If we reached the SNDHWM, and thus cannot send the subscription, drop - // the subscription message instead. This matches the behaviour of - // zmq_setsockopt(ZMQ_SUBSCRIBE, ...), which also drops subscriptions - // when the SNDHWM is reached. - if (!sent) - msg.close (); + pipe_->write (&msg); + msg.close (); } pipe_->flush (); } + +zmq::dish_session_t::dish_session_t (io_thread_t *io_thread_, bool connect_, + socket_base_t *socket_, const options_t &options_, + address_t *addr_) : + session_base_t (io_thread_, connect_, socket_, options_, addr_), + state (group) +{ +} + +zmq::dish_session_t::~dish_session_t () +{ +} + +int zmq::dish_session_t::push_msg (msg_t *msg_) +{ + if (state == group) { + if ((msg_->flags() & msg_t::more) != msg_t::more) { + errno = EFAULT; + return -1; + } + + if (msg_->size() > ZMQ_GROUP_MAX_LENGTH) { + errno = EFAULT; + return -1; + } + + group_msg = *msg_; + state = body; + + int rc = msg_->init (); + errno_assert (rc == 0); + return 0; + } + else { + // Set the message group + int rc = msg_->set_group ((char*)group_msg.data (), group_msg. size()); + errno_assert (rc == 0); + + // We set the group, so we don't need the group_msg anymore + rc = group_msg.close (); + errno_assert (rc == 0); + + // Thread safe socket doesn't support multipart messages + if ((msg_->flags() & msg_t::more) == msg_t::more) { + errno = EFAULT; + return -1; + } + + // Push message to dish socket + rc = session_base_t::push_msg (msg_); + + if (rc == 0) + state = group; + + return rc; + } +} + +int zmq::dish_session_t::pull_msg (msg_t *msg_) +{ + int rc = session_base_t::pull_msg (msg_); + + if (rc != 0) + return rc; + + if (!msg_->is_join () && !msg_->is_leave ()) + return rc; + else { + int group_length = strlen (msg_->group ()); + + msg_t command; + int offset; + + if (msg_->is_join ()) { + command.init_size (group_length + 5); + offset = 5; + memcpy (command.data (), "\4JOIN", 5); + } + else { + command.init_size (group_length + 6); + offset = 6; + memcpy (command.data (), "\5LEAVE", 6); + } + + command.set_flags (msg_t::command); + char* command_data = (char*)command.data (); + + // Copy the group + memcpy (command_data + offset, msg_->group (), group_length); + + // Close the join message + int rc = msg_->close (); + errno_assert (rc == 0); + + *msg_ = command; + + return 0; + } +} + +void zmq::dish_session_t::reset () +{ + session_base_t::reset (); + state = group; +} diff --git a/src/dish.hpp b/src/dish.hpp index d0f42d70..bb805ab9 100644 --- a/src/dish.hpp +++ b/src/dish.hpp @@ -71,7 +71,7 @@ namespace zmq int xleave (const char *group_); private: - // Send subscriptions to a pipe + // Send subscriptions to a pipe void send_subscriptions (pipe_t *pipe_); // Fair queueing object for inbound pipes. @@ -81,7 +81,7 @@ namespace zmq dist_t dist; // The repository of subscriptions. - typedef std::vector subscriptions_t; + typedef std::set subscriptions_t; subscriptions_t subscriptions; // If true, 'message' contains a matching message to return on the @@ -93,6 +93,33 @@ namespace zmq const dish_t &operator = (const dish_t&); }; + class dish_session_t : public session_base_t + { + public: + + dish_session_t (zmq::io_thread_t *io_thread_, bool connect_, + zmq::socket_base_t *socket_, const options_t &options_, + address_t *addr_); + ~dish_session_t (); + + // Overrides of the functions from session_base_t. + int push_msg (msg_t *msg_); + int pull_msg (msg_t *msg_); + void reset (); + + private: + + enum { + group, + body + } state; + + msg_t group_msg; + + dish_session_t (const dish_session_t&); + const dish_session_t &operator = (const dish_session_t&); + }; + } #endif diff --git a/src/msg.cpp b/src/msg.cpp index 8d24b267..a421756e 100644 --- a/src/msg.cpp +++ b/src/msg.cpp @@ -53,7 +53,7 @@ bool zmq::msg_t::check () int zmq::msg_t::init (void* data_, size_t size_, msg_free_fn* ffn_, void* hint, - zmq::atomic_counter_t* refcnt_) + content_t* content_) { if (size_ < max_vsm_size) { int const rc = init_size(size_); @@ -68,9 +68,9 @@ int zmq::msg_t::init (void* data_, size_t size_, return -1; } } - else if(refcnt_) + else if(content_) { - return init_external_storage(data_, size_, refcnt_, ffn_, hint); + return init_external_storage(content_, data_, size_, ffn_, hint); } else { @@ -84,26 +84,30 @@ int zmq::msg_t::init () u.vsm.type = type_vsm; u.vsm.flags = 0; u.vsm.size = 0; + u.vsm.group[0] = '\0'; u.vsm.routing_id = 0; - file_desc = -1; + u.vsm.fd = retired_fd; return 0; } int zmq::msg_t::init_size (size_t size_) { - file_desc = -1; if (size_ <= max_vsm_size) { u.vsm.metadata = NULL; u.vsm.type = type_vsm; u.vsm.flags = 0; u.vsm.size = (unsigned char) size_; + u.vsm.group[0] = '\0'; u.vsm.routing_id = 0; + u.vsm.fd = retired_fd; } else { u.lmsg.metadata = NULL; u.lmsg.type = type_lmsg; u.lmsg.flags = 0; + u.lmsg.group[0] = '\0'; u.lmsg.routing_id = 0; + u.lmsg.fd = retired_fd; u.lmsg.content = NULL; if (sizeof (content_t) + size_ > size_) u.lmsg.content = (content_t*) malloc (sizeof (content_t) + size_); @@ -121,25 +125,25 @@ int zmq::msg_t::init_size (size_t size_) return 0; } -int zmq::msg_t::init_external_storage(void *data_, size_t size_, zmq::atomic_counter_t* ctr, - msg_free_fn *ffn_, void *hint_) +int zmq::msg_t::init_external_storage(content_t* content_, void* data_, size_t size_, + msg_free_fn *ffn_, void* hint_) { zmq_assert(NULL != data_); - zmq_assert(NULL != ctr); - - file_desc = -1; + zmq_assert(NULL != content_); u.zclmsg.metadata = NULL; u.zclmsg.type = type_zclmsg; u.zclmsg.flags = 0; + u.zclmsg.group[0] = '\0'; u.zclmsg.routing_id = 0; + u.zclmsg.fd = retired_fd; - u.zclmsg.data = data_; - u.zclmsg.size = size_; - u.zclmsg.ffn = ffn_; - u.zclmsg.hint = hint_; - u.zclmsg.refcnt = ctr; - new (u.zclmsg.refcnt) zmq::atomic_counter_t(); + u.zclmsg.content = content_; + u.zclmsg.content->data = data_; + u.zclmsg.content->size = size_; + u.zclmsg.content->ffn = ffn_; + u.zclmsg.content->hint = hint_; + new (&u.zclmsg.content->refcnt) zmq::atomic_counter_t(); return 0; } @@ -151,8 +155,6 @@ int zmq::msg_t::init_data (void *data_, size_t size_, // would occur once the data is accessed zmq_assert (data_ != NULL || size_ == 0); - file_desc = -1; - // Initialize constant message if there's no need to deallocate if (ffn_ == NULL) { u.cmsg.metadata = NULL; @@ -160,13 +162,17 @@ int zmq::msg_t::init_data (void *data_, size_t size_, u.cmsg.flags = 0; u.cmsg.data = data_; u.cmsg.size = size_; + u.cmsg.group[0] = '\0'; u.cmsg.routing_id = 0; + u.cmsg.fd = retired_fd; } else { u.lmsg.metadata = NULL; u.lmsg.type = type_lmsg; u.lmsg.flags = 0; + u.lmsg.group[0] = '\0'; u.lmsg.routing_id = 0; + u.lmsg.fd = retired_fd; u.lmsg.content = (content_t*) malloc (sizeof (content_t)); if (!u.lmsg.content) { errno = ENOMEM; @@ -188,7 +194,31 @@ int zmq::msg_t::init_delimiter () u.delimiter.metadata = NULL; u.delimiter.type = type_delimiter; u.delimiter.flags = 0; + u.delimiter.group[0] = '\0'; u.delimiter.routing_id = 0; + u.delimiter.fd = retired_fd; + return 0; +} + +int zmq::msg_t::init_join () +{ + u.base.metadata = NULL; + u.base.type = type_join; + u.base.flags = 0; + u.base.group[0] = '\0'; + u.base.routing_id = 0; + u.base.fd = retired_fd; + return 0; +} + +int zmq::msg_t::init_leave () +{ + u.base.metadata = NULL; + u.base.type = type_leave; + u.base.flags = 0; + u.base.group[0] = '\0'; + u.base.routing_id = 0; + u.base.fd = retired_fd; return 0; } @@ -220,19 +250,19 @@ int zmq::msg_t::close () if (is_zcmsg()) { - zmq_assert( u.zclmsg.ffn ); + zmq_assert( u.zclmsg.content->ffn ); // If the content is not shared, or if it is shared and the reference // count has dropped to zero, deallocate it. if (!(u.zclmsg.flags & msg_t::shared) || - !u.zclmsg.refcnt->sub (1)) { + !u.zclmsg.content->refcnt.sub (1)) { // We used "placement new" operator to initialize the reference // counter so we call the destructor explicitly now. - u.zclmsg.refcnt->~atomic_counter_t (); + u.zclmsg.content->refcnt.~atomic_counter_t (); - u.zclmsg.ffn (u.zclmsg.data, - u.zclmsg.hint); + u.zclmsg.content->ffn (u.zclmsg.content->data, + u.zclmsg.content->hint); } } @@ -327,7 +357,7 @@ void *zmq::msg_t::data () case type_cmsg: return u.cmsg.data; case type_zclmsg: - return u.zclmsg.data; + return u.zclmsg.content->data; default: zmq_assert (false); return NULL; @@ -345,7 +375,7 @@ size_t zmq::msg_t::size () case type_lmsg: return u.lmsg.content->size; case type_zclmsg: - return u.zclmsg.size; + return u.zclmsg.content->size; case type_cmsg: return u.cmsg.size; default: @@ -369,14 +399,14 @@ void zmq::msg_t::reset_flags (unsigned char flags_) u.base.flags &= ~flags_; } -int64_t zmq::msg_t::fd () +zmq::fd_t zmq::msg_t::fd () { - return file_desc; + return u.base.fd; } -void zmq::msg_t::set_fd (int64_t fd_) +void zmq::msg_t::set_fd (fd_t fd_) { - file_desc = fd_; + u.base.fd = fd_; } zmq::metadata_t *zmq::msg_t::metadata () const @@ -432,6 +462,16 @@ bool zmq::msg_t::is_zcmsg() const return u.base.type == type_zclmsg; } +bool zmq::msg_t::is_join() const +{ + return u.base.type == type_join; +} + +bool zmq::msg_t::is_leave() const +{ + return u.base.type == type_leave; +} + void zmq::msg_t::add_refs (int refs_) { zmq_assert (refs_ >= 0); @@ -485,10 +525,10 @@ bool zmq::msg_t::rm_refs (int refs_) return false; } - if (is_zcmsg() && !u.zclmsg.refcnt->sub(refs_)) { + if (is_zcmsg() && !u.zclmsg.content->refcnt.sub(refs_)) { // storage for rfcnt is provided externally - if (u.zclmsg.ffn) { - u.zclmsg.ffn(u.zclmsg.data, u.zclmsg.hint); + if (u.zclmsg.content->ffn) { + u.zclmsg.content->ffn(u.zclmsg.content->data, u.zclmsg.content->hint); } return false; @@ -518,6 +558,30 @@ int zmq::msg_t::reset_routing_id () return 0; } +const char * zmq::msg_t::group () +{ + return u.base.group; +} + +int zmq::msg_t::set_group (const char * group_) +{ + return set_group (group_, strlen (group_)); +} + +int zmq::msg_t::set_group (const char * group_, size_t length_) +{ + if (length_> ZMQ_GROUP_MAX_LENGTH) + { + errno = EINVAL; + return -1; + } + + strncpy (u.base.group, group_, length_); + u.base.group[length_] = '\0'; + + return 0; +} + zmq::atomic_counter_t *zmq::msg_t::refcnt() { switch(u.base.type) @@ -525,7 +589,7 @@ zmq::atomic_counter_t *zmq::msg_t::refcnt() case type_lmsg: return &u.lmsg.content->refcnt; case type_zclmsg: - return u.zclmsg.refcnt; + return &u.zclmsg.content->refcnt; default: zmq_assert(false); return NULL; diff --git a/src/msg.hpp b/src/msg.hpp index b93ea7ac..22cda9ac 100644 --- a/src/msg.hpp +++ b/src/msg.hpp @@ -34,6 +34,7 @@ #include #include "config.hpp" +#include "fd.hpp" #include "atomic_counter.hpp" #include "metadata.hpp" @@ -55,68 +56,6 @@ namespace zmq { public: - // Message flags. - enum - { - more = 1, // Followed by more parts - command = 2, // Command frame (see ZMTP spec) - credential = 32, - identity = 64, - shared = 128 - }; - - bool check (); - int init(); - - int init (void* data, size_t size_, - msg_free_fn* ffn_, void* hint, - zmq::atomic_counter_t* refcnt_ = NULL); - - int init_size (size_t size_); - int init_data (void *data_, size_t size_, msg_free_fn *ffn_, - void *hint_); - int init_external_storage(void *data_, size_t size_, zmq::atomic_counter_t* ctr, - msg_free_fn *ffn_, void *hint_); - int init_delimiter (); - int close (); - int move (msg_t &src_); - int copy (msg_t &src_); - void *data (); - size_t size (); - unsigned char flags (); - void set_flags (unsigned char flags_); - void reset_flags (unsigned char flags_); - int64_t fd (); - void set_fd (int64_t fd_); - metadata_t *metadata () const; - void set_metadata (metadata_t *metadata_); - void reset_metadata (); - bool is_identity () const; - bool is_credential () const; - bool is_delimiter () const; - bool is_vsm () const; - bool is_cmsg () const; - bool is_zcmsg() const; - uint32_t get_routing_id (); - int set_routing_id (uint32_t routing_id_); - int reset_routing_id (); - - // After calling this function you can copy the message in POD-style - // refs_ times. No need to call copy. - void add_refs (int refs_); - - // Removes references previously added by add_refs. If the number of - // references drops to 0, the message is closed and false is returned. - bool rm_refs (int refs_); - - // Size in bytes of the largest message that is still copied around - // rather than being reference-counted. - enum { msg_t_size = 64 }; - enum { max_vsm_size = msg_t_size - (8 + sizeof (metadata_t *) + 3 + sizeof(uint32_t)) }; - - private: - zmq::atomic_counter_t* refcnt(); - // Shared message buffer. Message data are either allocated in one // continuous block along with this structure - thus avoiding one // malloc/free pair or they are stored in used-supplied memory. @@ -133,6 +72,78 @@ namespace zmq zmq::atomic_counter_t refcnt; }; + // Message flags. + enum + { + more = 1, // Followed by more parts + command = 2, // Command frame (see ZMTP spec) + credential = 32, + identity = 64, + shared = 128 + }; + + bool check (); + int init(); + + int init (void* data, size_t size_, + msg_free_fn* ffn_, void* hint, + content_t* content_ = NULL); + + int init_size (size_t size_); + int init_data (void *data_, size_t size_, msg_free_fn *ffn_, + void *hint_); + int init_external_storage(content_t* content_, void *data_, size_t size_, + msg_free_fn *ffn_, void *hint_); + int init_delimiter (); + int init_join (); + int init_leave (); + int close (); + int move (msg_t &src_); + int copy (msg_t &src_); + void *data (); + size_t size (); + unsigned char flags (); + void set_flags (unsigned char flags_); + void reset_flags (unsigned char flags_); + fd_t fd (); + void set_fd (fd_t fd_); + metadata_t *metadata () const; + void set_metadata (metadata_t *metadata_); + void reset_metadata (); + bool is_identity () const; + bool is_credential () const; + bool is_delimiter () const; + bool is_join () const; + bool is_leave () const; + bool is_vsm () const; + bool is_cmsg () const; + bool is_zcmsg() const; + uint32_t get_routing_id (); + int set_routing_id (uint32_t routing_id_); + int reset_routing_id (); + const char * group (); + int set_group (const char* group_); + int set_group (const char*, size_t length); + + // After calling this function you can copy the message in POD-style + // refs_ times. No need to call copy. + void add_refs (int refs_); + + // Removes references previously added by add_refs. If the number of + // references drops to 0, the message is closed and false is returned. + bool rm_refs (int refs_); + + // Size in bytes of the largest message that is still copied around + // rather than being reference-counted. + enum { msg_t_size = 64 }; + enum { max_vsm_size = msg_t_size - (sizeof (metadata_t *) + + 3 + + 16 + + sizeof (uint32_t) + + sizeof (fd_t))}; + private: + zmq::atomic_counter_t* refcnt(); + // Different message types. enum type_t { @@ -149,11 +160,14 @@ namespace zmq // zero-copy LMSG message for v2_decoder type_zclmsg = 105, - type_max = 105 - }; + // Join message for radio_dish + type_join = 106, - // the file descriptor where this message originated, needs to be 64bit due to alignment - int64_t file_desc; + // Leave message for radio_dish + type_leave = 107, + + type_max = 107 + }; // Note that fields shared between different message types are not // moved to the parent class (msg_t). This way we get tighter packing @@ -162,10 +176,16 @@ namespace zmq union { struct { metadata_t *metadata; - unsigned char unused [msg_t_size - (8 + sizeof (metadata_t *) + 2 + sizeof(uint32_t))]; + unsigned char unused [msg_t_size - (sizeof (metadata_t *) + + 2 + + 16 + + sizeof (uint32_t) + + sizeof (fd_t))]; unsigned char type; unsigned char flags; + char group [16]; uint32_t routing_id; + fd_t fd; } base; struct { metadata_t *metadata; @@ -173,54 +193,69 @@ namespace zmq unsigned char size; unsigned char type; unsigned char flags; + char group [16]; uint32_t routing_id; + fd_t fd; } vsm; struct { metadata_t *metadata; content_t *content; - unsigned char unused [msg_t_size - (8 + sizeof (metadata_t *) - + sizeof (content_t*) - + 2 - + sizeof(uint32_t))]; + unsigned char unused [msg_t_size - (sizeof (metadata_t *) + + sizeof (content_t*) + + 2 + + 16 + + sizeof (uint32_t) + + sizeof (fd_t))]; unsigned char type; unsigned char flags; + char group [16]; uint32_t routing_id; + fd_t fd; } lmsg; struct { metadata_t *metadata; - void *data; - size_t size; - msg_free_fn *ffn; - void *hint; - zmq::atomic_counter_t* refcnt; - unsigned char unused [msg_t_size - (8 + sizeof (metadata_t *) - + sizeof (void*) - + sizeof (size_t) - + sizeof (msg_free_fn*) - + sizeof (void*) - + sizeof (zmq::atomic_counter_t*) - + 2 - + sizeof(uint32_t))]; + content_t *content; + unsigned char unused [msg_t_size - (sizeof (metadata_t *) + + sizeof (content_t*) + + 2 + + 16 + + sizeof (uint32_t) + + sizeof (fd_t))]; unsigned char type; unsigned char flags; + char group [16]; uint32_t routing_id; + fd_t fd; } zclmsg; struct { metadata_t *metadata; void* data; size_t size; - unsigned char unused - [msg_t_size - (8 + sizeof (metadata_t *) + sizeof (void*) + sizeof (size_t) + 2 + sizeof(uint32_t))]; + unsigned char unused [msg_t_size - (sizeof (metadata_t *) + + sizeof (void*) + + sizeof (size_t) + + 2 + + 16 + + sizeof (uint32_t) + + sizeof (fd_t))]; unsigned char type; unsigned char flags; + char group [16]; uint32_t routing_id; + fd_t fd; } cmsg; struct { metadata_t *metadata; - unsigned char unused [msg_t_size - (8 + sizeof (metadata_t *) + 2 + sizeof(uint32_t))]; + unsigned char unused [msg_t_size - (sizeof (metadata_t *) + + 2 + + 16 + + sizeof (uint32_t) + + sizeof (fd_t))]; unsigned char type; unsigned char flags; + char group [16]; uint32_t routing_id; + fd_t fd; } delimiter; } u; }; diff --git a/src/radio.cpp b/src/radio.cpp index 14441d6a..51e654c9 100644 --- a/src/radio.cpp +++ b/src/radio.cpp @@ -65,15 +65,13 @@ void zmq::radio_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_) void zmq::radio_t::xread_activated (pipe_t *pipe_) { // There are some subscriptions waiting. Let's process them. - msg_t sub; - while (pipe_->read (&sub)) { + msg_t msg; + while (pipe_->read (&msg)) { // Apply the subscription to the trie - const char * data = (char *) sub.data (); - const size_t size = sub.size (); - if (size > 0 && (*data == 'J' || *data == 'L')) { - std::string group = std::string (data + 1, sub. size() - 1); + if (msg.is_join () || msg.is_leave ()) { + std::string group = std::string (msg.group ()); - if (*data == 'J') + if (msg.is_join ()) subscriptions.insert (subscriptions_t::value_type (group, pipe_)); else { std::pair range = @@ -87,7 +85,7 @@ void zmq::radio_t::xread_activated (pipe_t *pipe_) } } } - sub.close (); + msg.close (); } } @@ -115,37 +113,13 @@ int zmq::radio_t::xsend (msg_t *msg_) return -1; } - size_t size = msg_->size (); - char *group = (char*) msg_->data(); - - // Maximum allowed group length is 255 - if (size > ZMQ_GROUP_MAX_LENGTH) - size = ZMQ_GROUP_MAX_LENGTH; - - // Check if NULL terminated - bool terminated = false; - - for (size_t index = 0; index < size; index++) { - if (group[index] == '\0') { - terminated = true; - break; - } - } - - if (!terminated) { - // User didn't include a group in the message - errno = EINVAL; - return -1; - } - dist.unmatch (); std::pair range = - subscriptions.equal_range (std::string(group)); + subscriptions.equal_range (std::string(msg_->group ())); - for (subscriptions_t::iterator it = range.first; it != range.second; ++it) { + for (subscriptions_t::iterator it = range.first; it != range.second; ++it) dist.match (it-> second); - } int rc = dist.send_to_matching (msg_); @@ -168,3 +142,93 @@ bool zmq::radio_t::xhas_in () { return false; } + +zmq::radio_session_t::radio_session_t (io_thread_t *io_thread_, bool connect_, + socket_base_t *socket_, const options_t &options_, + address_t *addr_) : + session_base_t (io_thread_, connect_, socket_, options_, addr_), + state (group) +{ +} + +zmq::radio_session_t::~radio_session_t () +{ +} + +int zmq::radio_session_t::push_msg (msg_t *msg_) +{ + if (msg_->flags() & msg_t::command) { + char *command_data = + static_cast (msg_->data ()); + const size_t data_size = msg_->size (); + + int group_length; + char * group; + + msg_t join_leave_msg; + int rc; + + // Set the msg type to either JOIN or LEAVE + if (data_size >= 5 && memcmp (command_data, "\4JOIN", 5) == 0) { + group_length = data_size - 5; + group = command_data + 5; + rc = join_leave_msg.init_join (); + } + else if (data_size >= 6 && memcmp (command_data, "\5LEAVE", 6) == 0) { + group_length = data_size - 6; + group = command_data + 6; + rc = join_leave_msg.init_leave (); + } + // If it is not a JOIN or LEAVE just push the message + else + return session_base_t::push_msg (msg_); + + errno_assert (rc == 0); + + // Set the group + rc = join_leave_msg.set_group (group, group_length); + errno_assert (rc == 0); + + // Close the current command + rc = msg_->close (); + errno_assert (rc == 0); + + // Push the join or leave command + *msg_ = join_leave_msg; + return session_base_t::push_msg (msg_); + } + else + return session_base_t::push_msg (msg_); +} + +int zmq::radio_session_t::pull_msg (msg_t *msg_) +{ + if (state == group) { + int rc = session_base_t::pull_msg (&pending_msg); + if (rc != 0) + return rc; + + const char *group = pending_msg.group (); + int length = strlen (group); + + // First frame is the group + msg_->init_size (length); + msg_->set_flags (msg_t::more); + memcpy (msg_->data (), group, length); + + // Next status is the body + state = body; + return 0; + } + else { + *msg_ = pending_msg; + state = group; + return 0; + } +} + +void zmq::radio_session_t::reset () +{ + session_base_t::reset (); + state = group; +} diff --git a/src/radio.hpp b/src/radio.hpp index 52157660..8e76a550 100644 --- a/src/radio.hpp +++ b/src/radio.hpp @@ -77,6 +77,31 @@ namespace zmq const radio_t &operator = (const radio_t&); }; + class radio_session_t : public session_base_t + { + public: + + radio_session_t (zmq::io_thread_t *io_thread_, bool connect_, + zmq::socket_base_t *socket_, const options_t &options_, + address_t *addr_); + ~radio_session_t (); + + // Overrides of the functions from session_base_t. + int push_msg (msg_t *msg_); + int pull_msg (msg_t *msg_); + void reset (); + private: + + enum { + group, + body + } state; + + msg_t pending_msg; + + radio_session_t (const radio_session_t&); + const radio_session_t &operator = (const radio_session_t&); + }; } #endif diff --git a/src/raw_decoder.cpp b/src/raw_decoder.cpp index 53b9a330..633530ef 100644 --- a/src/raw_decoder.cpp +++ b/src/raw_decoder.cpp @@ -62,13 +62,13 @@ int zmq::raw_decoder_t::decode (const uint8_t *data_, size_t size_, { int rc = in_progress.init ((unsigned char*)data_, size_, shared_message_memory_allocator::call_dec_ref, - allocator.buffer(), - allocator.provide_refcnt() ); + allocator.buffer (), + allocator.provide_content ()); // if the buffer serves as memory for a zero-copy message, release it // and allocate a new buffer in get_buffer for the next decode - if (in_progress.is_zcmsg()) { - allocator.advance_refcnt(); + if (in_progress.is_zcmsg ()) { + allocator.advance_content(); allocator.release(); } diff --git a/src/session_base.cpp b/src/session_base.cpp index 6d6ea5bd..515f2947 100644 --- a/src/session_base.cpp +++ b/src/session_base.cpp @@ -45,6 +45,8 @@ #include "ctx.hpp" #include "req.hpp" +#include "radio.hpp" +#include "dish.hpp" zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_, bool active_, class socket_base_t *socket_, const options_t &options_, @@ -56,6 +58,14 @@ zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_, s = new (std::nothrow) req_session_t (io_thread_, active_, socket_, options_, addr_); break; + case ZMQ_RADIO: + s = new (std::nothrow) radio_session_t (io_thread_, active_, + socket_, options_, addr_); + break; + case ZMQ_DISH: + s = new (std::nothrow) dish_session_t (io_thread_, active_, + socket_, options_, addr_); + break; case ZMQ_DEALER: case ZMQ_REP: case ZMQ_ROUTER: @@ -69,8 +79,6 @@ zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_, case ZMQ_STREAM: case ZMQ_SERVER: case ZMQ_CLIENT: - case ZMQ_RADIO: - case ZMQ_DISH: s = new (std::nothrow) session_base_t (io_thread_, active_, socket_, options_, addr_); break; diff --git a/src/v2_decoder.cpp b/src/v2_decoder.cpp index 09154144..6af74fd2 100644 --- a/src/v2_decoder.cpp +++ b/src/v2_decoder.cpp @@ -127,12 +127,12 @@ int zmq::v2_decoder_t::size_ready(uint64_t msg_size, unsigned char const* read_p // if the message will be a large message, pass a valid refcnt memory location as well rc = in_progress.init ((unsigned char *) read_pos, static_cast (msg_size), shared_message_memory_allocator::call_dec_ref, buffer(), - provide_refcnt ()); + provide_content ()); // For small messages, data has been copied and refcount does not have to be increased if (in_progress.is_zcmsg()) { - advance_refcnt(); + advance_content(); inc_ref(); } } diff --git a/src/zmq.cpp b/src/zmq.cpp index 4d508496..a041538d 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -695,6 +695,16 @@ uint32_t zmq_msg_routing_id (zmq_msg_t *msg_) return ((zmq::msg_t *) msg_)->get_routing_id (); } +int zmq_msg_set_group (zmq_msg_t *msg_, const char *group_) +{ + return ((zmq::msg_t *) msg_)->set_group (group_); +} + +const char *zmq_msg_group (zmq_msg_t *msg_) +{ + return ((zmq::msg_t *) msg_)->group (); +} + // Get message metadata string const char *zmq_msg_gets (zmq_msg_t *msg_, const char *property_) diff --git a/tests/test_radio_dish.cpp b/tests/test_radio_dish.cpp index 80db94ad..560d330d 100644 --- a/tests/test_radio_dish.cpp +++ b/tests/test_radio_dish.cpp @@ -29,6 +29,57 @@ #include "testutil.hpp" +int msg_send (zmq_msg_t *msg_, void *s_, const char* group_, const char* body_) +{ + int rc = zmq_msg_init_size (msg_, strlen (body_)); + if (rc != 0) + return rc; + + memcpy (zmq_msg_data (msg_), body_, strlen (body_)); + + rc = zmq_msg_set_group (msg_, group_); + if (rc != 0) { + zmq_msg_close (msg_); + return rc; + } + + rc = zmq_msg_send (msg_, s_, 0); + + zmq_msg_close (msg_); + + return rc; +} + +int msg_recv_cmp (zmq_msg_t *msg_, void *s_, const char* group_, const char* body_) +{ + int rc = zmq_msg_init (msg_); + if (rc != 0) + return -1; + + int recv_rc = zmq_msg_recv (msg_, s_, 0); + if (recv_rc == -1) + return -1; + + if (strcmp (zmq_msg_group (msg_), group_) != 0) + { + zmq_msg_close (msg_); + return -1; + } + + char * body = (char*) malloc (sizeof(char) * (zmq_msg_size (msg_) + 1)); + memcpy (body, zmq_msg_data (msg_), zmq_msg_size (msg_)); + body [zmq_msg_size (msg_)] = '\0'; + + if (strcmp (body, body_) != 0) + { + zmq_msg_close (msg_); + return -1; + } + + zmq_msg_close (msg_); + return recv_rc; +} + int main (void) { setup_test_environment (); @@ -38,11 +89,11 @@ int main (void) void *radio = zmq_socket (ctx, ZMQ_RADIO); void *dish = zmq_socket (ctx, ZMQ_DISH); - int rc = zmq_bind (radio, "inproc://test-radio-dish"); + int rc = zmq_bind (radio, "tcp://127.0.0.1:5556"); assert (rc == 0); // Leaving a group which we didn't join - rc = zmq_leave (dish, "World"); + rc = zmq_leave (dish, "Movies"); assert (rc == -1); // Joining too long group @@ -54,66 +105,64 @@ int main (void) assert (rc == -1); // Joining - rc = zmq_join (dish, "World"); + rc = zmq_join (dish, "Movies"); assert (rc == 0); // Duplicate Joining - rc = zmq_join (dish, "World"); + rc = zmq_join (dish, "Movies"); assert (rc == -1); // Connecting - rc = zmq_connect (dish, "inproc://test-radio-dish"); + rc = zmq_connect (dish, "tcp://127.0.0.1:5556"); assert (rc == 0); zmq_sleep (1); - // This is not going to be sent as dish only subscribe to "World" - rc = zmq_send (radio, "Hello\0Message", 13, 0); - assert (rc == 13); + zmq_msg_t msg; + + // This is not going to be sent as dish only subscribe to "Movies" + rc = msg_send (&msg, radio, "TV", "Friends"); + assert (rc == 7); // This is going to be sent to the dish - rc = zmq_send (radio, "World\0Message", 13, 0); - assert (rc == 13); + rc = msg_send (&msg, radio, "Movies", "Godfather"); + assert (rc == 9); - char* data = (char*) malloc (sizeof(char) * 13); - - rc = zmq_recv (dish, data, 13, 0); - assert (rc == 13); - assert (strcmp (data, "World") == 0); + // Check the correct message arrived + rc = msg_recv_cmp (&msg, dish, "Movies", "Godfather"); + assert (rc == 9); // Join group during connection optvallen - rc = zmq_join (dish, "Hello"); + rc = zmq_join (dish, "TV"); assert (rc == 0); zmq_sleep (1); // This should arrive now as we joined the group - rc = zmq_send (radio, "Hello\0Message", 13, 0); - assert (rc == 13); + rc = msg_send (&msg, radio, "TV", "Friends"); + assert (rc == 7); - rc = zmq_recv (dish, data, 13, 0); - assert (rc == 13); - assert (strcmp (data, "Hello") == 0); + // Check the correct message arrived + rc = msg_recv_cmp (&msg, dish, "TV", "Friends"); + assert (rc == 7); - // Leaving group - rc = zmq_leave (dish, "Hello"); + // Leaving groupr + rc = zmq_leave (dish, "TV"); assert (rc == 0); zmq_sleep (1); - // This is not going to be sent as dish only subscribe to "World" - rc = zmq_send (radio, "Hello\0Message", 13, 0); - assert (rc == 13); + // This is not going to be sent as dish only subscribe to "Movies" + rc = msg_send (&msg, radio, "TV", "Friends"); + assert (rc == 7); // This is going to be sent to the dish - rc = zmq_send (radio, "World\0Message", 13, 0); - assert (rc == 13); + rc = msg_send (&msg, radio, "Movies", "Godfather"); + assert (rc == 9); - rc = zmq_recv (dish, data, 13, 0); - assert (rc == 13); - assert (strcmp (data, "World") == 0); - - free (data); + // Check the correct message arrived + rc = msg_recv_cmp (&msg, dish, "Movies", "Godfather"); + assert (rc == 9); rc = zmq_close (dish); assert (rc == 0);