0
0
mirror of https://github.com/zeromq/libzmq.git synced 2025-01-15 10:18:01 +08:00

STREAM socket support for limited metadata

WIP - STREAM socket support for limited metadata

STREAM socket support for limited metadata
This commit is contained in:
Thomas Rodgers 2015-02-18 09:35:04 -06:00
parent ea3b9c28ae
commit 5afd4e16ed
5 changed files with 46 additions and 17 deletions

View File

@ -484,9 +484,6 @@ test_bind_src_address_LDADD = libzmq.la
test_metadata_SOURCES = tests/test_metadata.cpp
test_metadata_LDADD = libzmq.la
test_id2fd_SOURCES = tests/test_id2fd.cpp
test_id2fd_LDADD = libzmq.la
test_capabilities_SOURCES = tests/test_capabilities.cpp
test_capabilities_LDADD = libzmq.la

View File

@ -208,9 +208,16 @@ int zmq::stream_t::xrecv (msg_t *msg_)
// We have received a frame with TCP data.
// Rather than sendig this frame, we keep it in prefetched
// buffer and send a frame with peer's ID.
blob_t identity = pipe->get_identity ();
rc = msg_->init_size (identity.size ());
errno_assert (rc == 0);
// forward metadata (if any)
metadata_t *metadata = prefetched_msg.metadata();
if (metadata)
msg_->set_metadata(metadata);
memcpy (msg_->data (), identity.data (), identity.size ());
msg_->set_flags (msg_t::more);
@ -267,7 +274,7 @@ void zmq::stream_t::identify_peer (pipe_t *pipe_)
connect_rid.length ());
connect_rid.clear ();
outpipes_t::iterator it = outpipes.find (identity);
if (it != outpipes.end ())
if (it != outpipes.end ())
zmq_assert(false);
}
else {

View File

@ -36,7 +36,6 @@
#include <string.h>
#include <new>
#include <sstream>
#include <iostream>
#include "stream_engine.hpp"
#include "io_thread.hpp"
@ -61,6 +60,8 @@
#include "likely.hpp"
#include "wire.hpp"
#include <iostream>
zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_,
const std::string &endpoint_) :
s (fd_),
@ -192,14 +193,24 @@ void zmq::stream_engine_t::plug (io_thread_t *io_thread_,
handshaking = false;
next_msg = &stream_engine_t::pull_msg_from_session;
process_msg = &stream_engine_t::push_msg_to_session;
process_msg = &stream_engine_t::push_raw_msg_to_session;
if (!peer_address.empty()) {
// Compile metadata.
typedef metadata_t::dict_t properties_t;
properties_t properties;
properties.insert(std::make_pair("Peer-Address", peer_address));
zmq_assert (metadata == NULL);
metadata = new (std::nothrow) metadata_t (properties);
}
// For raw sockets, send an initial 0-length message to the
// application so that it knows a peer has connected.
msg_t connector;
connector.init();
if (metadata)
connector.set_metadata(metadata);
push_msg_to_session (&connector);
connector.close();
session->flush ();
}
else {
@ -792,7 +803,6 @@ void zmq::stream_engine_t::mechanism_ready ()
// Compile metadata.
typedef metadata_t::dict_t properties_t;
properties_t properties;
properties_t::const_iterator it;
// If we have a peer_address, add it to metadata
if (!peer_address.empty()) {
@ -822,6 +832,12 @@ int zmq::stream_engine_t::push_msg_to_session (msg_t *msg_)
return session->push_msg (msg_);
}
int zmq::stream_engine_t::push_raw_msg_to_session (msg_t *msg_) {
if (metadata)
msg_->set_metadata(metadata);
return push_msg_to_session(msg_);
}
int zmq::stream_engine_t::write_credential (msg_t *msg_)
{
zmq_assert (mechanism != NULL);

View File

@ -59,7 +59,7 @@ namespace zmq
timeout_error
};
stream_engine_t (fd_t fd_, const options_t &options_,
stream_engine_t (fd_t fd_, const options_t &options_,
const std::string &endpoint);
~stream_engine_t ();
@ -99,6 +99,8 @@ namespace zmq
int pull_msg_from_session (msg_t *msg_);
int push_msg_to_session (msg_t *msg);
int push_raw_msg_to_session (msg_t *msg);
int write_credential (msg_t *msg_);
int pull_and_encode (msg_t *msg_);
int decode_and_push (msg_t *msg_);

View File

@ -74,6 +74,10 @@ test_stream_to_dealer (void)
rc = zmq_msg_init (&identity);
assert (rc == 0);
rc = zmq_msg_recv (&identity, stream, 0);
// Verify the existence of Peer-Address metadata
assert (streq (zmq_msg_gets (&identity, "Peer-Address"), "127.0.0.1"));
assert (rc > 0);
assert (zmq_msg_more (&identity));
@ -81,13 +85,16 @@ test_stream_to_dealer (void)
byte buffer [255];
rc = zmq_recv (stream, buffer, 255, 0);
assert (rc == 0);
// Real data follows
// First frame is identity
rc = zmq_msg_recv (&identity, stream, 0);
assert (rc > 0);
assert (zmq_msg_more (&identity));
// Verify the existence of Peer-Address metadata
assert (streq (zmq_msg_gets (&identity, "Peer-Address"), "127.0.0.1"));
// Second frame is greeting signature
rc = zmq_recv (stream, buffer, 255, 0);
assert (rc == 10);
@ -179,7 +186,7 @@ test_stream_to_stream (void)
// Set-up our context and sockets
void *ctx = zmq_ctx_new ();
assert (ctx);
void *server = zmq_socket (ctx, ZMQ_STREAM);
assert (server);
rc = zmq_bind (server, "tcp://127.0.0.1:9070");
@ -192,7 +199,7 @@ test_stream_to_stream (void)
uint8_t id [256];
size_t id_size = 256;
uint8_t buffer [256];
// Connecting sends a zero message
// Server: First frame is identity, second frame is zero
id_size = zmq_recv (server, id, 256, 0);
@ -215,19 +222,19 @@ test_stream_to_stream (void)
// Second frame is HTTP GET request
rc = zmq_send (client, "GET /\n\n", 7, 0);
assert (rc == 7);
// Get HTTP request; ID frame and then request
id_size = zmq_recv (server, id, 256, 0);
assert (id_size > 0);
rc = zmq_recv (server, buffer, 256, 0);
assert (rc != -1);
assert (memcmp (buffer, "GET /\n\n", 7) == 0);
// Send reply back to client
char http_response [] =
"HTTP/1.0 200 OK\r\n"
"Content-Type: text/plain\r\n"
"\r\n"
"HTTP/1.0 200 OK\r\n"
"Content-Type: text/plain\r\n"
"\r\n"
"Hello, World!";
rc = zmq_send (server, id, id_size, ZMQ_SNDMORE);
assert (rc != -1);