mirror of
https://github.com/zeromq/libzmq.git
synced 2025-01-14 09:47:56 +08:00
Problem: inconsistent naming related to routing ids
Solution: renamed routing_id fields in pipe_t, renamed ZMQ_CONNECT_RID to ZMQ_CONNECT_ROUTING_ID
This commit is contained in:
parent
1daf83079a
commit
e00131dd43
@ -345,7 +345,7 @@ ZMQ_EXPORT const char *zmq_msg_gets (const zmq_msg_t *msg, const char *property)
|
||||
#define ZMQ_ZAP_DOMAIN 55
|
||||
#define ZMQ_ROUTER_HANDOVER 56
|
||||
#define ZMQ_TOS 57
|
||||
#define ZMQ_CONNECT_RID 61
|
||||
#define ZMQ_CONNECT_ROUTING_ID 61
|
||||
#define ZMQ_GSSAPI_SERVER 62
|
||||
#define ZMQ_GSSAPI_PRINCIPAL 63
|
||||
#define ZMQ_GSSAPI_SERVICE_PRINCIPAL 64
|
||||
@ -391,6 +391,7 @@ ZMQ_EXPORT const char *zmq_msg_gets (const zmq_msg_t *msg, const char *property)
|
||||
|
||||
/* Deprecated options and aliases */
|
||||
#define ZMQ_IDENTITY ZMQ_ROUTING_ID
|
||||
#define ZMQ_CONNECT_RID ZMQ_CONNECT_ROUTING_ID
|
||||
#define ZMQ_TCP_ACCEPT_FILTER 38
|
||||
#define ZMQ_IPC_FILTER_PID 58
|
||||
#define ZMQ_IPC_FILTER_UID 59
|
||||
|
16
src/pipe.cpp
16
src/pipe.cpp
@ -92,7 +92,7 @@ zmq::pipe_t::pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_,
|
||||
sink (NULL),
|
||||
state (active),
|
||||
delay (true),
|
||||
integral_routing_id(0),
|
||||
server_socket_routing_id (0),
|
||||
conflate (conflate_)
|
||||
{
|
||||
}
|
||||
@ -115,24 +115,24 @@ void zmq::pipe_t::set_event_sink (i_pipe_events *sink_)
|
||||
sink = sink_;
|
||||
}
|
||||
|
||||
void zmq::pipe_t::set_integral_routing_id (uint32_t integral_routing_id_)
|
||||
void zmq::pipe_t::set_server_socket_routing_id (uint32_t server_socket_routing_id_)
|
||||
{
|
||||
integral_routing_id = integral_routing_id_;
|
||||
server_socket_routing_id = server_socket_routing_id_;
|
||||
}
|
||||
|
||||
uint32_t zmq::pipe_t::get_integral_routing_id ()
|
||||
uint32_t zmq::pipe_t::get_server_socket_routing_id ()
|
||||
{
|
||||
return integral_routing_id;
|
||||
return server_socket_routing_id;
|
||||
}
|
||||
|
||||
void zmq::pipe_t::set_routing_id (const blob_t &routing_id_)
|
||||
void zmq::pipe_t::set_router_socket_routing_id (const blob_t &router_socket_routing_id_)
|
||||
{
|
||||
routing_id = routing_id_;
|
||||
router_socket_routing_id = router_socket_routing_id_;
|
||||
}
|
||||
|
||||
zmq::blob_t zmq::pipe_t::get_routing_id ()
|
||||
{
|
||||
return routing_id;
|
||||
return router_socket_routing_id;
|
||||
}
|
||||
|
||||
zmq::blob_t zmq::pipe_t::get_credential () const
|
||||
|
10
src/pipe.hpp
10
src/pipe.hpp
@ -85,11 +85,11 @@ namespace zmq
|
||||
void set_event_sink (i_pipe_events *sink_);
|
||||
|
||||
// Pipe endpoint can store an routing ID to be used by its clients.
|
||||
void set_integral_routing_id (uint32_t routing_id_);
|
||||
uint32_t get_integral_routing_id ();
|
||||
void set_server_socket_routing_id (uint32_t routing_id_);
|
||||
uint32_t get_server_socket_routing_id ();
|
||||
|
||||
// Pipe endpoint can store an opaque ID to be used by its clients.
|
||||
void set_routing_id (const blob_t &identity_);
|
||||
void set_router_socket_routing_id (const blob_t &identity_);
|
||||
blob_t get_routing_id ();
|
||||
|
||||
blob_t get_credential () const;
|
||||
@ -227,10 +227,10 @@ namespace zmq
|
||||
bool delay;
|
||||
|
||||
// Identity of the writer. Used uniquely by the reader side.
|
||||
blob_t routing_id;
|
||||
blob_t router_socket_routing_id;
|
||||
|
||||
// Identity of the writer. Used uniquely by the reader side.
|
||||
int integral_routing_id;
|
||||
int server_socket_routing_id;
|
||||
|
||||
// Pipe's credential.
|
||||
blob_t credential;
|
||||
|
@ -102,9 +102,9 @@ int zmq::router_t::xsetsockopt (int option_, const void *optval_,
|
||||
if (is_int) memcpy(&value, optval_, sizeof (int));
|
||||
|
||||
switch (option_) {
|
||||
case ZMQ_CONNECT_RID:
|
||||
case ZMQ_CONNECT_ROUTING_ID:
|
||||
if (optval_ && optvallen_) {
|
||||
connect_rid.assign ((char *) optval_, optvallen_);
|
||||
connect_routing_id.assign ((char *) optval_, optvallen_);
|
||||
return 0;
|
||||
}
|
||||
break;
|
||||
@ -470,10 +470,10 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_)
|
||||
blob_t routing_id;
|
||||
bool ok;
|
||||
|
||||
if (connect_rid.length()) {
|
||||
routing_id = blob_t ((unsigned char*) connect_rid.c_str (),
|
||||
connect_rid.length());
|
||||
connect_rid.clear ();
|
||||
if (connect_routing_id.length()) {
|
||||
routing_id = blob_t ((unsigned char*) connect_routing_id.c_str (),
|
||||
connect_routing_id.length());
|
||||
connect_routing_id.clear ();
|
||||
outpipes_t::iterator it = outpipes.find (routing_id);
|
||||
if (it != outpipes.end ())
|
||||
zmq_assert(false); // Not allowed to duplicate an existing rid
|
||||
@ -519,7 +519,7 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_)
|
||||
put_uint32 (buf + 1, next_integral_routing_id++);
|
||||
blob_t new_routing_id = blob_t (buf, sizeof buf);
|
||||
|
||||
it->second.pipe->set_routing_id (new_routing_id);
|
||||
it->second.pipe->set_router_socket_routing_id (new_routing_id);
|
||||
outpipe_t existing_outpipe =
|
||||
{it->second.pipe, it->second.active};
|
||||
|
||||
@ -540,7 +540,7 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_)
|
||||
}
|
||||
}
|
||||
|
||||
pipe_->set_routing_id (routing_id);
|
||||
pipe_->set_router_socket_routing_id (routing_id);
|
||||
// Add the record into output pipes lookup table
|
||||
outpipe_t outpipe = {pipe_, true};
|
||||
ok = outpipes.insert (outpipes_t::value_type (routing_id, outpipe)).second;
|
||||
|
@ -58,7 +58,7 @@ void zmq::server_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
|
||||
if (!routing_id)
|
||||
routing_id = next_rid++; // Never use RID zero
|
||||
|
||||
pipe_->set_integral_routing_id (routing_id);
|
||||
pipe_->set_server_socket_routing_id (routing_id);
|
||||
// Add the record into output pipes lookup table
|
||||
outpipe_t outpipe = {pipe_, true};
|
||||
bool ok = outpipes.insert (outpipes_t::value_type (routing_id, outpipe)).second;
|
||||
@ -69,7 +69,7 @@ void zmq::server_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
|
||||
|
||||
void zmq::server_t::xpipe_terminated (pipe_t *pipe_)
|
||||
{
|
||||
outpipes_t::iterator it = outpipes.find (pipe_->get_integral_routing_id ());
|
||||
outpipes_t::iterator it = outpipes.find (pipe_->get_server_socket_routing_id ());
|
||||
zmq_assert (it != outpipes.end ());
|
||||
outpipes.erase (it);
|
||||
fq.pipe_terminated (pipe_);
|
||||
@ -159,7 +159,7 @@ int zmq::server_t::xrecv (msg_t *msg_)
|
||||
|
||||
zmq_assert (pipe != NULL);
|
||||
|
||||
uint32_t routing_id = pipe->get_integral_routing_id ();
|
||||
uint32_t routing_id = pipe->get_server_socket_routing_id ();
|
||||
msg_->set_routing_id (routing_id);
|
||||
|
||||
return 0;
|
||||
|
@ -186,7 +186,7 @@ namespace zmq
|
||||
void process_destroy ();
|
||||
|
||||
// Next assigned name on a zmq_connect() call used by ROUTER and STREAM socket types
|
||||
std::string connect_rid;
|
||||
std::string connect_routing_id;
|
||||
|
||||
private:
|
||||
// test if event should be sent and then dispatch it
|
||||
|
@ -183,9 +183,9 @@ int zmq::stream_t::xsetsockopt (int option_, const void *optval_,
|
||||
if (is_int) memcpy(&value, optval_, sizeof (int));
|
||||
|
||||
switch (option_) {
|
||||
case ZMQ_CONNECT_RID:
|
||||
case ZMQ_CONNECT_ROUTING_ID:
|
||||
if (optval_ && optvallen_) {
|
||||
connect_rid.assign ((char*) optval_, optvallen_);
|
||||
connect_routing_id.assign ((char*) optval_, optvallen_);
|
||||
return 0;
|
||||
}
|
||||
break;
|
||||
@ -299,10 +299,10 @@ void zmq::stream_t::identify_peer (pipe_t *pipe_)
|
||||
unsigned char buffer [5];
|
||||
buffer [0] = 0;
|
||||
blob_t routing_id;
|
||||
if (connect_rid.length ()) {
|
||||
routing_id = blob_t ((unsigned char*) connect_rid.c_str(),
|
||||
connect_rid.length ());
|
||||
connect_rid.clear ();
|
||||
if (connect_routing_id.length ()) {
|
||||
routing_id = blob_t ((unsigned char*) connect_routing_id.c_str(),
|
||||
connect_routing_id.length ());
|
||||
connect_routing_id.clear ();
|
||||
outpipes_t::iterator it = outpipes.find (routing_id);
|
||||
zmq_assert (it == outpipes.end ());
|
||||
}
|
||||
@ -312,7 +312,7 @@ void zmq::stream_t::identify_peer (pipe_t *pipe_)
|
||||
memcpy (options.routing_id, routing_id.data (), routing_id.size ());
|
||||
options.routing_id_size = (unsigned char) routing_id.size ();
|
||||
}
|
||||
pipe_->set_routing_id (routing_id);
|
||||
pipe_->set_router_socket_routing_id (routing_id);
|
||||
// Add the record into output pipes lookup table
|
||||
outpipe_t outpipe = {pipe_, true};
|
||||
const bool ok = outpipes.insert (
|
||||
|
@ -520,7 +520,7 @@ bool zmq::stream_engine_t::handshake ()
|
||||
|
||||
// Inspect the right-most bit of the 10th byte (which coincides
|
||||
// with the 'flags' field if a regular message was sent).
|
||||
// Zero indicates this is a header of routing id message
|
||||
// Zero indicates this is a header of a routing id message
|
||||
// (i.e. the peer is using the unversioned protocol).
|
||||
if (!(greeting_recv [9] & 0x01))
|
||||
break;
|
||||
|
@ -61,13 +61,13 @@ void test_stream_2_stream(){
|
||||
assert (0 == ret);
|
||||
|
||||
// Do the connection.
|
||||
ret = zmq_setsockopt (rconn1, ZMQ_CONNECT_RID, "conn1", 6);
|
||||
ret = zmq_setsockopt (rconn1, ZMQ_CONNECT_ROUTING_ID, "conn1", 6);
|
||||
assert (0 == ret);
|
||||
ret = zmq_connect (rconn1, my_endpoint);
|
||||
|
||||
/* Uncomment to test assert on duplicate rid.
|
||||
/* Uncomment to test assert on duplicate routing id.
|
||||
// Test duplicate connect attempt.
|
||||
ret = zmq_setsockopt (rconn1, ZMQ_CONNECT_RID, "conn1", 6);
|
||||
ret = zmq_setsockopt (rconn1, ZMQ_CONNECT_ROUTING_ID, "conn1", 6);
|
||||
assert (0 == ret);
|
||||
ret = zmq_connect (rconn1, bindip);
|
||||
assert (0 == ret);
|
||||
@ -130,14 +130,14 @@ void test_router_2_router(bool named){
|
||||
ret = zmq_setsockopt (rconn1, ZMQ_ROUTING_ID, "Y", 1);
|
||||
}
|
||||
|
||||
// Make call to connect using a connect_rid.
|
||||
ret = zmq_setsockopt (rconn1, ZMQ_CONNECT_RID, "conn1", 6);
|
||||
// Make call to connect using a connect_routing_id.
|
||||
ret = zmq_setsockopt (rconn1, ZMQ_CONNECT_ROUTING_ID, "conn1", 6);
|
||||
assert (0 == ret);
|
||||
ret = zmq_connect (rconn1, my_endpoint);
|
||||
assert (0 == ret);
|
||||
/* Uncomment to test assert on duplicate rid
|
||||
/* Uncomment to test assert on duplicate routing id
|
||||
// Test duplicate connect attempt.
|
||||
ret = zmq_setsockopt (rconn1, ZMQ_CONNECT_RID, "conn1", 6);
|
||||
ret = zmq_setsockopt (rconn1, ZMQ_CONNECT_ROUTING_ID, "conn1", 6);
|
||||
assert (0 == ret);
|
||||
ret = zmq_connect (rconn1, bindip);
|
||||
assert (0 == ret);
|
||||
|
@ -752,7 +752,8 @@ int main (void)
|
||||
shutdown_context_and_server_side (ctx, zap_thread, server, server_mon,
|
||||
handler);
|
||||
|
||||
fprintf (stderr, "test_curve_security_invalid_initiate_command_encrypted_cookie\n");
|
||||
fprintf (stderr,
|
||||
"test_curve_security_invalid_initiate_command_encrypted_cookie\n");
|
||||
setup_context_and_server_side (&ctx, &handler, &zap_thread, &server,
|
||||
&server_mon, my_endpoint);
|
||||
test_curve_security_invalid_initiate_command_encrypted_cookie (
|
||||
@ -760,7 +761,9 @@ int main (void)
|
||||
shutdown_context_and_server_side (ctx, zap_thread, server, server_mon,
|
||||
handler);
|
||||
|
||||
fprintf (stderr, "test_curve_security_invalid_initiate_command_encrypted_content\n");
|
||||
fprintf (
|
||||
stderr,
|
||||
"test_curve_security_invalid_initiate_command_encrypted_content\n");
|
||||
setup_context_and_server_side (&ctx, &handler, &zap_thread, &server,
|
||||
&server_mon, my_endpoint);
|
||||
test_curve_security_invalid_initiate_command_encrypted_content (
|
||||
@ -769,11 +772,12 @@ int main (void)
|
||||
handler);
|
||||
|
||||
// test with a large identity (resulting in large metadata)
|
||||
fprintf (stderr, "test_curve_security_with_valid_credentials (large identity)\n");
|
||||
fprintf (stderr,
|
||||
"test_curve_security_with_valid_credentials (large identity)\n");
|
||||
setup_context_and_server_side (
|
||||
&ctx, &handler, &zap_thread, &server, &server_mon, my_endpoint,
|
||||
&zap_handler_large_identity, &socket_config_curve_server, &valid_server_secret,
|
||||
large_identity);
|
||||
&zap_handler_large_identity, &socket_config_curve_server,
|
||||
&valid_server_secret, large_identity);
|
||||
test_curve_security_with_valid_credentials (ctx, my_endpoint, server,
|
||||
server_mon, timeout);
|
||||
shutdown_context_and_server_side (ctx, zap_thread, server, server_mon,
|
||||
|
@ -280,7 +280,6 @@ int main (void)
|
||||
test_zap_errors (&socket_config_plain_server, NULL,
|
||||
&socket_config_plain_client, NULL);
|
||||
|
||||
if (zmq_has ("curve")) {
|
||||
fprintf (stderr, "CURVE mechanism\n");
|
||||
setup_testutil_security_curve ();
|
||||
|
||||
@ -289,4 +288,3 @@ int main (void)
|
||||
test_zap_errors (&socket_config_curve_server, valid_server_secret,
|
||||
&socket_config_curve_client, &curve_client_data);
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user