diff --git a/src/curve_server.cpp b/src/curve_server.cpp index c8888a8c..77a56c72 100644 --- a/src/curve_server.cpp +++ b/src/curve_server.cpp @@ -580,7 +580,7 @@ int zmq::curve_server_t::send_zap_request (const uint8_t *key) msg.set_flags (msg_t::more); rc = session->write_zap_msg (&msg); if (rc != 0) - return send_failure (&msg); + return close_and_return (&msg, -1); // Version frame rc = msg.init_size (3); @@ -589,7 +589,7 @@ int zmq::curve_server_t::send_zap_request (const uint8_t *key) msg.set_flags (msg_t::more); rc = session->write_zap_msg (&msg); if (rc != 0) - return send_failure (&msg); + return close_and_return (&msg, -1); // Request ID frame rc = msg.init_size (1); @@ -598,7 +598,7 @@ int zmq::curve_server_t::send_zap_request (const uint8_t *key) msg.set_flags (msg_t::more); rc = session->write_zap_msg (&msg); if (rc != 0) - return send_failure (&msg); + return close_and_return (&msg, -1); // Domain frame rc = msg.init_size (options.zap_domain.length ()); @@ -607,7 +607,7 @@ int zmq::curve_server_t::send_zap_request (const uint8_t *key) msg.set_flags (msg_t::more); rc = session->write_zap_msg (&msg); if (rc != 0) - return send_failure (&msg); + return close_and_return (&msg, -1); // Address frame rc = msg.init_size (peer_address.length ()); @@ -616,7 +616,7 @@ int zmq::curve_server_t::send_zap_request (const uint8_t *key) msg.set_flags (msg_t::more); rc = session->write_zap_msg (&msg); if (rc != 0) - return send_failure (&msg); + return close_and_return (&msg, -1); // Identity frame rc = msg.init_size (options.identity_size); @@ -625,7 +625,7 @@ int zmq::curve_server_t::send_zap_request (const uint8_t *key) msg.set_flags (msg_t::more); rc = session->write_zap_msg (&msg); if (rc != 0) - return send_failure (&msg); + return close_and_return (&msg, -1); // Mechanism frame rc = msg.init_size (5); @@ -634,7 +634,7 @@ int zmq::curve_server_t::send_zap_request (const uint8_t *key) msg.set_flags (msg_t::more); rc = session->write_zap_msg (&msg); if (rc != 0) - return send_failure (&msg); + return close_and_return (&msg, -1); // Credentials frame rc = msg.init_size (crypto_box_PUBLICKEYBYTES); @@ -642,7 +642,7 @@ int zmq::curve_server_t::send_zap_request (const uint8_t *key) memcpy (msg.data (), key, crypto_box_PUBLICKEYBYTES); rc = session->write_zap_msg (&msg); if (rc != 0) - return send_failure (&msg); + return close_and_return (&msg, -1); return 0; } @@ -661,12 +661,12 @@ int zmq::curve_server_t::receive_and_process_zap_reply () for (int i = 0; i < 7; i++) { rc = session->read_zap_msg (&msg [i]); if (rc == -1) - return send_failure (msg); + return close_and_return (msg, -1); if ((msg [i].flags () & msg_t::more) == (i < 6? 0: msg_t::more)) { // Temporary support for security debugging puts ("CURVE I: ZAP handler sent incomplete reply message"); errno = EPROTO; - return send_failure (msg); + return close_and_return (msg, -1); } } @@ -675,7 +675,7 @@ int zmq::curve_server_t::receive_and_process_zap_reply () // Temporary support for security debugging puts ("CURVE I: ZAP handler sent malformed reply message"); errno = EPROTO; - return send_failure (msg); + return close_and_return (msg, -1); } // Version frame @@ -683,7 +683,7 @@ int zmq::curve_server_t::receive_and_process_zap_reply () // Temporary support for security debugging puts ("CURVE I: ZAP handler sent bad version number"); errno = EPROTO; - return send_failure (msg); + return close_and_return (msg, -1); } // Request id frame @@ -691,7 +691,7 @@ int zmq::curve_server_t::receive_and_process_zap_reply () // Temporary support for security debugging puts ("CURVE I: ZAP handler sent bad request ID"); errno = EPROTO; - return send_failure (msg); + return close_and_return (msg, -1); } // Status code frame @@ -699,7 +699,7 @@ int zmq::curve_server_t::receive_and_process_zap_reply () // Temporary support for security debugging puts ("CURVE I: ZAP handler rejected client authentication"); errno = EACCES; - return send_failure (msg); + return close_and_return (msg, -1); } // Save status code @@ -713,7 +713,7 @@ int zmq::curve_server_t::receive_and_process_zap_reply () msg [6].size (), true); if (rc != 0) - return send_failure (msg); + return close_and_return (msg, -1); // Close all reply frames for (int i = 0; i < 7; i++) { diff --git a/src/gssapi_server.cpp b/src/gssapi_server.cpp index dfa6685c..a7b90b74 100644 --- a/src/gssapi_server.cpp +++ b/src/gssapi_server.cpp @@ -161,7 +161,7 @@ int zmq::gssapi_server_t::send_zap_request () msg.set_flags (msg_t::more); rc = session->write_zap_msg (&msg); if (rc != 0) - return send_failure (&msg); + return close_and_return (&msg, -1); // Version frame rc = msg.init_size (3); @@ -170,7 +170,7 @@ int zmq::gssapi_server_t::send_zap_request () msg.set_flags (msg_t::more); rc = session->write_zap_msg (&msg); if (rc != 0) - return send_failure (&msg); + return close_and_return (&msg, -1); // Request ID frame rc = msg.init_size (1); @@ -179,7 +179,7 @@ int zmq::gssapi_server_t::send_zap_request () msg.set_flags (msg_t::more); rc = session->write_zap_msg (&msg); if (rc != 0) - return send_failure (&msg); + return close_and_return (&msg, -1); // Domain frame rc = msg.init_size (options.zap_domain.length ()); @@ -188,7 +188,7 @@ int zmq::gssapi_server_t::send_zap_request () msg.set_flags (msg_t::more); rc = session->write_zap_msg (&msg); if (rc != 0) - return send_failure (&msg); + return close_and_return (&msg, -1); // Address frame rc = msg.init_size (peer_address.length ()); @@ -197,7 +197,7 @@ int zmq::gssapi_server_t::send_zap_request () msg.set_flags (msg_t::more); rc = session->write_zap_msg (&msg); if (rc != 0) - return send_failure (&msg); + return close_and_return (&msg, -1); // Identity frame rc = msg.init_size (options.identity_size); @@ -206,7 +206,7 @@ int zmq::gssapi_server_t::send_zap_request () msg.set_flags (msg_t::more); rc = session->write_zap_msg (&msg); if (rc != 0) - return send_failure (&msg); + return close_and_return (&msg, -1); // Mechanism frame rc = msg.init_size (6); @@ -215,7 +215,7 @@ int zmq::gssapi_server_t::send_zap_request () msg.set_flags (msg_t::more); rc = session->write_zap_msg (&msg); if (rc != 0) - return send_failure (&msg); + return close_and_return (&msg, -1); // Principal frame gss_buffer_desc principal; @@ -227,7 +227,7 @@ int zmq::gssapi_server_t::send_zap_request () rc = session->write_zap_msg (&msg); gss_release_buffer(&min_stat, &principal); if (rc != 0) - return send_failure (&msg); + return close_and_return (&msg, -1); return 0; } @@ -246,35 +246,35 @@ int zmq::gssapi_server_t::receive_and_process_zap_reply () for (int i = 0; i < 7; i++) { rc = session->read_zap_msg (&msg [i]); if (rc == -1) - return send_failure (msg); + return close_and_return (msg, -1); if ((msg [i].flags () & msg_t::more) == (i < 6? 0: msg_t::more)) { errno = EPROTO; - return send_failure (msg); + return close_and_return (msg, -1); } } // Address delimiter frame if (msg [0].size () > 0) { errno = EPROTO; - return send_failure (msg); + return close_and_return (msg, -1); } // Version frame if (msg [1].size () != 3 || memcmp (msg [1].data (), "1.0", 3)) { errno = EPROTO; - return send_failure (msg); + return close_and_return (msg, -1); } // Request id frame if (msg [2].size () != 1 || memcmp (msg [2].data (), "1", 1)) { errno = EPROTO; - return send_failure (msg); + return close_and_return (msg, -1); } // Status code frame if (msg [3].size () != 3 || memcmp (msg [3].data (), "200", 3)) { errno = EACCES; - return send_failure (msg); + return close_and_return (msg, -1); } // Save user id @@ -285,7 +285,7 @@ int zmq::gssapi_server_t::receive_and_process_zap_reply () msg [6].size (), true); if (rc != 0) - return send_failure (msg); + return close_and_return (msg, -1); // Close all reply frames for (int i = 0; i < 7; i++) { diff --git a/src/msg.cpp b/src/msg.cpp index 90e8a084..81076403 100644 --- a/src/msg.cpp +++ b/src/msg.cpp @@ -241,7 +241,7 @@ int zmq::msg_t::close () if (is_zcmsg()) { - zmq_assert( u.zclmsg.content->ffn ); + zmq_assert(u.zclmsg.content->ffn); // If the content is not shared, or if it is shared and the reference // count has dropped to zero, deallocate it. diff --git a/src/msg.hpp b/src/msg.hpp index ec567587..1da6466d 100644 --- a/src/msg.hpp +++ b/src/msg.hpp @@ -247,19 +247,21 @@ namespace zmq } u; }; - inline int send_failure (zmq::msg_t *msg) + inline int close_and_return (zmq::msg_t *msg, int echo) { + // Since we abort on close failure we preserve errno for success case. + int err = errno; const int rc = msg->close (); errno_assert (rc == 0); - return -1; + errno = err; + return echo; } - inline int send_failure (zmq::msg_t msg[], int count) + inline int close_and_return (zmq::msg_t msg [], int count, int echo) { for (int i = 0; i < count; i++) - send_failure (&msg [i]); - - return -1; + close_and_return (&msg [i], 0); + return echo; } } diff --git a/src/null_mechanism.cpp b/src/null_mechanism.cpp index 57ef4217..f9f73e76 100644 --- a/src/null_mechanism.cpp +++ b/src/null_mechanism.cpp @@ -225,7 +225,7 @@ int zmq::null_mechanism_t::send_zap_request () msg.set_flags (msg_t::more); rc = session->write_zap_msg (&msg); if (rc != 0) - return send_failure (&msg); + return close_and_return (&msg, -1); // Version frame rc = msg.init_size (3); @@ -234,7 +234,7 @@ int zmq::null_mechanism_t::send_zap_request () msg.set_flags (msg_t::more); rc = session->write_zap_msg (&msg); if (rc != 0) - return send_failure (&msg); + return close_and_return (&msg, -1); // Request id frame rc = msg.init_size (1); @@ -243,7 +243,7 @@ int zmq::null_mechanism_t::send_zap_request () msg.set_flags (msg_t::more); rc = session->write_zap_msg (&msg); if (rc != 0) - return send_failure (&msg); + return close_and_return (&msg, -1); // Domain frame rc = msg.init_size (options.zap_domain.length ()); @@ -252,7 +252,7 @@ int zmq::null_mechanism_t::send_zap_request () msg.set_flags (msg_t::more); rc = session->write_zap_msg (&msg); if (rc != 0) - return send_failure (&msg); + return close_and_return (&msg, -1); // Address frame rc = msg.init_size (peer_address.length ()); @@ -261,7 +261,7 @@ int zmq::null_mechanism_t::send_zap_request () msg.set_flags (msg_t::more); rc = session->write_zap_msg (&msg); if (rc != 0) - return send_failure (&msg); + return close_and_return (&msg, -1); // Identity frame rc = msg.init_size (options.identity_size); @@ -270,7 +270,7 @@ int zmq::null_mechanism_t::send_zap_request () msg.set_flags (msg_t::more); rc = session->write_zap_msg (&msg); if (rc != 0) - return send_failure (&msg); + return close_and_return (&msg, -1); // Mechanism frame rc = msg.init_size (4); @@ -278,7 +278,7 @@ int zmq::null_mechanism_t::send_zap_request () memcpy (msg.data (), "NULL", 4); rc = session->write_zap_msg (&msg); if (rc != 0) - return send_failure (&msg); + return close_and_return (&msg, -1); return 0; } @@ -297,12 +297,12 @@ int zmq::null_mechanism_t::receive_and_process_zap_reply () for (int i = 0; i < 7; i++) { rc = session->read_zap_msg (&msg [i]); if (rc == -1) - return send_failure (msg); + return close_and_return (msg, -1); if ((msg [i].flags () & msg_t::more) == (i < 6? 0: msg_t::more)) { // Temporary support for security debugging puts ("NULL I: ZAP handler sent incomplete reply message"); errno = EPROTO; - return send_failure (msg); + return close_and_return (msg, -1); } } @@ -311,7 +311,7 @@ int zmq::null_mechanism_t::receive_and_process_zap_reply () // Temporary support for security debugging puts ("NULL I: ZAP handler sent malformed reply message"); errno = EPROTO; - return send_failure (msg); + return close_and_return (msg, -1); } // Version frame @@ -319,7 +319,7 @@ int zmq::null_mechanism_t::receive_and_process_zap_reply () // Temporary support for security debugging puts ("NULL I: ZAP handler sent bad version number"); errno = EPROTO; - return send_failure (msg); + return close_and_return (msg, -1); } // Request id frame @@ -327,7 +327,7 @@ int zmq::null_mechanism_t::receive_and_process_zap_reply () // Temporary support for security debugging puts ("NULL I: ZAP handler sent bad request ID"); errno = EPROTO; - return send_failure (msg); + return close_and_return (msg, -1); } // Status code frame @@ -335,7 +335,7 @@ int zmq::null_mechanism_t::receive_and_process_zap_reply () // Temporary support for security debugging puts ("NULL I: ZAP handler rejected client authentication"); errno = EPROTO; - return send_failure (msg); + return close_and_return (msg, -1); } // Save status code @@ -349,7 +349,7 @@ int zmq::null_mechanism_t::receive_and_process_zap_reply () msg [6].size (), true); if (rc != 0) - return send_failure (msg); + return close_and_return (msg, -1); // Close all reply frames for (int i = 0; i < 7; i++) { diff --git a/src/plain_server.cpp b/src/plain_server.cpp index cb2caa7d..4d946b56 100644 --- a/src/plain_server.cpp +++ b/src/plain_server.cpp @@ -289,7 +289,7 @@ int zmq::plain_server_t::send_zap_request (const std::string &username, msg.set_flags (msg_t::more); rc = session->write_zap_msg (&msg); if (rc != 0) - return send_failure (&msg); + return close_and_return (&msg, -1); // Version frame rc = msg.init_size (3); @@ -298,7 +298,7 @@ int zmq::plain_server_t::send_zap_request (const std::string &username, msg.set_flags (msg_t::more); rc = session->write_zap_msg (&msg); if (rc != 0) - return send_failure (&msg); + return close_and_return (&msg, -1); // Request id frame rc = msg.init_size (1); @@ -307,7 +307,7 @@ int zmq::plain_server_t::send_zap_request (const std::string &username, msg.set_flags (msg_t::more); rc = session->write_zap_msg (&msg); if (rc != 0) - return send_failure (&msg); + return close_and_return (&msg, -1); // Domain frame rc = msg.init_size (options.zap_domain.length ()); @@ -316,7 +316,7 @@ int zmq::plain_server_t::send_zap_request (const std::string &username, msg.set_flags (msg_t::more); rc = session->write_zap_msg (&msg); if (rc != 0) - return send_failure (&msg); + return close_and_return (&msg, -1); // Address frame rc = msg.init_size (peer_address.length ()); @@ -325,7 +325,7 @@ int zmq::plain_server_t::send_zap_request (const std::string &username, msg.set_flags (msg_t::more); rc = session->write_zap_msg (&msg); if (rc != 0) - return send_failure (&msg); + return close_and_return (&msg, -1); // Identity frame rc = msg.init_size (options.identity_size); @@ -334,7 +334,7 @@ int zmq::plain_server_t::send_zap_request (const std::string &username, msg.set_flags (msg_t::more); rc = session->write_zap_msg (&msg); if (rc != 0) - return send_failure (&msg); + return close_and_return (&msg, -1); // Mechanism frame rc = msg.init_size (5); @@ -343,7 +343,7 @@ int zmq::plain_server_t::send_zap_request (const std::string &username, msg.set_flags (msg_t::more); rc = session->write_zap_msg (&msg); if (rc != 0) - return send_failure (&msg); + return close_and_return (&msg, -1); // Username frame rc = msg.init_size (username.length ()); @@ -352,7 +352,7 @@ int zmq::plain_server_t::send_zap_request (const std::string &username, msg.set_flags (msg_t::more); rc = session->write_zap_msg (&msg); if (rc != 0) - return send_failure (&msg); + return close_and_return (&msg, -1); // Password frame rc = msg.init_size (password.length ()); @@ -360,7 +360,7 @@ int zmq::plain_server_t::send_zap_request (const std::string &username, memcpy (msg.data (), password.c_str (), password.length ()); rc = session->write_zap_msg (&msg); if (rc != 0) - return send_failure (&msg); + return close_and_return (&msg, -1); return 0; } @@ -379,12 +379,12 @@ int zmq::plain_server_t::receive_and_process_zap_reply () for (int i = 0; i < 7; i++) { rc = session->read_zap_msg (&msg [i]); if (rc == -1) - return send_failure (msg); + return close_and_return (msg, -1); if ((msg [i].flags () & msg_t::more) == (i < 6? 0: msg_t::more)) { // Temporary support for security debugging puts ("PLAIN I: ZAP handler sent incomplete reply message"); errno = EPROTO; - return send_failure (msg); + return close_and_return (msg, -1); } } @@ -393,7 +393,7 @@ int zmq::plain_server_t::receive_and_process_zap_reply () // Temporary support for security debugging puts ("PLAIN I: ZAP handler sent malformed reply message"); errno = EPROTO; - return send_failure (msg); + return close_and_return (msg, -1); } // Version frame @@ -401,7 +401,7 @@ int zmq::plain_server_t::receive_and_process_zap_reply () // Temporary support for security debugging puts ("PLAIN I: ZAP handler sent bad version number"); errno = EPROTO; - return send_failure (msg); + return close_and_return (msg, -1); } // Request id frame @@ -409,7 +409,7 @@ int zmq::plain_server_t::receive_and_process_zap_reply () // Temporary support for security debugging puts ("PLAIN I: ZAP handler sent bad request ID"); errno = EPROTO; - return send_failure (msg); + return close_and_return (msg, -1); } // Status code frame @@ -417,7 +417,7 @@ int zmq::plain_server_t::receive_and_process_zap_reply () // Temporary support for security debugging puts ("PLAIN I: ZAP handler rejected client authentication"); errno = EACCES; - return send_failure (msg); + return close_and_return (msg, -1); } // Save status code @@ -431,7 +431,7 @@ int zmq::plain_server_t::receive_and_process_zap_reply () msg [6].size (), true); if (rc != 0) - return send_failure (msg); + return close_and_return (msg, -1); // Close all reply frames for (int i = 0; i < 7; i++) { diff --git a/src/proxy.cpp b/src/proxy.cpp index 3753af0a..2ca98ef4 100644 --- a/src/proxy.cpp +++ b/src/proxy.cpp @@ -143,7 +143,7 @@ int zmq::proxy ( // Wait while there are either requests or replies to process. rc = zmq_poll (&items [0], qt_poll_items, -1); if (unlikely (rc < 0)) - return -1; + return close_and_return (&msg, -1); // Get the pollout separately because when combining this with pollin it maxes the CPU // because pollout shall most of the time return directly. @@ -151,7 +151,7 @@ int zmq::proxy ( if (frontend_ != backend_) { rc = zmq_poll (&itemsout [0], 2, 0); if (unlikely (rc < 0)) { - return -1; + return close_and_return (&msg, -1); } } @@ -159,17 +159,17 @@ int zmq::proxy ( if (control_ && items [2].revents & ZMQ_POLLIN) { rc = control_->recv (&msg, 0); if (unlikely (rc < 0)) - return -1; + return close_and_return (&msg, -1); moresz = sizeof more; rc = control_->getsockopt (ZMQ_RCVMORE, &more, &moresz); if (unlikely (rc < 0) || more) - return -1; + return close_and_return (&msg, -1); // Copy message to capture socket if any - rc = capture(capture_, msg); + rc = capture (capture_, msg); if (unlikely (rc < 0)) - return -1; + return close_and_return (&msg, -1); if (msg.size () == 5 && memcmp (msg.data (), "PAUSE", 5) == 0) state = paused; @@ -180,7 +180,7 @@ int zmq::proxy ( if (msg.size () == 9 && memcmp (msg.data (), "TERMINATE", 9) == 0) state = terminated; else { - // This is an API error, we should assert + // This is an API error, so we assert puts ("E: invalid command sent to proxy"); zmq_assert (false); } @@ -189,19 +189,20 @@ int zmq::proxy ( if (state == active && items [0].revents & ZMQ_POLLIN && (frontend_ == backend_ || itemsout [1].revents & ZMQ_POLLOUT)) { - rc = forward(frontend_, backend_, capture_,msg); + rc = forward (frontend_, backend_, capture_, msg); if (unlikely (rc < 0)) - return -1; + return close_and_return (&msg, -1); } // Process a reply if (state == active && frontend_ != backend_ && items [1].revents & ZMQ_POLLIN && itemsout [0].revents & ZMQ_POLLOUT) { - rc = forward(backend_, frontend_, capture_,msg); + rc = forward (backend_, frontend_, capture_, msg); if (unlikely (rc < 0)) - return -1; + return close_and_return (&msg, -1); } } - return 0; + + return close_and_return (&msg, 0); } diff --git a/src/sub.cpp b/src/sub.cpp index 4ff07346..e8718e58 100644 --- a/src/sub.cpp +++ b/src/sub.cpp @@ -69,15 +69,8 @@ int zmq::sub_t::xsetsockopt (int option_, const void *optval_, memcpy (data + 1, optval_, optvallen_); } // Pass it further on in the stack. - int err = 0; rc = xsub_t::xsend (&msg); - if (rc != 0) - err = errno; - int rc2 = msg.close (); - errno_assert (rc2 == 0); - if (rc != 0) - errno = err; - return rc; + return close_and_return (&msg, rc); } int zmq::sub_t::xsend (msg_t *) diff --git a/src/xpub.cpp b/src/xpub.cpp index 9c4f1e2f..e30ada58 100644 --- a/src/xpub.cpp +++ b/src/xpub.cpp @@ -41,18 +41,18 @@ zmq::xpub_t::xpub_t (class ctx_t *parent_, uint32_t tid_, int sid_) : verbose_unsubs (false), more (false), lossy (true), - manual(false), + manual (false), pending_pipes (), welcome_msg () { last_pipe = NULL; options.type = ZMQ_XPUB; - welcome_msg.init(); + welcome_msg.init (); } zmq::xpub_t::~xpub_t () { - welcome_msg.close(); + welcome_msg.close (); } void zmq::xpub_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_) @@ -65,15 +65,16 @@ void zmq::xpub_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_) if (subscribe_to_all_) subscriptions.add (NULL, 0, pipe_); - // if welcome message exist - if (welcome_msg.size() > 0) + // if welcome message exists, send a copy of it + if (welcome_msg.size () > 0) { msg_t copy; - copy.init(); - copy.copy(welcome_msg); - - pipe_->write(©); - pipe_->flush(); + copy.init (); + int rc = copy.copy (welcome_msg); + errno_assert (rc == 0); + bool ok = pipe_->write (©); + zmq_assert (ok); + pipe_->flush (); } // The pipe is active when attached. Let's read the subscriptions from @@ -95,35 +96,35 @@ void zmq::xpub_t::xread_activated (pipe_t *pipe_) { // Store manual subscription to use on termination if (*data == 0) - manual_subscriptions.rm(data + 1, size - 1, pipe_); + manual_subscriptions.rm (data + 1, size - 1, pipe_); else - manual_subscriptions.add(data + 1, size - 1, pipe_); + manual_subscriptions.add (data + 1, size - 1, pipe_); - pending_pipes.push_back(pipe_); - pending_data.push_back(blob_t(data, size)); + pending_pipes.push_back (pipe_); + pending_data.push_back (blob_t (data, size)); if (metadata) - metadata->add_ref(); - pending_metadata.push_back(metadata); - pending_flags.push_back(0); + metadata->add_ref (); + pending_metadata.push_back (metadata); + pending_flags.push_back (0); } else { bool unique; if (*data == 0) - unique = subscriptions.rm(data + 1, size - 1, pipe_); + unique = subscriptions.rm (data + 1, size - 1, pipe_); else - unique = subscriptions.add(data + 1, size - 1, pipe_); + unique = subscriptions.add (data + 1, size - 1, pipe_); // If the (un)subscription is not a duplicate store it so that it can be // passed to the user on next recv call unless verbose mode is enabled // which makes to pass always these messages. if (options.type == ZMQ_XPUB && (unique || (*data == 1 && verbose_subs) || (*data == 0 && verbose_unsubs && verbose_subs))) { - pending_data.push_back(blob_t(data, size)); + pending_data.push_back (blob_t(data, size)); if (metadata) - metadata->add_ref(); - pending_metadata.push_back(metadata); - pending_flags.push_back(0); + metadata->add_ref (); + pending_metadata.push_back (metadata); + pending_flags.push_back (0); } } } @@ -131,7 +132,7 @@ void zmq::xpub_t::xread_activated (pipe_t *pipe_) // Process user message coming upstream from xsub socket pending_data.push_back (blob_t (data, size)); if (metadata) - metadata->add_ref(); + metadata->add_ref (); pending_metadata.push_back (metadata); pending_flags.push_back (sub.flags ()); } @@ -151,7 +152,7 @@ int zmq::xpub_t::xsetsockopt (int option_, const void *optval_, || option_ == ZMQ_XPUB_VERBOSER || option_ == ZMQ_XPUB_NODROP || option_ == ZMQ_XPUB_MANUAL) { - if (optvallen_ != sizeof(int) || *static_cast (optval_) < 0) { + if (optvallen_ != sizeof (int) || *static_cast (optval_) < 0) { errno = EINVAL; return -1; } @@ -174,26 +175,26 @@ int zmq::xpub_t::xsetsockopt (int option_, const void *optval_, else if (option_ == ZMQ_SUBSCRIBE && manual) { if (last_pipe != NULL) - subscriptions.add ((unsigned char *)optval_, optvallen_, last_pipe); + subscriptions.add ((unsigned char *) optval_, optvallen_, last_pipe); } else if (option_ == ZMQ_UNSUBSCRIBE && manual) { if (last_pipe != NULL) - subscriptions.rm ((unsigned char *)optval_, optvallen_, last_pipe); + subscriptions.rm ((unsigned char *) optval_, optvallen_, last_pipe); } else if (option_ == ZMQ_XPUB_WELCOME_MSG) { - welcome_msg.close(); + welcome_msg.close (); if (optvallen_ > 0) { - int rc = welcome_msg.init_size(optvallen_); + int rc = welcome_msg.init_size (optvallen_); errno_assert(rc == 0); - unsigned char *data = (unsigned char*)welcome_msg.data(); - memcpy(data, optval_, optvallen_); + unsigned char *data = (unsigned char*) welcome_msg.data (); + memcpy (data, optval_, optvallen_); } else - welcome_msg.init(); + welcome_msg.init (); } else { errno = EINVAL; diff --git a/tests/test_xpub_welcome_msg.cpp b/tests/test_xpub_welcome_msg.cpp index 6c8fc34a..01b6dda1 100644 --- a/tests/test_xpub_welcome_msg.cpp +++ b/tests/test_xpub_welcome_msg.cpp @@ -31,7 +31,7 @@ int main (void) { - setup_test_environment(); + setup_test_environment (); void *ctx = zmq_ctx_new (); assert (ctx); @@ -42,15 +42,15 @@ int main (void) assert (rc == 0); // set pub socket options - rc = zmq_setsockopt(pub, ZMQ_XPUB_WELCOME_MSG, "W", 1); + rc = zmq_setsockopt (pub, ZMQ_XPUB_WELCOME_MSG, "W", 1); assert (rc == 0); // Create a subscriber void *sub = zmq_socket (ctx, ZMQ_SUB); // Subscribe to the welcome message - rc = zmq_setsockopt(sub, ZMQ_SUBSCRIBE, "W", 1); - assert(rc == 0); + rc = zmq_setsockopt (sub, ZMQ_SUBSCRIBE, "W", 1); + assert (rc == 0); assert (sub); rc = zmq_connect (sub, "inproc://soname"); @@ -60,14 +60,14 @@ int main (void) // Receive the welcome subscription rc = zmq_recv(pub, buffer, 2, 0); - assert(rc == 2); - assert(buffer[0] == 1); - assert(buffer[1] == 'W'); + assert (rc == 2); + assert (buffer [0] == 1); + assert (buffer [1] == 'W'); // Receive the welcome message - rc = zmq_recv(sub, buffer, 1, 0); - assert(rc == 1); - assert(buffer[0] == 'W'); + rc = zmq_recv (sub, buffer, 1, 0); + assert (rc == 1); + assert (buffer [0] == 'W'); // Clean up. rc = zmq_close (pub);