diff --git a/src/command.cpp b/src/command.cpp index fcb5729b..7564fe20 100644 --- a/src/command.cpp +++ b/src/command.cpp @@ -17,8 +17,18 @@ along with this program. If not, see . */ +#include + #include "command.hpp" void zmq::deallocate_command (command_t *cmd_) { + switch (cmd_->type) { + case command_t::attach: + if (cmd_->args.attach.peer_identity) + free (cmd_->args.attach.peer_identity); + break; + default: + /* noop */; + } } diff --git a/src/command.hpp b/src/command.hpp index 976285e1..6187b72d 100644 --- a/src/command.hpp +++ b/src/command.hpp @@ -66,6 +66,8 @@ namespace zmq // Attach the engine to the session. struct { struct i_engine *engine; + unsigned char peer_identity_size; + unsigned char *peer_identity; } attach; // Sent from session to socket to establish pipe(s) between them. diff --git a/src/object.cpp b/src/object.cpp index faa922e8..73a17a37 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -17,6 +17,8 @@ along with this program. If not, see . */ +#include + #include "object.hpp" #include "dispatcher.hpp" #include "err.hpp" @@ -80,7 +82,9 @@ void zmq::object_t::process_command (command_t &cmd_) break; case command_t::attach: - process_attach (cmd_.args.attach.engine); + process_attach (cmd_.args.attach.engine, + cmd_.args.attach.peer_identity_size, + cmd_.args.attach.peer_identity); process_seqnum (); break; @@ -180,6 +184,7 @@ void zmq::object_t::send_own (socket_base_t *destination_, owned_t *object_) } void zmq::object_t::send_attach (session_t *destination_, i_engine *engine_, + unsigned char peer_identity_size_, unsigned char *peer_identity_, bool inc_seqnum_) { if (inc_seqnum_) @@ -189,6 +194,18 @@ void zmq::object_t::send_attach (session_t *destination_, i_engine *engine_, cmd.destination = destination_; cmd.type = command_t::attach; cmd.args.attach.engine = engine_; + if (!peer_identity_size_) { + cmd.args.attach.peer_identity_size = 0; + cmd.args.attach.peer_identity = NULL; + } + else { + cmd.args.attach.peer_identity_size = peer_identity_size_; + cmd.args.attach.peer_identity = + (unsigned char*) malloc (peer_identity_size_); + zmq_assert (cmd.args.attach.peer_identity_size); + memcpy (cmd.args.attach.peer_identity, peer_identity_, + peer_identity_size_); + } send_command (cmd); } @@ -271,7 +288,8 @@ void zmq::object_t::process_own (owned_t *object_) zmq_assert (false); } -void zmq::object_t::process_attach (i_engine *engine_) +void zmq::object_t::process_attach (i_engine *engine_, + unsigned char peer_identity_size_, unsigned char *peer_identity_) { zmq_assert (false); } diff --git a/src/object.hpp b/src/object.hpp index e6b23790..4c82a0d8 100644 --- a/src/object.hpp +++ b/src/object.hpp @@ -64,7 +64,8 @@ namespace zmq void send_own (class socket_base_t *destination_, class owned_t *object_); void send_attach (class session_t *destination_, - struct i_engine *engine_, bool inc_seqnum_ = true); + struct i_engine *engine_, unsigned char peer_identity_size_, + unsigned char *peer_identity_, bool inc_seqnum_ = true); void send_bind (class socket_base_t *destination_, class reader_t *in_pipe_, class writer_t *out_pipe_, bool inc_seqnum_ = true); @@ -81,7 +82,8 @@ namespace zmq virtual void process_stop (); virtual void process_plug (); virtual void process_own (class owned_t *object_); - virtual void process_attach (struct i_engine *engine_); + virtual void process_attach (struct i_engine *engine_, + unsigned char peer_identity_size_, unsigned char *peer_identity_); virtual void process_bind (class reader_t *in_pipe_, class writer_t *out_pipe_); virtual void process_revive (); diff --git a/src/session.cpp b/src/session.cpp index 1aece4da..07971e17 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -232,7 +232,8 @@ void zmq::session_t::process_unplug () } } -void zmq::session_t::process_attach (i_engine *engine_) +void zmq::session_t::process_attach (i_engine *engine_, + unsigned char peer_identity_size_, unsigned char *peer_identity_) { zmq_assert (!engine); zmq_assert (engine_); diff --git a/src/session.hpp b/src/session.hpp index 375d095a..2c6b4621 100644 --- a/src/session.hpp +++ b/src/session.hpp @@ -66,7 +66,8 @@ namespace zmq // Handlers for incoming commands. void process_plug (); void process_unplug (); - void process_attach (struct i_engine *engine_); + void process_attach (struct i_engine *engine_, + unsigned char peer_identity_size_, unsigned char *peer_identity_); // Inbound pipe, i.e. one the session is getting messages from. class reader_t *in_pipe; diff --git a/src/zmq_init.cpp b/src/zmq_init.cpp index b49baa9d..6baa88f8 100644 --- a/src/zmq_init.cpp +++ b/src/zmq_init.cpp @@ -192,7 +192,8 @@ void zmq::zmq_init_t::finalise () } // No need to increment seqnum as it was laready incremented above. - send_attach (session, engine, false); + send_attach (session, engine, (unsigned char) peer_identity.size (), + (unsigned char*) peer_identity.data (), false); // Destroy the init object. engine = NULL;