0
0
mirror of https://github.com/zeromq/libzmq.git synced 2024-12-31 01:43:02 +08:00

Use state functions for message flow

The patch makes the code somewhat simpler and prepares it for
more complex initialization handshakes.
This commit is contained in:
Martin Hurton 2013-04-12 11:59:49 +02:00
parent 7942db7606
commit 9d79ac2830
2 changed files with 65 additions and 65 deletions

View File

@ -63,12 +63,10 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_, cons
endpoint (endpoint_), endpoint (endpoint_),
plugged (false), plugged (false),
terminating (false), terminating (false),
read_msg (&stream_engine_t::read_identity),
write_msg (&stream_engine_t::write_identity),
io_error (false), io_error (false),
congested (false), congested (false),
identity_received (false),
identity_sent (false),
rx_initialized (false),
tx_initialized (false),
subscription_required (false), subscription_required (false),
socket (NULL) socket (NULL)
{ {
@ -157,6 +155,9 @@ void zmq::stream_engine_t::plug (io_thread_t *io_thread_,
// disable handshaking for raw socket // disable handshaking for raw socket
handshaking = false; handshaking = false;
read_msg = &stream_engine_t::pull_msg_from_session;
write_msg = &stream_engine_t::push_msg_to_session;
} }
else { else {
// Send the 'length' and 'flags' fields of the identity message. // Send the 'length' and 'flags' fields of the identity message.
@ -248,7 +249,7 @@ void zmq::stream_engine_t::in_event ()
insize -= processed; insize -= processed;
if (rc == 0 || rc == -1) if (rc == 0 || rc == -1)
break; break;
rc = write_msg (decoder->msg ()); rc = (this->*write_msg) (decoder->msg ());
if (rc == -1) if (rc == -1)
break; break;
} }
@ -286,7 +287,7 @@ void zmq::stream_engine_t::out_event ()
outsize = encoder->encode (&outpos, 0); outsize = encoder->encode (&outpos, 0);
while (outsize < out_batch_size) { while (outsize < out_batch_size) {
if (read_msg (&tx_msg) == -1) if ((this->*read_msg) (&tx_msg) == -1)
break; break;
encoder->load_msg (&tx_msg); encoder->load_msg (&tx_msg);
unsigned char *bufptr = outpos + outsize; unsigned char *bufptr = outpos + outsize;
@ -355,7 +356,7 @@ void zmq::stream_engine_t::activate_in ()
zmq_assert (session != NULL); zmq_assert (session != NULL);
zmq_assert (decoder != NULL); zmq_assert (decoder != NULL);
int rc = write_msg (decoder->msg ()); int rc = (this->*write_msg) (decoder->msg ());
if (rc == -1) { if (rc == -1) {
if (errno == EAGAIN) if (errno == EAGAIN)
session->flush (); session->flush ();
@ -372,7 +373,7 @@ void zmq::stream_engine_t::activate_in ()
insize -= processed; insize -= processed;
if (rc == 0 || rc == -1) if (rc == 0 || rc == -1)
break; break;
rc = write_msg (decoder->msg ()); rc = (this->*write_msg) (decoder->msg ());
if (rc == -1) if (rc == -1)
break; break;
} }
@ -499,35 +500,22 @@ bool zmq::stream_engine_t::handshake ()
return true; return true;
} }
int zmq::stream_engine_t::read_msg (msg_t *msg_) int zmq::stream_engine_t::read_identity (msg_t *msg_)
{ {
if (likely (tx_initialized || options.raw_sock))
return session->pull_msg (msg_);
if (!identity_sent) {
int rc = msg_->init_size (options.identity_size); int rc = msg_->init_size (options.identity_size);
errno_assert (rc == 0); errno_assert (rc == 0);
if (options.identity_size > 0)
memcpy (msg_->data (), options.identity, options.identity_size); memcpy (msg_->data (), options.identity, options.identity_size);
identity_sent = true; read_msg = &stream_engine_t::pull_msg_from_session;
tx_initialized = true;
return 0; return 0;
} }
tx_initialized = true; int zmq::stream_engine_t::write_identity (msg_t *msg_)
return 0;
}
int zmq::stream_engine_t::write_msg (msg_t *msg_)
{ {
if (likely (rx_initialized || options.raw_sock))
return session->push_msg (msg_);
if (!identity_received) {
if (options.recv_identity) { if (options.recv_identity) {
msg_->set_flags (msg_t::identity); msg_->set_flags (msg_t::identity);
int rc = session->push_msg (msg_); int rc = session->push_msg (msg_);
if (rc == -1) errno_assert (rc == 0);
return -1;
} }
else { else {
int rc = msg_->close (); int rc = msg_->close ();
@ -536,23 +524,39 @@ int zmq::stream_engine_t::write_msg (msg_t *msg_)
errno_assert (rc == 0); errno_assert (rc == 0);
} }
identity_received = true; if (subscription_required)
write_msg = &stream_engine_t::write_subscription_msg;
else
write_msg = &stream_engine_t::push_msg_to_session;
return 0;
} }
int zmq::stream_engine_t::pull_msg_from_session (msg_t *msg_)
{
return session->pull_msg (msg_);
}
int zmq::stream_engine_t::push_msg_to_session (msg_t *msg_)
{
return session->push_msg (msg_);
}
int zmq::stream_engine_t::write_subscription_msg (msg_t *msg_)
{
msg_t subscription;
// Inject the subscription message, so that also // Inject the subscription message, so that also
// ZMQ 2.x peers receive published messages. // ZMQ 2.x peers receive published messages.
if (subscription_required) { int rc = subscription.init_size (1);
int rc = msg_->init_size (1);
errno_assert (rc == 0); errno_assert (rc == 0);
*(unsigned char*) msg_->data () = 1; *(unsigned char*) subscription.data () = 1;
rc = session->push_msg (msg_); rc = session->push_msg (&subscription);
if (rc == -1) if (rc == -1)
return -1; return -1;
subscription_required = false;
}
rx_initialized = true; write_msg = &stream_engine_t::push_msg_to_session;
return 0; return push_msg_to_session (msg_);
} }
void zmq::stream_engine_t::error () void zmq::stream_engine_t::error ()

View File

@ -91,9 +91,13 @@ namespace zmq
// peer -1 is returned. // peer -1 is returned.
int read (void *data_, size_t size_); int read (void *data_, size_t size_);
int read_msg (msg_t *msg_); int read_identity (msg_t *msg_);
int write_identity (msg_t *msg_);
int write_msg (msg_t *msg_); int pull_msg_from_session (msg_t *msg_);
int push_msg_to_session (msg_t *msg);
int write_subscription_msg (msg_t *msg_);
// Underlying socket. // Underlying socket.
fd_t s; fd_t s;
@ -137,24 +141,16 @@ namespace zmq
bool plugged; bool plugged;
bool terminating; bool terminating;
int (stream_engine_t::*read_msg) (msg_t *msg_);
int (stream_engine_t::*write_msg) (msg_t *msg_);
bool io_error; bool io_error;
// True iff the session could not accept more // True iff the session could not accept more
// messages due to flow control. // messages due to flow control.
bool congested; bool congested;
// True iff the engine has received identity message.
bool identity_received;
// True iff the engine has sent identity message.
bool identity_sent;
// True iff the engine has received all ZMTP control messages.
bool rx_initialized;
// True iff the engine has sent all ZMTP control messages.
bool tx_initialized;
// Indicates whether the engine is to inject a phony // Indicates whether the engine is to inject a phony
// subscription message into the incomming stream. // subscription message into the incomming stream.
// Needed to support old peers. // Needed to support old peers.