From 8d82cc2a0b91a4883e3643f7f22dee16d5d4978f Mon Sep 17 00:00:00 2001 From: Martin Hurton Date: Fri, 2 May 2014 12:33:26 +0200 Subject: [PATCH] Include ZMTP properties in message metadata Metadata are built in stream_engine now. This makes it easy to extend metadata with user-defined properties. --- src/mechanism.cpp | 22 ++++++++-------------- src/mechanism.hpp | 16 ++++++++++++---- src/metadata.cpp | 2 +- src/metadata.hpp | 5 +++-- src/stream_engine.cpp | 33 ++++++++++++++++++++++++++++++++- src/stream_engine.hpp | 4 ++++ 6 files changed, 60 insertions(+), 22 deletions(-) diff --git a/src/mechanism.cpp b/src/mechanism.cpp index 17920e3d..06247add 100644 --- a/src/mechanism.cpp +++ b/src/mechanism.cpp @@ -18,7 +18,6 @@ */ #include -#include #include "mechanism.hpp" #include "options.hpp" @@ -27,16 +26,12 @@ #include "wire.hpp" zmq::mechanism_t::mechanism_t (const options_t &options_) : - metadata (NULL), options (options_) { } zmq::mechanism_t::~mechanism_t () { - if (metadata != NULL) - if (metadata->drop_ref ()) - delete metadata; } void zmq::mechanism_t::set_peer_identity (const void *id_ptr, size_t id_size) @@ -90,7 +85,6 @@ size_t zmq::mechanism_t::add_property (unsigned char *ptr, const char *name, int zmq::mechanism_t::parse_metadata (const unsigned char *ptr_, size_t length_, bool zap_flag) { - std::map dict; size_t bytes_left = length_; while (bytes_left > 1) { @@ -132,19 +126,19 @@ int zmq::mechanism_t::parse_metadata (const unsigned char *ptr_, return -1; } - dict.insert ( - std::map ::value_type ( - name, std::string ((char *) value, value_length))); + if (zap_flag) + zap_properties.insert ( + metadata_t::dict_t::value_type ( + name, std::string ((char *) value, value_length))); + else + zmtp_properties.insert ( + metadata_t::dict_t::value_type ( + name, std::string ((char *) value, value_length))); } if (bytes_left > 0) { errno = EPROTO; return -1; } - if (zap_flag) { - assert (metadata == NULL); - if (!dict.empty ()) - metadata = new (std::nothrow) metadata_t (dict); - } return 0; } diff --git a/src/mechanism.hpp b/src/mechanism.hpp index 3b3f5b7c..8c8e3ad5 100644 --- a/src/mechanism.hpp +++ b/src/mechanism.hpp @@ -65,7 +65,13 @@ namespace zmq blob_t get_user_id () const; - metadata_t *get_metadata () { return metadata; } + const metadata_t::dict_t& get_zmtp_properties () { + return zmtp_properties; + } + + const metadata_t::dict_t& get_zap_properties () { + return zap_properties; + } protected: @@ -93,9 +99,11 @@ namespace zmq virtual int property (const std::string& name_, const void *value_, size_t length_); - // Metadada as returned by ZAP protocol. - // NULL if no metadata received. - metadata_t *metadata; + // Properties received from ZMTP peer. + metadata_t::dict_t zmtp_properties; + + // Properties received from ZAP server. + metadata_t::dict_t zap_properties; options_t options; diff --git a/src/metadata.cpp b/src/metadata.cpp index 357d87cb..db8f0940 100644 --- a/src/metadata.cpp +++ b/src/metadata.cpp @@ -19,7 +19,7 @@ #include "metadata.hpp" -zmq::metadata_t::metadata_t (dict_t &dict) : +zmq::metadata_t::metadata_t (const dict_t &dict) : ref_cnt (1), dict (dict) { diff --git a/src/metadata.hpp b/src/metadata.hpp index 72ab6fa9..f00149ae 100644 --- a/src/metadata.hpp +++ b/src/metadata.hpp @@ -31,7 +31,9 @@ namespace zmq { public: - metadata_t (std::map &dict); + typedef std::map dict_t; + + metadata_t (const dict_t &dict); virtual ~metadata_t (); // Returns pointer to property value or NULL if @@ -50,7 +52,6 @@ namespace zmq atomic_counter_t ref_cnt; // Dictionary holding metadata. - typedef std::map dict_t; const dict_t dict; }; diff --git a/src/stream_engine.cpp b/src/stream_engine.cpp index aac93da6..1d5c62e0 100644 --- a/src/stream_engine.cpp +++ b/src/stream_engine.cpp @@ -64,6 +64,7 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_, outpos (NULL), outsize (0), encoder (NULL), + metadata (NULL), handshaking (true), greeting_size (v2_greeting_size), greeting_bytes_read (0), @@ -144,6 +145,12 @@ zmq::stream_engine_t::~stream_engine_t () int rc = tx_msg.close (); errno_assert (rc == 0); + // Drop reference to metadata and destroy it if we are + // the only user. + if (metadata != NULL) + if (metadata->drop_ref ()) + delete metadata; + delete encoder; delete decoder; delete mechanism; @@ -728,6 +735,31 @@ void zmq::stream_engine_t::mechanism_ready () read_msg = &stream_engine_t::pull_and_encode; write_msg = &stream_engine_t::write_credential; + + // Compile metadata. + typedef metadata_t::dict_t properties_t; + properties_t properties; + properties_t::const_iterator it; + + // Add ZAP properties. + const properties_t& zap_properties = mechanism->get_zap_properties (); + it = zap_properties.begin (); + while (it != zap_properties.end ()) { + properties.insert (properties_t::value_type (it->first, it->second)); + it++; + } + + // Add ZMTP properties. + const properties_t& zmtp_properties = mechanism->get_zmtp_properties (); + it = zmtp_properties.begin (); + while (it != zmtp_properties.end ()) { + properties.insert (properties_t::value_type (it->first, it->second)); + it++; + } + + zmq_assert (metadata == NULL); + if (!properties.empty ()) + metadata = new (std::nothrow) metadata_t (properties); } int zmq::stream_engine_t::pull_msg_from_session (msg_t *msg_) @@ -780,7 +812,6 @@ int zmq::stream_engine_t::decode_and_push (msg_t *msg_) if (mechanism->decode (msg_) == -1) return -1; - metadata_t *metadata = mechanism->get_metadata (); if (metadata) msg_->set_properties (metadata); if (session->push_msg (msg_) == -1) { diff --git a/src/stream_engine.hpp b/src/stream_engine.hpp index 4e9e7915..552bcd8d 100644 --- a/src/stream_engine.hpp +++ b/src/stream_engine.hpp @@ -30,6 +30,7 @@ #include "options.hpp" #include "socket_base.hpp" #include "../include/zmq.h" +#include "metadata.hpp" namespace zmq { @@ -131,6 +132,9 @@ namespace zmq size_t outsize; i_encoder *encoder; + // Metadata to be attached to received messages. May be NULL. + metadata_t *metadata; + // When true, we are still trying to determine whether // the peer is using versioned protocol, and if so, which // version. When false, normal message flow has started.