diff --git a/src/xpub.cpp b/src/xpub.cpp index 9c4f1e2f..e30ada58 100644 --- a/src/xpub.cpp +++ b/src/xpub.cpp @@ -41,18 +41,18 @@ zmq::xpub_t::xpub_t (class ctx_t *parent_, uint32_t tid_, int sid_) : verbose_unsubs (false), more (false), lossy (true), - manual(false), + manual (false), pending_pipes (), welcome_msg () { last_pipe = NULL; options.type = ZMQ_XPUB; - welcome_msg.init(); + welcome_msg.init (); } zmq::xpub_t::~xpub_t () { - welcome_msg.close(); + welcome_msg.close (); } void zmq::xpub_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_) @@ -65,15 +65,16 @@ void zmq::xpub_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_) if (subscribe_to_all_) subscriptions.add (NULL, 0, pipe_); - // if welcome message exist - if (welcome_msg.size() > 0) + // if welcome message exists, send a copy of it + if (welcome_msg.size () > 0) { msg_t copy; - copy.init(); - copy.copy(welcome_msg); - - pipe_->write(©); - pipe_->flush(); + copy.init (); + int rc = copy.copy (welcome_msg); + errno_assert (rc == 0); + bool ok = pipe_->write (©); + zmq_assert (ok); + pipe_->flush (); } // The pipe is active when attached. Let's read the subscriptions from @@ -95,35 +96,35 @@ void zmq::xpub_t::xread_activated (pipe_t *pipe_) { // Store manual subscription to use on termination if (*data == 0) - manual_subscriptions.rm(data + 1, size - 1, pipe_); + manual_subscriptions.rm (data + 1, size - 1, pipe_); else - manual_subscriptions.add(data + 1, size - 1, pipe_); + manual_subscriptions.add (data + 1, size - 1, pipe_); - pending_pipes.push_back(pipe_); - pending_data.push_back(blob_t(data, size)); + pending_pipes.push_back (pipe_); + pending_data.push_back (blob_t (data, size)); if (metadata) - metadata->add_ref(); - pending_metadata.push_back(metadata); - pending_flags.push_back(0); + metadata->add_ref (); + pending_metadata.push_back (metadata); + pending_flags.push_back (0); } else { bool unique; if (*data == 0) - unique = subscriptions.rm(data + 1, size - 1, pipe_); + unique = subscriptions.rm (data + 1, size - 1, pipe_); else - unique = subscriptions.add(data + 1, size - 1, pipe_); + unique = subscriptions.add (data + 1, size - 1, pipe_); // If the (un)subscription is not a duplicate store it so that it can be // passed to the user on next recv call unless verbose mode is enabled // which makes to pass always these messages. if (options.type == ZMQ_XPUB && (unique || (*data == 1 && verbose_subs) || (*data == 0 && verbose_unsubs && verbose_subs))) { - pending_data.push_back(blob_t(data, size)); + pending_data.push_back (blob_t(data, size)); if (metadata) - metadata->add_ref(); - pending_metadata.push_back(metadata); - pending_flags.push_back(0); + metadata->add_ref (); + pending_metadata.push_back (metadata); + pending_flags.push_back (0); } } } @@ -131,7 +132,7 @@ void zmq::xpub_t::xread_activated (pipe_t *pipe_) // Process user message coming upstream from xsub socket pending_data.push_back (blob_t (data, size)); if (metadata) - metadata->add_ref(); + metadata->add_ref (); pending_metadata.push_back (metadata); pending_flags.push_back (sub.flags ()); } @@ -151,7 +152,7 @@ int zmq::xpub_t::xsetsockopt (int option_, const void *optval_, || option_ == ZMQ_XPUB_VERBOSER || option_ == ZMQ_XPUB_NODROP || option_ == ZMQ_XPUB_MANUAL) { - if (optvallen_ != sizeof(int) || *static_cast (optval_) < 0) { + if (optvallen_ != sizeof (int) || *static_cast (optval_) < 0) { errno = EINVAL; return -1; } @@ -174,26 +175,26 @@ int zmq::xpub_t::xsetsockopt (int option_, const void *optval_, else if (option_ == ZMQ_SUBSCRIBE && manual) { if (last_pipe != NULL) - subscriptions.add ((unsigned char *)optval_, optvallen_, last_pipe); + subscriptions.add ((unsigned char *) optval_, optvallen_, last_pipe); } else if (option_ == ZMQ_UNSUBSCRIBE && manual) { if (last_pipe != NULL) - subscriptions.rm ((unsigned char *)optval_, optvallen_, last_pipe); + subscriptions.rm ((unsigned char *) optval_, optvallen_, last_pipe); } else if (option_ == ZMQ_XPUB_WELCOME_MSG) { - welcome_msg.close(); + welcome_msg.close (); if (optvallen_ > 0) { - int rc = welcome_msg.init_size(optvallen_); + int rc = welcome_msg.init_size (optvallen_); errno_assert(rc == 0); - unsigned char *data = (unsigned char*)welcome_msg.data(); - memcpy(data, optval_, optvallen_); + unsigned char *data = (unsigned char*) welcome_msg.data (); + memcpy (data, optval_, optvallen_); } else - welcome_msg.init(); + welcome_msg.init (); } else { errno = EINVAL;