diff --git a/src/Makefile.am b/src/Makefile.am index 2d7a248f..41fa3fa2 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -26,8 +26,6 @@ libzmq_la_SOURCES = \ i_encoder.hpp \ i_decoder.hpp \ i_engine.hpp \ - i_msg_sink.hpp \ - i_msg_source.hpp \ i_poll_events.hpp \ io_object.hpp \ io_thread.hpp \ diff --git a/src/decoder.hpp b/src/decoder.hpp index efb5c457..dca535bf 100644 --- a/src/decoder.hpp +++ b/src/decoder.hpp @@ -32,8 +32,6 @@ namespace zmq { - class i_msg_sink; - // Helper base class for decoders that know the amount of data to read // in advance at any moment. Knowing the amount in advance is a property // of the protocol used. 0MQ framing protocol is based size-prefixed @@ -89,106 +87,66 @@ namespace zmq // Processes the data in the buffer previously allocated using // get_buffer function. size_ argument specifies nemuber of bytes - // actually filled into the buffer. Function returns number of - // bytes actually processed. - inline size_t process_buffer (unsigned char *data_, size_t size_) + // actually filled into the buffer. Function returns 1 when the + // whole message was decoded or 0 when more data is required. + // On error, -1 is returned and errno set accordingly. + // Number of bytes processed is returned in byts_used_. + inline int decode (const unsigned char *data_, size_t size_, + size_t &bytes_used_) { - // Check if we had an error in previous attempt. - if (unlikely (!(static_cast (this)->next))) - return (size_t) -1; + bytes_used_ = 0; // In case of zero-copy simply adjust the pointers, no copying // is required. Also, run the state machine in case all the data // were processed. if (data_ == read_pos) { + zmq_assert (size_ <= to_read); read_pos += size_; to_read -= size_; + bytes_used_ = size_; while (!to_read) { - if (!(static_cast (this)->*next) ()) { - if (unlikely (!(static_cast (this)->next))) - return (size_t) -1; - return size_; - } + const int rc = (static_cast (this)->*next) (); + if (rc != 0) + return rc; } - return size_; + return 0; } - size_t pos = 0; - while (true) { - + while (bytes_used_ < size_) { + // Copy the data from buffer to the message. + const size_t to_copy = std::min (to_read, size_ - bytes_used_); + memcpy (read_pos, data_ + bytes_used_, to_copy); + read_pos += to_copy; + to_read -= to_copy; + bytes_used_ += to_copy; // Try to get more space in the message to fill in. // If none is available, return. - while (!to_read) { - if (!(static_cast (this)->*next) ()) { - if (unlikely (!(static_cast (this)->next))) - return (size_t) -1; - return pos; - } - } - - // If there are no more data in the buffer, return. - if (pos == size_) - return pos; - - // Copy the data from buffer to the message. - size_t to_copy = std::min (to_read, size_ - pos); - memcpy (read_pos, data_ + pos, to_copy); - read_pos += to_copy; - pos += to_copy; - to_read -= to_copy; - } - } - - // Returns true if the decoder has been fed all required data - // but cannot proceed with the next decoding step. - // False is returned if the decoder has encountered an error. - bool stalled () - { - // Check whether there was decoding error. - if (unlikely (!(static_cast (this)->next))) - return false; - - while (!to_read) { - if (!(static_cast (this)->*next) ()) { - if (unlikely (!(static_cast (this)->next))) - return false; - return true; + while (to_read == 0) { + const int rc = (static_cast (this)->*next) (); + if (rc != 0) + return rc; } } - return false; - } - - inline bool message_ready_size (size_t /* msg_sz */) - { - zmq_assert (false); - return false; + return 0; } protected: // Prototype of state machine action. Action should return false if // it is unable to push the data to the system. - typedef bool (T::*step_t) (); + typedef int (T::*step_t) (); // This function should be called from derived class to read data // from the buffer and schedule next state machine action. - inline void next_step (void *read_pos_, size_t to_read_, - step_t next_) + inline void next_step (void *read_pos_, size_t to_read_, step_t next_) { read_pos = (unsigned char*) read_pos_; to_read = to_read_; next = next_; } - // This function should be called from the derived class to - // abort decoder state machine. - inline void decoding_error () - { - next = NULL; - } - private: // Next step. If set to NULL, it means that associated data stream diff --git a/src/encoder.hpp b/src/encoder.hpp index 4dd435c0..beae44f3 100644 --- a/src/encoder.hpp +++ b/src/encoder.hpp @@ -38,8 +38,6 @@ namespace zmq { - class i_msg_source; - // Helper base class for encoders. It implements the state machine that // fills the outgoing buffer. Derived classes should implement individual // state machine actions. @@ -49,7 +47,8 @@ namespace zmq public: inline encoder_base_t (size_t bufsize_) : - bufsize (bufsize_) + bufsize (bufsize_), + in_progress (NULL) { buf = (unsigned char*) malloc (bufsize_); alloc_assert (buf); @@ -65,17 +64,13 @@ namespace zmq // The function returns a batch of binary data. The data // are filled to a supplied buffer. If no buffer is supplied (data_ // points to NULL) decoder object will provide buffer of its own. - // If offset is not NULL, it is filled by offset of the first message - // in the batch.If there's no beginning of a message in the batch, - // offset is set to -1. - inline void get_data (unsigned char **data_, size_t *size_, - int *offset_ = NULL) + inline size_t encode (unsigned char **data_, size_t size_) { unsigned char *buffer = !*data_ ? buf : *data_; - size_t buffersize = !*data_ ? bufsize : *size_; + size_t buffersize = !*data_ ? bufsize : size_; - if (offset_) - *offset_ = -1; + if (in_progress == NULL) + return 0; size_t pos = 0; while (pos < buffersize) { @@ -84,14 +79,15 @@ namespace zmq // If there are still no data, return what we already have // in the buffer. if (!to_write) { - // If we are to encode the beginning of a new message, - // adjust the message offset. - if (beginning) - if (offset_ && *offset_ == -1) - *offset_ = static_cast (pos); - - if (!(static_cast (this)->*next) ()) + if (new_msg_flag) { + int rc = in_progress->close (); + errno_assert (rc == 0); + rc = in_progress->init (); + errno_assert (rc == 0); + in_progress = NULL; break; + } + (static_cast (this)->*next) (); } // If there are no data in the buffer yet and we are able to @@ -106,10 +102,10 @@ namespace zmq // amounts of time. if (!pos && !*data_ && to_write >= buffersize) { *data_ = write_pos; - *size_ = to_write; + pos = to_write; write_pos = NULL; to_write = 0; - return; + return pos; } // Copy data to the buffer. If the buffer is full, return. @@ -121,7 +117,14 @@ namespace zmq } *data_ = buffer; - *size_ = pos; + return pos; + } + + void load_msg (msg_t *msg_) + { + zmq_assert (in_progress == NULL); + in_progress = msg_; + (static_cast (this)->*next) (); } inline bool has_data () @@ -132,18 +135,17 @@ namespace zmq protected: // Prototype of state machine action. - typedef bool (T::*step_t) (); + typedef void (T::*step_t) (); // This function should be called from derived class to write the data - // to the buffer and schedule next state machine action. Set beginning - // to true when you are writing first byte of a message. + // to the buffer and schedule next state machine action. inline void next_step (void *write_pos_, size_t to_write_, - step_t next_, bool beginning_) + step_t next_, bool new_msg_flag_) { write_pos = (unsigned char*) write_pos_; to_write = to_write_; next = next_; - beginning = beginning_; + new_msg_flag = new_msg_flag_; } private: @@ -158,8 +160,7 @@ namespace zmq // is dead. step_t next; - // If true, first byte of the message is being written. - bool beginning; + bool new_msg_flag; // The buffer for encoded data. size_t bufsize; @@ -167,6 +168,11 @@ namespace zmq encoder_base_t (const encoder_base_t&); void operator = (const encoder_base_t&); + + protected: + + msg_t *in_progress; + }; } diff --git a/src/i_decoder.hpp b/src/i_decoder.hpp index cf579b90..db62dabe 100644 --- a/src/i_decoder.hpp +++ b/src/i_decoder.hpp @@ -25,7 +25,7 @@ namespace zmq { - class i_msg_sink; + class msg_t; // Interface to be implemented by message decoder. @@ -34,15 +34,16 @@ namespace zmq public: virtual ~i_decoder () {} - virtual void set_msg_sink (i_msg_sink *msg_sink_) = 0; - virtual void get_buffer (unsigned char **data_, size_t *size_) = 0; - virtual size_t process_buffer (unsigned char *data_, size_t size_) = 0; + // Decodes data pointed to by data_. + // When a message is decoded, 1 is returned. + // When the decoder needs more data, 0 is returnd. + // On error, -1 is returned and errno is set accordingly. + virtual int decode (const unsigned char *data_, size_t size_, + size_t &processed) = 0; - virtual bool stalled () = 0; - - virtual bool message_ready_size (size_t msg_sz) = 0; + virtual msg_t *msg () = 0; }; } diff --git a/src/i_encoder.hpp b/src/i_encoder.hpp index fac16bb9..2cfa2a89 100644 --- a/src/i_encoder.hpp +++ b/src/i_encoder.hpp @@ -26,7 +26,7 @@ namespace zmq { // Forward declaration - class i_msg_source; + class msg_t; // Interface to be implemented by message encoder. @@ -34,17 +34,14 @@ namespace zmq { virtual ~i_encoder () {} - // Set message producer. - virtual void set_msg_source (i_msg_source *msg_source_) = 0; - // The function returns a batch of binary data. The data // are filled to a supplied buffer. If no buffer is supplied (data_ // is NULL) encoder will provide buffer of its own. - // If offset is not NULL, it is filled by offset of the first message - // in the batch.If there's no beginning of a message in the batch, - // offset is set to -1. - virtual void get_data (unsigned char **data_, size_t *size_, - int *offset_ = NULL) = 0; + // Function returns 0 when a new message is required. + virtual size_t encode (unsigned char **data_, size_t size) = 0; + + // Load a new message into encoder. + virtual void load_msg (msg_t *msg_) = 0; virtual bool has_data () = 0; }; diff --git a/src/i_msg_sink.hpp b/src/i_msg_sink.hpp deleted file mode 100644 index 4f980011..00000000 --- a/src/i_msg_sink.hpp +++ /dev/null @@ -1,43 +0,0 @@ -/* - Copyright (c) 2007-2013 Contributors as noted in the AUTHORS file - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the GNU Lesser General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU Lesser General Public License for more details. - - You should have received a copy of the GNU Lesser General Public License - along with this program. If not, see . -*/ - -#ifndef __ZMQ_I_MSG_SINK_HPP_INCLUDED__ -#define __ZMQ_I_MSG_SINK_HPP_INCLUDED__ - -namespace zmq -{ - - // Forward declaration - class msg_t; - - // Interface to be implemented by message sink. - - class i_msg_sink - { - public: - virtual ~i_msg_sink () {} - - // Delivers a message. Returns 0 if successful; -1 otherwise. - // The function takes ownership of the passed message. - virtual int push_msg (msg_t *msg_) = 0; - }; - -} - -#endif diff --git a/src/i_msg_source.hpp b/src/i_msg_source.hpp deleted file mode 100644 index 03eb54f3..00000000 --- a/src/i_msg_source.hpp +++ /dev/null @@ -1,44 +0,0 @@ -/* - Copyright (c) 2007-2013 Contributors as noted in the AUTHORS file - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the GNU Lesser General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU Lesser General Public License for more details. - - You should have received a copy of the GNU Lesser General Public License - along with this program. If not, see . -*/ - -#ifndef __ZMQ_I_MSG_SOURCE_HPP_INCLUDED__ -#define __ZMQ_I_MSG_SOURCE_HPP_INCLUDED__ - -namespace zmq -{ - - // Forward declaration - class msg_t; - - // Interface to be implemented by message source. - - class i_msg_source - { - public: - virtual ~i_msg_source () {} - - // Fetch a message. Returns 0 if successful; -1 otherwise. - // The caller is responsible for freeing the message when no - // longer used. - virtual int pull_msg (msg_t *msg_) = 0; - }; - -} - -#endif diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp index 30777b03..89311350 100644 --- a/src/pgm_receiver.cpp +++ b/src/pgm_receiver.cpp @@ -41,8 +41,8 @@ zmq::pgm_receiver_t::pgm_receiver_t (class io_thread_t *parent_, pgm_socket (true, options_), options (options_), session (NULL), - mru_decoder (NULL), - pending_bytes (0) + active_tsi (NULL), + insize (0) { } @@ -83,9 +83,7 @@ void zmq::pgm_receiver_t::unplug () delete it->second.decoder; } peers.clear (); - - mru_decoder = NULL; - pending_bytes = 0; + active_tsi = NULL; if (has_rx_timer) { cancel_timer (rx_timer_id); @@ -111,49 +109,47 @@ void zmq::pgm_receiver_t::activate_out () void zmq::pgm_receiver_t::activate_in () { - // It is possible that the most recently used decoder - // processed the whole buffer but failed to write - // the last message into the pipe. - if (pending_bytes == 0) { - if (mru_decoder != NULL) { - mru_decoder->process_buffer (NULL, 0); - session->flush (); + zmq_assert (session != NULL); + zmq_assert (active_tsi != NULL); + + const peers_t::iterator it = peers.find (*active_tsi); + zmq_assert (it != peers.end ()); + zmq_assert (it->second.joined); + + // Push the pending message into the session. + int rc = session->push_msg (it->second.decoder->msg ()); + errno_assert (rc == 0); + + if (insize > 0) { + rc = process_input (it->second.decoder); + if (rc == -1) { + // HWM reached; we will try later. + if (errno == EAGAIN) { + session->flush (); + return; + } + // Data error. Delete message decoder, mark the + // peer as not joined and drop remaining data. + it->second.joined = false; + delete it->second.decoder; + it->second.decoder = NULL; + insize = 0; } - - // Resume polling. - set_pollin (pipe_handle); - set_pollin (socket_handle); - - return; } - zmq_assert (mru_decoder != NULL); - zmq_assert (pending_ptr != NULL); - - // Ask the decoder to process remaining data. - size_t n = mru_decoder->process_buffer (pending_ptr, pending_bytes); - pending_bytes -= n; - session->flush (); - - if (pending_bytes > 0) - return; - // Resume polling. set_pollin (pipe_handle); set_pollin (socket_handle); + active_tsi = NULL; in_event (); } void zmq::pgm_receiver_t::in_event () { // Read data from the underlying pgm_socket. - unsigned char *data = NULL; const pgm_tsi_t *tsi = NULL; - if (pending_bytes > 0) - return; - if (has_rx_timer) { cancel_timer (rx_timer_id); has_rx_timer = false; @@ -167,7 +163,7 @@ void zmq::pgm_receiver_t::in_event () // Note the workaround made not to break strict-aliasing rules. void *tmp = NULL; ssize_t received = pgm_socket.receive (&tmp, &tsi); - data = (unsigned char*) tmp; + inpos = (unsigned char*) tmp; // No data to process. This may happen if the packet received is // neither ODATA nor ODATA. @@ -187,8 +183,6 @@ void zmq::pgm_receiver_t::in_event () if (received == -1) { if (it != peers.end ()) { it->second.joined = false; - if (it->second.decoder == mru_decoder) - mru_decoder = NULL; if (it->second.decoder != NULL) { delete it->second.decoder; it->second.decoder = NULL; @@ -203,11 +197,13 @@ void zmq::pgm_receiver_t::in_event () it = peers.insert (peers_t::value_type (*tsi, peer_info)).first; } + insize = static_cast (received); + // Read the offset of the fist message in the current packet. - zmq_assert ((size_t) received >= sizeof (uint16_t)); - uint16_t offset = get_uint16 (data); - data += sizeof (uint16_t); - received -= sizeof (uint16_t); + zmq_assert (insize >= sizeof (uint16_t)); + uint16_t offset = get_uint16 (inpos); + inpos += sizeof (uint16_t); + insize -= sizeof (uint16_t); // Join the stream if needed. if (!it->second.joined) { @@ -217,12 +213,12 @@ void zmq::pgm_receiver_t::in_event () if (offset == 0xffff) continue; - zmq_assert (offset <= received); + zmq_assert (offset <= insize); zmq_assert (it->second.decoder == NULL); // We have to move data to the begining of the first message. - data += offset; - received -= offset; + inpos += offset; + insize -= offset; // Mark the stream as joined. it->second.joined = true; @@ -231,28 +227,24 @@ void zmq::pgm_receiver_t::in_event () it->second.decoder = new (std::nothrow) v1_decoder_t (0, options.maxmsgsize); alloc_assert (it->second.decoder); - it->second.decoder->set_msg_sink (session); } - mru_decoder = it->second.decoder; + int rc = process_input (it->second.decoder); + if (rc == -1) { + if (errno == EAGAIN) { + active_tsi = tsi; - // Push all the data to the decoder. - ssize_t processed = it->second.decoder->process_buffer (data, received); - if (processed < received) { - // Save some state so we can resume the decoding process later. - pending_bytes = received - processed; - pending_ptr = data + processed; - // Stop polling. - reset_pollin (pipe_handle); - reset_pollin (socket_handle); + // Stop polling. + reset_pollin (pipe_handle); + reset_pollin (socket_handle); - // Reset outstanding timer. - if (has_rx_timer) { - cancel_timer (rx_timer_id); - has_rx_timer = false; + break; } - break; + it->second.joined = false; + delete it->second.decoder; + it->second.decoder = NULL; + insize = 0; } } @@ -260,6 +252,29 @@ void zmq::pgm_receiver_t::in_event () session->flush (); } +int zmq::pgm_receiver_t::process_input (v1_decoder_t *decoder) +{ + zmq_assert (session != NULL); + + while (insize > 0) { + size_t n = 0; + int rc = decoder->decode (inpos, insize, n); + if (rc == -1) + return -1; + inpos += n; + insize -= n; + if (rc == 0) + break; + rc = session->push_msg (decoder->msg ()); + if (rc == -1) { + errno_assert (errno == EAGAIN); + return -1; + } + } + return 0; +} + + void zmq::pgm_receiver_t::timer_event (int token) { zmq_assert (token == rx_timer_id); diff --git a/src/pgm_receiver.hpp b/src/pgm_receiver.hpp index 7e1249f7..d6617860 100644 --- a/src/pgm_receiver.hpp +++ b/src/pgm_receiver.hpp @@ -69,6 +69,10 @@ namespace zmq // Unplug the engine from the session. void unplug (); + // Decode received data (inpos, insize) and forward decoded + // messages to the session. + int process_input (v1_decoder_t *decoder); + // PGM is not able to move subscriptions upstream. Thus, drop all // the pending subscriptions. void drop_subscriptions (); @@ -112,14 +116,13 @@ namespace zmq // Associated session. zmq::session_base_t *session; - // Most recently used decoder. - v1_decoder_t *mru_decoder; + const pgm_tsi_t *active_tsi; // Number of bytes not consumed by the decoder due to pipe overflow. - size_t pending_bytes; + size_t insize; // Pointer to data still waiting to be processed by the decoder. - unsigned char *pending_ptr; + const unsigned char *inpos; // Poll handle associated with PGM socket. handle_t socket_handle; diff --git a/src/pgm_sender.cpp b/src/pgm_sender.cpp index 87ec4800..a0f2d0a2 100644 --- a/src/pgm_sender.cpp +++ b/src/pgm_sender.cpp @@ -39,13 +39,17 @@ zmq::pgm_sender_t::pgm_sender_t (io_thread_t *parent_, io_object_t (parent_), has_tx_timer (false), has_rx_timer (false), + session (NULL), encoder (0), + more_flag (false), pgm_socket (false, options_), options (options_), out_buffer (NULL), out_buffer_size (0), write_size (0) { + int rc = msg.init (); + errno_assert (rc == 0); } int zmq::pgm_sender_t::init (bool udp_encapsulation_, const char *network_) @@ -69,7 +73,7 @@ void zmq::pgm_sender_t::plug (io_thread_t *io_thread_, session_base_t *session_) fd_t rdata_notify_fd = retired_fd; fd_t pending_notify_fd = retired_fd; - encoder.set_msg_source (session_); + session = session_; // Fill fds from PGM transport and add them to the poller. pgm_socket.get_sender_fds (&downlink_socket_fd, &uplink_socket_fd, @@ -106,7 +110,7 @@ void zmq::pgm_sender_t::unplug () rm_fd (uplink_handle); rm_fd (rdata_notify_handle); rm_fd (pending_notify_handle); - encoder.set_msg_source (NULL); + session = NULL; } void zmq::pgm_sender_t::terminate () @@ -128,6 +132,9 @@ void zmq::pgm_sender_t::activate_in () zmq::pgm_sender_t::~pgm_sender_t () { + int rc = msg.close (); + errno_assert (rc == 0); + if (out_buffer) { free (out_buffer); out_buffer = NULL; @@ -161,18 +168,31 @@ void zmq::pgm_sender_t::out_event () // the get data function we prevent it from returning its own buffer. unsigned char *bf = out_buffer + sizeof (uint16_t); size_t bfsz = out_buffer_size - sizeof (uint16_t); - int offset = -1; - encoder.get_data (&bf, &bfsz, &offset); + uint16_t offset = 0xffff; + + size_t bytes = encoder.encode (&bf, bfsz); + while (bytes < bfsz) { + if (!more_flag && offset == 0xffff) + offset = static_cast (bytes); + int rc = session->pull_msg (&msg); + if (rc == -1) + break; + more_flag = msg.flags () & msg_t::more; + encoder.load_msg (&msg); + bf = out_buffer + sizeof (uint16_t) + bytes; + bytes += encoder.encode (&bf, bfsz - bytes); + } // If there are no data to write stop polling for output. - if (!bfsz) { + if (bytes == 0) { reset_pollout (handle); return; } + write_size = sizeof (uint16_t) + bytes; + // Put offset information in the buffer. - write_size = bfsz + sizeof (uint16_t); - put_uint16 (out_buffer, offset == -1 ? 0xffff : (uint16_t) offset); + put_uint16 (out_buffer, offset); } if (has_tx_timer) { diff --git a/src/pgm_sender.hpp b/src/pgm_sender.hpp index 2991327f..60dce6b8 100644 --- a/src/pgm_sender.hpp +++ b/src/pgm_sender.hpp @@ -34,6 +34,7 @@ #include "options.hpp" #include "pgm_socket.hpp" #include "v1_encoder.hpp" +#include "msg.hpp" namespace zmq { @@ -75,9 +76,16 @@ namespace zmq bool has_tx_timer; bool has_rx_timer; + session_base_t *session; + // Message encoder. v1_encoder_t encoder; + msg_t msg; + + // Keeps track of message boundaries. + bool more_flag; + // PGM socket. pgm_socket_t pgm_socket; diff --git a/src/raw_decoder.cpp b/src/raw_decoder.cpp index 702fda0c..fe05ac59 100644 --- a/src/raw_decoder.cpp +++ b/src/raw_decoder.cpp @@ -26,72 +26,38 @@ #endif #include "raw_decoder.hpp" -#include "likely.hpp" -#include "wire.hpp" #include "err.hpp" -zmq::raw_decoder_t::raw_decoder_t (size_t bufsize_, - int64_t maxmsgsize_, i_msg_sink *msg_sink_) : - decoder_base_t (bufsize_), - msg_sink (msg_sink_), - maxmsgsize (maxmsgsize_) +zmq::raw_decoder_t::raw_decoder_t (size_t bufsize_) : + bufsize (bufsize_) { int rc = in_progress.init (); errno_assert (rc == 0); + + buffer = (unsigned char *) malloc (bufsize); + alloc_assert (buffer); } zmq::raw_decoder_t::~raw_decoder_t () { int rc = in_progress.close (); errno_assert (rc == 0); + + free (buffer); } -void zmq::raw_decoder_t::set_msg_sink (i_msg_sink *msg_sink_) +void zmq::raw_decoder_t::get_buffer (unsigned char **data_, size_t *size_) { - msg_sink = msg_sink_; + *data_ = buffer; + *size_ = bufsize; } -bool zmq::raw_decoder_t::stalled () +int zmq::raw_decoder_t::decode (const uint8_t *data_, size_t size_, + size_t &bytes_used_) { - return false; -} - -bool zmq::raw_decoder_t::message_ready_size (size_t msg_sz) -{ - int rc = in_progress.init_size (msg_sz); - if (rc != 0) { - errno_assert (errno == ENOMEM); - rc = in_progress.init (); - errno_assert (rc == 0); - decoding_error (); - return false; - } - - next_step (in_progress.data (), in_progress.size (), - &raw_decoder_t::raw_message_ready); - - return true; -} - -bool zmq::raw_decoder_t::raw_message_ready () -{ - zmq_assert (in_progress.size ()); - // Message is completely read. Push it further and start reading - // new message. (in_progress is a 0-byte message after this point.) - if (unlikely (!msg_sink)) - return false; - int rc = msg_sink->push_msg (&in_progress); - if (unlikely (rc != 0)) { - if (errno != EAGAIN) - decoding_error (); - return false; - } - - // NOTE: This is just to break out of process_buffer - // raw_message_ready should never get called in state machine w/o - // message_ready_size from stream_engine. - next_step (in_progress.data (), 1, - &raw_decoder_t::raw_message_ready); - - return true; + int rc = in_progress.init_size (size_); + errno_assert (rc != -1); + memcpy (in_progress.data (), data_, size_); + bytes_used_ = size_; + return 1; } diff --git a/src/raw_decoder.hpp b/src/raw_decoder.hpp index 7db68e18..9f23156f 100644 --- a/src/raw_decoder.hpp +++ b/src/raw_decoder.hpp @@ -22,9 +22,7 @@ #include "err.hpp" #include "msg.hpp" -#include "decoder.hpp" -#include "raw_decoder.hpp" -#include "i_msg_sink.hpp" +#include "i_decoder.hpp" #include "stdint.hpp" namespace zmq @@ -32,30 +30,31 @@ namespace zmq // Decoder for 0MQ v1 framing protocol. Converts data stream into messages. - class raw_decoder_t : public decoder_base_t + class raw_decoder_t : public i_decoder { public: - raw_decoder_t (size_t bufsize_, - int64_t maxmsgsize_, i_msg_sink *msg_sink_); + raw_decoder_t (size_t bufsize_); virtual ~raw_decoder_t (); // i_decoder interface. - virtual void set_msg_sink (i_msg_sink *msg_sink_); - virtual bool stalled (); + virtual void get_buffer (unsigned char **data_, size_t *size_); + + virtual int decode (const unsigned char *data_, size_t size_, + size_t &processed); + + virtual msg_t *msg () { return &in_progress; } - virtual bool message_ready_size (size_t msg_sz); private: - bool raw_message_ready (); - - i_msg_sink *msg_sink; msg_t in_progress; - const int64_t maxmsgsize; + const int64_t bufsize; + + unsigned char *buffer; raw_decoder_t (const raw_decoder_t&); void operator = (const raw_decoder_t&); diff --git a/src/raw_encoder.cpp b/src/raw_encoder.cpp index c6df6257..dbf8545f 100644 --- a/src/raw_encoder.cpp +++ b/src/raw_encoder.cpp @@ -19,65 +19,22 @@ #include "encoder.hpp" #include "raw_encoder.hpp" -#include "i_msg_source.hpp" #include "likely.hpp" #include "wire.hpp" -zmq::raw_encoder_t::raw_encoder_t (size_t bufsize_, i_msg_source *msg_source_) : - encoder_base_t (bufsize_), - msg_source (msg_source_) +zmq::raw_encoder_t::raw_encoder_t (size_t bufsize_) : + encoder_base_t (bufsize_) { - int rc = in_progress.init (); - errno_assert (rc == 0); - // Write 0 bytes to the batch and go to message_ready state. next_step (NULL, 0, &raw_encoder_t::raw_message_ready, true); } zmq::raw_encoder_t::~raw_encoder_t () { - int rc = in_progress.close (); - errno_assert (rc == 0); } -void zmq::raw_encoder_t::set_msg_source (i_msg_source *msg_source_) +void zmq::raw_encoder_t::raw_message_ready () { - msg_source = msg_source_; -} - -bool zmq::raw_encoder_t::raw_message_size_ready () -{ - // Write message body into the buffer. - next_step (in_progress.data (), in_progress.size (), - &raw_encoder_t::raw_message_ready, !(in_progress.flags () & msg_t::more)); - return true; -} - -bool zmq::raw_encoder_t::raw_message_ready () -{ - // Destroy content of the old message. - int rc = in_progress.close (); - errno_assert (rc == 0); - - // Read new message. If there is none, return false. - // Note that new state is set only if write is successful. That way - // unsuccessful write will cause retry on the next state machine - // invocation. - if (unlikely (!msg_source)) { - rc = in_progress.init (); - errno_assert (rc == 0); - return false; - } - rc = msg_source->pull_msg (&in_progress); - if (unlikely (rc != 0)) { - errno_assert (errno == EAGAIN); - rc = in_progress.init (); - errno_assert (rc == 0); - return false; - } - - in_progress.reset_flags(0xff); - next_step (NULL, 0, &raw_encoder_t::raw_message_size_ready, true); - - return true; + next_step (in_progress->data (), in_progress->size (), + &raw_encoder_t::raw_message_ready, true); } diff --git a/src/raw_encoder.hpp b/src/raw_encoder.hpp index 65c5348d..fec36d31 100644 --- a/src/raw_encoder.hpp +++ b/src/raw_encoder.hpp @@ -44,19 +44,13 @@ namespace zmq { public: - raw_encoder_t (size_t bufsize_, i_msg_source *msg_source_); + raw_encoder_t (size_t bufsize_); ~raw_encoder_t (); - void set_msg_source (i_msg_source *msg_source_); - private: - bool raw_message_ready (); - bool raw_message_size_ready (); + void raw_message_ready (); - i_msg_source *msg_source; - msg_t in_progress; - unsigned char tmpbuf [4]; raw_encoder_t (const raw_encoder_t&); const raw_encoder_t &operator = (const raw_encoder_t&); }; diff --git a/src/req.cpp b/src/req.cpp index 242bf568..f4c2c3d7 100644 --- a/src/req.cpp +++ b/src/req.cpp @@ -138,7 +138,7 @@ zmq::req_session_t::req_session_t (io_thread_t *io_thread_, bool connect_, socket_base_t *socket_, const options_t &options_, const address_t *addr_) : dealer_session_t (io_thread_, connect_, socket_, options_, addr_), - state (identity) + state (bottom) { } @@ -163,12 +163,6 @@ int zmq::req_session_t::push_msg (msg_t *msg_) return dealer_session_t::push_msg (msg_); } break; - case identity: - if (msg_->flags () == 0) { - state = bottom; - return dealer_session_t::push_msg (msg_); - } - break; } errno = EFAULT; return -1; @@ -177,5 +171,5 @@ int zmq::req_session_t::push_msg (msg_t *msg_) void zmq::req_session_t::reset () { session_base_t::reset (); - state = identity; + state = bottom; } diff --git a/src/req.hpp b/src/req.hpp index 375df293..57216e03 100644 --- a/src/req.hpp +++ b/src/req.hpp @@ -74,7 +74,6 @@ namespace zmq private: enum { - identity, bottom, body } state; diff --git a/src/session_base.cpp b/src/session_base.cpp index 512f0091..c096d740 100644 --- a/src/session_base.cpp +++ b/src/session_base.cpp @@ -111,8 +111,6 @@ zmq::session_base_t::session_base_t (class io_thread_t *io_thread_, socket (socket_), io_thread (io_thread_), has_linger_timer (false), - identity_sent (false), - identity_received (false), addr (addr_) { } @@ -146,17 +144,6 @@ void zmq::session_base_t::attach_pipe (pipe_t *pipe_) int zmq::session_base_t::pull_msg (msg_t *msg_) { - // Unless the socket is in raw mode, the first - // message we send is its identity. - if (unlikely (!identity_sent && !options.raw_sock)) { - int rc = msg_->init_size (options.identity_size); - errno_assert (rc == 0); - memcpy (msg_->data (), options.identity, options.identity_size); - identity_sent = true; - incomplete_in = false; - return 0; - } - if (!pipe || !pipe->read (msg_)) { errno = EAGAIN; return -1; @@ -168,20 +155,6 @@ int zmq::session_base_t::pull_msg (msg_t *msg_) int zmq::session_base_t::push_msg (msg_t *msg_) { - // Unless the socket is in raw mode, the first - // message we receive is its identity. - if (unlikely (!identity_received && !options.raw_sock)) { - msg_->set_flags (msg_t::identity); - identity_received = true; - if (!options.recv_identity) { - int rc = msg_->close (); - errno_assert (rc == 0); - rc = msg_->init (); - errno_assert (rc == 0); - return 0; - } - } - if (pipe && pipe->write (msg_)) { int rc = msg_->init (); errno_assert (rc == 0); @@ -194,9 +167,6 @@ int zmq::session_base_t::push_msg (msg_t *msg_) void zmq::session_base_t::reset () { - // Restore identity flags. - identity_sent = false; - identity_received = false; } void zmq::session_base_t::flush () diff --git a/src/session_base.hpp b/src/session_base.hpp index cf0bf916..e907587e 100644 --- a/src/session_base.hpp +++ b/src/session_base.hpp @@ -26,8 +26,6 @@ #include "own.hpp" #include "io_object.hpp" #include "pipe.hpp" -#include "i_msg_source.hpp" -#include "i_msg_sink.hpp" #include "socket_base.hpp" namespace zmq @@ -42,9 +40,7 @@ namespace zmq class session_base_t : public own_t, public io_object_t, - public i_pipe_events, - public i_msg_source, - public i_msg_sink + public i_pipe_events { public: @@ -56,12 +52,6 @@ namespace zmq // To be used once only, when creating the session. void attach_pipe (zmq::pipe_t *pipe_); - // i_msg_source interface implementation. - virtual int pull_msg (msg_t *msg_); - - // i_msg_sink interface implementation. - virtual int push_msg (msg_t *msg_); - // Following functions are the interface exposed towards the engine. virtual void reset (); void flush (); @@ -73,6 +63,15 @@ namespace zmq void hiccuped (zmq::pipe_t *pipe_); void terminated (zmq::pipe_t *pipe_); + // Delivers a message. Returns 0 if successful; -1 otherwise. + // The function takes ownership of the message. + int push_msg (msg_t *msg_); + + // Fetches a message. Returns 0 if successful; -1 otherwise. + // The caller is responsible for freeing the message when no + // longer used. + int pull_msg (msg_t *msg_); + socket_base_t *get_socket (); protected: @@ -137,10 +136,6 @@ namespace zmq // True is linger timer is running. bool has_linger_timer; - // If true, identity has been sent/received from the network. - bool identity_sent; - bool identity_received; - // Protocol and address to use when connecting. const address_t *addr; diff --git a/src/stream_engine.cpp b/src/stream_engine.cpp index 0d1e7a77..6cac0b08 100644 --- a/src/stream_engine.cpp +++ b/src/stream_engine.cpp @@ -50,7 +50,6 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_, const std::string &endpoint_) : s (fd_), - io_enabled (false), inpos (NULL), insize (0), decoder (NULL), @@ -64,13 +63,23 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_, cons endpoint (endpoint_), plugged (false), terminating (false), + io_error (false), + congested (false), + identity_received (false), + identity_sent (false), + rx_initialized (false), + tx_initialized (false), + subscription_required (false), socket (NULL) { + int rc = tx_msg.init (); + errno_assert (rc == 0); + // Put the socket into non-blocking mode. unblock_socket (s); // Set the socket buffer limits for the underlying socket. if (options.sndbuf) { - int rc = setsockopt (s, SOL_SOCKET, SO_SNDBUF, + rc = setsockopt (s, SOL_SOCKET, SO_SNDBUF, (char*) &options.sndbuf, sizeof (int)); #ifdef ZMQ_HAVE_WINDOWS wsa_assert (rc != SOCKET_ERROR); @@ -79,7 +88,7 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_, cons #endif } if (options.rcvbuf) { - int rc = setsockopt (s, SOL_SOCKET, SO_RCVBUF, + rc = setsockopt (s, SOL_SOCKET, SO_RCVBUF, (char*) &options.rcvbuf, sizeof (int)); #ifdef ZMQ_HAVE_WINDOWS wsa_assert (rc != SOCKET_ERROR); @@ -112,6 +121,9 @@ zmq::stream_engine_t::~stream_engine_t () s = retired_fd; } + int rc = tx_msg.close (); + errno_assert (rc == 0); + if (encoder != NULL) delete encoder; if (decoder != NULL) @@ -133,15 +145,14 @@ void zmq::stream_engine_t::plug (io_thread_t *io_thread_, // Connect to I/O threads poller object. io_object_t::plug (io_thread_); handle = add_fd (s); - io_enabled = true; + io_error = false; if (options.raw_sock) { // no handshaking for raw sock, instantiate raw encoder and decoders - encoder = new (std::nothrow) raw_encoder_t (out_batch_size, session); + encoder = new (std::nothrow) raw_encoder_t (out_batch_size); alloc_assert (encoder); - decoder = new (std::nothrow) - raw_decoder_t (in_batch_size, options.maxmsgsize, session); + decoder = new (std::nothrow) raw_decoder_t (in_batch_size); alloc_assert (decoder); // disable handshaking for raw socket @@ -169,19 +180,12 @@ void zmq::stream_engine_t::unplug () plugged = false; // Cancel all fd subscriptions. - if (io_enabled) { + if (!io_error) rm_fd (handle); - io_enabled = false; - } // Disconnect from I/O threads poller object. io_object_t::unplug (); - // Disconnect from session object. - if (encoder) - encoder->set_msg_source (NULL); - if (decoder) - decoder->set_msg_sink (NULL); session = NULL; } @@ -198,14 +202,21 @@ void zmq::stream_engine_t::terminate () void zmq::stream_engine_t::in_event () { + assert (!io_error); + // If still handshaking, receive and process the greeting message. if (unlikely (handshaking)) if (!handshake ()) return; zmq_assert (decoder); - bool disconnection = false; - size_t processed; + + // If there has been an I/O error, stop polling. + if (congested) { + rm_fd (handle); + io_error = true; + return; + } // If there's no data to process in the buffer... if (!insize) { @@ -215,58 +226,51 @@ void zmq::stream_engine_t::in_event () // the underlying TCP layer has fixed buffer size and thus the // number of bytes read will be always limited. decoder->get_buffer (&inpos, &insize); - insize = read (inpos, insize); + const int bytes_read = read (inpos, insize); // Check whether the peer has closed the connection. - if (insize == (size_t) -1) { - insize = 0; - disconnection = true; + if (bytes_read == -1) { + error (); + return; } + + // Adjust input size + insize = static_cast (bytes_read); } - if (options.raw_sock) { - if (insize == 0 || !decoder->message_ready_size (insize)) - processed = 0; - else - processed = decoder->process_buffer (inpos, insize); - } - else - // Push the data to the decoder. - processed = decoder->process_buffer (inpos, insize); + int rc = 0; + size_t processed = 0; - if (unlikely (processed == (size_t) -1)) - disconnection = true; - else { - - // Stop polling for input if we got stuck. - if (processed < insize) - reset_pollin (handle); - - // Adjust the buffer. + while (insize > 0) { + rc = decoder->decode (inpos, insize, processed); + zmq_assert (processed <= insize); inpos += processed; insize -= processed; + if (rc == 0 || rc == -1) + break; + rc = write_msg (decoder->msg ()); + if (rc == -1) + break; } - // Flush all messages the decoder may have produced. - session->flush (); - - // Input error has occurred. If the last decoded - // message has already been accepted, we terminate - // the engine immediately. Otherwise, we stop - // waiting for input events and postpone the termination - // until after the session has accepted the message. - if (disconnection) { - if (decoder->stalled ()) { - rm_fd (handle); - io_enabled = false; - } - else + // Tear down the connection if we have failed to decode input data + // or the session has rejected the message. + if (rc == -1) { + if (errno != EAGAIN) { error (); + return; + } + congested = true; + reset_pollin (handle); } + + session->flush (); } void zmq::stream_engine_t::out_event () { + zmq_assert (!io_error); + // If write buffer is empty, try to read new data from the encoder. if (!outsize) { @@ -279,7 +283,19 @@ void zmq::stream_engine_t::out_event () } outpos = NULL; - encoder->get_data (&outpos, &outsize); + outsize = encoder->encode (&outpos, 0); + + while (outsize < out_batch_size) { + if (read_msg (&tx_msg) == -1) + break; + encoder->load_msg (&tx_msg); + unsigned char *bufptr = outpos + outsize; + size_t n = encoder->encode (&bufptr, out_batch_size - outsize); + zmq_assert (n > 0); + if (outpos == NULL) + outpos = bufptr; + outsize += n; + } // If there is no data to send, stop polling for output. if (outsize == 0) { @@ -321,6 +337,9 @@ void zmq::stream_engine_t::out_event () void zmq::stream_engine_t::activate_out () { + if (unlikely (io_error)) + return; + set_pollout (handle); // Speculative write: The assumption is that at the moment new message @@ -332,22 +351,45 @@ void zmq::stream_engine_t::activate_out () void zmq::stream_engine_t::activate_in () { - if (unlikely (!io_enabled)) { - // There was an input error but the engine could not - // be terminated (due to the stalled decoder). - // Flush the pending message and terminate the engine now. - zmq_assert (decoder); - decoder->process_buffer (inpos, 0); - zmq_assert (!decoder->stalled ()); - session->flush (); - error (); + zmq_assert (congested); + zmq_assert (session != NULL); + zmq_assert (decoder != NULL); + + int rc = write_msg (decoder->msg ()); + if (rc == -1) { + if (errno == EAGAIN) + session->flush (); + else + error (); return; } - set_pollin (handle); + while (insize > 0) { + size_t processed = 0; + rc = decoder->decode (inpos, insize, processed); + zmq_assert (processed <= insize); + inpos += processed; + insize -= processed; + if (rc == 0 || rc == -1) + break; + rc = write_msg (decoder->msg ()); + if (rc == -1) + break; + } - // Speculative read. - in_event (); + if (rc == -1 && errno == EAGAIN) + session->flush (); + else + if (rc == -1 || io_error) + error (); + else { + congested = false; + set_pollin (handle); + session->flush (); + + // Speculative read. + in_event (); + } } bool zmq::stream_engine_t::handshake () @@ -402,11 +444,9 @@ bool zmq::stream_engine_t::handshake () if (greeting_recv [0] != 0xff || !(greeting_recv [9] & 0x01)) { encoder = new (std::nothrow) v1_encoder_t (out_batch_size); alloc_assert (encoder); - encoder->set_msg_source (session); decoder = new (std::nothrow) v1_decoder_t (in_batch_size, options.maxmsgsize); alloc_assert (decoder); - decoder->set_msg_sink (session); // We have already sent the message header. // Since there is no way to tell the encoder to @@ -414,8 +454,7 @@ bool zmq::stream_engine_t::handshake () // header data away. const size_t header_size = options.identity_size + 1 >= 255 ? 10 : 2; unsigned char tmp [10], *bufferp = tmp; - size_t buffer_size = header_size; - encoder->get_data (&bufferp, &buffer_size); + size_t buffer_size = encoder->encode (&bufferp, header_size); zmq_assert (buffer_size == header_size); // Make sure the decoder sees the data we have already received. @@ -424,33 +463,28 @@ bool zmq::stream_engine_t::handshake () // To allow for interoperability with peers that do not forward // their subscriptions, we inject a phony subscription - // message into the incoming message stream. To put this - // message right after the identity message, we temporarily - // divert the message stream from session to ourselves. + // message into the incomming message stream. if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB) - decoder->set_msg_sink (this); + subscription_required = true; } else if (greeting_recv [revision_pos] == ZMTP_1_0) { encoder = new (std::nothrow) v1_encoder_t ( out_batch_size); alloc_assert (encoder); - encoder->set_msg_source (session); decoder = new (std::nothrow) v1_decoder_t ( in_batch_size, options.maxmsgsize); alloc_assert (decoder); - decoder->set_msg_sink (session); } else if (greeting_recv [revision_pos] == ZMTP_2_0 || greeting_recv [revision_pos] == ZMTP_2_1) { - encoder = new (std::nothrow) v2_encoder_t ( - out_batch_size, session); + encoder = new (std::nothrow) v2_encoder_t (out_batch_size); alloc_assert (encoder); decoder = new (std::nothrow) v2_decoder_t ( - in_batch_size, options.maxmsgsize, session); + in_batch_size, options.maxmsgsize); alloc_assert (decoder); } @@ -465,35 +499,67 @@ bool zmq::stream_engine_t::handshake () return true; } -int zmq::stream_engine_t::push_msg (msg_t *msg_) +int zmq::stream_engine_t::read_msg (msg_t *msg_) { - zmq_assert (options.type == ZMQ_PUB || options.type == ZMQ_XPUB); + if (likely (tx_initialized || options.raw_sock)) + return session->pull_msg (msg_); - // The first message is identity. - // Let the session process it. - int rc = session->push_msg (msg_); - errno_assert (rc == 0); + if (!identity_sent) { + int rc = msg_->init_size (options.identity_size); + errno_assert (rc == 0); + memcpy (msg_->data (), options.identity, options.identity_size); + identity_sent = true; + tx_initialized = true; + return 0; + } - // Inject the subscription message so that the ZMQ 2.x peer - // receives our messages. - rc = msg_->init_size (1); - errno_assert (rc == 0); - *(unsigned char*) msg_->data () = 1; - rc = session->push_msg (msg_); - session->flush (); + tx_initialized = true; + return 0; +} - // Once we have injected the subscription message, we can - // Divert the message flow back to the session. - zmq_assert (decoder); - decoder->set_msg_sink (session); +int zmq::stream_engine_t::write_msg (msg_t *msg_) +{ + if (likely (rx_initialized || options.raw_sock)) + return session->push_msg (msg_); - return rc; + if (!identity_received) { + if (options.recv_identity) { + msg_->set_flags (msg_t::identity); + int rc = session->push_msg (msg_); + if (rc == -1) + return -1; + } + else { + int rc = msg_->close (); + errno_assert (rc == 0); + rc = msg_->init (); + errno_assert (rc == 0); + } + + identity_received = true; + } + + // Inject the subscription message, so that also + // ZMQ 2.x peers receive published messages. + if (subscription_required) { + int rc = msg_->init_size (1); + errno_assert (rc == 0); + *(unsigned char*) msg_->data () = 1; + rc = session->push_msg (msg_); + if (rc == -1) + return -1; + subscription_required = false; + } + + rx_initialized = true; + return 0; } void zmq::stream_engine_t::error () { zmq_assert (session); socket->event_disconnected (endpoint, s); + session->flush (); session->detach (); unplug (); delete this; diff --git a/src/stream_engine.hpp b/src/stream_engine.hpp index 41ed214f..3829f925 100644 --- a/src/stream_engine.hpp +++ b/src/stream_engine.hpp @@ -24,7 +24,6 @@ #include "fd.hpp" #include "i_engine.hpp" -#include "i_msg_sink.hpp" #include "io_object.hpp" #include "i_encoder.hpp" #include "i_decoder.hpp" @@ -43,12 +42,13 @@ namespace zmq }; class io_thread_t; + class msg_t; class session_base_t; // This engine handles any socket with SOCK_STREAM semantics, // e.g. TCP socket or an UNIX domain socket. - class stream_engine_t : public io_object_t, public i_engine, public i_msg_sink + class stream_engine_t : public io_object_t, public i_engine { public: @@ -62,9 +62,6 @@ namespace zmq void activate_in (); void activate_out (); - // i_msg_sink interface implementation. - virtual int push_msg (msg_t *msg_); - // i_poll_events interface implementation. void in_event (); void out_event (); @@ -94,11 +91,14 @@ namespace zmq // peer -1 is returned. int read (void *data_, size_t size_); + int read_msg (msg_t *msg_); + + int write_msg (msg_t *msg_); + // Underlying socket. fd_t s; - // True iff we are registered with an I/O poller. - bool io_enabled; + msg_t tx_msg; handle_t handle; @@ -137,6 +137,29 @@ namespace zmq bool plugged; bool terminating; + bool io_error; + + // True iff the session could not accept more + // messages due to flow control. + bool congested; + + // True iff the engine has received identity message. + bool identity_received; + + // True iff the engine has sent identity message. + bool identity_sent; + + // True iff the engine has received all ZMTP control messages. + bool rx_initialized; + + // True iff the engine has sent all ZMTP control messages. + bool tx_initialized; + + // Indicates whether the engine is to inject a phony + // subscription message into the incomming stream. + // Needed to support old peers. + bool subscription_required; + // Socket zmq::socket_base_t *socket; diff --git a/src/v1_decoder.cpp b/src/v1_decoder.cpp index 7dd3517c..b50f67f9 100644 --- a/src/v1_decoder.cpp +++ b/src/v1_decoder.cpp @@ -28,14 +28,12 @@ #include "decoder.hpp" #include "v1_decoder.hpp" -#include "i_msg_sink.hpp" #include "likely.hpp" #include "wire.hpp" #include "err.hpp" zmq::v1_decoder_t::v1_decoder_t (size_t bufsize_, int64_t maxmsgsize_) : decoder_base_t (bufsize_), - msg_sink (NULL), maxmsgsize (maxmsgsize_) { int rc = in_progress.init (); @@ -51,12 +49,7 @@ zmq::v1_decoder_t::~v1_decoder_t () errno_assert (rc == 0); } -void zmq::v1_decoder_t::set_msg_sink (i_msg_sink *msg_sink_) -{ - msg_sink = msg_sink_; -} - -bool zmq::v1_decoder_t::one_byte_size_ready () +int zmq::v1_decoder_t::one_byte_size_ready () { // First byte of size is read. If it is 0xff read 8-byte size. // Otherwise allocate the buffer for message data and read the @@ -67,34 +60,33 @@ bool zmq::v1_decoder_t::one_byte_size_ready () // There has to be at least one byte (the flags) in the message). if (!*tmpbuf) { - decoding_error (); - return false; + errno = EPROTO; + return -1; + } + + if (maxmsgsize >= 0 && (int64_t) (*tmpbuf - 1) > maxmsgsize) { + errno = EMSGSIZE; + return -1; } // in_progress is initialised at this point so in theory we should // close it before calling zmq_msg_init_size, however, it's a 0-byte // message and thus we can treat it as uninitialised... - int rc; - if (maxmsgsize >= 0 && (int64_t) (*tmpbuf - 1) > maxmsgsize) { - rc = -1; - errno = ENOMEM; - } - else - rc = in_progress.init_size (*tmpbuf - 1); - if (rc != 0 && errno == ENOMEM) { + int rc = in_progress.init_size (*tmpbuf - 1); + if (rc != 0) { + errno_assert (errno == ENOMEM); rc = in_progress.init (); errno_assert (rc == 0); - decoding_error (); - return false; + errno = ENOMEM; + return -1; } - errno_assert (rc == 0); next_step (tmpbuf, 1, &v1_decoder_t::flags_ready); } - return true; + return 0; } -bool zmq::v1_decoder_t::eight_byte_size_ready () +int zmq::v1_decoder_t::eight_byte_size_ready () { // 8-byte payload length is read. Allocate the buffer // for message body and read the message data into it. @@ -102,20 +94,20 @@ bool zmq::v1_decoder_t::eight_byte_size_ready () // There has to be at least one byte (the flags) in the message). if (payload_length == 0) { - decoding_error (); - return false; + errno = EPROTO; + return -1; } // Message size must not exceed the maximum allowed size. if (maxmsgsize >= 0 && payload_length - 1 > (uint64_t) maxmsgsize) { - decoding_error (); - return false; + errno = EMSGSIZE; + return -1; } // Message size must fit within range of size_t data type. if (payload_length - 1 > std::numeric_limits ::max ()) { - decoding_error (); - return false; + errno = EMSGSIZE; + return -1; } const size_t msg_size = static_cast (payload_length - 1); @@ -128,15 +120,15 @@ bool zmq::v1_decoder_t::eight_byte_size_ready () errno_assert (errno == ENOMEM); rc = in_progress.init (); errno_assert (rc == 0); - decoding_error (); - return false; + errno = ENOMEM; + return -1; } next_step (tmpbuf, 1, &v1_decoder_t::flags_ready); - return true; + return 0; } -bool zmq::v1_decoder_t::flags_ready () +int zmq::v1_decoder_t::flags_ready () { // Store the flags from the wire into the message structure. in_progress.set_flags (tmpbuf [0] & msg_t::more); @@ -144,22 +136,13 @@ bool zmq::v1_decoder_t::flags_ready () next_step (in_progress.data (), in_progress.size (), &v1_decoder_t::message_ready); - return true; + return 0; } -bool zmq::v1_decoder_t::message_ready () +int zmq::v1_decoder_t::message_ready () { // Message is completely read. Push it further and start reading // new message. (in_progress is a 0-byte message after this point.) - if (unlikely (!msg_sink)) - return false; - int rc = msg_sink->push_msg (&in_progress); - if (unlikely (rc != 0)) { - if (errno != EAGAIN) - decoding_error (); - return false; - } - next_step (tmpbuf, 1, &v1_decoder_t::one_byte_size_ready); - return true; + return 1; } diff --git a/src/v1_decoder.hpp b/src/v1_decoder.hpp index c7d9fe01..17081182 100644 --- a/src/v1_decoder.hpp +++ b/src/v1_decoder.hpp @@ -33,17 +33,15 @@ namespace zmq v1_decoder_t (size_t bufsize_, int64_t maxmsgsize_); ~v1_decoder_t (); - // Set the receiver of decoded messages. - void set_msg_sink (i_msg_sink *msg_sink_); + virtual msg_t *msg () { return &in_progress; } private: - bool one_byte_size_ready (); - bool eight_byte_size_ready (); - bool flags_ready (); - bool message_ready (); + int one_byte_size_ready (); + int eight_byte_size_ready (); + int flags_ready (); + int message_ready (); - i_msg_sink *msg_sink; unsigned char tmpbuf [8]; msg_t in_progress; diff --git a/src/v1_encoder.cpp b/src/v1_encoder.cpp index e35a2ef3..e2b6fd00 100644 --- a/src/v1_encoder.cpp +++ b/src/v1_encoder.cpp @@ -19,65 +19,31 @@ #include "encoder.hpp" #include "v1_encoder.hpp" -#include "i_msg_source.hpp" #include "likely.hpp" #include "wire.hpp" zmq::v1_encoder_t::v1_encoder_t (size_t bufsize_) : - encoder_base_t (bufsize_), - msg_source (NULL) + encoder_base_t (bufsize_) { - int rc = in_progress.init (); - errno_assert (rc == 0); - // Write 0 bytes to the batch and go to message_ready state. next_step (NULL, 0, &v1_encoder_t::message_ready, true); } zmq::v1_encoder_t::~v1_encoder_t () { - int rc = in_progress.close (); - errno_assert (rc == 0); } -void zmq::v1_encoder_t::set_msg_source (i_msg_source *msg_source_) -{ - msg_source = msg_source_; -} - -bool zmq::v1_encoder_t::size_ready () +void zmq::v1_encoder_t::size_ready () { // Write message body into the buffer. - next_step (in_progress.data (), in_progress.size (), - &v1_encoder_t::message_ready, !(in_progress.flags () & msg_t::more)); - return true; + next_step (in_progress->data (), in_progress->size (), + &v1_encoder_t::message_ready, true); } -bool zmq::v1_encoder_t::message_ready () +void zmq::v1_encoder_t::message_ready () { - // Destroy content of the old message. - int rc = in_progress.close (); - errno_assert (rc == 0); - - // Read new message. If there is none, return false. - // Note that new state is set only if write is successful. That way - // unsuccessful write will cause retry on the next state machine - // invocation. - if (unlikely (!msg_source)) { - rc = in_progress.init (); - errno_assert (rc == 0); - return false; - } - rc = msg_source->pull_msg (&in_progress); - if (unlikely (rc != 0)) { - errno_assert (errno == EAGAIN); - rc = in_progress.init (); - errno_assert (rc == 0); - return false; - } - // Get the message size. - size_t size = in_progress.size (); + size_t size = in_progress->size (); // Account for the 'flags' byte. size++; @@ -87,14 +53,13 @@ bool zmq::v1_encoder_t::message_ready () // message size. In both cases 'flags' field follows. if (size < 255) { tmpbuf [0] = (unsigned char) size; - tmpbuf [1] = (in_progress.flags () & msg_t::more); + tmpbuf [1] = (in_progress->flags () & msg_t::more); next_step (tmpbuf, 2, &v1_encoder_t::size_ready, false); } else { tmpbuf [0] = 0xff; put_uint64 (tmpbuf + 1, size); - tmpbuf [9] = (in_progress.flags () & msg_t::more); + tmpbuf [9] = (in_progress->flags () & msg_t::more); next_step (tmpbuf, 10, &v1_encoder_t::size_ready, false); } - return true; } diff --git a/src/v1_encoder.hpp b/src/v1_encoder.hpp index b93c28a6..7d5ab7dc 100644 --- a/src/v1_encoder.hpp +++ b/src/v1_encoder.hpp @@ -24,8 +24,6 @@ namespace zmq { - class i_msg_source; - // Encoder for ZMTP/1.0 protocol. Converts messages into data batches. class v1_encoder_t : public encoder_base_t @@ -35,15 +33,11 @@ namespace zmq v1_encoder_t (size_t bufsize_); ~v1_encoder_t (); - void set_msg_source (i_msg_source *msg_source_); - private: - bool size_ready (); - bool message_ready (); + void size_ready (); + void message_ready (); - i_msg_source *msg_source; - msg_t in_progress; unsigned char tmpbuf [10]; v1_encoder_t (const v1_encoder_t&); diff --git a/src/v2_decoder.cpp b/src/v2_decoder.cpp index 13570bbb..c5a44c95 100644 --- a/src/v2_decoder.cpp +++ b/src/v2_decoder.cpp @@ -31,10 +31,8 @@ #include "wire.hpp" #include "err.hpp" -zmq::v2_decoder_t::v2_decoder_t (size_t bufsize_, - int64_t maxmsgsize_, i_msg_sink *msg_sink_) : +zmq::v2_decoder_t::v2_decoder_t (size_t bufsize_, int64_t maxmsgsize_) : decoder_base_t (bufsize_), - msg_sink (msg_sink_), msg_flags (0), maxmsgsize (maxmsgsize_) { @@ -51,12 +49,7 @@ zmq::v2_decoder_t::~v2_decoder_t () errno_assert (rc == 0); } -void zmq::v2_decoder_t::set_msg_sink (i_msg_sink *msg_sink_) -{ - msg_sink = msg_sink_; -} - -bool zmq::v2_decoder_t::flags_ready () +int zmq::v2_decoder_t::flags_ready () { msg_flags = 0; if (tmpbuf [0] & v2_protocol_t::more_flag) @@ -69,92 +62,79 @@ bool zmq::v2_decoder_t::flags_ready () else next_step (tmpbuf, 1, &v2_decoder_t::one_byte_size_ready); - return true; + return 0; } -bool zmq::v2_decoder_t::one_byte_size_ready () +int zmq::v2_decoder_t::one_byte_size_ready () { - int rc = 0; - // Message size must not exceed the maximum allowed size. if (maxmsgsize >= 0) - if (unlikely (tmpbuf [0] > static_cast (maxmsgsize))) - goto error; + if (unlikely (tmpbuf [0] > static_cast (maxmsgsize))) { + errno = EMSGSIZE; + return -1; + } // in_progress is initialised at this point so in theory we should // close it before calling zmq_msg_init_size, however, it's a 0-byte // message and thus we can treat it as uninitialised... - rc = in_progress.init_size (tmpbuf [0]); + int rc = in_progress.init_size (tmpbuf [0]); if (unlikely (rc)) { errno_assert (errno == ENOMEM); - int rc = in_progress.init (); + rc = in_progress.init (); errno_assert (rc == 0); - goto error; + errno = ENOMEM; + return -1; } in_progress.set_flags (msg_flags); next_step (in_progress.data (), in_progress.size (), &v2_decoder_t::message_ready); - return true; - -error: - decoding_error (); - return false; + return 0; } -bool zmq::v2_decoder_t::eight_byte_size_ready () +int zmq::v2_decoder_t::eight_byte_size_ready () { - int rc = 0; - // The payload size is encoded as 64-bit unsigned integer. // The most significant byte comes first. const uint64_t msg_size = get_uint64 (tmpbuf); // Message size must not exceed the maximum allowed size. if (maxmsgsize >= 0) - if (unlikely (msg_size > static_cast (maxmsgsize))) - goto error; + if (unlikely (msg_size > static_cast (maxmsgsize))) { + errno = EMSGSIZE; + return -1; + } // Message size must fit into size_t data type. - if (unlikely (msg_size != static_cast (msg_size))) - goto error; + if (unlikely (msg_size != static_cast (msg_size))) { + errno = EMSGSIZE; + return -1; + } // in_progress is initialised at this point so in theory we should // close it before calling init_size, however, it's a 0-byte // message and thus we can treat it as uninitialised. - rc = in_progress.init_size (static_cast (msg_size)); + int rc = in_progress.init_size (static_cast (msg_size)); if (unlikely (rc)) { errno_assert (errno == ENOMEM); - int rc = in_progress.init (); + rc = in_progress.init (); errno_assert (rc == 0); - goto error; + errno = ENOMEM; + return -1; } in_progress.set_flags (msg_flags); next_step (in_progress.data (), in_progress.size (), &v2_decoder_t::message_ready); - return true; - -error: - decoding_error (); - return false; + return 0; } -bool zmq::v2_decoder_t::message_ready () +int zmq::v2_decoder_t::message_ready () { - // Message is completely read. Push it further and start reading - // new message. (in_progress is a 0-byte message after this point.) - if (unlikely (!msg_sink)) - return false; - int rc = msg_sink->push_msg (&in_progress); - if (unlikely (rc != 0)) { - if (errno != EAGAIN) - decoding_error (); - return false; - } - + // Message is completely read. Signal this to the caller + // and prepare to decode next message. next_step (tmpbuf, 1, &v2_decoder_t::flags_ready); - return true; + return 1; } diff --git a/src/v2_decoder.hpp b/src/v2_decoder.hpp index e7c637d6..a370dda1 100644 --- a/src/v2_decoder.hpp +++ b/src/v2_decoder.hpp @@ -21,7 +21,6 @@ #define __ZMQ_V2_DECODER_HPP_INCLUDED__ #include "decoder.hpp" -#include "i_msg_sink.hpp" namespace zmq { @@ -30,21 +29,19 @@ namespace zmq { public: - v2_decoder_t (size_t bufsize_, - int64_t maxmsgsize_, i_msg_sink *msg_sink_); + v2_decoder_t (size_t bufsize_, int64_t maxmsgsize_); virtual ~v2_decoder_t (); // i_decoder interface. - virtual void set_msg_sink (i_msg_sink *msg_sink_); + virtual msg_t *msg () { return &in_progress; } private: - bool flags_ready (); - bool one_byte_size_ready (); - bool eight_byte_size_ready (); - bool message_ready (); + int flags_ready (); + int one_byte_size_ready (); + int eight_byte_size_ready (); + int message_ready (); - i_msg_sink *msg_sink; unsigned char tmpbuf [8]; unsigned char msg_flags; msg_t in_progress; diff --git a/src/v2_encoder.cpp b/src/v2_encoder.cpp index 68e36a01..c08d59c8 100644 --- a/src/v2_encoder.cpp +++ b/src/v2_encoder.cpp @@ -22,64 +22,31 @@ #include "likely.hpp" #include "wire.hpp" -zmq::v2_encoder_t::v2_encoder_t (size_t bufsize_, i_msg_source *msg_source_) : - encoder_base_t (bufsize_), - msg_source (msg_source_) +zmq::v2_encoder_t::v2_encoder_t (size_t bufsize_) : + encoder_base_t (bufsize_) { - int rc = in_progress.init (); - errno_assert (rc == 0); - // Write 0 bytes to the batch and go to message_ready state. next_step (NULL, 0, &v2_encoder_t::message_ready, true); } zmq::v2_encoder_t::~v2_encoder_t () { - int rc = in_progress.close (); - errno_assert (rc == 0); } -void zmq::v2_encoder_t::set_msg_source (i_msg_source *msg_source_) +void zmq::v2_encoder_t::message_ready () { - msg_source = msg_source_; -} - -bool zmq::v2_encoder_t::message_ready () -{ - // Release the content of the old message. - int rc = in_progress.close (); - errno_assert (rc == 0); - - // Read new message. If there is none, return false. - // Note that new state is set only if write is successful. That way - // unsuccessful write will cause retry on the next state machine - // invocation. - if (unlikely (!msg_source)) { - rc = in_progress.init (); - errno_assert (rc == 0); - return false; - } - - rc = msg_source->pull_msg (&in_progress); - if (unlikely (rc)) { - errno_assert (errno == EAGAIN); - rc = in_progress.init (); - errno_assert (rc == 0); - return false; - } - // Encode flags. unsigned char &protocol_flags = tmpbuf [0]; protocol_flags = 0; - if (in_progress.flags () & msg_t::more) + if (in_progress->flags () & msg_t::more) protocol_flags |= v2_protocol_t::more_flag; - if (in_progress.size () > 255) + if (in_progress->size () > 255) protocol_flags |= v2_protocol_t::large_flag; // Encode the message length. For messages less then 256 bytes, // the length is encoded as 8-bit unsigned integer. For larger // messages, 64-bit unsigned integer in network byte order is used. - const size_t size = in_progress.size (); + const size_t size = in_progress->size (); if (unlikely (size > 255)) { put_uint64 (tmpbuf + 1, size); next_step (tmpbuf, 9, &v2_encoder_t::size_ready, false); @@ -88,13 +55,11 @@ bool zmq::v2_encoder_t::message_ready () tmpbuf [1] = static_cast (size); next_step (tmpbuf, 2, &v2_encoder_t::size_ready, false); } - return true; } -bool zmq::v2_encoder_t::size_ready () +void zmq::v2_encoder_t::size_ready () { // Write message body into the buffer. - next_step (in_progress.data (), in_progress.size (), - &v2_encoder_t::message_ready, !(in_progress.flags () & msg_t::more)); - return true; + next_step (in_progress->data (), in_progress->size (), + &v2_encoder_t::message_ready, true); } diff --git a/src/v2_encoder.hpp b/src/v2_encoder.hpp index de8db391..12ec1759 100644 --- a/src/v2_encoder.hpp +++ b/src/v2_encoder.hpp @@ -21,30 +21,23 @@ #define __ZMQ_V2_ENCODER_HPP_INCLUDED__ #include "encoder.hpp" -#include "i_msg_source.hpp" namespace zmq { - class i_msg_source; - // Encoder for 0MQ framing protocol. Converts messages into data stream. class v2_encoder_t : public encoder_base_t { public: - v2_encoder_t (size_t bufsize_, i_msg_source *msg_source_); + v2_encoder_t (size_t bufsize_); virtual ~v2_encoder_t (); - virtual void set_msg_source (i_msg_source *msg_source_); - private: - bool size_ready (); - bool message_ready (); + void size_ready (); + void message_ready (); - i_msg_source *msg_source; - msg_t in_progress; unsigned char tmpbuf [9]; v2_encoder_t (const v2_encoder_t&); diff --git a/src/wire.hpp b/src/wire.hpp index e06c00b1..7aec7b8b 100644 --- a/src/wire.hpp +++ b/src/wire.hpp @@ -33,7 +33,7 @@ namespace zmq *buffer_ = value; } - inline uint8_t get_uint8 (unsigned char *buffer_) + inline uint8_t get_uint8 (const unsigned char *buffer_) { return *buffer_; } @@ -44,7 +44,7 @@ namespace zmq buffer_ [1] = (unsigned char) (value & 0xff); } - inline uint16_t get_uint16 (unsigned char *buffer_) + inline uint16_t get_uint16 (const unsigned char *buffer_) { return (((uint16_t) buffer_ [0]) << 8) | @@ -59,7 +59,7 @@ namespace zmq buffer_ [3] = (unsigned char) (value & 0xff); } - inline uint32_t get_uint32 (unsigned char *buffer_) + inline uint32_t get_uint32 (const unsigned char *buffer_) { return (((uint32_t) buffer_ [0]) << 24) | @@ -80,7 +80,7 @@ namespace zmq buffer_ [7] = (unsigned char) (value & 0xff); } - inline uint64_t get_uint64 (unsigned char *buffer_) + inline uint64_t get_uint64 (const unsigned char *buffer_) { return (((uint64_t) buffer_ [0]) << 56) |