mirror of
https://github.com/zeromq/libzmq.git
synced 2025-03-10 16:06:09 +00:00
commit
40cbbe3c9e
@ -22,7 +22,6 @@
|
|||||||
#include "windows.hpp"
|
#include "windows.hpp"
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#include <string.h>
|
|
||||||
#include <string>
|
#include <string>
|
||||||
|
|
||||||
#include "msg.hpp"
|
#include "msg.hpp"
|
||||||
|
@ -73,8 +73,8 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_,
|
|||||||
options (options_),
|
options (options_),
|
||||||
endpoint (endpoint_),
|
endpoint (endpoint_),
|
||||||
plugged (false),
|
plugged (false),
|
||||||
read_msg (&stream_engine_t::read_identity),
|
next_msg (&stream_engine_t::identity_msg),
|
||||||
write_msg (&stream_engine_t::write_identity),
|
process_msg (&stream_engine_t::process_identity_msg),
|
||||||
io_error (false),
|
io_error (false),
|
||||||
subscription_required (false),
|
subscription_required (false),
|
||||||
mechanism (NULL),
|
mechanism (NULL),
|
||||||
@ -185,14 +185,14 @@ void zmq::stream_engine_t::plug (io_thread_t *io_thread_,
|
|||||||
// disable handshaking for raw socket
|
// disable handshaking for raw socket
|
||||||
handshaking = false;
|
handshaking = false;
|
||||||
|
|
||||||
read_msg = &stream_engine_t::pull_msg_from_session;
|
next_msg = &stream_engine_t::pull_msg_from_session;
|
||||||
write_msg = &stream_engine_t::push_msg_to_session;
|
process_msg = &stream_engine_t::push_msg_to_session;
|
||||||
|
|
||||||
// For raw sockets, send an initial 0-length message to the
|
// For raw sockets, send an initial 0-length message to the
|
||||||
// application so that it knows a peer has connected.
|
// application so that it knows a peer has connected.
|
||||||
msg_t connector;
|
msg_t connector;
|
||||||
connector.init();
|
connector.init();
|
||||||
(this->*write_msg) (&connector);
|
push_msg_to_session (&connector);
|
||||||
connector.close();
|
connector.close();
|
||||||
session->flush ();
|
session->flush ();
|
||||||
}
|
}
|
||||||
@ -286,7 +286,7 @@ void zmq::stream_engine_t::in_event ()
|
|||||||
insize -= processed;
|
insize -= processed;
|
||||||
if (rc == 0 || rc == -1)
|
if (rc == 0 || rc == -1)
|
||||||
break;
|
break;
|
||||||
rc = (this->*write_msg) (decoder->msg ());
|
rc = (this->*process_msg) (decoder->msg ());
|
||||||
if (rc == -1)
|
if (rc == -1)
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@ -324,7 +324,7 @@ void zmq::stream_engine_t::out_event ()
|
|||||||
outsize = encoder->encode (&outpos, 0);
|
outsize = encoder->encode (&outpos, 0);
|
||||||
|
|
||||||
while (outsize < out_batch_size) {
|
while (outsize < out_batch_size) {
|
||||||
if ((this->*read_msg) (&tx_msg) == -1)
|
if ((this->*next_msg) (&tx_msg) == -1)
|
||||||
break;
|
break;
|
||||||
encoder->load_msg (&tx_msg);
|
encoder->load_msg (&tx_msg);
|
||||||
unsigned char *bufptr = outpos + outsize;
|
unsigned char *bufptr = outpos + outsize;
|
||||||
@ -391,7 +391,7 @@ void zmq::stream_engine_t::restart_input ()
|
|||||||
zmq_assert (session != NULL);
|
zmq_assert (session != NULL);
|
||||||
zmq_assert (decoder != NULL);
|
zmq_assert (decoder != NULL);
|
||||||
|
|
||||||
int rc = (this->*write_msg) (decoder->msg ());
|
int rc = (this->*process_msg) (decoder->msg ());
|
||||||
if (rc == -1) {
|
if (rc == -1) {
|
||||||
if (errno == EAGAIN)
|
if (errno == EAGAIN)
|
||||||
session->flush ();
|
session->flush ();
|
||||||
@ -408,7 +408,7 @@ void zmq::stream_engine_t::restart_input ()
|
|||||||
insize -= processed;
|
insize -= processed;
|
||||||
if (rc == 0 || rc == -1)
|
if (rc == 0 || rc == -1)
|
||||||
break;
|
break;
|
||||||
rc = (this->*write_msg) (decoder->msg ());
|
rc = (this->*process_msg) (decoder->msg ());
|
||||||
if (rc == -1)
|
if (rc == -1)
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@ -550,10 +550,10 @@ bool zmq::stream_engine_t::handshake ()
|
|||||||
|
|
||||||
// We are sending our identity now and the next message
|
// We are sending our identity now and the next message
|
||||||
// will come from the socket.
|
// will come from the socket.
|
||||||
read_msg = &stream_engine_t::pull_msg_from_session;
|
next_msg = &stream_engine_t::pull_msg_from_session;
|
||||||
|
|
||||||
// We are expecting identity message.
|
// We are expecting identity message.
|
||||||
write_msg = &stream_engine_t::write_identity;
|
process_msg = &stream_engine_t::process_identity_msg;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
if (greeting_recv [revision_pos] == ZMTP_1_0) {
|
if (greeting_recv [revision_pos] == ZMTP_1_0) {
|
||||||
@ -619,8 +619,8 @@ bool zmq::stream_engine_t::handshake ()
|
|||||||
error ();
|
error ();
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
read_msg = &stream_engine_t::next_handshake_command;
|
next_msg = &stream_engine_t::next_handshake_command;
|
||||||
write_msg = &stream_engine_t::process_handshake_command;
|
process_msg = &stream_engine_t::process_handshake_command;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start polling for output if necessary.
|
// Start polling for output if necessary.
|
||||||
@ -634,17 +634,17 @@ bool zmq::stream_engine_t::handshake ()
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
int zmq::stream_engine_t::read_identity (msg_t *msg_)
|
int zmq::stream_engine_t::identity_msg (msg_t *msg_)
|
||||||
{
|
{
|
||||||
int rc = msg_->init_size (options.identity_size);
|
int rc = msg_->init_size (options.identity_size);
|
||||||
errno_assert (rc == 0);
|
errno_assert (rc == 0);
|
||||||
if (options.identity_size > 0)
|
if (options.identity_size > 0)
|
||||||
memcpy (msg_->data (), options.identity, options.identity_size);
|
memcpy (msg_->data (), options.identity, options.identity_size);
|
||||||
read_msg = &stream_engine_t::pull_msg_from_session;
|
next_msg = &stream_engine_t::pull_msg_from_session;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int zmq::stream_engine_t::write_identity (msg_t *msg_)
|
int zmq::stream_engine_t::process_identity_msg (msg_t *msg_)
|
||||||
{
|
{
|
||||||
if (options.recv_identity) {
|
if (options.recv_identity) {
|
||||||
msg_->set_flags (msg_t::identity);
|
msg_->set_flags (msg_t::identity);
|
||||||
@ -659,9 +659,9 @@ int zmq::stream_engine_t::write_identity (msg_t *msg_)
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (subscription_required)
|
if (subscription_required)
|
||||||
write_msg = &stream_engine_t::write_subscription_msg;
|
process_msg = &stream_engine_t::write_subscription_msg;
|
||||||
else
|
else
|
||||||
write_msg = &stream_engine_t::push_msg_to_session;
|
process_msg = &stream_engine_t::push_msg_to_session;
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
@ -734,8 +734,8 @@ void zmq::stream_engine_t::mechanism_ready ()
|
|||||||
session->flush ();
|
session->flush ();
|
||||||
}
|
}
|
||||||
|
|
||||||
read_msg = &stream_engine_t::pull_and_encode;
|
next_msg = &stream_engine_t::pull_and_encode;
|
||||||
write_msg = &stream_engine_t::write_credential;
|
process_msg = &stream_engine_t::write_credential;
|
||||||
|
|
||||||
// Compile metadata.
|
// Compile metadata.
|
||||||
typedef metadata_t::dict_t properties_t;
|
typedef metadata_t::dict_t properties_t;
|
||||||
@ -792,7 +792,7 @@ int zmq::stream_engine_t::write_credential (msg_t *msg_)
|
|||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
write_msg = &stream_engine_t::decode_and_push;
|
process_msg = &stream_engine_t::decode_and_push;
|
||||||
return decode_and_push (msg_);
|
return decode_and_push (msg_);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -817,7 +817,7 @@ int zmq::stream_engine_t::decode_and_push (msg_t *msg_)
|
|||||||
msg_->set_metadata (metadata);
|
msg_->set_metadata (metadata);
|
||||||
if (session->push_msg (msg_) == -1) {
|
if (session->push_msg (msg_) == -1) {
|
||||||
if (errno == EAGAIN)
|
if (errno == EAGAIN)
|
||||||
write_msg = &stream_engine_t::push_one_then_decode_and_push;
|
process_msg = &stream_engine_t::push_one_then_decode_and_push;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
@ -827,7 +827,7 @@ int zmq::stream_engine_t::push_one_then_decode_and_push (msg_t *msg_)
|
|||||||
{
|
{
|
||||||
const int rc = session->push_msg (msg_);
|
const int rc = session->push_msg (msg_);
|
||||||
if (rc == 0)
|
if (rc == 0)
|
||||||
write_msg = &stream_engine_t::decode_and_push;
|
process_msg = &stream_engine_t::decode_and_push;
|
||||||
return rc;
|
return rc;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -844,7 +844,7 @@ int zmq::stream_engine_t::write_subscription_msg (msg_t *msg_)
|
|||||||
if (rc == -1)
|
if (rc == -1)
|
||||||
return -1;
|
return -1;
|
||||||
|
|
||||||
write_msg = &stream_engine_t::push_msg_to_session;
|
process_msg = &stream_engine_t::push_msg_to_session;
|
||||||
return push_msg_to_session (msg_);
|
return push_msg_to_session (msg_);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -855,7 +855,7 @@ void zmq::stream_engine_t::error ()
|
|||||||
// so that it knows the peer has been disconnected.
|
// so that it knows the peer has been disconnected.
|
||||||
msg_t terminator;
|
msg_t terminator;
|
||||||
terminator.init();
|
terminator.init();
|
||||||
(this->*write_msg) (&terminator);
|
(this->*process_msg) (&terminator);
|
||||||
terminator.close();
|
terminator.close();
|
||||||
}
|
}
|
||||||
zmq_assert (session);
|
zmq_assert (session);
|
||||||
|
@ -93,8 +93,8 @@ namespace zmq
|
|||||||
// Zero indicates the peer has closed the connection.
|
// Zero indicates the peer has closed the connection.
|
||||||
int read (void *data_, size_t size_);
|
int read (void *data_, size_t size_);
|
||||||
|
|
||||||
int read_identity (msg_t *msg_);
|
int identity_msg (msg_t *msg_);
|
||||||
int write_identity (msg_t *msg_);
|
int process_identity_msg (msg_t *msg_);
|
||||||
|
|
||||||
int next_handshake_command (msg_t *msg);
|
int next_handshake_command (msg_t *msg);
|
||||||
int process_handshake_command (msg_t *msg);
|
int process_handshake_command (msg_t *msg);
|
||||||
@ -168,9 +168,9 @@ namespace zmq
|
|||||||
|
|
||||||
bool plugged;
|
bool plugged;
|
||||||
|
|
||||||
int (stream_engine_t::*read_msg) (msg_t *msg_);
|
int (stream_engine_t::*next_msg) (msg_t *msg_);
|
||||||
|
|
||||||
int (stream_engine_t::*write_msg) (msg_t *msg_);
|
int (stream_engine_t::*process_msg) (msg_t *msg_);
|
||||||
|
|
||||||
bool io_error;
|
bool io_error;
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user