mirror of
https://github.com/zeromq/libzmq.git
synced 2024-12-27 07:31:03 +08:00
tipc: add support for address domain suffix
The TIPC protocol bindings in ZeroMQ defaults to a lookup domain of 1.0.0 to prevent 'closest first' search, and instead always do round robin if several sockets in the network or node have the same name published. In retrospect, this might have been a bad idea because it won't work on standalone configurations. We solve this by allowing an optional domain suffix to be provided in the address, and 0.0.0 should be used in that case, or if the TIPC address range in the cluster configuration is defined to some other value. Domain suffixes are only relevant for connecting addresses. Signed-off-by: Erik Hugne <erik.hugne@gmail.com>
This commit is contained in:
parent
c7ecbedb0e
commit
f81ef1bc72
@ -59,8 +59,16 @@ int zmq::tipc_address_t::resolve (const char *name)
|
|||||||
unsigned int type = 0;
|
unsigned int type = 0;
|
||||||
unsigned int lower = 0;
|
unsigned int lower = 0;
|
||||||
unsigned int upper = 0;
|
unsigned int upper = 0;
|
||||||
|
unsigned int z = 1, c = 0, n = 0;
|
||||||
|
char eof;
|
||||||
|
const char *domain;
|
||||||
const int res = sscanf (name, "{%u,%u,%u}", &type, &lower, &upper);
|
const int res = sscanf (name, "{%u,%u,%u}", &type, &lower, &upper);
|
||||||
|
|
||||||
|
/* Fetch optional domain suffix. */
|
||||||
|
if ((domain = strchr(name, '@'))) {
|
||||||
|
if (sscanf(domain, "@%u.%u.%u%c", &z, &c, &n, &eof) != 3)
|
||||||
|
return EINVAL;
|
||||||
|
}
|
||||||
if (res == 3)
|
if (res == 3)
|
||||||
goto nameseq;
|
goto nameseq;
|
||||||
else
|
else
|
||||||
@ -69,10 +77,7 @@ int zmq::tipc_address_t::resolve (const char *name)
|
|||||||
address.addrtype = TIPC_ADDR_NAME;
|
address.addrtype = TIPC_ADDR_NAME;
|
||||||
address.addr.name.name.type = type;
|
address.addr.name.name.type = type;
|
||||||
address.addr.name.name.instance = lower;
|
address.addr.name.name.instance = lower;
|
||||||
/* Since we can't specify lookup domain when connecting
|
address.addr.name.domain = tipc_addr (z, c, n);
|
||||||
* (and we're not sure that we want it to be configurable)
|
|
||||||
* Change from 'closest first' approach, to search entire zone */
|
|
||||||
address.addr.name.domain = tipc_addr (1, 0, 0);
|
|
||||||
address.scope = 0;
|
address.scope = 0;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -60,10 +60,10 @@ int main (void)
|
|||||||
val = 0;
|
val = 0;
|
||||||
zmq_setsockopt (from, ZMQ_LINGER, &val, sizeof (val));
|
zmq_setsockopt (from, ZMQ_LINGER, &val, sizeof (val));
|
||||||
// This pipe will not connect
|
// This pipe will not connect
|
||||||
rc = zmq_connect (from, "tipc://{5556,0}");
|
rc = zmq_connect (from, "tipc://{5556,0}@0.0.0");
|
||||||
assert (rc == 0);
|
assert (rc == 0);
|
||||||
// This pipe will
|
// This pipe will
|
||||||
rc = zmq_connect (from, "tipc://{6555,0}");
|
rc = zmq_connect (from, "tipc://{6555,0}@0.0.0");
|
||||||
assert (rc == 0);
|
assert (rc == 0);
|
||||||
|
|
||||||
// We send 10 messages, 5 should just get stuck in the queue
|
// We send 10 messages, 5 should just get stuck in the queue
|
||||||
@ -130,10 +130,10 @@ int main (void)
|
|||||||
assert (rc == 0);
|
assert (rc == 0);
|
||||||
|
|
||||||
// Connect to the invalid socket
|
// Connect to the invalid socket
|
||||||
rc = zmq_connect (from, "tipc://{5561,0}");
|
rc = zmq_connect (from, "tipc://{5561,0}@0.0.0");
|
||||||
assert (rc == 0);
|
assert (rc == 0);
|
||||||
// Connect to the valid socket
|
// Connect to the valid socket
|
||||||
rc = zmq_connect (from, "tipc://{5560,0}");
|
rc = zmq_connect (from, "tipc://{5560,0}@0.0.0");
|
||||||
assert (rc == 0);
|
assert (rc == 0);
|
||||||
|
|
||||||
// Send 10 messages, all should be routed to the connected pipe
|
// Send 10 messages, all should be routed to the connected pipe
|
||||||
@ -185,7 +185,7 @@ int main (void)
|
|||||||
assert (rc == 0);
|
assert (rc == 0);
|
||||||
rc = zmq_bind (backend, "tipc://{5560,0,0}");
|
rc = zmq_bind (backend, "tipc://{5560,0,0}");
|
||||||
assert (rc == 0);
|
assert (rc == 0);
|
||||||
rc = zmq_connect (frontend, "tipc://{5560,0}");
|
rc = zmq_connect (frontend, "tipc://{5560,0}@0.0.0");
|
||||||
assert (rc == 0);
|
assert (rc == 0);
|
||||||
|
|
||||||
// Ping backend to frontend so we know when the connection is up
|
// Ping backend to frontend so we know when the connection is up
|
||||||
|
@ -44,7 +44,7 @@ int main (void)
|
|||||||
|
|
||||||
void *sc = zmq_socket (ctx, ZMQ_PAIR);
|
void *sc = zmq_socket (ctx, ZMQ_PAIR);
|
||||||
assert (sc);
|
assert (sc);
|
||||||
rc = zmq_connect (sc, "tipc://{5560,0}");
|
rc = zmq_connect (sc, "tipc://{5560,0}@0.0.0");
|
||||||
assert (rc == 0);
|
assert (rc == 0);
|
||||||
|
|
||||||
bounce (sb, sc);
|
bounce (sb, sc);
|
||||||
|
@ -49,13 +49,13 @@ int main (void)
|
|||||||
// Create a worker.
|
// Create a worker.
|
||||||
void *rep = zmq_socket (ctx, ZMQ_REP);
|
void *rep = zmq_socket (ctx, ZMQ_REP);
|
||||||
assert (rep);
|
assert (rep);
|
||||||
rc = zmq_connect (rep, "tipc://{5560,0}");
|
rc = zmq_connect (rep, "tipc://{5560,0}@0.0.0");
|
||||||
assert (rc == 0);
|
assert (rc == 0);
|
||||||
|
|
||||||
// Create a client.
|
// Create a client.
|
||||||
void *req = zmq_socket (ctx, ZMQ_REQ);
|
void *req = zmq_socket (ctx, ZMQ_REQ);
|
||||||
assert (req);
|
assert (req);
|
||||||
rc = zmq_connect (req, "tipc://{5561,0}");
|
rc = zmq_connect (req, "tipc://{5561,0}@0.0.0");
|
||||||
assert (rc == 0);
|
assert (rc == 0);
|
||||||
|
|
||||||
// Send a request.
|
// Send a request.
|
||||||
|
@ -43,7 +43,7 @@ int main (void)
|
|||||||
|
|
||||||
void *sc = zmq_socket (ctx, ZMQ_REQ);
|
void *sc = zmq_socket (ctx, ZMQ_REQ);
|
||||||
assert (sc);
|
assert (sc);
|
||||||
rc = zmq_connect (sc, "tipc://{5560,0}");
|
rc = zmq_connect (sc, "tipc://{5560,0}@0.0.0");
|
||||||
assert (rc == 0);
|
assert (rc == 0);
|
||||||
|
|
||||||
bounce (sb, sc);
|
bounce (sb, sc);
|
||||||
|
@ -37,7 +37,7 @@ extern "C"
|
|||||||
{
|
{
|
||||||
int rc;
|
int rc;
|
||||||
|
|
||||||
rc = zmq_connect (s, "tipc://{5560,0}");
|
rc = zmq_connect (s, "tipc://{5560,0}@0.0.0");
|
||||||
assert (rc == 0);
|
assert (rc == 0);
|
||||||
|
|
||||||
// Start closing the socket while the connecting process is underway.
|
// Start closing the socket while the connecting process is underway.
|
||||||
|
@ -49,13 +49,13 @@ int main (void)
|
|||||||
// Create a publisher.
|
// Create a publisher.
|
||||||
void *pub = zmq_socket (ctx, ZMQ_PUB);
|
void *pub = zmq_socket (ctx, ZMQ_PUB);
|
||||||
assert (pub);
|
assert (pub);
|
||||||
rc = zmq_connect (pub, "tipc://{5561,0}");
|
rc = zmq_connect (pub, "tipc://{5561,0}@0.0.0");
|
||||||
assert (rc == 0);
|
assert (rc == 0);
|
||||||
|
|
||||||
// Create a subscriber.
|
// Create a subscriber.
|
||||||
void *sub = zmq_socket (ctx, ZMQ_SUB);
|
void *sub = zmq_socket (ctx, ZMQ_SUB);
|
||||||
assert (sub);
|
assert (sub);
|
||||||
rc = zmq_connect (sub, "tipc://{5560,0}");
|
rc = zmq_connect (sub, "tipc://{5560,0}@0.0.0");
|
||||||
assert (rc == 0);
|
assert (rc == 0);
|
||||||
|
|
||||||
// Subscribe for all messages.
|
// Subscribe for all messages.
|
||||||
|
@ -34,7 +34,7 @@ int main (void)
|
|||||||
int rc;
|
int rc;
|
||||||
char buf[32];
|
char buf[32];
|
||||||
const char *ep = "tipc://{5560,0,0}";
|
const char *ep = "tipc://{5560,0,0}";
|
||||||
const char *name = "tipc://{5560,0}";
|
const char *name = "tipc://{5560,0}@0.0.0";
|
||||||
|
|
||||||
fprintf (stderr, "unbind endpoint test running...\n");
|
fprintf (stderr, "unbind endpoint test running...\n");
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user