From c7d52ec260e0168c53e795e389501d55f25aa708 Mon Sep 17 00:00:00 2001 From: somdoron Date: Thu, 28 Jan 2016 18:20:01 +0200 Subject: [PATCH] radio-dish join/leave are ZMTP commands --- src/dish.cpp | 90 +++++++++++++++++++++++++++++++++++---------------- src/dish.hpp | 1 + src/msg.cpp | 32 ++++++++++++++++++ src/msg.hpp | 13 ++++++-- src/radio.cpp | 60 +++++++++++++++++++++++++++++----- src/radio.hpp | 1 + 6 files changed, 160 insertions(+), 37 deletions(-) diff --git a/src/dish.cpp b/src/dish.cpp index acfe2988..45023d3f 100644 --- a/src/dish.cpp +++ b/src/dish.cpp @@ -90,13 +90,15 @@ void zmq::dish_t::xhiccuped (pipe_t *pipe_) int zmq::dish_t::xjoin (const char* group_) { - if (strlen (group_) > ZMQ_GROUP_MAX_LENGTH) { + std::string group = std::string (group_); + + if (group.length () > ZMQ_GROUP_MAX_LENGTH) { errno = EINVAL; return -1; } subscriptions_t::iterator it = - std::find (subscriptions.begin (), subscriptions.end (), std::string(group_)); + std::find (subscriptions.begin (), subscriptions.end (), group); // User cannot join same group twice if (it != subscriptions.end ()) { @@ -104,16 +106,14 @@ int zmq::dish_t::xjoin (const char* group_) return -1; } - subscriptions.push_back (std::string (group_)); + subscriptions.push_back (group); - size_t len = strlen (group_); msg_t msg; - int rc = msg.init_size (len + 1); + int rc = msg.init_join (); errno_assert (rc == 0); - char *data = (char*) msg.data (); - data[0] = 'J'; - memcpy (data + 1, group_, len); + rc = msg.set_group (group_); + errno_assert (rc == 0); int err = 0; rc = dist.send_to_all (&msg); @@ -128,12 +128,14 @@ int zmq::dish_t::xjoin (const char* group_) int zmq::dish_t::xleave (const char* group_) { - if (strlen (group_) > ZMQ_GROUP_MAX_LENGTH) { + std::string group = std::string (group_); + + if (group.length () > ZMQ_GROUP_MAX_LENGTH) { errno = EINVAL; return -1; } - subscriptions_t::iterator it = std::find (subscriptions.begin (), subscriptions.end (), std::string (group_)); + subscriptions_t::iterator it = std::find (subscriptions.begin (), subscriptions.end (), group); if (it == subscriptions.end ()) { errno = EINVAL; @@ -142,14 +144,12 @@ int zmq::dish_t::xleave (const char* group_) subscriptions.erase (it); - size_t len = strlen (group_); msg_t msg; - int rc = msg.init_size (len + 1); + int rc = msg.init_leave (); errno_assert (rc == 0); - char *data = (char*) msg.data (); - data[0] = 'L'; - memcpy (data + 1, group_, len); + rc = msg.set_group (group_); + errno_assert (rc == 0); int err = 0; rc = dist.send_to_all (&msg); @@ -226,21 +226,15 @@ void zmq::dish_t::send_subscriptions (pipe_t *pipe_) { for (subscriptions_t::iterator it = subscriptions.begin (); it != subscriptions.end (); ++it) { msg_t msg; - int rc = msg.init_size (it->length () + 1); + int rc = msg.init_join (); + errno_assert (rc == 0); + + rc = msg.set_group (it->c_str()); errno_assert (rc == 0); - char *data = (char*) msg.data (); - data [0] = 'J'; - it->copy (data + 1, it->length ()); // Send it to the pipe. - bool sent = pipe_->write (&msg); - - // If we reached the SNDHWM, and thus cannot send the subscription, drop - // the subscription message instead. This matches the behaviour of - // zmq_setsockopt(ZMQ_SUBSCRIBE, ...), which also drops subscriptions - // when the SNDHWM is reached. - if (!sent) - msg.close (); + pipe_->write (&msg); + msg.close (); } pipe_->flush (); @@ -303,6 +297,48 @@ int zmq::dish_session_t::push_msg (msg_t *msg_) } } +int zmq::dish_session_t::pull_msg (msg_t *msg_) +{ + int rc = session_base_t::pull_msg (msg_); + + if (rc != 0) + return rc; + + if (!msg_->is_join () && !msg_->is_leave ()) + return rc; + else { + int group_length = strlen (msg_->group ()); + + msg_t command; + int offset; + + if (msg_->is_join ()) { + command.init_size (group_length + 5); + offset = 5; + memcpy (command.data (), "\4JOIN", 5); + } + else { + command.init_size (group_length + 6); + offset = 6; + memcpy (command.data (), "\5LEAVE", 6); + } + + command.set_flags (msg_t::command); + char* command_data = (char*)command.data (); + + // Copy the group + memcpy (command_data + offset, msg_->group (), group_length); + + // Close the join message + int rc = msg_->close (); + errno_assert (rc == 0); + + *msg_ = command; + + return 0; + } +} + void zmq::dish_session_t::reset () { session_base_t::reset (); diff --git a/src/dish.hpp b/src/dish.hpp index 414083ee..7759a462 100644 --- a/src/dish.hpp +++ b/src/dish.hpp @@ -104,6 +104,7 @@ namespace zmq // Overrides of the functions from session_base_t. int push_msg (msg_t *msg_); + int pull_msg (msg_t *msg_); void reset (); private: diff --git a/src/msg.cpp b/src/msg.cpp index 263c8e93..a421756e 100644 --- a/src/msg.cpp +++ b/src/msg.cpp @@ -200,6 +200,28 @@ int zmq::msg_t::init_delimiter () return 0; } +int zmq::msg_t::init_join () +{ + u.base.metadata = NULL; + u.base.type = type_join; + u.base.flags = 0; + u.base.group[0] = '\0'; + u.base.routing_id = 0; + u.base.fd = retired_fd; + return 0; +} + +int zmq::msg_t::init_leave () +{ + u.base.metadata = NULL; + u.base.type = type_leave; + u.base.flags = 0; + u.base.group[0] = '\0'; + u.base.routing_id = 0; + u.base.fd = retired_fd; + return 0; +} + int zmq::msg_t::close () { // Check the validity of the message. @@ -440,6 +462,16 @@ bool zmq::msg_t::is_zcmsg() const return u.base.type == type_zclmsg; } +bool zmq::msg_t::is_join() const +{ + return u.base.type == type_join; +} + +bool zmq::msg_t::is_leave() const +{ + return u.base.type == type_leave; +} + void zmq::msg_t::add_refs (int refs_) { zmq_assert (refs_ >= 0); diff --git a/src/msg.hpp b/src/msg.hpp index 330c9984..22cda9ac 100644 --- a/src/msg.hpp +++ b/src/msg.hpp @@ -95,6 +95,8 @@ namespace zmq int init_external_storage(content_t* content_, void *data_, size_t size_, msg_free_fn *ffn_, void *hint_); int init_delimiter (); + int init_join (); + int init_leave (); int close (); int move (msg_t &src_); int copy (msg_t &src_); @@ -111,6 +113,8 @@ namespace zmq bool is_identity () const; bool is_credential () const; bool is_delimiter () const; + bool is_join () const; + bool is_leave () const; bool is_vsm () const; bool is_cmsg () const; bool is_zcmsg() const; @@ -137,7 +141,6 @@ namespace zmq 16 + sizeof (uint32_t) + sizeof (fd_t))}; - private: zmq::atomic_counter_t* refcnt(); @@ -157,7 +160,13 @@ namespace zmq // zero-copy LMSG message for v2_decoder type_zclmsg = 105, - type_max = 105 + // Join message for radio_dish + type_join = 106, + + // Leave message for radio_dish + type_leave = 107, + + type_max = 107 }; // Note that fields shared between different message types are not diff --git a/src/radio.cpp b/src/radio.cpp index 03c6ba41..51e654c9 100644 --- a/src/radio.cpp +++ b/src/radio.cpp @@ -65,15 +65,13 @@ void zmq::radio_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_) void zmq::radio_t::xread_activated (pipe_t *pipe_) { // There are some subscriptions waiting. Let's process them. - msg_t sub; - while (pipe_->read (&sub)) { + msg_t msg; + while (pipe_->read (&msg)) { // Apply the subscription to the trie - const char * data = (char *) sub.data (); - const size_t size = sub.size (); - if (size > 0 && (*data == 'J' || *data == 'L')) { - std::string group = std::string (data + 1, sub. size() - 1); + if (msg.is_join () || msg.is_leave ()) { + std::string group = std::string (msg.group ()); - if (*data == 'J') + if (msg.is_join ()) subscriptions.insert (subscriptions_t::value_type (group, pipe_)); else { std::pair range = @@ -87,7 +85,7 @@ void zmq::radio_t::xread_activated (pipe_t *pipe_) } } } - sub.close (); + msg.close (); } } @@ -157,6 +155,52 @@ zmq::radio_session_t::~radio_session_t () { } +int zmq::radio_session_t::push_msg (msg_t *msg_) +{ + if (msg_->flags() & msg_t::command) { + char *command_data = + static_cast (msg_->data ()); + const size_t data_size = msg_->size (); + + int group_length; + char * group; + + msg_t join_leave_msg; + int rc; + + // Set the msg type to either JOIN or LEAVE + if (data_size >= 5 && memcmp (command_data, "\4JOIN", 5) == 0) { + group_length = data_size - 5; + group = command_data + 5; + rc = join_leave_msg.init_join (); + } + else if (data_size >= 6 && memcmp (command_data, "\5LEAVE", 6) == 0) { + group_length = data_size - 6; + group = command_data + 6; + rc = join_leave_msg.init_leave (); + } + // If it is not a JOIN or LEAVE just push the message + else + return session_base_t::push_msg (msg_); + + errno_assert (rc == 0); + + // Set the group + rc = join_leave_msg.set_group (group, group_length); + errno_assert (rc == 0); + + // Close the current command + rc = msg_->close (); + errno_assert (rc == 0); + + // Push the join or leave command + *msg_ = join_leave_msg; + return session_base_t::push_msg (msg_); + } + else + return session_base_t::push_msg (msg_); +} + int zmq::radio_session_t::pull_msg (msg_t *msg_) { if (state == group) { diff --git a/src/radio.hpp b/src/radio.hpp index 596a8cd4..8e76a550 100644 --- a/src/radio.hpp +++ b/src/radio.hpp @@ -87,6 +87,7 @@ namespace zmq ~radio_session_t (); // Overrides of the functions from session_base_t. + int push_msg (msg_t *msg_); int pull_msg (msg_t *msg_); void reset (); private: