From 1bca4f6f033ed6bb56fcfd7762bbfea04ecf684b Mon Sep 17 00:00:00 2001 From: Martin Hurton Date: Sat, 1 Sep 2012 13:59:22 +0200 Subject: [PATCH 1/3] Extend ZTP/1.0 protocol The new protocol adds support for protocol version and exchanges the socket type, so that the library can reject a connection when the sockets do not match. The protocol was designed so that it's possible to detect and fully support ZTP/1.0 peers. When a new connection is set up, peers exchange greeting messages. The greeting message encodes both the protocol verion and the socket type. The format of the greeting message is as follows: greeting = tag1, adaptation, tag2, version, length, socket_type tag1 = BYTE / 0xff adaptation = 8 BYTES tag2 = BYTE / 0x7f version = BYTE / 1 length = BYTE / 1 socket_type = BYTE The protocol does not define the value of adaptation field. When interoperability with ZTP/1.0 peers is required, the adaptaion encodes, in network byte order, the length of identity message increased by 1. When adaptaion consists of eight zeros, the current implementatatio of 0MQ 2.x closes the connection. This patch supports both ZTP/1.0 and new protocol. --- src/stream_engine.cpp | 151 +++++++++++++++++++++++++++++++++++++++++- src/stream_engine.hpp | 36 ++++++++++ 2 files changed, 185 insertions(+), 2 deletions(-) diff --git a/src/stream_engine.cpp b/src/stream_engine.cpp index c411e708..d24de27e 100644 --- a/src/stream_engine.cpp +++ b/src/stream_engine.cpp @@ -41,6 +41,8 @@ #include "config.hpp" #include "err.hpp" #include "ip.hpp" +#include "likely.hpp" +#include "wire.hpp" zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_, const std::string &endpoint_) : s (fd_), @@ -51,6 +53,9 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_, cons outpos (NULL), outsize (0), encoder (out_batch_size), + handshaking (true), + greeting_bytes_read (0), + greeting_size (0), session (NULL), options (options_), endpoint (endpoint_), @@ -112,13 +117,27 @@ void zmq::stream_engine_t::plug (io_thread_t *io_thread_, // Connect to session object. zmq_assert (!session); zmq_assert (session_); - encoder.set_session (session_); - decoder.set_session (session_); session = session_; // Connect to I/O threads poller object. io_object_t::plug (io_thread_); handle = add_fd (s); + + // We need to detect whether our peer is using the versioned + // protocol. The detection is done in two steps. First, we read + // first two bytes and check if the long format of length is in use. + // If so, we receive and check the 'flags' field. If the rightmost bit + // is 1, the peer is using versioned protocol. + greeting_size = 2; + + // Send the 'length' and 'flags' fields of the identity message. + // The 'length' field is encoded in the long format. + outpos = greeting_output_buffer; + outpos [outsize++] = 0xff; + put_uint64 (&outpos [outsize], options.identity_size + 1); + outsize += 8; + outpos [outsize++] = 0x7f; + set_pollin (handle); set_pollout (handle); // Flush all the data that may have been already received downstream. @@ -150,6 +169,11 @@ void zmq::stream_engine_t::terminate () void zmq::stream_engine_t::in_event () { + // If still handshaking, receive and prcess the greeting message. + if (unlikely (handshaking)) + if (!handshake ()) + return; + bool disconnection = false; // If there's no data to process in the buffer... @@ -235,6 +259,12 @@ void zmq::stream_engine_t::out_event () outpos += nbytes; outsize -= nbytes; + + // If we are still handshaking and there are no data + // to send, stop polling for output. + if (unlikely (handshaking)) + if (outsize == 0) + reset_pollout (handle); } void zmq::stream_engine_t::activate_out () @@ -267,6 +297,123 @@ void zmq::stream_engine_t::activate_in () in_event (); } +int zmq::stream_engine_t::receive_greeting () +{ + zmq_assert (greeting_bytes_read < greeting_size); + + while (greeting_bytes_read < greeting_size) { + const int n = read (greeting + greeting_bytes_read, + greeting_size - greeting_bytes_read); + if (n == -1) + return -1; + if (n == 0) + return 0; + + greeting_bytes_read += n; + + if (greeting_bytes_read < greeting_size) + continue; + + if (greeting_size == 2) { + // We have received the first two bytes from the peer. + // If the first byte is not 0xff, we know that the + // peer is using unversioned protocol. + if (greeting [0] != 0xff) + break; + + // This may still be a long identity message (either + // 254 or 255 bytes long). We need to receive 8 more + // bytes so we can inspect the potential 'flags' field. + greeting_size = 10; + } + else + if (greeting_size == 10) { + // Inspect the rightmost bit of the 10th byte (which coincides + // with the 'flags' field if a regular message was sent). + // Zero indicates this is a header of identity message + // (i.e. the peer is using the unversioned protocol). + if (!(greeting [9] & 0x01)) + break; + + // This is truly a handshake and we can now send the rest of + // the greeting message out. + + if (outsize == 0) + set_pollout (handle); + + zmq_assert (outpos != NULL); + + outpos [outsize++] = 1; // Protocol version + outpos [outsize++] = 1; // Remaining length (1 byte for v1) + outpos [outsize++] = options.type; // Socket type + + // Read the 'version' and 'remaining_length' fields. + greeting_size = 12; + } + else + if (greeting_size == 12) { + // We have received the greeting message up to + // the 'remaining_length' field. Receive the remaining + // bytes of the greeting. + greeting_size += greeting [11]; + } + } + + return 0; +} + +bool zmq::stream_engine_t::handshake () +{ + zmq_assert (handshaking); + zmq_assert (greeting_bytes_read < greeting_size); + + int rc = receive_greeting (); + if (rc == -1) { + error (); + return false; + } + + if (greeting_bytes_read < greeting_size) + return false; + + // We have received either a header of identity message + // or the whole greeting. + + encoder.set_session (session); + decoder.set_session (session); + + zmq_assert (greeting [0] != 0xff || greeting_bytes_read >= 10); + + // Is the peer using the unversioned protocol? + // If so, we send and receive rests of identity + // messages. + if (greeting [0] != 0xff || !(greeting [9] & 0x01)) { + // We have already sent the message header. + // Since there is no way to tell the encoder to + // skip the message header, we simply throw that + // header data away. + const size_t header_size = options.identity_size + 1 >= 255 ? 10 : 2; + unsigned char tmp [10], *bufferp = tmp; + size_t buffer_size = header_size; + encoder.get_data (&bufferp, &buffer_size); + zmq_assert (buffer_size == header_size); + + // Make sure the decoder sees the data we have already received. + inpos = greeting; + insize = greeting_bytes_read; + } + + // Start polling for output if necessary. + if (outsize == 0) + set_pollout (handle); + + // Handshaking was successful. + // Switch into the normal message flow. + handshaking = false; + + return true; +} + void zmq::stream_engine_t::error () { zmq_assert (session); diff --git a/src/stream_engine.hpp b/src/stream_engine.hpp index f4d15b6d..90b9221f 100644 --- a/src/stream_engine.hpp +++ b/src/stream_engine.hpp @@ -67,6 +67,12 @@ namespace zmq // Function to handle network disconnections. void error (); + // Receives the greeting message from the peer. + int receive_greeting (); + + // Detects the protocol used by the peer. + bool handshake (); + // Writes data to the socket. Returns the number of bytes actually // written (even zero is to be considered to be a success). In case // of error or orderly shutdown by the other peer -1 is returned. @@ -81,6 +87,16 @@ namespace zmq // Underlying socket. fd_t s; + // Maximum size of a greeting message: + // preamble (10 bytes) + version (1 byte) + remaining_length (1 byte) + + // up to 255 remaining bytes. + const static size_t maximum_greeting_size = 10 + 1 + 1 + 255; + + // Size of v1 greeting message: + // preamble (10 bytes) + version (1 byte) + remaining_length (1 byte) + + // socket_type (1) + const static size_t v1_greeting_size = 10 + 1 + 1 + 1; + handle_t handle; unsigned char *inpos; @@ -92,6 +108,26 @@ namespace zmq size_t outsize; encoder_t encoder; + // When true, we are still trying to determine whether + // the peer is using versioned protocol, and if so, which + // version. When false, normal message flow has started. + bool handshaking; + + // The receive buffer holding the greeting message + // that we are receiving from the peer. + unsigned char greeting [maximum_greeting_size]; + + // The number of bytes of the greeting message that + // we have already received. + unsigned int greeting_bytes_read; + + // The size of the greeting message. + unsigned int greeting_size; + + // The send buffer holding the greeting message + // that we are sending to the peer. + unsigned char greeting_output_buffer [v1_greeting_size]; + // The session this engine is attached to. zmq::session_base_t *session; From dfc0222ee6c67e2bd596468296f3ac271f8c33b3 Mon Sep 17 00:00:00 2001 From: Martin Hurton Date: Sun, 2 Sep 2012 18:03:38 +0200 Subject: [PATCH 2/3] Decouple encoder_t and decoder_t from session_base_t This patch introduces i_msg_sink and i_msg_source interfaces. This allows us to make message encoder and decoder more general. --- src/decoder.cpp | 12 ++++++------ src/decoder.hpp | 7 ++++--- src/encoder.cpp | 12 ++++++------ src/encoder.hpp | 6 +++--- src/i_msg_sink.hpp | 43 ++++++++++++++++++++++++++++++++++++++++++ src/i_msg_source.hpp | 44 +++++++++++++++++++++++++++++++++++++++++++ src/pgm_receiver.cpp | 2 +- src/pgm_sender.cpp | 2 +- src/req.cpp | 10 +++++----- src/req.hpp | 2 +- src/session_base.cpp | 6 +++--- src/session_base.hpp | 14 +++++++++++--- src/stream_engine.cpp | 8 ++++---- 13 files changed, 132 insertions(+), 36 deletions(-) create mode 100644 src/i_msg_sink.hpp create mode 100644 src/i_msg_source.hpp diff --git a/src/decoder.cpp b/src/decoder.cpp index 6368c77c..ee9c3abe 100644 --- a/src/decoder.cpp +++ b/src/decoder.cpp @@ -29,14 +29,14 @@ #endif #include "decoder.hpp" -#include "session_base.hpp" +#include "i_msg_sink.hpp" #include "likely.hpp" #include "wire.hpp" #include "err.hpp" zmq::decoder_t::decoder_t (size_t bufsize_, int64_t maxmsgsize_) : decoder_base_t (bufsize_), - session (NULL), + msg_sink (NULL), maxmsgsize (maxmsgsize_) { int rc = in_progress.init (); @@ -52,9 +52,9 @@ zmq::decoder_t::~decoder_t () errno_assert (rc == 0); } -void zmq::decoder_t::set_session (session_base_t *session_) +void zmq::decoder_t::set_msg_sink (i_msg_sink *msg_sink_) { - session = session_; + msg_sink = msg_sink_; } bool zmq::decoder_t::stalled () const @@ -157,9 +157,9 @@ bool zmq::decoder_t::message_ready () { // Message is completely read. Push it further and start reading // new message. (in_progress is a 0-byte message after this point.) - if (unlikely (!session)) + if (unlikely (!msg_sink)) return false; - int rc = session->write (&in_progress); + int rc = msg_sink->push_msg (&in_progress); if (unlikely (rc != 0)) { if (errno != EAGAIN) decoding_error (); diff --git a/src/decoder.hpp b/src/decoder.hpp index d648cda8..a6b524ad 100644 --- a/src/decoder.hpp +++ b/src/decoder.hpp @@ -34,7 +34,7 @@ namespace zmq { - class session_base_t; + class i_msg_sink; // Helper base class for decoders that know the amount of data to read // in advance at any moment. Knowing the amount in advance is a property @@ -195,7 +195,8 @@ namespace zmq decoder_t (size_t bufsize_, int64_t maxmsgsize_); ~decoder_t (); - void set_session (zmq::session_base_t *session_); + // Set the receiver of decoded messages. + void set_msg_sink (i_msg_sink *msg_sink_); // Returns true if there is a decoded message // waiting to be delivered to the session. @@ -208,7 +209,7 @@ namespace zmq bool flags_ready (); bool message_ready (); - zmq::session_base_t *session; + i_msg_sink *msg_sink; unsigned char tmpbuf [8]; msg_t in_progress; diff --git a/src/encoder.cpp b/src/encoder.cpp index ae998d3f..abecb396 100644 --- a/src/encoder.cpp +++ b/src/encoder.cpp @@ -21,13 +21,13 @@ */ #include "encoder.hpp" -#include "session_base.hpp" +#include "i_msg_source.hpp" #include "likely.hpp" #include "wire.hpp" zmq::encoder_t::encoder_t (size_t bufsize_) : encoder_base_t (bufsize_), - session (NULL) + msg_source (NULL) { int rc = in_progress.init (); errno_assert (rc == 0); @@ -42,9 +42,9 @@ zmq::encoder_t::~encoder_t () errno_assert (rc == 0); } -void zmq::encoder_t::set_session (session_base_t *session_) +void zmq::encoder_t::set_msg_source (i_msg_source *msg_source_) { - session = session_; + msg_source = msg_source_; } bool zmq::encoder_t::size_ready () @@ -65,12 +65,12 @@ bool zmq::encoder_t::message_ready () // Note that new state is set only if write is successful. That way // unsuccessful write will cause retry on the next state machine // invocation. - if (unlikely (!session)) { + if (unlikely (!msg_source)) { rc = in_progress.init (); errno_assert (rc == 0); return false; } - rc = session->read (&in_progress); + rc = msg_source->pull_msg (&in_progress); if (unlikely (rc != 0)) { errno_assert (errno == EAGAIN); rc = in_progress.init (); diff --git a/src/encoder.hpp b/src/encoder.hpp index c78b3e35..41a30de2 100644 --- a/src/encoder.hpp +++ b/src/encoder.hpp @@ -33,7 +33,7 @@ namespace zmq { - class session_base_t; + class i_msg_source; // Helper base class for encoders. It implements the state machine that // fills the outgoing buffer. Derived classes should implement individual @@ -168,14 +168,14 @@ namespace zmq encoder_t (size_t bufsize_); ~encoder_t (); - void set_session (zmq::session_base_t *session_); + void set_msg_source (i_msg_source *msg_source_); private: bool size_ready (); bool message_ready (); - zmq::session_base_t *session; + i_msg_source *msg_source; msg_t in_progress; unsigned char tmpbuf [10]; diff --git a/src/i_msg_sink.hpp b/src/i_msg_sink.hpp new file mode 100644 index 00000000..2b1a8c40 --- /dev/null +++ b/src/i_msg_sink.hpp @@ -0,0 +1,43 @@ +/* + Copyright (c) 2007-2012 iMatix Corporation + Copyright (c) 2007-2012 Other contributors as noted in the AUTHORS file + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#ifndef __ZMQ_I_MSG_SINK_HPP_INCLUDED__ +#define __ZMQ_I_MSG_SINK_HPP_INCLUDED__ + +namespace zmq +{ + + // Forward declaration + class msg_t; + + // Interface to be implemented by message sink. + + struct i_msg_sink + { + virtual ~i_msg_sink () {} + + // Delivers a message. Returns 0 if successful; -1 otherwise. + // The function takes ownership of the passed message. + virtual int push_msg (msg_t *msg_) = 0; + }; + +} + +#endif diff --git a/src/i_msg_source.hpp b/src/i_msg_source.hpp new file mode 100644 index 00000000..a6956c1c --- /dev/null +++ b/src/i_msg_source.hpp @@ -0,0 +1,44 @@ +/* + Copyright (c) 2007-2012 iMatix Corporation + Copyright (c) 2007-2012 Other contributors as noted in the AUTHORS file + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#ifndef __ZMQ_I_MSG_SOURCE_HPP_INCLUDED__ +#define __ZMQ_I_MSG_SOURCE_HPP_INCLUDED__ + +namespace zmq +{ + + // Forward declaration + class msg_t; + + // Interface to be implemented by message source. + + struct i_msg_source + { + virtual ~i_msg_source () {} + + // Fetch a message. Returns 0 if successful; -1 otherwise. + // The caller is responsible for freeing the message when no + // longer used. + virtual int pull_msg (msg_t *msg_) = 0; + }; + +} + +#endif diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp index 99e882bd..451e0ca7 100644 --- a/src/pgm_receiver.cpp +++ b/src/pgm_receiver.cpp @@ -233,7 +233,7 @@ void zmq::pgm_receiver_t::in_event () it->second.decoder = new (std::nothrow) decoder_t (0, options.maxmsgsize); alloc_assert (it->second.decoder); - it->second.decoder->set_session (session); + it->second.decoder->set_msg_sink (session); } mru_decoder = it->second.decoder; diff --git a/src/pgm_sender.cpp b/src/pgm_sender.cpp index 8cfb9da6..4535965d 100644 --- a/src/pgm_sender.cpp +++ b/src/pgm_sender.cpp @@ -72,7 +72,7 @@ void zmq::pgm_sender_t::plug (io_thread_t *io_thread_, session_base_t *session_) fd_t rdata_notify_fd = retired_fd; fd_t pending_notify_fd = retired_fd; - encoder.set_session (session_); + encoder.set_msg_source (session_); // Fill fds from PGM transport and add them to the poller. pgm_socket.get_sender_fds (&downlink_socket_fd, &uplink_socket_fd, diff --git a/src/req.cpp b/src/req.cpp index 18811002..e2dbfab2 100644 --- a/src/req.cpp +++ b/src/req.cpp @@ -150,27 +150,27 @@ zmq::req_session_t::~req_session_t () state = options.recv_identity ? identity : bottom; } -int zmq::req_session_t::write (msg_t *msg_) +int zmq::req_session_t::push_msg (msg_t *msg_) { switch (state) { case bottom: if (msg_->flags () == msg_t::more && msg_->size () == 0) { state = body; - return dealer_session_t::write (msg_); + return dealer_session_t::push_msg (msg_); } break; case body: if (msg_->flags () == msg_t::more) - return dealer_session_t::write (msg_); + return dealer_session_t::push_msg (msg_); if (msg_->flags () == 0) { state = bottom; - return dealer_session_t::write (msg_); + return dealer_session_t::push_msg (msg_); } break; case identity: if (msg_->flags () == 0) { state = bottom; - return dealer_session_t::write (msg_); + return dealer_session_t::push_msg (msg_); } break; } diff --git a/src/req.hpp b/src/req.hpp index 038c49ef..023778e3 100644 --- a/src/req.hpp +++ b/src/req.hpp @@ -71,7 +71,7 @@ namespace zmq ~req_session_t (); // Overloads of the functions from session_base_t. - int write (msg_t *msg_); + int push_msg (msg_t *msg_); void reset (); private: diff --git a/src/session_base.cpp b/src/session_base.cpp index 9f277385..acbfd489 100644 --- a/src/session_base.cpp +++ b/src/session_base.cpp @@ -150,7 +150,7 @@ void zmq::session_base_t::attach_pipe (pipe_t *pipe_) pipe->set_event_sink (this); } -int zmq::session_base_t::read (msg_t *msg_) +int zmq::session_base_t::pull_msg (msg_t *msg_) { // First message to send is identity if (!identity_sent) { @@ -172,7 +172,7 @@ int zmq::session_base_t::read (msg_t *msg_) return 0; } -int zmq::session_base_t::write (msg_t *msg_) +int zmq::session_base_t::push_msg (msg_t *msg_) { // First message to receive is identity if (!identity_received) { @@ -224,7 +224,7 @@ void zmq::session_base_t::clean_pipes () msg_t msg; int rc = msg.init (); errno_assert (rc == 0); - if (!read (&msg)) { + if (!pull_msg (&msg)) { zmq_assert (!incomplete_in); break; } diff --git a/src/session_base.hpp b/src/session_base.hpp index 7d26ad89..5b5fd2b4 100644 --- a/src/session_base.hpp +++ b/src/session_base.hpp @@ -29,6 +29,8 @@ #include "own.hpp" #include "io_object.hpp" #include "pipe.hpp" +#include "i_msg_source.hpp" +#include "i_msg_sink.hpp" namespace zmq { @@ -42,7 +44,9 @@ namespace zmq class session_base_t : public own_t, public io_object_t, - public i_pipe_events + public i_pipe_events, + public i_msg_source, + public i_msg_sink { public: @@ -54,9 +58,13 @@ namespace zmq // To be used once only, when creating the session. void attach_pipe (zmq::pipe_t *pipe_); + // i_msg_source interface implementation. + virtual int pull_msg (msg_t *msg_); + + // i_msg_sink interface implementation. + virtual int push_msg (msg_t *msg_); + // Following functions are the interface exposed towards the engine. - virtual int read (msg_t *msg_); - virtual int write (msg_t *msg_); virtual void reset (); void flush (); void detach (); diff --git a/src/stream_engine.cpp b/src/stream_engine.cpp index d24de27e..a1a1551b 100644 --- a/src/stream_engine.cpp +++ b/src/stream_engine.cpp @@ -156,8 +156,8 @@ void zmq::stream_engine_t::unplug () io_object_t::unplug (); // Disconnect from session object. - encoder.set_session (NULL); - decoder.set_session (NULL); + encoder.set_msg_source (NULL); + decoder.set_msg_sink (NULL); session = NULL; } @@ -379,8 +379,8 @@ bool zmq::stream_engine_t::handshake () // We have received either a header of identity message // or the whole greeting. - encoder.set_session (session); - decoder.set_session (session); + encoder.set_msg_source (session); + decoder.set_msg_sink (session); zmq_assert (greeting [0] != 0xff || greeting_bytes_read >= 10); From d9307c9ff04b0d1d60aac1b201dfbe2684d3403f Mon Sep 17 00:00:00 2001 From: Martin Hurton Date: Sun, 2 Sep 2012 18:19:15 +0200 Subject: [PATCH 3/3] Make ZMQ interoperate with ZMQ 2.x SUB sockets Since ZMQ 2.x does not support subscription forwarding, it's not possible to use ZMQ 2.x SUB socket to receive messages from a PUB socket. This patch adds some compatibility layer so that ZMQ 2.x SUB socket receives messages from PUB socket. --- src/stream_engine.cpp | 32 ++++++++++++++++++++++++++++++++ src/stream_engine.hpp | 6 +++++- 2 files changed, 37 insertions(+), 1 deletion(-) diff --git a/src/stream_engine.cpp b/src/stream_engine.cpp index a1a1551b..1a1b5b2d 100644 --- a/src/stream_engine.cpp +++ b/src/stream_engine.cpp @@ -401,6 +401,14 @@ bool zmq::stream_engine_t::handshake () // Make sure the decoder sees the data we have already received. inpos = greeting; insize = greeting_bytes_read; + + // To allow for interoperability with peers that do not forward + // their subscriptions, we inject a phony subsription + // message into the incomming message stream. To put this + // message right after the identity message, we temporarily + // divert the message stream from session to ourselves. + if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB) + decoder.set_msg_sink (this); } // Start polling for output if necessary. @@ -414,6 +422,30 @@ bool zmq::stream_engine_t::handshake () return true; } +int zmq::stream_engine_t::push_msg (msg_t *msg_) +{ + zmq_assert (options.type == ZMQ_PUB || options.type == ZMQ_XPUB); + + // The first message is identity. + // Let the session process it. + int rc = session->push_msg (msg_); + errno_assert (rc == 0); + + // Inject the subscription message so that the ZMQ 2.x peer + // receives our messages. + rc = msg_->init_size (1); + errno_assert (rc == 0); + *(unsigned char*) msg_->data () = 1; + rc = session->push_msg (msg_); + session->flush (); + + // Once we have injected the subscription message, we can + // Divert the message flow back to the session. + decoder.set_msg_sink (session); + + return rc; +} + void zmq::stream_engine_t::error () { zmq_assert (session); diff --git a/src/stream_engine.hpp b/src/stream_engine.hpp index 90b9221f..86b3fef4 100644 --- a/src/stream_engine.hpp +++ b/src/stream_engine.hpp @@ -26,6 +26,7 @@ #include "fd.hpp" #include "i_engine.hpp" +#include "i_msg_sink.hpp" #include "io_object.hpp" #include "encoder.hpp" #include "decoder.hpp" @@ -41,7 +42,7 @@ namespace zmq // This engine handles any socket with SOCK_STREAM semantics, // e.g. TCP socket or an UNIX domain socket. - class stream_engine_t : public io_object_t, public i_engine + class stream_engine_t : public io_object_t, public i_engine, public i_msg_sink { public: @@ -55,6 +56,9 @@ namespace zmq void activate_in (); void activate_out (); + // i_msg_sink interface implementation. + virtual int push_msg (msg_t *msg_); + // i_poll_events interface implementation. void in_event (); void out_event ();