mirror of
https://github.com/zeromq/libzmq.git
synced 2025-01-16 20:41:18 +08:00
Resolve LIBZMQ-465
This commit is contained in:
parent
99f714445f
commit
c543b2ce8c
@ -57,11 +57,6 @@ void zmq::decoder_t::set_msg_sink (i_msg_sink *msg_sink_)
|
||||
msg_sink = msg_sink_;
|
||||
}
|
||||
|
||||
bool zmq::decoder_t::stalled () const
|
||||
{
|
||||
return next == &decoder_t::message_ready;
|
||||
}
|
||||
|
||||
bool zmq::decoder_t::one_byte_size_ready ()
|
||||
{
|
||||
// First byte of size is read. If it is 0xff read 8-byte size.
|
||||
|
@ -143,6 +143,22 @@ namespace zmq
|
||||
}
|
||||
}
|
||||
|
||||
// Returns true if the decoder has been fed all required data
|
||||
// but cannot proceed with the next decoding step.
|
||||
// False is returned if the decoder has encountered an error.
|
||||
bool stalled ()
|
||||
{
|
||||
while (!to_read) {
|
||||
if (!(static_cast <T*> (this)->*next) ()) {
|
||||
if (unlikely (!(static_cast <T*> (this)->next)))
|
||||
return false;
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
inline bool message_ready_size (size_t msg_sz)
|
||||
{
|
||||
zmq_assert (false);
|
||||
@ -172,13 +188,13 @@ namespace zmq
|
||||
next = NULL;
|
||||
}
|
||||
|
||||
private:
|
||||
|
||||
// Next step. If set to NULL, it means that associated data stream
|
||||
// is dead. Note that there can be still data in the process in such
|
||||
// case.
|
||||
step_t next;
|
||||
|
||||
private:
|
||||
|
||||
// Where to store the read data.
|
||||
unsigned char *read_pos;
|
||||
|
||||
@ -205,10 +221,6 @@ namespace zmq
|
||||
// Set the receiver of decoded messages.
|
||||
void set_msg_sink (i_msg_sink *msg_sink_);
|
||||
|
||||
// Returns true if there is a decoded message
|
||||
// waiting to be delivered to the session.
|
||||
bool stalled () const;
|
||||
|
||||
private:
|
||||
|
||||
bool one_byte_size_ready ();
|
||||
|
@ -40,7 +40,7 @@ namespace zmq
|
||||
|
||||
virtual size_t process_buffer (unsigned char *data_, size_t size_) = 0;
|
||||
|
||||
virtual bool stalled () const = 0;
|
||||
virtual bool stalled () = 0;
|
||||
|
||||
virtual bool message_ready_size (size_t msg_sz) = 0;
|
||||
};
|
||||
|
@ -53,7 +53,7 @@ void zmq::raw_decoder_t::set_msg_sink (i_msg_sink *msg_sink_)
|
||||
msg_sink = msg_sink_;
|
||||
}
|
||||
|
||||
bool zmq::raw_decoder_t::stalled () const
|
||||
bool zmq::raw_decoder_t::stalled ()
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
@ -45,7 +45,7 @@ namespace zmq
|
||||
// i_decoder interface.
|
||||
virtual void set_msg_sink (i_msg_sink *msg_sink_);
|
||||
|
||||
virtual bool stalled () const;
|
||||
virtual bool stalled ();
|
||||
|
||||
virtual bool message_ready_size (size_t msg_sz);
|
||||
|
||||
|
@ -52,10 +52,10 @@
|
||||
|
||||
zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_, const std::string &endpoint_) :
|
||||
s (fd_),
|
||||
io_enabled (false),
|
||||
inpos (NULL),
|
||||
insize (0),
|
||||
decoder (NULL),
|
||||
input_error (false),
|
||||
outpos (NULL),
|
||||
outsize (0),
|
||||
encoder (NULL),
|
||||
@ -134,6 +134,7 @@ void zmq::stream_engine_t::plug (io_thread_t *io_thread_,
|
||||
// Connect to I/O threads poller object.
|
||||
io_object_t::plug (io_thread_);
|
||||
handle = add_fd (s);
|
||||
io_enabled = true;
|
||||
|
||||
if (options.raw_sock) {
|
||||
// no handshaking for raw sock, instantiate raw encoder and decoders
|
||||
@ -169,7 +170,10 @@ void zmq::stream_engine_t::unplug ()
|
||||
plugged = false;
|
||||
|
||||
// Cancel all fd subscriptions.
|
||||
rm_fd (handle);
|
||||
if (io_enabled) {
|
||||
rm_fd (handle);
|
||||
io_enabled = false;
|
||||
}
|
||||
|
||||
// Disconnect from I/O threads poller object.
|
||||
io_object_t::unplug ();
|
||||
@ -250,9 +254,10 @@ void zmq::stream_engine_t::in_event ()
|
||||
// waiting for input events and postpone the termination
|
||||
// until after the session has accepted the message.
|
||||
if (disconnection) {
|
||||
input_error = true;
|
||||
if (decoder->stalled ())
|
||||
reset_pollin (handle);
|
||||
if (decoder->stalled ()) {
|
||||
rm_fd (handle);
|
||||
io_enabled = false;
|
||||
}
|
||||
else
|
||||
error ();
|
||||
}
|
||||
@ -319,7 +324,7 @@ void zmq::stream_engine_t::activate_out ()
|
||||
|
||||
void zmq::stream_engine_t::activate_in ()
|
||||
{
|
||||
if (input_error) {
|
||||
if (unlikely (!io_enabled)) {
|
||||
// There was an input error but the engine could not
|
||||
// be terminated (due to the stalled decoder).
|
||||
// Flush the pending message and terminate the engine now.
|
||||
|
@ -96,12 +96,14 @@ namespace zmq
|
||||
// Preamble (10 bytes) + version (1 byte) + socket type (1 byte).
|
||||
const static size_t greeting_size = 12;
|
||||
|
||||
// True iff we are registered with an I/O poller.
|
||||
bool io_enabled;
|
||||
|
||||
handle_t handle;
|
||||
|
||||
unsigned char *inpos;
|
||||
size_t insize;
|
||||
i_decoder *decoder;
|
||||
bool input_error;
|
||||
|
||||
unsigned char *outpos;
|
||||
size_t outsize;
|
||||
|
@ -58,11 +58,6 @@ void zmq::v1_decoder_t::set_msg_sink (i_msg_sink *msg_sink_)
|
||||
msg_sink = msg_sink_;
|
||||
}
|
||||
|
||||
bool zmq::v1_decoder_t::stalled () const
|
||||
{
|
||||
return next == &v1_decoder_t::message_ready;
|
||||
}
|
||||
|
||||
bool zmq::v1_decoder_t::flags_ready ()
|
||||
{
|
||||
msg_flags = 0;
|
||||
|
@ -44,8 +44,6 @@ namespace zmq
|
||||
// i_decoder interface.
|
||||
virtual void set_msg_sink (i_msg_sink *msg_sink_);
|
||||
|
||||
virtual bool stalled () const;
|
||||
|
||||
private:
|
||||
|
||||
bool flags_ready ();
|
||||
|
Loading…
x
Reference in New Issue
Block a user