diff --git a/src/stream_engine.cpp b/src/stream_engine.cpp index 2e57ec3c..584d9343 100644 --- a/src/stream_engine.cpp +++ b/src/stream_engine.cpp @@ -752,10 +752,19 @@ int zmq::stream_engine_t::process_identity_msg (msg_t *msg_) errno_assert (rc == 0); } - if (subscription_required) - process_msg = &stream_engine_t::write_subscription_msg; - else - process_msg = &stream_engine_t::push_msg_to_session; + if (subscription_required) { + msg_t subscription; + + // Inject the subscription message, so that also + // ZMQ 2.x peers receive published messages. + int rc = subscription.init_size (1); + errno_assert (rc == 0); + *(unsigned char*) subscription.data () = 1; + rc = session->push_msg (&subscription); + errno_assert (rc == 0); + } + + process_msg = &stream_engine_t::push_msg_to_session; return 0; } @@ -947,23 +956,6 @@ int zmq::stream_engine_t::push_one_then_decode_and_push (msg_t *msg_) return rc; } -int zmq::stream_engine_t::write_subscription_msg (msg_t *msg_) -{ - msg_t subscription; - - // Inject the subscription message, so that also - // ZMQ 2.x peers receive published messages. - int rc = subscription.init_size (1); - errno_assert (rc == 0); - *(unsigned char*) subscription.data () = 1; - rc = session->push_msg (&subscription); - if (rc == -1) - return -1; - - process_msg = &stream_engine_t::push_msg_to_session; - return push_msg_to_session (msg_); -} - void zmq::stream_engine_t::error (error_reason_t reason) { if (options.raw_socket && options.raw_notify) { diff --git a/src/stream_engine.hpp b/src/stream_engine.hpp index 0fab265e..68292c53 100644 --- a/src/stream_engine.hpp +++ b/src/stream_engine.hpp @@ -116,8 +116,6 @@ namespace zmq void mechanism_ready (); - int write_subscription_msg (msg_t *msg_); - size_t add_property (unsigned char *ptr, const char *name, const void *value, size_t value_len);