diff --git a/src/stream.cpp b/src/stream.cpp index be53d9de..7b88d4e2 100644 --- a/src/stream.cpp +++ b/src/stream.cpp @@ -176,7 +176,7 @@ int zmq::stream_t::xsetsockopt (int option_, const void *optval_, return 0; } break; - + case ZMQ_STREAM_NOTIFY: if (is_int && (value == 0 || value == 1)) { options.raw_notify = (value != 0); @@ -221,6 +221,12 @@ int zmq::stream_t::xrecv (msg_t *msg_) blob_t identity = pipe->get_identity (); rc = msg_->init_size (identity.size ()); errno_assert (rc == 0); + + // forward metadata (if any) + metadata_t *metadata = prefetched_msg.metadata(); + if (metadata) + msg_->set_metadata(metadata); + memcpy (msg_->data (), identity.data (), identity.size ()); msg_->set_flags (msg_t::more); @@ -277,7 +283,7 @@ void zmq::stream_t::identify_peer (pipe_t *pipe_) connect_rid.length ()); connect_rid.clear (); outpipes_t::iterator it = outpipes.find (identity); - if (it != outpipes.end ()) + if (it != outpipes.end ()) zmq_assert(false); } else { diff --git a/src/stream_engine.cpp b/src/stream_engine.cpp index 82f5ad0a..800476b0 100644 --- a/src/stream_engine.cpp +++ b/src/stream_engine.cpp @@ -192,14 +192,23 @@ void zmq::stream_engine_t::plug (io_thread_t *io_thread_, handshaking = false; next_msg = &stream_engine_t::pull_msg_from_session; - process_msg = &stream_engine_t::push_msg_to_session; + process_msg = &stream_engine_t::push_raw_msg_to_session; + + if (!peer_address.empty()) { + // Compile metadata. + typedef metadata_t::dict_t properties_t; + properties_t properties; + properties.insert(std::make_pair("Peer-Address", peer_address)); + zmq_assert (metadata == NULL); + metadata = new (std::nothrow) metadata_t (properties); + } if (options.raw_notify) { // For raw sockets, send an initial 0-length message to the // application so that it knows a peer has connected. msg_t connector; connector.init(); - push_msg_to_session (&connector); + push_raw_msg_to_session (&connector); connector.close(); session->flush (); } @@ -835,6 +844,12 @@ int zmq::stream_engine_t::push_msg_to_session (msg_t *msg_) return session->push_msg (msg_); } +int zmq::stream_engine_t::push_raw_msg_to_session (msg_t *msg_) { + if (metadata) + msg_->set_metadata(metadata); + return push_msg_to_session(msg_); +} + int zmq::stream_engine_t::write_credential (msg_t *msg_) { zmq_assert (mechanism != NULL); diff --git a/src/stream_engine.hpp b/src/stream_engine.hpp index 140f13a6..fd73bd60 100644 --- a/src/stream_engine.hpp +++ b/src/stream_engine.hpp @@ -59,7 +59,7 @@ namespace zmq timeout_error }; - stream_engine_t (fd_t fd_, const options_t &options_, + stream_engine_t (fd_t fd_, const options_t &options_, const std::string &endpoint); ~stream_engine_t (); @@ -99,6 +99,8 @@ namespace zmq int pull_msg_from_session (msg_t *msg_); int push_msg_to_session (msg_t *msg); + int push_raw_msg_to_session (msg_t *msg); + int write_credential (msg_t *msg_); int pull_and_encode (msg_t *msg_); int decode_and_push (msg_t *msg_); diff --git a/tests/test_stream.cpp b/tests/test_stream.cpp index 20a03ca5..862a3f1e 100644 --- a/tests/test_stream.cpp +++ b/tests/test_stream.cpp @@ -80,11 +80,14 @@ test_stream_to_dealer (void) assert (rc > 0); assert (zmq_msg_more (&identity)); + // Verify the existence of Peer-Address metadata + assert (streq (zmq_msg_gets (&identity, "Peer-Address"), "127.0.0.1")); + // Second frame is zero byte buffer [255]; rc = zmq_recv (stream, buffer, 255, 0); assert (rc == 0); - + // Real data follows // First frame is identity rc = zmq_msg_recv (&identity, stream, 0); @@ -92,6 +95,9 @@ test_stream_to_dealer (void) assert (zmq_msg_more (&identity)); // Second frame is greeting signature + // Verify the existence of Peer-Address metadata + assert (streq (zmq_msg_gets (&identity, "Peer-Address"), "127.0.0.1")); + rc = zmq_recv (stream, buffer, 255, 0); assert (rc == 10); assert (memcmp (buffer, greeting.signature, 10) == 0); @@ -182,7 +188,7 @@ test_stream_to_stream (void) // Set-up our context and sockets void *ctx = zmq_ctx_new (); assert (ctx); - + void *server = zmq_socket (ctx, ZMQ_STREAM); assert (server); int enabled = 1; @@ -200,7 +206,7 @@ test_stream_to_stream (void) uint8_t id [256]; size_t id_size = 256; uint8_t buffer [256]; - + // Connecting sends a zero message // Server: First frame is identity, second frame is zero id_size = zmq_recv (server, id, 256, 0); @@ -223,19 +229,19 @@ test_stream_to_stream (void) // Second frame is HTTP GET request rc = zmq_send (client, "GET /\n\n", 7, 0); assert (rc == 7); - + // Get HTTP request; ID frame and then request id_size = zmq_recv (server, id, 256, 0); assert (id_size > 0); rc = zmq_recv (server, buffer, 256, 0); assert (rc != -1); assert (memcmp (buffer, "GET /\n\n", 7) == 0); - + // Send reply back to client char http_response [] = - "HTTP/1.0 200 OK\r\n" - "Content-Type: text/plain\r\n" - "\r\n" + "HTTP/1.0 200 OK\r\n" + "Content-Type: text/plain\r\n" + "\r\n" "Hello, World!"; rc = zmq_send (server, id, id_size, ZMQ_SNDMORE); assert (rc != -1);