mirror of
https://github.com/zeromq/libzmq.git
synced 2025-01-17 04:50:57 +08:00
Merge pull request #564 from hurtonm/master
Rename pipe states so they are more mnemonic
This commit is contained in:
commit
9d63ebf6d6
76
src/pipe.cpp
76
src/pipe.cpp
@ -96,7 +96,9 @@ zmq::blob_t zmq::pipe_t::get_identity ()
|
|||||||
|
|
||||||
bool zmq::pipe_t::check_read ()
|
bool zmq::pipe_t::check_read ()
|
||||||
{
|
{
|
||||||
if (unlikely (!in_active || (state != active && state != pending)))
|
if (unlikely (!in_active))
|
||||||
|
return false;
|
||||||
|
if (unlikely (state != active && state != waiting_for_delimiter))
|
||||||
return false;
|
return false;
|
||||||
|
|
||||||
// Check if there's an item in the pipe.
|
// Check if there's an item in the pipe.
|
||||||
@ -120,7 +122,9 @@ bool zmq::pipe_t::check_read ()
|
|||||||
|
|
||||||
bool zmq::pipe_t::read (msg_t *msg_)
|
bool zmq::pipe_t::read (msg_t *msg_)
|
||||||
{
|
{
|
||||||
if (unlikely (!in_active || (state != active && state != pending)))
|
if (unlikely (!in_active))
|
||||||
|
return false;
|
||||||
|
if (unlikely (state != active && state != waiting_for_delimiter))
|
||||||
return false;
|
return false;
|
||||||
|
|
||||||
if (!inpipe->read (msg_)) {
|
if (!inpipe->read (msg_)) {
|
||||||
@ -187,7 +191,7 @@ void zmq::pipe_t::rollback ()
|
|||||||
void zmq::pipe_t::flush ()
|
void zmq::pipe_t::flush ()
|
||||||
{
|
{
|
||||||
// The peer does not exist anymore at this point.
|
// The peer does not exist anymore at this point.
|
||||||
if (state == terminating)
|
if (state == term_ack_sent)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
if (outpipe && !outpipe->flush ())
|
if (outpipe && !outpipe->flush ())
|
||||||
@ -196,7 +200,7 @@ void zmq::pipe_t::flush ()
|
|||||||
|
|
||||||
void zmq::pipe_t::process_activate_read ()
|
void zmq::pipe_t::process_activate_read ()
|
||||||
{
|
{
|
||||||
if (!in_active && (state == active || state == pending)) {
|
if (!in_active && (state == active || state == waiting_for_delimiter)) {
|
||||||
in_active = true;
|
in_active = true;
|
||||||
sink->read_activated (this);
|
sink->read_activated (this);
|
||||||
}
|
}
|
||||||
@ -240,24 +244,24 @@ void zmq::pipe_t::process_pipe_term ()
|
|||||||
{
|
{
|
||||||
// This is the simple case of peer-induced termination. If there are no
|
// This is the simple case of peer-induced termination. If there are no
|
||||||
// more pending messages to read, or if the pipe was configured to drop
|
// more pending messages to read, or if the pipe was configured to drop
|
||||||
// pending messages, we can move directly to the terminating state.
|
// pending messages, we can move directly to the term_ack_sent state.
|
||||||
// Otherwise we'll hang up in pending state till all the pending messages
|
// Otherwise we'll hang up in waiting_for_delimiter state till all
|
||||||
// are sent.
|
// pending messages are read.
|
||||||
if (state == active) {
|
if (state == active) {
|
||||||
if (!delay) {
|
if (!delay) {
|
||||||
state = terminating;
|
state = term_ack_sent;
|
||||||
outpipe = NULL;
|
outpipe = NULL;
|
||||||
send_pipe_term_ack (peer);
|
send_pipe_term_ack (peer);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
state = pending;
|
state = waiting_for_delimiter;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Delimiter happened to arrive before the term command. Now we have the
|
// Delimiter happened to arrive before the term command. Now we have the
|
||||||
// term command as well, so we can move straight to terminating state.
|
// term command as well, so we can move straight to term_ack_sent state.
|
||||||
if (state == delimited) {
|
if (state == delimiter_received) {
|
||||||
state = terminating;
|
state = term_ack_sent;
|
||||||
outpipe = NULL;
|
outpipe = NULL;
|
||||||
send_pipe_term_ack (peer);
|
send_pipe_term_ack (peer);
|
||||||
return;
|
return;
|
||||||
@ -266,8 +270,8 @@ void zmq::pipe_t::process_pipe_term ()
|
|||||||
// This is the case where both ends of the pipe are closed in parallel.
|
// This is the case where both ends of the pipe are closed in parallel.
|
||||||
// We simply reply to the request by ack and continue waiting for our
|
// We simply reply to the request by ack and continue waiting for our
|
||||||
// own ack.
|
// own ack.
|
||||||
if (state == terminated) {
|
if (state == term_req_sent1) {
|
||||||
state = double_terminated;
|
state = term_req_sent2;
|
||||||
outpipe = NULL;
|
outpipe = NULL;
|
||||||
send_pipe_term_ack (peer);
|
send_pipe_term_ack (peer);
|
||||||
return;
|
return;
|
||||||
@ -283,16 +287,16 @@ void zmq::pipe_t::process_pipe_term_ack ()
|
|||||||
zmq_assert (sink);
|
zmq_assert (sink);
|
||||||
sink->terminated (this);
|
sink->terminated (this);
|
||||||
|
|
||||||
// In terminating and double_terminated states there's nothing to do.
|
// In term_ack_sent and term_req_sent2 states there's nothing to do.
|
||||||
// Simply deallocate the pipe. In terminated state we have to ack the
|
// Simply deallocate the pipe. In term_req_sent1 state we have to ack
|
||||||
// peer before deallocating this side of the pipe. All the other states
|
// the peer before deallocating this side of the pipe.
|
||||||
// are invalid.
|
// All the other states are invalid.
|
||||||
if (state == terminated) {
|
if (state == term_req_sent1) {
|
||||||
outpipe = NULL;
|
outpipe = NULL;
|
||||||
send_pipe_term_ack (peer);
|
send_pipe_term_ack (peer);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
zmq_assert (state == terminating || state == double_terminated);
|
zmq_assert (state == term_ack_sent || state == term_req_sent2);
|
||||||
|
|
||||||
// We'll deallocate the inbound pipe, the peer will deallocate the outbound
|
// We'll deallocate the inbound pipe, the peer will deallocate the outbound
|
||||||
// pipe (which is an inbound pipe from its point of view).
|
// pipe (which is an inbound pipe from its point of view).
|
||||||
@ -316,44 +320,44 @@ void zmq::pipe_t::terminate (bool delay_)
|
|||||||
delay = delay_;
|
delay = delay_;
|
||||||
|
|
||||||
// If terminate was already called, we can ignore the duplicit invocation.
|
// If terminate was already called, we can ignore the duplicit invocation.
|
||||||
if (state == terminated || state == double_terminated)
|
if (state == term_req_sent1 || state == term_req_sent2)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
// If the pipe is in the final phase of async termination, it's going to
|
// If the pipe is in the final phase of async termination, it's going to
|
||||||
// closed anyway. No need to do anything special here.
|
// closed anyway. No need to do anything special here.
|
||||||
else
|
else
|
||||||
if (state == terminating)
|
if (state == term_ack_sent)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
// The simple sync termination case. Ask the peer to terminate and wait
|
// The simple sync termination case. Ask the peer to terminate and wait
|
||||||
// for the ack.
|
// for the ack.
|
||||||
else
|
else
|
||||||
if (state == active) {
|
if (state == active) {
|
||||||
send_pipe_term (peer);
|
send_pipe_term (peer);
|
||||||
state = terminated;
|
state = term_req_sent1;
|
||||||
}
|
}
|
||||||
|
|
||||||
// There are still pending messages available, but the user calls
|
// There are still pending messages available, but the user calls
|
||||||
// 'terminate'. We can act as if all the pending messages were read.
|
// 'terminate'. We can act as if all the pending messages were read.
|
||||||
else
|
else
|
||||||
if (state == pending && !delay) {
|
if (state == waiting_for_delimiter && delay == 0) {
|
||||||
outpipe = NULL;
|
outpipe = NULL;
|
||||||
send_pipe_term_ack (peer);
|
send_pipe_term_ack (peer);
|
||||||
state = terminating;
|
state = term_ack_sent;
|
||||||
}
|
}
|
||||||
|
|
||||||
// If there are pending messages still availabe, do nothing.
|
// If there are pending messages still availabe, do nothing.
|
||||||
else
|
else
|
||||||
if (state == pending) {
|
if (state == waiting_for_delimiter) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// We've already got delimiter, but not term command yet. We can ignore
|
// We've already got delimiter, but not term command yet. We can ignore
|
||||||
// the delimiter and ack synchronously terminate as if we were in
|
// the delimiter and ack synchronously terminate as if we were in
|
||||||
// active state.
|
// active state.
|
||||||
else
|
else
|
||||||
if (state == delimited) {
|
if (state == delimiter_received) {
|
||||||
send_pipe_term (peer);
|
send_pipe_term (peer);
|
||||||
state = terminated;
|
state = term_req_sent1;
|
||||||
}
|
}
|
||||||
|
|
||||||
// There are no other states.
|
// There are no other states.
|
||||||
@ -413,14 +417,14 @@ int zmq::pipe_t::compute_lwm (int hwm_)
|
|||||||
void zmq::pipe_t::delimit ()
|
void zmq::pipe_t::delimit ()
|
||||||
{
|
{
|
||||||
if (state == active) {
|
if (state == active) {
|
||||||
state = delimited;
|
state = delimiter_received;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (state == pending) {
|
if (state == waiting_for_delimiter) {
|
||||||
outpipe = NULL;
|
outpipe = NULL;
|
||||||
send_pipe_term_ack (peer);
|
send_pipe_term_ack (peer);
|
||||||
state = terminating;
|
state = term_ack_sent;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
30
src/pipe.hpp
30
src/pipe.hpp
@ -162,22 +162,24 @@ namespace zmq
|
|||||||
// Sink to send events to.
|
// Sink to send events to.
|
||||||
i_pipe_events *sink;
|
i_pipe_events *sink;
|
||||||
|
|
||||||
// State of the pipe endpoint. Active is common state before any
|
// States of the pipe endpoint:
|
||||||
// termination begins. Delimited means that delimiter was read from
|
// active: common state before any termination begins,
|
||||||
// pipe before term command was received. Pending means that term
|
// delimiter_received: delimiter was read from pipe before
|
||||||
// command was already received from the peer but there are still
|
// term command was received,
|
||||||
// pending messages to read. Terminating means that all pending
|
// waiting_fo_delimiter: term command was already received
|
||||||
// messages were already read and all we are waiting for is ack from
|
// from the peer but there are still pending messages to read,
|
||||||
// the peer. Terminated means that 'terminate' was explicitly called
|
// term_ack_sent: all pending messages were already read and
|
||||||
// by the user. Double_terminated means that user called 'terminate'
|
// all we are waiting for is ack from the peer,
|
||||||
// and then we've got term command from the peer as well.
|
// term_req_sent1: 'terminate' was explicitly called by the user,
|
||||||
|
// term_req_sent2: user called 'terminate' and then we've got
|
||||||
|
// term command from the peer as well.
|
||||||
enum {
|
enum {
|
||||||
active,
|
active,
|
||||||
delimited,
|
delimiter_received,
|
||||||
pending,
|
waiting_for_delimiter,
|
||||||
terminating,
|
term_ack_sent,
|
||||||
terminated,
|
term_req_sent1,
|
||||||
double_terminated
|
term_req_sent2
|
||||||
} state;
|
} state;
|
||||||
|
|
||||||
// If true, we receive all the pending inbound messages before
|
// If true, we receive all the pending inbound messages before
|
||||||
|
Loading…
x
Reference in New Issue
Block a user