mirror of
https://github.com/zeromq/libzmq.git
synced 2025-01-15 10:18:01 +08:00
Backport resolve #1357 Support limited metadata for STREAM sockets
This commit is contained in:
parent
9fe4fb3929
commit
d172875f52
@ -211,6 +211,12 @@ int zmq::stream_t::xrecv (msg_t *msg_)
|
|||||||
blob_t identity = pipe->get_identity ();
|
blob_t identity = pipe->get_identity ();
|
||||||
rc = msg_->init_size (identity.size ());
|
rc = msg_->init_size (identity.size ());
|
||||||
errno_assert (rc == 0);
|
errno_assert (rc == 0);
|
||||||
|
|
||||||
|
// forward metadata (if any)
|
||||||
|
metadata_t *metadata = prefetched_msg.metadata();
|
||||||
|
if (metadata)
|
||||||
|
msg_->set_metadata(metadata);
|
||||||
|
|
||||||
memcpy (msg_->data (), identity.data (), identity.size ());
|
memcpy (msg_->data (), identity.data (), identity.size ());
|
||||||
msg_->set_flags (msg_t::more);
|
msg_->set_flags (msg_t::more);
|
||||||
|
|
||||||
@ -267,7 +273,7 @@ void zmq::stream_t::identify_peer (pipe_t *pipe_)
|
|||||||
connect_rid.length ());
|
connect_rid.length ());
|
||||||
connect_rid.clear ();
|
connect_rid.clear ();
|
||||||
outpipes_t::iterator it = outpipes.find (identity);
|
outpipes_t::iterator it = outpipes.find (identity);
|
||||||
if (it != outpipes.end ())
|
if (it != outpipes.end ())
|
||||||
zmq_assert(false);
|
zmq_assert(false);
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
|
@ -192,13 +192,22 @@ void zmq::stream_engine_t::plug (io_thread_t *io_thread_,
|
|||||||
handshaking = false;
|
handshaking = false;
|
||||||
|
|
||||||
next_msg = &stream_engine_t::pull_msg_from_session;
|
next_msg = &stream_engine_t::pull_msg_from_session;
|
||||||
process_msg = &stream_engine_t::push_msg_to_session;
|
process_msg = &stream_engine_t::push_raw_msg_to_session;
|
||||||
|
|
||||||
|
if (!peer_address.empty()) {
|
||||||
|
// Compile metadata.
|
||||||
|
typedef metadata_t::dict_t properties_t;
|
||||||
|
properties_t properties;
|
||||||
|
properties.insert(std::make_pair("Peer-Address", peer_address));
|
||||||
|
zmq_assert (metadata == NULL);
|
||||||
|
metadata = new (std::nothrow) metadata_t (properties);
|
||||||
|
}
|
||||||
|
|
||||||
// For raw sockets, send an initial 0-length message to the
|
// For raw sockets, send an initial 0-length message to the
|
||||||
// application so that it knows a peer has connected.
|
// application so that it knows a peer has connected.
|
||||||
msg_t connector;
|
msg_t connector;
|
||||||
connector.init();
|
connector.init();
|
||||||
push_msg_to_session (&connector);
|
push_raw_msg_to_session (&connector);
|
||||||
connector.close();
|
connector.close();
|
||||||
session->flush ();
|
session->flush ();
|
||||||
}
|
}
|
||||||
@ -822,6 +831,12 @@ int zmq::stream_engine_t::push_msg_to_session (msg_t *msg_)
|
|||||||
return session->push_msg (msg_);
|
return session->push_msg (msg_);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int zmq::stream_engine_t::push_raw_msg_to_session (msg_t *msg_) {
|
||||||
|
if (metadata)
|
||||||
|
msg_->set_metadata(metadata);
|
||||||
|
return push_msg_to_session(msg_);
|
||||||
|
}
|
||||||
|
|
||||||
int zmq::stream_engine_t::write_credential (msg_t *msg_)
|
int zmq::stream_engine_t::write_credential (msg_t *msg_)
|
||||||
{
|
{
|
||||||
zmq_assert (mechanism != NULL);
|
zmq_assert (mechanism != NULL);
|
||||||
|
@ -59,7 +59,7 @@ namespace zmq
|
|||||||
timeout_error
|
timeout_error
|
||||||
};
|
};
|
||||||
|
|
||||||
stream_engine_t (fd_t fd_, const options_t &options_,
|
stream_engine_t (fd_t fd_, const options_t &options_,
|
||||||
const std::string &endpoint);
|
const std::string &endpoint);
|
||||||
~stream_engine_t ();
|
~stream_engine_t ();
|
||||||
|
|
||||||
@ -96,6 +96,8 @@ namespace zmq
|
|||||||
int next_handshake_command (msg_t *msg);
|
int next_handshake_command (msg_t *msg);
|
||||||
int process_handshake_command (msg_t *msg);
|
int process_handshake_command (msg_t *msg);
|
||||||
|
|
||||||
|
int push_raw_msg_to_session (msg_t *msg);
|
||||||
|
|
||||||
int pull_msg_from_session (msg_t *msg_);
|
int pull_msg_from_session (msg_t *msg_);
|
||||||
int push_msg_to_session (msg_t *msg);
|
int push_msg_to_session (msg_t *msg);
|
||||||
|
|
||||||
|
@ -77,17 +77,23 @@ test_stream_to_dealer (void)
|
|||||||
assert (rc > 0);
|
assert (rc > 0);
|
||||||
assert (zmq_msg_more (&identity));
|
assert (zmq_msg_more (&identity));
|
||||||
|
|
||||||
|
// Verify the existence of Peer-Address metadata
|
||||||
|
assert (streq (zmq_msg_gets (&identity, "Peer-Address"), "127.0.0.1"));
|
||||||
|
|
||||||
// Second frame is zero
|
// Second frame is zero
|
||||||
byte buffer [255];
|
byte buffer [255];
|
||||||
rc = zmq_recv (stream, buffer, 255, 0);
|
rc = zmq_recv (stream, buffer, 255, 0);
|
||||||
assert (rc == 0);
|
assert (rc == 0);
|
||||||
|
|
||||||
// Real data follows
|
// Real data follows
|
||||||
// First frame is identity
|
// First frame is identity
|
||||||
rc = zmq_msg_recv (&identity, stream, 0);
|
rc = zmq_msg_recv (&identity, stream, 0);
|
||||||
assert (rc > 0);
|
assert (rc > 0);
|
||||||
assert (zmq_msg_more (&identity));
|
assert (zmq_msg_more (&identity));
|
||||||
|
|
||||||
|
// Verify the existence of Peer-Address metadata
|
||||||
|
assert (streq (zmq_msg_gets (&identity, "Peer-Address"), "127.0.0.1"));
|
||||||
|
|
||||||
// Second frame is greeting signature
|
// Second frame is greeting signature
|
||||||
rc = zmq_recv (stream, buffer, 255, 0);
|
rc = zmq_recv (stream, buffer, 255, 0);
|
||||||
assert (rc == 10);
|
assert (rc == 10);
|
||||||
@ -179,7 +185,7 @@ test_stream_to_stream (void)
|
|||||||
// Set-up our context and sockets
|
// Set-up our context and sockets
|
||||||
void *ctx = zmq_ctx_new ();
|
void *ctx = zmq_ctx_new ();
|
||||||
assert (ctx);
|
assert (ctx);
|
||||||
|
|
||||||
void *server = zmq_socket (ctx, ZMQ_STREAM);
|
void *server = zmq_socket (ctx, ZMQ_STREAM);
|
||||||
assert (server);
|
assert (server);
|
||||||
rc = zmq_bind (server, "tcp://127.0.0.1:9070");
|
rc = zmq_bind (server, "tcp://127.0.0.1:9070");
|
||||||
@ -192,7 +198,7 @@ test_stream_to_stream (void)
|
|||||||
uint8_t id [256];
|
uint8_t id [256];
|
||||||
size_t id_size = 256;
|
size_t id_size = 256;
|
||||||
uint8_t buffer [256];
|
uint8_t buffer [256];
|
||||||
|
|
||||||
// Connecting sends a zero message
|
// Connecting sends a zero message
|
||||||
// Server: First frame is identity, second frame is zero
|
// Server: First frame is identity, second frame is zero
|
||||||
id_size = zmq_recv (server, id, 256, 0);
|
id_size = zmq_recv (server, id, 256, 0);
|
||||||
@ -215,19 +221,19 @@ test_stream_to_stream (void)
|
|||||||
// Second frame is HTTP GET request
|
// Second frame is HTTP GET request
|
||||||
rc = zmq_send (client, "GET /\n\n", 7, 0);
|
rc = zmq_send (client, "GET /\n\n", 7, 0);
|
||||||
assert (rc == 7);
|
assert (rc == 7);
|
||||||
|
|
||||||
// Get HTTP request; ID frame and then request
|
// Get HTTP request; ID frame and then request
|
||||||
id_size = zmq_recv (server, id, 256, 0);
|
id_size = zmq_recv (server, id, 256, 0);
|
||||||
assert (id_size > 0);
|
assert (id_size > 0);
|
||||||
rc = zmq_recv (server, buffer, 256, 0);
|
rc = zmq_recv (server, buffer, 256, 0);
|
||||||
assert (rc != -1);
|
assert (rc != -1);
|
||||||
assert (memcmp (buffer, "GET /\n\n", 7) == 0);
|
assert (memcmp (buffer, "GET /\n\n", 7) == 0);
|
||||||
|
|
||||||
// Send reply back to client
|
// Send reply back to client
|
||||||
char http_response [] =
|
char http_response [] =
|
||||||
"HTTP/1.0 200 OK\r\n"
|
"HTTP/1.0 200 OK\r\n"
|
||||||
"Content-Type: text/plain\r\n"
|
"Content-Type: text/plain\r\n"
|
||||||
"\r\n"
|
"\r\n"
|
||||||
"Hello, World!";
|
"Hello, World!";
|
||||||
rc = zmq_send (server, id, id_size, ZMQ_SNDMORE);
|
rc = zmq_send (server, id, id_size, ZMQ_SNDMORE);
|
||||||
assert (rc != -1);
|
assert (rc != -1);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user