diff --git a/src/null_mechanism.cpp b/src/null_mechanism.cpp index 070f76eb..de546de4 100644 --- a/src/null_mechanism.cpp +++ b/src/null_mechanism.cpp @@ -28,13 +28,25 @@ #include "err.hpp" #include "msg.hpp" +#include "session_base.hpp" #include "wire.hpp" #include "null_mechanism.hpp" -zmq::null_mechanism_t::null_mechanism_t (const options_t &options_) : mechanism_t (options_), - ready_command_sent (false), - ready_command_received (false) +zmq::null_mechanism_t::null_mechanism_t (session_base_t *session_, + const std::string &peer_address_, + const options_t &options_) : + mechanism_t (options_), + session (session_), + peer_address (peer_address_), + ready_command_sent (false), + ready_command_received (false), + zap_connected (false), + zap_request_sent (false), + zap_reply_received (false) { + const int rc = session->zap_connect (); + if (rc == 0) + zap_connected = true; } zmq::null_mechanism_t::~null_mechanism_t () @@ -47,6 +59,18 @@ int zmq::null_mechanism_t::next_handshake_message (msg_t *msg_) errno = EAGAIN; return -1; } + if (zap_connected && !zap_reply_received) { + if (zap_request_sent) { + errno = EAGAIN; + return -1; + } + send_zap_request (); + zap_request_sent = true; + const int rc = receive_and_process_zap_reply (); + if (rc != 0) + return -1; + zap_reply_received = true; + } unsigned char * const command_buffer = (unsigned char *) malloc (512); alloc_assert (command_buffer); @@ -112,7 +136,132 @@ int zmq::null_mechanism_t::process_handshake_message (msg_t *msg_) return rc; } +int zmq::null_mechanism_t::zap_msg_available () +{ + if (zap_reply_received) { + errno = EFSM; + return -1; + } + const int rc = receive_and_process_zap_reply (); + if (rc == 0) + zap_reply_received = true; + return rc; +} + bool zmq::null_mechanism_t::is_handshake_complete () const { return ready_command_received && ready_command_sent; } + +void zmq::null_mechanism_t::send_zap_request () +{ + int rc; + msg_t msg; + + // Address delimiter frame + rc = msg.init (); + errno_assert (rc == 0); + msg.set_flags (msg_t::more); + rc = session->write_zap_msg (&msg); + errno_assert (rc == 0); + + // Version frame + rc = msg.init_size (3); + errno_assert (rc == 0); + memcpy (msg.data (), "1.0", 3); + msg.set_flags (msg_t::more); + rc = session->write_zap_msg (&msg); + errno_assert (rc == 0); + + // Request id frame + rc = msg.init_size (1); + errno_assert (rc == 0); + memcpy (msg.data (), "1", 1); + msg.set_flags (msg_t::more); + rc = session->write_zap_msg (&msg); + errno_assert (rc == 0); + + // Domain frame + rc = msg.init (); + errno_assert (rc == 0); + msg.set_flags (msg_t::more); + rc = session->write_zap_msg (&msg); + errno_assert (rc == 0); + + // Address frame + rc = msg.init_size (peer_address.length ()); + errno_assert (rc == 0); + memcpy (msg.data (), peer_address.c_str (), peer_address.length ()); + msg.set_flags (msg_t::more); + rc = session->write_zap_msg (&msg); + errno_assert (rc == 0); + + // Mechanism frame + rc = msg.init_size (5); + errno_assert (rc == 0); + memcpy (msg.data (), "NULL", 5); + rc = session->write_zap_msg (&msg); + errno_assert (rc == 0); +} + +int zmq::null_mechanism_t::receive_and_process_zap_reply () +{ + int rc = 0; + msg_t msg [7]; // ZAP reply consists of 7 frames + + // Initialize all reply frames + for (int i = 0; i < 7; i++) { + rc = msg [i].init (); + errno_assert (rc == 0); + } + + for (int i = 0; i < 7; i++) { + rc = session->read_zap_msg (&msg [i]); + if (rc == -1) + break; + if ((msg [i].flags () & msg_t::more) == (i < 6? 0: msg_t::more)) { + errno = EPROTO; + rc = -1; + break; + } + } + + if (rc != 0) + goto error; + + // Address delimiter frame + if (msg [0].size () > 0) { + errno = EPROTO; + goto error; + } + + // Version frame + if (msg [1].size () != 3 || memcmp (msg [1].data (), "1.0", 3)) { + errno = EPROTO; + goto error; + } + + // Request id frame + if (msg [2].size () != 1 || memcmp (msg [2].data (), "1", 1)) { + errno = EPROTO; + goto error; + } + + // Status code frame + if (msg [3].size () != 3 || memcmp (msg [3].data (), "200", 3)) { + errno = EACCES; + goto error; + } + + // Process metadata frame + rc = parse_metadata (static_cast (msg [6].data ()), + msg [6].size ()); + +error: + for (int i = 0; i < 7; i++) { + const int rc2 = msg [i].close (); + errno_assert (rc2 == 0); + } + + return rc; +} diff --git a/src/null_mechanism.hpp b/src/null_mechanism.hpp index 2c5d9c27..cf98d172 100644 --- a/src/null_mechanism.hpp +++ b/src/null_mechanism.hpp @@ -27,23 +27,37 @@ namespace zmq { class msg_t; + class session_base_t; class null_mechanism_t : public mechanism_t { public: - null_mechanism_t (const options_t &options_); + null_mechanism_t (session_base_t *session_, + const std::string &peer_address, + const options_t &options_); virtual ~null_mechanism_t (); // mechanism implementation virtual int next_handshake_message (msg_t *msg_); virtual int process_handshake_message (msg_t *msg_); + virtual int zap_msg_available (); virtual bool is_handshake_complete () const; private: + session_base_t * const session; + + const std::string peer_address; + bool ready_command_sent; bool ready_command_received; + bool zap_connected; + bool zap_request_sent; + bool zap_reply_received; + + void send_zap_request (); + int receive_and_process_zap_reply (); }; } diff --git a/src/stream_engine.cpp b/src/stream_engine.cpp index 5392262e..5900b8e7 100644 --- a/src/stream_engine.cpp +++ b/src/stream_engine.cpp @@ -525,7 +525,8 @@ bool zmq::stream_engine_t::handshake () alloc_assert (decoder); if (memcmp (greeting_recv + 12, "NULL\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20) == 0) { - mechanism = new (std::nothrow) null_mechanism_t (options); + mechanism = new (std::nothrow) + null_mechanism_t (session, peer_address, options); alloc_assert (mechanism); } else