mirror of
https://github.com/zeromq/libzmq.git
synced 2025-01-14 17:58:01 +08:00
Merge pull request #2746 from zeromq/revert-2743-rename-identity
Revert "Problem: term "identity" is confusing"
This commit is contained in:
commit
18498f620f
@ -289,7 +289,7 @@ ZMQ_HANDSHAKE_IVL: Retrieve maximum handshake interval
|
|||||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||||
The 'ZMQ_HANDSHAKE_IVL' option shall retrieve the maximum handshake interval
|
The 'ZMQ_HANDSHAKE_IVL' option shall retrieve the maximum handshake interval
|
||||||
for the specified 'socket'. Handshaking is the exchange of socket configuration
|
for the specified 'socket'. Handshaking is the exchange of socket configuration
|
||||||
information (socket type, routing id, security) that occurs when a connection
|
information (socket type, identity, security) that occurs when a connection
|
||||||
is first opened, only for connection-oriented transports. If handshaking does
|
is first opened, only for connection-oriented transports. If handshaking does
|
||||||
not complete within the configured time, the connection shall be closed.
|
not complete within the configured time, the connection shall be closed.
|
||||||
The value 0 means no handshake time limit.
|
The value 0 means no handshake time limit.
|
||||||
@ -303,8 +303,19 @@ Applicable socket types:: all but ZMQ_STREAM, only for connection-oriented trans
|
|||||||
|
|
||||||
ZMQ_IDENTITY: Retrieve socket identity
|
ZMQ_IDENTITY: Retrieve socket identity
|
||||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||||
This option name is now deprecated. Use ZMQ_ROUTING_ID instead.
|
The 'ZMQ_IDENTITY' option shall retrieve the identity of the specified 'socket'.
|
||||||
ZMQ_IDENTITY remains as an alias for now.
|
Socket identity is used only by request/reply pattern. Namely, it can be used
|
||||||
|
in tandem with ROUTER socket to route messages to the peer with specific
|
||||||
|
identity.
|
||||||
|
|
||||||
|
Identity should be at least one byte and at most 255 bytes long. Identities
|
||||||
|
starting with binary zero are reserved for use by 0MQ infrastructure.
|
||||||
|
|
||||||
|
[horizontal]
|
||||||
|
Option value type:: binary data
|
||||||
|
Option value unit:: N/A
|
||||||
|
Default value:: NULL
|
||||||
|
Applicable socket types:: ZMQ_REP, ZMQ_REQ, ZMQ_ROUTER, ZMQ_DEALER.
|
||||||
|
|
||||||
|
|
||||||
ZMQ_IMMEDIATE: Retrieve attach-on-connect value
|
ZMQ_IMMEDIATE: Retrieve attach-on-connect value
|
||||||
@ -645,23 +656,6 @@ Default value:: 10000
|
|||||||
Applicable socket types:: all, when using multicast transports
|
Applicable socket types:: all, when using multicast transports
|
||||||
|
|
||||||
|
|
||||||
ZMQ_ROUTING_ID: Retrieve socket routing id
|
|
||||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
|
||||||
The 'ZMQ_ROUTING_ID' option shall retrieve the routing id of the specified 'socket'.
|
|
||||||
Routing ids are used only by the request/reply pattern. Specifically, it can be used
|
|
||||||
in tandem with ROUTER socket to route messages to the peer with a specific
|
|
||||||
routing id.
|
|
||||||
|
|
||||||
A routing id must be at least one byte and at most 255 bytes long. Identities
|
|
||||||
starting with a zero byte are reserved for use by the 0MQ infrastructure.
|
|
||||||
|
|
||||||
[horizontal]
|
|
||||||
Option value type:: binary data
|
|
||||||
Option value unit:: N/A
|
|
||||||
Default value:: NULL
|
|
||||||
Applicable socket types:: ZMQ_REP, ZMQ_REQ, ZMQ_ROUTER, ZMQ_DEALER.
|
|
||||||
|
|
||||||
|
|
||||||
ZMQ_SNDBUF: Retrieve kernel transmit buffer size
|
ZMQ_SNDBUF: Retrieve kernel transmit buffer size
|
||||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||||
The 'ZMQ_SNDBUF' option shall retrieve the underlying kernel transmit buffer
|
The 'ZMQ_SNDBUF' option shall retrieve the underlying kernel transmit buffer
|
||||||
|
@ -26,16 +26,14 @@ The following ZMTP properties can be retrieved with the _zmq_msg_gets()_
|
|||||||
function:
|
function:
|
||||||
|
|
||||||
Socket-Type
|
Socket-Type
|
||||||
Routing-Id
|
Identity
|
||||||
|
|
||||||
Note: 'Identity' is a deprecated alias for 'Routing-Id'.
|
|
||||||
|
|
||||||
Additionally, when available for the underlying transport, the *Peer-Address*
|
Additionally, when available for the underlying transport, the *Peer-Address*
|
||||||
property will return the IP address of the remote endpoint as returned by
|
property will return the IP address of the remote endpoint as returned by
|
||||||
getnameinfo(2).
|
getnameinfo(2).
|
||||||
|
|
||||||
The names of these properties are also defined in _zmq.h_ as
|
The names of these properties are also defined in _zmq.h_ as
|
||||||
_ZMQ_MSG_PROPERTY_SOCKET_TYPE_ _ZMQ_MSG_PROPERTY_ROUTING_ID_, and
|
_ZMQ_MSG_PROPERTY_SOCKET_TYPE_ _ZMQ_MSG_PROPERTY_IDENTITY_, and
|
||||||
_ZMQ_MSG_PROPERTY_PEER_ADDRESS_.
|
_ZMQ_MSG_PROPERTY_PEER_ADDRESS_.
|
||||||
Currently, these definitions are only available as a DRAFT API.
|
Currently, these definitions are only available as a DRAFT API.
|
||||||
|
|
||||||
|
@ -90,29 +90,22 @@ Applicable socket types:: all, when using TCP or UDP transports.
|
|||||||
|
|
||||||
ZMQ_CONNECT_RID: Assign the next outbound connection id
|
ZMQ_CONNECT_RID: Assign the next outbound connection id
|
||||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||||
This option name is now deprecated. Use ZMQ_CONNECT_ROUTING_ID instead.
|
The 'ZMQ_CONNECT_RID' option sets the peer id of the next host connected
|
||||||
ZMQ_CONNECT_RID remains as an alias for now.
|
via the zmq_connect() call, and immediately readies that connection for
|
||||||
|
data transfer with the named id. This option applies only to the first
|
||||||
|
subsequent call to zmq_connect(), calls thereafter use default connection
|
||||||
|
behaviour.
|
||||||
|
|
||||||
|
Typical use is to set this socket option ahead of each zmq_connect() attempt
|
||||||
ZMQ_CONNECT_ROUTING_ID: Assign the next outbound routing id
|
to a new host. Each connection MUST be assigned a unique name. Assigning a
|
||||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
name that is already in use is not allowed.
|
||||||
The 'ZMQ_CONNECT_ROUTING_ID' option sets the peer id of the peer connected
|
|
||||||
via the next zmq_connect() call, such that that connection is immediately ready for
|
|
||||||
data transfer with the given routing id. This option applies only to the first
|
|
||||||
subsequent call to zmq_connect(), zmq_connect() calls thereafter use the default
|
|
||||||
connection behaviour.
|
|
||||||
|
|
||||||
Typical use is to set this socket option ahead of each zmq_connect() call.
|
|
||||||
Each connection MUST be assigned a unique routing id. Assigning a
|
|
||||||
routing id that is already in use is not allowed.
|
|
||||||
|
|
||||||
Useful when connecting ROUTER to ROUTER, or STREAM to STREAM, as it
|
Useful when connecting ROUTER to ROUTER, or STREAM to STREAM, as it
|
||||||
allows for immediate sending to peers. Outbound routing id framing requirements
|
allows for immediate sending to peers. Outbound id framing requirements
|
||||||
for ROUTER and STREAM sockets apply.
|
for ROUTER and STREAM sockets apply.
|
||||||
|
|
||||||
The routing id must be from 1 to 255 bytes long and MAY NOT start with
|
The peer id should be from 1 to 255 bytes long and MAY NOT start with
|
||||||
a zero byte (such routing ids are reserved for internal use by the 0MQ
|
binary zero.
|
||||||
infrastructure).
|
|
||||||
|
|
||||||
[horizontal]
|
[horizontal]
|
||||||
Option value type:: binary data
|
Option value type:: binary data
|
||||||
@ -312,7 +305,7 @@ ZMQ_HANDSHAKE_IVL: Set maximum handshake interval
|
|||||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||||
The 'ZMQ_HANDSHAKE_IVL' option shall set the maximum handshake interval for
|
The 'ZMQ_HANDSHAKE_IVL' option shall set the maximum handshake interval for
|
||||||
the specified 'socket'. Handshaking is the exchange of socket configuration
|
the specified 'socket'. Handshaking is the exchange of socket configuration
|
||||||
information (socket type, routing id, security) that occurs when a connection
|
information (socket type, identity, security) that occurs when a connection
|
||||||
is first opened, only for connection-oriented transports. If handshaking does
|
is first opened, only for connection-oriented transports. If handshaking does
|
||||||
not complete within the configured time, the connection shall be closed.
|
not complete within the configured time, the connection shall be closed.
|
||||||
The value 0 means no handshake time limit.
|
The value 0 means no handshake time limit.
|
||||||
@ -371,8 +364,22 @@ Applicable socket types:: all, when using connection-oriented transports
|
|||||||
|
|
||||||
ZMQ_IDENTITY: Set socket identity
|
ZMQ_IDENTITY: Set socket identity
|
||||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||||
This option name is now deprecated. Use ZMQ_ROUTING_ID instead.
|
The 'ZMQ_IDENTITY' option shall set the identity of the specified 'socket'
|
||||||
ZMQ_IDENTITY remains as an alias for now.
|
when connecting to a ROUTER socket. The identity should be from 1 to 255
|
||||||
|
bytes long and may contain any values.
|
||||||
|
|
||||||
|
If two clients use the same identity when connecting to a ROUTER, the
|
||||||
|
results shall depend on the ZMQ_ROUTER_HANDOVER option setting. If that
|
||||||
|
is not set (or set to the default of zero), the ROUTER socket shall reject
|
||||||
|
clients trying to connect with an already-used identity. If that option
|
||||||
|
is set to 1, the ROUTER socket shall hand-over the connection to the new
|
||||||
|
client and disconnect the existing one.
|
||||||
|
|
||||||
|
[horizontal]
|
||||||
|
Option value type:: binary data
|
||||||
|
Option value unit:: N/A
|
||||||
|
Default value:: NULL
|
||||||
|
Applicable socket types:: ZMQ_REQ, ZMQ_REP, ZMQ_ROUTER, ZMQ_DEALER.
|
||||||
|
|
||||||
|
|
||||||
ZMQ_IMMEDIATE: Queue messages only to completed connections
|
ZMQ_IMMEDIATE: Queue messages only to completed connections
|
||||||
@ -730,12 +737,12 @@ Default value:: 0
|
|||||||
Applicable socket types:: ZMQ_REQ
|
Applicable socket types:: ZMQ_REQ
|
||||||
|
|
||||||
|
|
||||||
ZMQ_ROUTER_HANDOVER: handle duplicate client routing ids on ROUTER sockets
|
ZMQ_ROUTER_HANDOVER: handle duplicate client identities on ROUTER sockets
|
||||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||||
If two clients use the same routing id when connecting to a ROUTER, the
|
If two clients use the same identity when connecting to a ROUTER, the
|
||||||
results shall depend on the ZMQ_ROUTER_HANDOVER option setting. If that
|
results shall depend on the ZMQ_ROUTER_HANDOVER option setting. If that
|
||||||
is not set (or set to the default of zero), the ROUTER socket shall reject
|
is not set (or set to the default of zero), the ROUTER socket shall reject
|
||||||
clients trying to connect with an already-used routing id. If that option
|
clients trying to connect with an already-used identity. If that option
|
||||||
is set to 1, the ROUTER socket shall hand-over the connection to the new
|
is set to 1, the ROUTER socket shall hand-over the connection to the new
|
||||||
client and disconnect the existing one.
|
client and disconnect the existing one.
|
||||||
|
|
||||||
@ -775,7 +782,7 @@ raw mode, and when using the tcp:// transport, it will read and write TCP data
|
|||||||
without 0MQ framing. This lets 0MQ applications talk to non-0MQ applications.
|
without 0MQ framing. This lets 0MQ applications talk to non-0MQ applications.
|
||||||
When using raw mode, you cannot set explicit identities, and the ZMQ_SNDMORE
|
When using raw mode, you cannot set explicit identities, and the ZMQ_SNDMORE
|
||||||
flag is ignored when sending data messages. In raw mode you can close a specific
|
flag is ignored when sending data messages. In raw mode you can close a specific
|
||||||
connection by sending it a zero-length message (following the routing id frame).
|
connection by sending it a zero-length message (following the identity frame).
|
||||||
|
|
||||||
NOTE: This option is deprecated, please use ZMQ_STREAM sockets instead.
|
NOTE: This option is deprecated, please use ZMQ_STREAM sockets instead.
|
||||||
|
|
||||||
@ -786,28 +793,6 @@ Default value:: 0
|
|||||||
Applicable socket types:: ZMQ_ROUTER
|
Applicable socket types:: ZMQ_ROUTER
|
||||||
|
|
||||||
|
|
||||||
ZMQ_ROUTING_ID: Set socket routing id
|
|
||||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
|
||||||
The 'ZMQ_ROUTING_ID' option shall set the routing id of the specified 'socket'
|
|
||||||
when connecting to a ROUTER socket.
|
|
||||||
|
|
||||||
A routing id must be at least one byte and at most 255 bytes long. Identities
|
|
||||||
starting with a zero byte are reserved for use by the 0MQ infrastructure.
|
|
||||||
|
|
||||||
If two clients use the same routing id when connecting to a ROUTER, the
|
|
||||||
results shall depend on the ZMQ_ROUTER_HANDOVER option setting. If that
|
|
||||||
is not set (or set to the default of zero), the ROUTER socket shall reject
|
|
||||||
clients trying to connect with an already-used routing id. If that option
|
|
||||||
is set to 1, the ROUTER socket shall hand-over the connection to the new
|
|
||||||
client and disconnect the existing one.
|
|
||||||
|
|
||||||
[horizontal]
|
|
||||||
Option value type:: binary data
|
|
||||||
Option value unit:: N/A
|
|
||||||
Default value:: NULL
|
|
||||||
Applicable socket types:: ZMQ_REQ, ZMQ_REP, ZMQ_ROUTER, ZMQ_DEALER.
|
|
||||||
|
|
||||||
|
|
||||||
ZMQ_SNDBUF: Set kernel transmit buffer size
|
ZMQ_SNDBUF: Set kernel transmit buffer size
|
||||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||||
The 'ZMQ_SNDBUF' option shall set the underlying kernel transmit buffer size
|
The 'ZMQ_SNDBUF' option shall set the underlying kernel transmit buffer size
|
||||||
|
@ -383,26 +383,26 @@ non-0MQ peer, when using the tcp:// transport. A 'ZMQ_STREAM' socket can
|
|||||||
act as client and/or server, sending and/or receiving TCP data asynchronously.
|
act as client and/or server, sending and/or receiving TCP data asynchronously.
|
||||||
|
|
||||||
When receiving TCP data, a 'ZMQ_STREAM' socket shall prepend a message part
|
When receiving TCP data, a 'ZMQ_STREAM' socket shall prepend a message part
|
||||||
containing the _routing id_ of the originating peer to the message before passing
|
containing the _identity_ of the originating peer to the message before passing
|
||||||
it to the application. Messages received are fair-queued from among all
|
it to the application. Messages received are fair-queued from among all
|
||||||
connected peers.
|
connected peers.
|
||||||
|
|
||||||
When sending TCP data, a 'ZMQ_STREAM' socket shall remove the first part of the
|
When sending TCP data, a 'ZMQ_STREAM' socket shall remove the first part of the
|
||||||
message and use it to determine the _routing id_ of the peer the message shall be
|
message and use it to determine the _identity_ of the peer the message shall be
|
||||||
routed to, and unroutable messages shall cause an EHOSTUNREACH or EAGAIN error.
|
routed to, and unroutable messages shall cause an EHOSTUNREACH or EAGAIN error.
|
||||||
|
|
||||||
To open a connection to a server, use the zmq_connect call, and then fetch the
|
To open a connection to a server, use the zmq_connect call, and then fetch the
|
||||||
socket routing id using the zmq_getsockopt call with the ZMQ_ROUTING_ID option.
|
socket identity using the ZMQ_IDENTITY zmq_getsockopt call.
|
||||||
|
|
||||||
To close a specific connection, send the routing id frame followed by a
|
To close a specific connection, send the identity frame followed by a
|
||||||
zero-length message (see EXAMPLE section).
|
zero-length message (see EXAMPLE section).
|
||||||
|
|
||||||
When a connection is made, a zero-length message will be received by the
|
When a connection is made, a zero-length message will be received by the
|
||||||
application. Similarly, when the peer disconnects (or the connection is lost),
|
application. Similarly, when the peer disconnects (or the connection is lost),
|
||||||
a zero-length message will be received by the application.
|
a zero-length message will be received by the application.
|
||||||
|
|
||||||
You must send one routing id frame followed by one data frame. The ZMQ_SNDMORE
|
You must send one identity frame followed by one data frame. The ZMQ_SNDMORE
|
||||||
flag is required for routing id frames but is ignored on data frames.
|
flag is required for identity frames but is ignored on data frames.
|
||||||
|
|
||||||
[horizontal]
|
[horizontal]
|
||||||
.Summary of ZMQ_STREAM characteristics
|
.Summary of ZMQ_STREAM characteristics
|
||||||
@ -492,10 +492,10 @@ ZMQ_ROUTER
|
|||||||
^^^^^^^^^^
|
^^^^^^^^^^
|
||||||
A socket of type 'ZMQ_ROUTER' is an advanced socket type used for extending
|
A socket of type 'ZMQ_ROUTER' is an advanced socket type used for extending
|
||||||
request/reply sockets. When receiving messages a 'ZMQ_ROUTER' socket shall
|
request/reply sockets. When receiving messages a 'ZMQ_ROUTER' socket shall
|
||||||
prepend a message part containing the _routing id_ of the originating peer to the
|
prepend a message part containing the _identity_ of the originating peer to the
|
||||||
message before passing it to the application. Messages received are fair-queued
|
message before passing it to the application. Messages received are fair-queued
|
||||||
from among all connected peers. When sending messages a 'ZMQ_ROUTER' socket shall
|
from among all connected peers. When sending messages a 'ZMQ_ROUTER' socket shall
|
||||||
remove the first part of the message and use it to determine the _routing id _ of
|
remove the first part of the message and use it to determine the _identity_ of
|
||||||
the peer the message shall be routed to. If the peer does not exist anymore, or
|
the peer the message shall be routed to. If the peer does not exist anymore, or
|
||||||
has never existed, the message shall be silently discarded. However, if
|
has never existed, the message shall be silently discarded. However, if
|
||||||
'ZMQ_ROUTER_MANDATORY' socket option is set to '1', the socket shall fail
|
'ZMQ_ROUTER_MANDATORY' socket option is set to '1', the socket shall fail
|
||||||
@ -514,9 +514,9 @@ or more peers. Likewise, the socket shall generate 'ZMQ_POLLOUT' events when
|
|||||||
at least one message can be sent to one or more peers.
|
at least one message can be sent to one or more peers.
|
||||||
|
|
||||||
When a 'ZMQ_REQ' socket is connected to a 'ZMQ_ROUTER' socket, in addition to the
|
When a 'ZMQ_REQ' socket is connected to a 'ZMQ_ROUTER' socket, in addition to the
|
||||||
_routing id_ of the originating peer each message received shall contain an empty
|
_identity_ of the originating peer each message received shall contain an empty
|
||||||
_delimiter_ message part. Hence, the entire structure of each received message
|
_delimiter_ message part. Hence, the entire structure of each received message
|
||||||
as seen by the application becomes: one or more _routing id_ parts, _delimiter_
|
as seen by the application becomes: one or more _identity_ parts, _delimiter_
|
||||||
part, one or more _body parts_. When sending replies to a 'ZMQ_REQ' socket the
|
part, one or more _body parts_. When sending replies to a 'ZMQ_REQ' socket the
|
||||||
application must include the _delimiter_ part.
|
application must include the _delimiter_ part.
|
||||||
|
|
||||||
@ -559,16 +559,16 @@ void *socket = zmq_socket (ctx, ZMQ_STREAM);
|
|||||||
assert (socket);
|
assert (socket);
|
||||||
int rc = zmq_bind (socket, "tcp://*:8080");
|
int rc = zmq_bind (socket, "tcp://*:8080");
|
||||||
assert (rc == 0);
|
assert (rc == 0);
|
||||||
/* Data structure to hold the ZMQ_STREAM routing id */
|
/* Data structure to hold the ZMQ_STREAM ID */
|
||||||
uint8_t routing_id [256];
|
uint8_t id [256];
|
||||||
size_t routing_id_size = 256;
|
size_t id_size = 256;
|
||||||
/* Data structure to hold the ZMQ_STREAM received data */
|
/* Data structure to hold the ZMQ_STREAM received data */
|
||||||
uint8_t raw [256];
|
uint8_t raw [256];
|
||||||
size_t raw_size = 256;
|
size_t raw_size = 256;
|
||||||
while (1) {
|
while (1) {
|
||||||
/* Get HTTP request; routing id frame and then request */
|
/* Get HTTP request; ID frame and then request */
|
||||||
routing_id_size = zmq_recv (socket, routing_id, 256, 0);
|
id_size = zmq_recv (socket, id, 256, 0);
|
||||||
assert (routing_id_size > 0);
|
assert (id_size > 0);
|
||||||
do {
|
do {
|
||||||
raw_size = zmq_recv (socket, raw, 256, 0);
|
raw_size = zmq_recv (socket, raw, 256, 0);
|
||||||
assert (raw_size >= 0);
|
assert (raw_size >= 0);
|
||||||
@ -579,11 +579,11 @@ while (1) {
|
|||||||
"Content-Type: text/plain\r\n"
|
"Content-Type: text/plain\r\n"
|
||||||
"\r\n"
|
"\r\n"
|
||||||
"Hello, World!";
|
"Hello, World!";
|
||||||
/* Sends the routing id frame followed by the response */
|
/* Sends the ID frame followed by the response */
|
||||||
zmq_send (socket, routing_id, routing_id_size, ZMQ_SNDMORE);
|
zmq_send (socket, id, id_size, ZMQ_SNDMORE);
|
||||||
zmq_send (socket, http_response, strlen (http_response), 0);
|
zmq_send (socket, http_response, strlen (http_response), 0);
|
||||||
/* Closes the connection by sending the routing id frame followed by a zero response */
|
/* Closes the connection by sending the ID frame followed by a zero response */
|
||||||
zmq_send (socket, routing_id, routing_id_size, ZMQ_SNDMORE);
|
zmq_send (socket, id, id_size, ZMQ_SNDMORE);
|
||||||
zmq_send (socket, 0, 0, 0);
|
zmq_send (socket, 0, 0, 0);
|
||||||
}
|
}
|
||||||
zmq_close (socket);
|
zmq_close (socket);
|
||||||
|
@ -299,7 +299,7 @@ ZMQ_EXPORT const char *zmq_msg_gets (const zmq_msg_t *msg, const char *property)
|
|||||||
|
|
||||||
/* Socket options. */
|
/* Socket options. */
|
||||||
#define ZMQ_AFFINITY 4
|
#define ZMQ_AFFINITY 4
|
||||||
#define ZMQ_ROUTING_ID 5
|
#define ZMQ_IDENTITY 5
|
||||||
#define ZMQ_SUBSCRIBE 6
|
#define ZMQ_SUBSCRIBE 6
|
||||||
#define ZMQ_UNSUBSCRIBE 7
|
#define ZMQ_UNSUBSCRIBE 7
|
||||||
#define ZMQ_RATE 8
|
#define ZMQ_RATE 8
|
||||||
@ -345,7 +345,7 @@ ZMQ_EXPORT const char *zmq_msg_gets (const zmq_msg_t *msg, const char *property)
|
|||||||
#define ZMQ_ZAP_DOMAIN 55
|
#define ZMQ_ZAP_DOMAIN 55
|
||||||
#define ZMQ_ROUTER_HANDOVER 56
|
#define ZMQ_ROUTER_HANDOVER 56
|
||||||
#define ZMQ_TOS 57
|
#define ZMQ_TOS 57
|
||||||
#define ZMQ_CONNECT_ROUTING_ID 61
|
#define ZMQ_CONNECT_RID 61
|
||||||
#define ZMQ_GSSAPI_SERVER 62
|
#define ZMQ_GSSAPI_SERVER 62
|
||||||
#define ZMQ_GSSAPI_PRINCIPAL 63
|
#define ZMQ_GSSAPI_PRINCIPAL 63
|
||||||
#define ZMQ_GSSAPI_SERVICE_PRINCIPAL 64
|
#define ZMQ_GSSAPI_SERVICE_PRINCIPAL 64
|
||||||
@ -390,8 +390,6 @@ ZMQ_EXPORT const char *zmq_msg_gets (const zmq_msg_t *msg, const char *property)
|
|||||||
#define ZMQ_GROUP_MAX_LENGTH 15
|
#define ZMQ_GROUP_MAX_LENGTH 15
|
||||||
|
|
||||||
/* Deprecated options and aliases */
|
/* Deprecated options and aliases */
|
||||||
#define ZMQ_IDENTITY ZMQ_ROUTING_ID
|
|
||||||
#define ZMQ_CONNECT_RID ZMQ_CONNECT_ROUTING_ID
|
|
||||||
#define ZMQ_TCP_ACCEPT_FILTER 38
|
#define ZMQ_TCP_ACCEPT_FILTER 38
|
||||||
#define ZMQ_IPC_FILTER_PID 58
|
#define ZMQ_IPC_FILTER_PID 58
|
||||||
#define ZMQ_IPC_FILTER_UID 59
|
#define ZMQ_IPC_FILTER_UID 59
|
||||||
@ -622,7 +620,7 @@ ZMQ_EXPORT int zmq_msg_set_group(zmq_msg_t *msg, const char *group);
|
|||||||
ZMQ_EXPORT const char *zmq_msg_group(zmq_msg_t *msg);
|
ZMQ_EXPORT const char *zmq_msg_group(zmq_msg_t *msg);
|
||||||
|
|
||||||
/* DRAFT Msg property names. */
|
/* DRAFT Msg property names. */
|
||||||
#define ZMQ_MSG_PROPERTY_ROUTING_ID "Routing-Id"
|
#define ZMQ_MSG_PROPERTY_IDENTITY "Identity"
|
||||||
#define ZMQ_MSG_PROPERTY_SOCKET_TYPE "Socket-Type"
|
#define ZMQ_MSG_PROPERTY_SOCKET_TYPE "Socket-Type"
|
||||||
#define ZMQ_MSG_PROPERTY_USER_ID "User-Id"
|
#define ZMQ_MSG_PROPERTY_USER_ID "User-Id"
|
||||||
#define ZMQ_MSG_PROPERTY_PEER_ADDRESS "Peer-Address"
|
#define ZMQ_MSG_PROPERTY_PEER_ADDRESS "Peer-Address"
|
||||||
@ -664,8 +662,8 @@ ZMQ_EXPORT int zmq_poller_remove_fd (void *poller, int fd);
|
|||||||
#endif
|
#endif
|
||||||
|
|
||||||
ZMQ_EXPORT int zmq_socket_get_peer_state (void *socket,
|
ZMQ_EXPORT int zmq_socket_get_peer_state (void *socket,
|
||||||
const void *routing_id,
|
const void *identity,
|
||||||
size_t routing_id_size);
|
size_t identity_size);
|
||||||
|
|
||||||
/******************************************************************************/
|
/******************************************************************************/
|
||||||
/* Scheduling timers */
|
/* Scheduling timers */
|
||||||
|
16
src/ctx.cpp
16
src/ctx.cpp
@ -529,7 +529,7 @@ void zmq::ctx_t::connect_inproc_sockets (zmq::socket_base_t *bind_socket_,
|
|||||||
bind_socket_->inc_seqnum();
|
bind_socket_->inc_seqnum();
|
||||||
pending_connection_.bind_pipe->set_tid (bind_socket_->get_tid ());
|
pending_connection_.bind_pipe->set_tid (bind_socket_->get_tid ());
|
||||||
|
|
||||||
if (!bind_options.recv_routing_id) {
|
if (!bind_options.recv_identity) {
|
||||||
msg_t msg;
|
msg_t msg;
|
||||||
const bool ok = pending_connection_.bind_pipe->read (&msg);
|
const bool ok = pending_connection_.bind_pipe->read (&msg);
|
||||||
zmq_assert (ok);
|
zmq_assert (ok);
|
||||||
@ -569,16 +569,16 @@ void zmq::ctx_t::connect_inproc_sockets (zmq::socket_base_t *bind_socket_,
|
|||||||
// When a ctx is terminated all pending inproc connection will be
|
// When a ctx is terminated all pending inproc connection will be
|
||||||
// connected, but the socket will already be closed and the pipe will be
|
// connected, but the socket will already be closed and the pipe will be
|
||||||
// in waiting_for_delimiter state, which means no more writes can be done
|
// in waiting_for_delimiter state, which means no more writes can be done
|
||||||
// and the routing id write fails and causes an assert. Check if the socket
|
// and the identity write fails and causes an assert. Check if the socket
|
||||||
// is open before sending.
|
// is open before sending.
|
||||||
if (pending_connection_.endpoint.options.recv_routing_id &&
|
if (pending_connection_.endpoint.options.recv_identity &&
|
||||||
pending_connection_.endpoint.socket->check_tag ()) {
|
pending_connection_.endpoint.socket->check_tag ()) {
|
||||||
msg_t routing_id;
|
msg_t id;
|
||||||
const int rc = routing_id.init_size (bind_options.routing_id_size);
|
const int rc = id.init_size (bind_options.identity_size);
|
||||||
errno_assert (rc == 0);
|
errno_assert (rc == 0);
|
||||||
memcpy (routing_id.data (), bind_options.routing_id, bind_options.routing_id_size);
|
memcpy (id.data (), bind_options.identity, bind_options.identity_size);
|
||||||
routing_id.set_flags (msg_t::routing_id);
|
id.set_flags (msg_t::identity);
|
||||||
const bool written = pending_connection_.bind_pipe->write (&routing_id);
|
const bool written = pending_connection_.bind_pipe->write (&id);
|
||||||
zmq_assert (written);
|
zmq_assert (written);
|
||||||
pending_connection_.bind_pipe->flush ();
|
pending_connection_.bind_pipe->flush ();
|
||||||
}
|
}
|
||||||
|
@ -46,17 +46,17 @@ zmq::mechanism_t::~mechanism_t ()
|
|||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
void zmq::mechanism_t::set_peer_routing_id (const void *id_ptr, size_t id_size)
|
void zmq::mechanism_t::set_peer_identity (const void *id_ptr, size_t id_size)
|
||||||
{
|
{
|
||||||
routing_id = blob_t (static_cast <const unsigned char*> (id_ptr), id_size);
|
identity = blob_t (static_cast <const unsigned char*> (id_ptr), id_size);
|
||||||
}
|
}
|
||||||
|
|
||||||
void zmq::mechanism_t::peer_routing_id (msg_t *msg_)
|
void zmq::mechanism_t::peer_identity (msg_t *msg_)
|
||||||
{
|
{
|
||||||
const int rc = msg_->init_size (routing_id.size ());
|
const int rc = msg_->init_size (identity.size ());
|
||||||
errno_assert (rc == 0);
|
errno_assert (rc == 0);
|
||||||
memcpy (msg_->data (), routing_id.data (), routing_id.size ());
|
memcpy (msg_->data (), identity.data (), identity.size ());
|
||||||
msg_->set_flags (msg_t::routing_id);
|
msg_->set_flags (msg_t::identity);
|
||||||
}
|
}
|
||||||
|
|
||||||
void zmq::mechanism_t::set_user_id (const void *data_, size_t size_)
|
void zmq::mechanism_t::set_user_id (const void *data_, size_t size_)
|
||||||
@ -132,12 +132,12 @@ size_t zmq::mechanism_t::add_basic_properties (unsigned char *buf,
|
|||||||
ZMQ_MSG_PROPERTY_SOCKET_TYPE, socket_type,
|
ZMQ_MSG_PROPERTY_SOCKET_TYPE, socket_type,
|
||||||
strlen (socket_type));
|
strlen (socket_type));
|
||||||
|
|
||||||
// Add routing id property
|
// Add identity property
|
||||||
if (options.type == ZMQ_REQ || options.type == ZMQ_DEALER
|
if (options.type == ZMQ_REQ || options.type == ZMQ_DEALER
|
||||||
|| options.type == ZMQ_ROUTER)
|
|| options.type == ZMQ_ROUTER)
|
||||||
ptr += add_property (ptr, buf_capacity - (ptr - buf),
|
ptr += add_property (ptr, buf_capacity - (ptr - buf),
|
||||||
ZMQ_MSG_PROPERTY_ROUTING_ID, options.routing_id,
|
ZMQ_MSG_PROPERTY_IDENTITY, options.identity,
|
||||||
options.routing_id_size);
|
options.identity_size);
|
||||||
|
|
||||||
return ptr - buf;
|
return ptr - buf;
|
||||||
}
|
}
|
||||||
@ -148,8 +148,8 @@ size_t zmq::mechanism_t::basic_properties_len() const
|
|||||||
return property_len (ZMQ_MSG_PROPERTY_SOCKET_TYPE, strlen (socket_type))
|
return property_len (ZMQ_MSG_PROPERTY_SOCKET_TYPE, strlen (socket_type))
|
||||||
+ ((options.type == ZMQ_REQ || options.type == ZMQ_DEALER
|
+ ((options.type == ZMQ_REQ || options.type == ZMQ_DEALER
|
||||||
|| options.type == ZMQ_ROUTER)
|
|| options.type == ZMQ_ROUTER)
|
||||||
? property_len (ZMQ_MSG_PROPERTY_ROUTING_ID,
|
? property_len (ZMQ_MSG_PROPERTY_IDENTITY,
|
||||||
options.routing_id_size)
|
options.identity_size)
|
||||||
: 0);
|
: 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -199,8 +199,8 @@ int zmq::mechanism_t::parse_metadata (const unsigned char *ptr_,
|
|||||||
ptr_ += value_length;
|
ptr_ += value_length;
|
||||||
bytes_left -= value_length;
|
bytes_left -= value_length;
|
||||||
|
|
||||||
if (name == ZMQ_MSG_PROPERTY_ROUTING_ID && options.recv_routing_id)
|
if (name == ZMQ_MSG_PROPERTY_IDENTITY && options.recv_identity)
|
||||||
set_peer_routing_id (value, value_length);
|
set_peer_identity (value, value_length);
|
||||||
else
|
else
|
||||||
if (name == ZMQ_MSG_PROPERTY_SOCKET_TYPE) {
|
if (name == ZMQ_MSG_PROPERTY_SOCKET_TYPE) {
|
||||||
const std::string socket_type ((char *) value, value_length);
|
const std::string socket_type ((char *) value, value_length);
|
||||||
|
@ -74,9 +74,9 @@ namespace zmq
|
|||||||
// Returns the status of this mechanism.
|
// Returns the status of this mechanism.
|
||||||
virtual status_t status () const = 0;
|
virtual status_t status () const = 0;
|
||||||
|
|
||||||
void set_peer_routing_id (const void *id_ptr, size_t id_size);
|
void set_peer_identity (const void *id_ptr, size_t id_size);
|
||||||
|
|
||||||
void peer_routing_id (msg_t *msg_);
|
void peer_identity (msg_t *msg_);
|
||||||
|
|
||||||
void set_user_id (const void *user_id, size_t size);
|
void set_user_id (const void *user_id, size_t size);
|
||||||
|
|
||||||
@ -138,7 +138,7 @@ namespace zmq
|
|||||||
|
|
||||||
private:
|
private:
|
||||||
|
|
||||||
blob_t routing_id;
|
blob_t identity;
|
||||||
|
|
||||||
blob_t user_id;
|
blob_t user_id;
|
||||||
|
|
||||||
|
@ -39,14 +39,8 @@ zmq::metadata_t::metadata_t (const dict_t &dict) :
|
|||||||
const char *zmq::metadata_t::get (const std::string &property) const
|
const char *zmq::metadata_t::get (const std::string &property) const
|
||||||
{
|
{
|
||||||
dict_t::const_iterator it = dict.find (property);
|
dict_t::const_iterator it = dict.find (property);
|
||||||
if (it == dict.end())
|
if (it == dict.end ())
|
||||||
{
|
|
||||||
/** \todo remove this when support for the deprecated name "Identity" is dropped */
|
|
||||||
if (property == "Identity")
|
|
||||||
return get (ZMQ_MSG_PROPERTY_ROUTING_ID);
|
|
||||||
|
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
|
||||||
else
|
else
|
||||||
return it->second.c_str ();
|
return it->second.c_str ();
|
||||||
}
|
}
|
||||||
|
@ -413,9 +413,9 @@ void zmq::msg_t::reset_metadata ()
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
bool zmq::msg_t::is_routing_id () const
|
bool zmq::msg_t::is_identity () const
|
||||||
{
|
{
|
||||||
return (u.base.flags & routing_id) == routing_id;
|
return (u.base.flags & identity) == identity;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool zmq::msg_t::is_credential () const
|
bool zmq::msg_t::is_credential () const
|
||||||
|
@ -79,7 +79,7 @@ namespace zmq
|
|||||||
more = 1, // Followed by more parts
|
more = 1, // Followed by more parts
|
||||||
command = 2, // Command frame (see ZMTP spec)
|
command = 2, // Command frame (see ZMTP spec)
|
||||||
credential = 32,
|
credential = 32,
|
||||||
routing_id = 64,
|
identity = 64,
|
||||||
shared = 128
|
shared = 128
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -109,7 +109,7 @@ namespace zmq
|
|||||||
metadata_t *metadata () const;
|
metadata_t *metadata () const;
|
||||||
void set_metadata (metadata_t *metadata_);
|
void set_metadata (metadata_t *metadata_);
|
||||||
void reset_metadata ();
|
void reset_metadata ();
|
||||||
bool is_routing_id () const;
|
bool is_identity () const;
|
||||||
bool is_credential () const;
|
bool is_credential () const;
|
||||||
bool is_delimiter () const;
|
bool is_delimiter () const;
|
||||||
bool is_join () const;
|
bool is_join () const;
|
||||||
|
@ -48,7 +48,7 @@ zmq::options_t::options_t () :
|
|||||||
sndhwm (1000),
|
sndhwm (1000),
|
||||||
rcvhwm (1000),
|
rcvhwm (1000),
|
||||||
affinity (0),
|
affinity (0),
|
||||||
routing_id_size (0),
|
identity_size (0),
|
||||||
rate (100),
|
rate (100),
|
||||||
recovery_ivl (10000),
|
recovery_ivl (10000),
|
||||||
multicast_hops (1),
|
multicast_hops (1),
|
||||||
@ -70,7 +70,7 @@ zmq::options_t::options_t () :
|
|||||||
immediate (0),
|
immediate (0),
|
||||||
filter (false),
|
filter (false),
|
||||||
invert_matching(false),
|
invert_matching(false),
|
||||||
recv_routing_id (false),
|
recv_identity (false),
|
||||||
raw_socket (false),
|
raw_socket (false),
|
||||||
raw_notify (true),
|
raw_notify (true),
|
||||||
tcp_keepalive (-1),
|
tcp_keepalive (-1),
|
||||||
@ -166,11 +166,11 @@ int zmq::options_t::setsockopt (int option_, const void *optval_,
|
|||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
|
|
||||||
case ZMQ_ROUTING_ID:
|
case ZMQ_IDENTITY:
|
||||||
// Routing id is any binary string from 1 to 255 octets
|
// Identity is any binary string from 1 to 255 octets
|
||||||
if (optvallen_ > 0 && optvallen_ < 256) {
|
if (optvallen_ > 0 && optvallen_ < 256) {
|
||||||
routing_id_size = (unsigned char) optvallen_;
|
identity_size = (unsigned char) optvallen_;
|
||||||
memcpy (routing_id, optval_, routing_id_size);
|
memcpy (identity, optval_, identity_size);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
@ -679,10 +679,10 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_)
|
|||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
|
|
||||||
case ZMQ_ROUTING_ID:
|
case ZMQ_IDENTITY:
|
||||||
if (*optvallen_ >= routing_id_size) {
|
if (*optvallen_ >= identity_size) {
|
||||||
memcpy (optval_, routing_id, routing_id_size);
|
memcpy (optval_, identity, identity_size);
|
||||||
*optvallen_ = routing_id_size;
|
*optvallen_ = identity_size;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
|
@ -70,9 +70,9 @@ namespace zmq
|
|||||||
// I/O thread affinity.
|
// I/O thread affinity.
|
||||||
uint64_t affinity;
|
uint64_t affinity;
|
||||||
|
|
||||||
// Socket routing id.
|
// Socket identity
|
||||||
unsigned char routing_id_size;
|
unsigned char identity_size;
|
||||||
unsigned char routing_id [256];
|
unsigned char identity [256];
|
||||||
|
|
||||||
// Maximum transfer rate [kb/s]. Default 100kb/s.
|
// Maximum transfer rate [kb/s]. Default 100kb/s.
|
||||||
int rate;
|
int rate;
|
||||||
@ -144,7 +144,7 @@ namespace zmq
|
|||||||
bool invert_matching;
|
bool invert_matching;
|
||||||
|
|
||||||
// If true, the identity message is forwarded to the socket.
|
// If true, the identity message is forwarded to the socket.
|
||||||
bool recv_routing_id;
|
bool recv_identity;
|
||||||
|
|
||||||
// if true, router socket accepts non-zmq tcp connections
|
// if true, router socket accepts non-zmq tcp connections
|
||||||
bool raw_socket;
|
bool raw_socket;
|
||||||
|
24
src/pipe.cpp
24
src/pipe.cpp
@ -92,7 +92,7 @@ zmq::pipe_t::pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_,
|
|||||||
sink (NULL),
|
sink (NULL),
|
||||||
state (active),
|
state (active),
|
||||||
delay (true),
|
delay (true),
|
||||||
server_socket_routing_id (0),
|
routing_id(0),
|
||||||
conflate (conflate_)
|
conflate (conflate_)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
@ -115,24 +115,24 @@ void zmq::pipe_t::set_event_sink (i_pipe_events *sink_)
|
|||||||
sink = sink_;
|
sink = sink_;
|
||||||
}
|
}
|
||||||
|
|
||||||
void zmq::pipe_t::set_server_socket_routing_id (uint32_t server_socket_routing_id_)
|
void zmq::pipe_t::set_routing_id (uint32_t routing_id_)
|
||||||
{
|
{
|
||||||
server_socket_routing_id = server_socket_routing_id_;
|
routing_id = routing_id_;
|
||||||
}
|
}
|
||||||
|
|
||||||
uint32_t zmq::pipe_t::get_server_socket_routing_id ()
|
uint32_t zmq::pipe_t::get_routing_id ()
|
||||||
{
|
{
|
||||||
return server_socket_routing_id;
|
return routing_id;
|
||||||
}
|
}
|
||||||
|
|
||||||
void zmq::pipe_t::set_router_socket_routing_id (const blob_t &router_socket_routing_id_)
|
void zmq::pipe_t::set_identity (const blob_t &identity_)
|
||||||
{
|
{
|
||||||
router_socket_routing_id = router_socket_routing_id_;
|
identity = identity_;
|
||||||
}
|
}
|
||||||
|
|
||||||
zmq::blob_t zmq::pipe_t::get_routing_id ()
|
zmq::blob_t zmq::pipe_t::get_identity ()
|
||||||
{
|
{
|
||||||
return router_socket_routing_id;
|
return identity;
|
||||||
}
|
}
|
||||||
|
|
||||||
zmq::blob_t zmq::pipe_t::get_credential () const
|
zmq::blob_t zmq::pipe_t::get_credential () const
|
||||||
@ -194,7 +194,7 @@ read_message:
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!(msg_->flags () & msg_t::more) && !msg_->is_routing_id ())
|
if (!(msg_->flags () & msg_t::more) && !msg_->is_identity ())
|
||||||
msgs_read++;
|
msgs_read++;
|
||||||
|
|
||||||
if (lwm > 0 && msgs_read % lwm == 0)
|
if (lwm > 0 && msgs_read % lwm == 0)
|
||||||
@ -224,9 +224,9 @@ bool zmq::pipe_t::write (msg_t *msg_)
|
|||||||
return false;
|
return false;
|
||||||
|
|
||||||
bool more = msg_->flags () & msg_t::more ? true : false;
|
bool more = msg_->flags () & msg_t::more ? true : false;
|
||||||
const bool is_routing_id = msg_->is_routing_id ();
|
const bool is_identity = msg_->is_identity ();
|
||||||
outpipe->write (*msg_, more);
|
outpipe->write (*msg_, more);
|
||||||
if (!more && !is_routing_id)
|
if (!more && !is_identity)
|
||||||
msgs_written++;
|
msgs_written++;
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
|
12
src/pipe.hpp
12
src/pipe.hpp
@ -85,12 +85,12 @@ namespace zmq
|
|||||||
void set_event_sink (i_pipe_events *sink_);
|
void set_event_sink (i_pipe_events *sink_);
|
||||||
|
|
||||||
// Pipe endpoint can store an routing ID to be used by its clients.
|
// Pipe endpoint can store an routing ID to be used by its clients.
|
||||||
void set_server_socket_routing_id (uint32_t routing_id_);
|
void set_routing_id (uint32_t routing_id_);
|
||||||
uint32_t get_server_socket_routing_id ();
|
uint32_t get_routing_id ();
|
||||||
|
|
||||||
// Pipe endpoint can store an opaque ID to be used by its clients.
|
// Pipe endpoint can store an opaque ID to be used by its clients.
|
||||||
void set_router_socket_routing_id (const blob_t &identity_);
|
void set_identity (const blob_t &identity_);
|
||||||
blob_t get_routing_id ();
|
blob_t get_identity ();
|
||||||
|
|
||||||
blob_t get_credential () const;
|
blob_t get_credential () const;
|
||||||
|
|
||||||
@ -227,10 +227,10 @@ namespace zmq
|
|||||||
bool delay;
|
bool delay;
|
||||||
|
|
||||||
// Identity of the writer. Used uniquely by the reader side.
|
// Identity of the writer. Used uniquely by the reader side.
|
||||||
blob_t router_socket_routing_id;
|
blob_t identity;
|
||||||
|
|
||||||
// Identity of the writer. Used uniquely by the reader side.
|
// Identity of the writer. Used uniquely by the reader side.
|
||||||
int server_socket_routing_id;
|
int routing_id;
|
||||||
|
|
||||||
// Pipe's credential.
|
// Pipe's credential.
|
||||||
blob_t credential;
|
blob_t credential;
|
||||||
|
@ -75,7 +75,7 @@ int zmq::req_t::xsend (msg_t *msg_)
|
|||||||
message_begins = true;
|
message_begins = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
// First part of the request is the request routing id.
|
// First part of the request is the request identity.
|
||||||
if (message_begins) {
|
if (message_begins) {
|
||||||
reply_pipe = NULL;
|
reply_pipe = NULL;
|
||||||
|
|
||||||
|
116
src/router.cpp
116
src/router.cpp
@ -39,13 +39,13 @@
|
|||||||
zmq::router_t::router_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
|
zmq::router_t::router_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
|
||||||
socket_base_t (parent_, tid_, sid_),
|
socket_base_t (parent_, tid_, sid_),
|
||||||
prefetched (false),
|
prefetched (false),
|
||||||
routing_id_sent (false),
|
identity_sent (false),
|
||||||
current_in (NULL),
|
current_in (NULL),
|
||||||
terminate_current_in (false),
|
terminate_current_in (false),
|
||||||
more_in (false),
|
more_in (false),
|
||||||
current_out (NULL),
|
current_out (NULL),
|
||||||
more_out (false),
|
more_out (false),
|
||||||
next_integral_routing_id (generate_random ()),
|
next_rid (generate_random ()),
|
||||||
mandatory (false),
|
mandatory (false),
|
||||||
// raw_socket functionality in ROUTER is deprecated
|
// raw_socket functionality in ROUTER is deprecated
|
||||||
raw_socket (false),
|
raw_socket (false),
|
||||||
@ -53,7 +53,7 @@ zmq::router_t::router_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
|
|||||||
handover (false)
|
handover (false)
|
||||||
{
|
{
|
||||||
options.type = ZMQ_ROUTER;
|
options.type = ZMQ_ROUTER;
|
||||||
options.recv_routing_id = true;
|
options.recv_identity = true;
|
||||||
options.raw_socket = false;
|
options.raw_socket = false;
|
||||||
|
|
||||||
prefetched_id.init ();
|
prefetched_id.init ();
|
||||||
@ -87,8 +87,8 @@ void zmq::router_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
|
|||||||
errno_assert (rc == 0);
|
errno_assert (rc == 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
bool routing_id_ok = identify_peer (pipe_);
|
bool identity_ok = identify_peer (pipe_);
|
||||||
if (routing_id_ok)
|
if (identity_ok)
|
||||||
fq.attach (pipe_);
|
fq.attach (pipe_);
|
||||||
else
|
else
|
||||||
anonymous_pipes.insert (pipe_);
|
anonymous_pipes.insert (pipe_);
|
||||||
@ -102,9 +102,9 @@ int zmq::router_t::xsetsockopt (int option_, const void *optval_,
|
|||||||
if (is_int) memcpy(&value, optval_, sizeof (int));
|
if (is_int) memcpy(&value, optval_, sizeof (int));
|
||||||
|
|
||||||
switch (option_) {
|
switch (option_) {
|
||||||
case ZMQ_CONNECT_ROUTING_ID:
|
case ZMQ_CONNECT_RID:
|
||||||
if (optval_ && optvallen_) {
|
if (optval_ && optvallen_) {
|
||||||
connect_routing_id.assign ((char *) optval_, optvallen_);
|
connect_rid.assign ((char *) optval_, optvallen_);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
@ -113,7 +113,7 @@ int zmq::router_t::xsetsockopt (int option_, const void *optval_,
|
|||||||
if (is_int && value >= 0) {
|
if (is_int && value >= 0) {
|
||||||
raw_socket = (value != 0);
|
raw_socket = (value != 0);
|
||||||
if (raw_socket) {
|
if (raw_socket) {
|
||||||
options.recv_routing_id = false;
|
options.recv_identity = false;
|
||||||
options.raw_socket = true;
|
options.raw_socket = true;
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
@ -155,7 +155,7 @@ void zmq::router_t::xpipe_terminated (pipe_t *pipe_)
|
|||||||
if (it != anonymous_pipes.end ())
|
if (it != anonymous_pipes.end ())
|
||||||
anonymous_pipes.erase (it);
|
anonymous_pipes.erase (it);
|
||||||
else {
|
else {
|
||||||
outpipes_t::iterator iter = outpipes.find (pipe_->get_routing_id ());
|
outpipes_t::iterator iter = outpipes.find (pipe_->get_identity ());
|
||||||
zmq_assert (iter != outpipes.end ());
|
zmq_assert (iter != outpipes.end ());
|
||||||
outpipes.erase (iter);
|
outpipes.erase (iter);
|
||||||
fq.pipe_terminated (pipe_);
|
fq.pipe_terminated (pipe_);
|
||||||
@ -171,8 +171,8 @@ void zmq::router_t::xread_activated (pipe_t *pipe_)
|
|||||||
if (it == anonymous_pipes.end ())
|
if (it == anonymous_pipes.end ())
|
||||||
fq.activated (pipe_);
|
fq.activated (pipe_);
|
||||||
else {
|
else {
|
||||||
bool routing_id_ok = identify_peer (pipe_);
|
bool identity_ok = identify_peer (pipe_);
|
||||||
if (routing_id_ok) {
|
if (identity_ok) {
|
||||||
anonymous_pipes.erase (it);
|
anonymous_pipes.erase (it);
|
||||||
fq.attach (pipe_);
|
fq.attach (pipe_);
|
||||||
}
|
}
|
||||||
@ -205,11 +205,11 @@ int zmq::router_t::xsend (msg_t *msg_)
|
|||||||
|
|
||||||
more_out = true;
|
more_out = true;
|
||||||
|
|
||||||
// Find the pipe associated with the routing id stored in the prefix.
|
// Find the pipe associated with the identity stored in the prefix.
|
||||||
// If there's no such pipe just silently ignore the message, unless
|
// If there's no such pipe just silently ignore the message, unless
|
||||||
// router_mandatory is set.
|
// router_mandatory is set.
|
||||||
blob_t routing_id ((unsigned char*) msg_->data (), msg_->size ());
|
blob_t identity ((unsigned char*) msg_->data (), msg_->size ());
|
||||||
outpipes_t::iterator it = outpipes.find (routing_id);
|
outpipes_t::iterator it = outpipes.find (identity);
|
||||||
|
|
||||||
if (it != outpipes.end ()) {
|
if (it != outpipes.end ()) {
|
||||||
current_out = it->second.pipe;
|
current_out = it->second.pipe;
|
||||||
@ -300,10 +300,10 @@ int zmq::router_t::xsend (msg_t *msg_)
|
|||||||
int zmq::router_t::xrecv (msg_t *msg_)
|
int zmq::router_t::xrecv (msg_t *msg_)
|
||||||
{
|
{
|
||||||
if (prefetched) {
|
if (prefetched) {
|
||||||
if (!routing_id_sent) {
|
if (!identity_sent) {
|
||||||
int rc = msg_->move (prefetched_id);
|
int rc = msg_->move (prefetched_id);
|
||||||
errno_assert (rc == 0);
|
errno_assert (rc == 0);
|
||||||
routing_id_sent = true;
|
identity_sent = true;
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
int rc = msg_->move (prefetched_msg);
|
int rc = msg_->move (prefetched_msg);
|
||||||
@ -325,10 +325,10 @@ int zmq::router_t::xrecv (msg_t *msg_)
|
|||||||
pipe_t *pipe = NULL;
|
pipe_t *pipe = NULL;
|
||||||
int rc = fq.recvpipe (msg_, &pipe);
|
int rc = fq.recvpipe (msg_, &pipe);
|
||||||
|
|
||||||
// It's possible that we receive peer's routing id. That happens
|
// It's possible that we receive peer's identity. That happens
|
||||||
// after reconnection. The current implementation assumes that
|
// after reconnection. The current implementation assumes that
|
||||||
// the peer always uses the same routing id.
|
// the peer always uses the same identity.
|
||||||
while (rc == 0 && msg_->is_routing_id ())
|
while (rc == 0 && msg_->is_identity ())
|
||||||
rc = fq.recvpipe (msg_, &pipe);
|
rc = fq.recvpipe (msg_, &pipe);
|
||||||
|
|
||||||
if (rc != 0)
|
if (rc != 0)
|
||||||
@ -357,14 +357,14 @@ int zmq::router_t::xrecv (msg_t *msg_)
|
|||||||
prefetched = true;
|
prefetched = true;
|
||||||
current_in = pipe;
|
current_in = pipe;
|
||||||
|
|
||||||
blob_t routing_id = pipe->get_routing_id ();
|
blob_t identity = pipe->get_identity ();
|
||||||
rc = msg_->init_size (routing_id.size ());
|
rc = msg_->init_size (identity.size ());
|
||||||
errno_assert (rc == 0);
|
errno_assert (rc == 0);
|
||||||
memcpy (msg_->data (), routing_id.data (), routing_id.size ());
|
memcpy (msg_->data (), identity.data (), identity.size ());
|
||||||
msg_->set_flags (msg_t::more);
|
msg_->set_flags (msg_t::more);
|
||||||
if (prefetched_msg.metadata())
|
if (prefetched_msg.metadata())
|
||||||
msg_->set_metadata(prefetched_msg.metadata());
|
msg_->set_metadata(prefetched_msg.metadata());
|
||||||
routing_id_sent = true;
|
identity_sent = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
@ -396,11 +396,11 @@ bool zmq::router_t::xhas_in ()
|
|||||||
pipe_t *pipe = NULL;
|
pipe_t *pipe = NULL;
|
||||||
int rc = fq.recvpipe (&prefetched_msg, &pipe);
|
int rc = fq.recvpipe (&prefetched_msg, &pipe);
|
||||||
|
|
||||||
// It's possible that we receive peer's routing id. That happens
|
// It's possible that we receive peer's identity. That happens
|
||||||
// after reconnection. The current implementation assumes that
|
// after reconnection. The current implementation assumes that
|
||||||
// the peer always uses the same routing id.
|
// the peer always uses the same identity.
|
||||||
// TODO: handle the situation when the peer changes its routing id.
|
// TODO: handle the situation when the peer changes its identity.
|
||||||
while (rc == 0 && prefetched_msg.is_routing_id ())
|
while (rc == 0 && prefetched_msg.is_identity ())
|
||||||
rc = fq.recvpipe (&prefetched_msg, &pipe);
|
rc = fq.recvpipe (&prefetched_msg, &pipe);
|
||||||
|
|
||||||
if (rc != 0)
|
if (rc != 0)
|
||||||
@ -408,14 +408,14 @@ bool zmq::router_t::xhas_in ()
|
|||||||
|
|
||||||
zmq_assert (pipe != NULL);
|
zmq_assert (pipe != NULL);
|
||||||
|
|
||||||
blob_t routing_id = pipe->get_routing_id ();
|
blob_t identity = pipe->get_identity ();
|
||||||
rc = prefetched_id.init_size (routing_id.size ());
|
rc = prefetched_id.init_size (identity.size ());
|
||||||
errno_assert (rc == 0);
|
errno_assert (rc == 0);
|
||||||
memcpy (prefetched_id.data (), routing_id.data (), routing_id.size ());
|
memcpy (prefetched_id.data (), identity.data (), identity.size ());
|
||||||
prefetched_id.set_flags (msg_t::more);
|
prefetched_id.set_flags (msg_t::more);
|
||||||
|
|
||||||
prefetched = true;
|
prefetched = true;
|
||||||
routing_id_sent = false;
|
identity_sent = false;
|
||||||
current_in = pipe;
|
current_in = pipe;
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
@ -443,13 +443,13 @@ zmq::blob_t zmq::router_t::get_credential () const
|
|||||||
return fq.get_credential ();
|
return fq.get_credential ();
|
||||||
}
|
}
|
||||||
|
|
||||||
int zmq::router_t::get_peer_state (const void *routing_id_,
|
int zmq::router_t::get_peer_state (const void *identity,
|
||||||
size_t routing_id_size_) const
|
size_t identity_size) const
|
||||||
{
|
{
|
||||||
int res = 0;
|
int res = 0;
|
||||||
|
|
||||||
blob_t routing_id_blob ((unsigned char *) routing_id_, routing_id_size_);
|
blob_t identity_blob ((unsigned char *) identity, identity_size);
|
||||||
outpipes_t::const_iterator it = outpipes.find (routing_id_blob);
|
outpipes_t::const_iterator it = outpipes.find (identity_blob);
|
||||||
if (it == outpipes.end ()) {
|
if (it == outpipes.end ()) {
|
||||||
errno = EHOSTUNREACH;
|
errno = EHOSTUNREACH;
|
||||||
return -1;
|
return -1;
|
||||||
@ -467,27 +467,27 @@ int zmq::router_t::get_peer_state (const void *routing_id_,
|
|||||||
bool zmq::router_t::identify_peer (pipe_t *pipe_)
|
bool zmq::router_t::identify_peer (pipe_t *pipe_)
|
||||||
{
|
{
|
||||||
msg_t msg;
|
msg_t msg;
|
||||||
blob_t routing_id;
|
blob_t identity;
|
||||||
bool ok;
|
bool ok;
|
||||||
|
|
||||||
if (connect_routing_id.length()) {
|
if (connect_rid.length()) {
|
||||||
routing_id = blob_t ((unsigned char*) connect_routing_id.c_str (),
|
identity = blob_t ((unsigned char*) connect_rid.c_str (),
|
||||||
connect_routing_id.length());
|
connect_rid.length());
|
||||||
connect_routing_id.clear ();
|
connect_rid.clear ();
|
||||||
outpipes_t::iterator it = outpipes.find (routing_id);
|
outpipes_t::iterator it = outpipes.find (identity);
|
||||||
if (it != outpipes.end ())
|
if (it != outpipes.end ())
|
||||||
zmq_assert(false); // Not allowed to duplicate an existing rid
|
zmq_assert(false); // Not allowed to duplicate an existing rid
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
if (options.raw_socket) { // Always assign an integral routing id for raw-socket
|
if (options.raw_socket) { // Always assign identity for raw-socket
|
||||||
unsigned char buf [5];
|
unsigned char buf [5];
|
||||||
buf [0] = 0;
|
buf [0] = 0;
|
||||||
put_uint32 (buf + 1, next_integral_routing_id++);
|
put_uint32 (buf + 1, next_rid++);
|
||||||
routing_id = blob_t (buf, sizeof buf);
|
identity = blob_t (buf, sizeof buf);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
if (!options.raw_socket) {
|
if (!options.raw_socket) {
|
||||||
// Pick up handshake cases and also case where next integral routing id is set
|
// Pick up handshake cases and also case where next identity is set
|
||||||
msg.init ();
|
msg.init ();
|
||||||
ok = pipe_->read (&msg);
|
ok = pipe_->read (&msg);
|
||||||
if (!ok)
|
if (!ok)
|
||||||
@ -497,13 +497,13 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_)
|
|||||||
// Fall back on the auto-generation
|
// Fall back on the auto-generation
|
||||||
unsigned char buf [5];
|
unsigned char buf [5];
|
||||||
buf [0] = 0;
|
buf [0] = 0;
|
||||||
put_uint32 (buf + 1, next_integral_routing_id++);
|
put_uint32 (buf + 1, next_rid++);
|
||||||
routing_id = blob_t (buf, sizeof buf);
|
identity = blob_t (buf, sizeof buf);
|
||||||
msg.close ();
|
msg.close ();
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
routing_id = blob_t ((unsigned char*) msg.data (), msg.size ());
|
identity = blob_t ((unsigned char*) msg.data (), msg.size ());
|
||||||
outpipes_t::iterator it = outpipes.find (routing_id);
|
outpipes_t::iterator it = outpipes.find (identity);
|
||||||
msg.close ();
|
msg.close ();
|
||||||
|
|
||||||
if (it != outpipes.end ()) {
|
if (it != outpipes.end ()) {
|
||||||
@ -512,23 +512,23 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_)
|
|||||||
return false;
|
return false;
|
||||||
else {
|
else {
|
||||||
// We will allow the new connection to take over this
|
// We will allow the new connection to take over this
|
||||||
// routing id. Temporarily assign a new routing id to the
|
// identity. Temporarily assign a new identity to the
|
||||||
// existing pipe so we can terminate it asynchronously.
|
// existing pipe so we can terminate it asynchronously.
|
||||||
unsigned char buf [5];
|
unsigned char buf [5];
|
||||||
buf [0] = 0;
|
buf [0] = 0;
|
||||||
put_uint32 (buf + 1, next_integral_routing_id++);
|
put_uint32 (buf + 1, next_rid++);
|
||||||
blob_t new_routing_id = blob_t (buf, sizeof buf);
|
blob_t new_identity = blob_t (buf, sizeof buf);
|
||||||
|
|
||||||
it->second.pipe->set_router_socket_routing_id (new_routing_id);
|
it->second.pipe->set_identity (new_identity);
|
||||||
outpipe_t existing_outpipe =
|
outpipe_t existing_outpipe =
|
||||||
{it->second.pipe, it->second.active};
|
{it->second.pipe, it->second.active};
|
||||||
|
|
||||||
ok = outpipes.insert (outpipes_t::value_type (
|
ok = outpipes.insert (outpipes_t::value_type (
|
||||||
new_routing_id, existing_outpipe)).second;
|
new_identity, existing_outpipe)).second;
|
||||||
zmq_assert (ok);
|
zmq_assert (ok);
|
||||||
|
|
||||||
// Remove the existing routing id entry to allow the new
|
// Remove the existing identity entry to allow the new
|
||||||
// connection to take the routing id.
|
// connection to take the identity.
|
||||||
outpipes.erase (it);
|
outpipes.erase (it);
|
||||||
|
|
||||||
if (existing_outpipe.pipe == current_in)
|
if (existing_outpipe.pipe == current_in)
|
||||||
@ -540,10 +540,10 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pipe_->set_router_socket_routing_id (routing_id);
|
pipe_->set_identity (identity);
|
||||||
// Add the record into output pipes lookup table
|
// Add the record into output pipes lookup table
|
||||||
outpipe_t outpipe = {pipe_, true};
|
outpipe_t outpipe = {pipe_, true};
|
||||||
ok = outpipes.insert (outpipes_t::value_type (routing_id, outpipe)).second;
|
ok = outpipes.insert (outpipes_t::value_type (identity, outpipe)).second;
|
||||||
zmq_assert (ok);
|
zmq_assert (ok);
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
|
@ -85,7 +85,7 @@ namespace zmq
|
|||||||
|
|
||||||
// If true, the receiver got the message part with
|
// If true, the receiver got the message part with
|
||||||
// the peer's identity.
|
// the peer's identity.
|
||||||
bool routing_id_sent;
|
bool identity_sent;
|
||||||
|
|
||||||
// Holds the prefetched identity.
|
// Holds the prefetched identity.
|
||||||
msg_t prefetched_id;
|
msg_t prefetched_id;
|
||||||
@ -123,7 +123,7 @@ namespace zmq
|
|||||||
|
|
||||||
// Routing IDs are generated. It's a simple increment and wrap-over
|
// Routing IDs are generated. It's a simple increment and wrap-over
|
||||||
// algorithm. This value is the next ID to use (if not used already).
|
// algorithm. This value is the next ID to use (if not used already).
|
||||||
uint32_t next_integral_routing_id;
|
uint32_t next_rid;
|
||||||
|
|
||||||
// If true, report EAGAIN to the caller instead of silently dropping
|
// If true, report EAGAIN to the caller instead of silently dropping
|
||||||
// the message targeting an unknown peer.
|
// the message targeting an unknown peer.
|
||||||
|
@ -38,7 +38,7 @@
|
|||||||
|
|
||||||
zmq::server_t::server_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
|
zmq::server_t::server_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
|
||||||
socket_base_t (parent_, tid_, sid_, true),
|
socket_base_t (parent_, tid_, sid_, true),
|
||||||
next_routing_id (generate_random ())
|
next_rid (generate_random ())
|
||||||
{
|
{
|
||||||
options.type = ZMQ_SERVER;
|
options.type = ZMQ_SERVER;
|
||||||
}
|
}
|
||||||
@ -54,11 +54,11 @@ void zmq::server_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
|
|||||||
|
|
||||||
zmq_assert (pipe_);
|
zmq_assert (pipe_);
|
||||||
|
|
||||||
uint32_t routing_id = next_routing_id++;
|
uint32_t routing_id = next_rid++;
|
||||||
if (!routing_id)
|
if (!routing_id)
|
||||||
routing_id = next_routing_id++; // Never use Routing ID zero
|
routing_id = next_rid++; // Never use RID zero
|
||||||
|
|
||||||
pipe_->set_server_socket_routing_id (routing_id);
|
pipe_->set_routing_id (routing_id);
|
||||||
// Add the record into output pipes lookup table
|
// Add the record into output pipes lookup table
|
||||||
outpipe_t outpipe = {pipe_, true};
|
outpipe_t outpipe = {pipe_, true};
|
||||||
bool ok = outpipes.insert (outpipes_t::value_type (routing_id, outpipe)).second;
|
bool ok = outpipes.insert (outpipes_t::value_type (routing_id, outpipe)).second;
|
||||||
@ -69,7 +69,7 @@ void zmq::server_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
|
|||||||
|
|
||||||
void zmq::server_t::xpipe_terminated (pipe_t *pipe_)
|
void zmq::server_t::xpipe_terminated (pipe_t *pipe_)
|
||||||
{
|
{
|
||||||
outpipes_t::iterator it = outpipes.find (pipe_->get_server_socket_routing_id ());
|
outpipes_t::iterator it = outpipes.find (pipe_->get_routing_id ());
|
||||||
zmq_assert (it != outpipes.end ());
|
zmq_assert (it != outpipes.end ());
|
||||||
outpipes.erase (it);
|
outpipes.erase (it);
|
||||||
fq.pipe_terminated (pipe_);
|
fq.pipe_terminated (pipe_);
|
||||||
@ -159,7 +159,7 @@ int zmq::server_t::xrecv (msg_t *msg_)
|
|||||||
|
|
||||||
zmq_assert (pipe != NULL);
|
zmq_assert (pipe != NULL);
|
||||||
|
|
||||||
uint32_t routing_id = pipe->get_server_socket_routing_id ();
|
uint32_t routing_id = pipe->get_routing_id ();
|
||||||
msg_->set_routing_id (routing_id);
|
msg_->set_routing_id (routing_id);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -85,7 +85,7 @@ namespace zmq
|
|||||||
|
|
||||||
// Routing IDs are generated. It's a simple increment and wrap-over
|
// Routing IDs are generated. It's a simple increment and wrap-over
|
||||||
// algorithm. This value is the next ID to use (if not used already).
|
// algorithm. This value is the next ID to use (if not used already).
|
||||||
uint32_t next_routing_id;
|
uint32_t next_rid;
|
||||||
|
|
||||||
server_t (const server_t&);
|
server_t (const server_t&);
|
||||||
const server_t &operator = (const server_t&);
|
const server_t &operator = (const server_t&);
|
||||||
|
@ -356,12 +356,12 @@ int zmq::session_base_t::zap_connect ()
|
|||||||
|
|
||||||
send_bind (peer.socket, new_pipes [1], false);
|
send_bind (peer.socket, new_pipes [1], false);
|
||||||
|
|
||||||
// Send empty routing id if required by the peer.
|
// Send empty identity if required by the peer.
|
||||||
if (peer.options.recv_routing_id) {
|
if (peer.options.recv_identity) {
|
||||||
msg_t id;
|
msg_t id;
|
||||||
rc = id.init ();
|
rc = id.init ();
|
||||||
errno_assert (rc == 0);
|
errno_assert (rc == 0);
|
||||||
id.set_flags (msg_t::routing_id);
|
id.set_flags (msg_t::identity);
|
||||||
bool ok = zap_pipe->write (&id);
|
bool ok = zap_pipe->write (&id);
|
||||||
zmq_assert (ok);
|
zmq_assert (ok);
|
||||||
zap_pipe->flush ();
|
zap_pipe->flush ();
|
||||||
|
@ -221,11 +221,11 @@ zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_, int sid_, bool
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int zmq::socket_base_t::get_peer_state (const void *routing_id_,
|
int zmq::socket_base_t::get_peer_state (const void *identity,
|
||||||
size_t routing_id_size_) const
|
size_t identity_size) const
|
||||||
{
|
{
|
||||||
LIBZMQ_UNUSED (routing_id_);
|
LIBZMQ_UNUSED (identity);
|
||||||
LIBZMQ_UNUSED (routing_id_size_);
|
LIBZMQ_UNUSED (identity_size);
|
||||||
|
|
||||||
// Only ROUTER sockets support this
|
// Only ROUTER sockets support this
|
||||||
errno = ENOTSUP;
|
errno = ENOTSUP;
|
||||||
@ -764,14 +764,14 @@ int zmq::socket_base_t::connect (const char *addr_)
|
|||||||
|
|
||||||
if (!peer.socket) {
|
if (!peer.socket) {
|
||||||
// The peer doesn't exist yet so we don't know whether
|
// The peer doesn't exist yet so we don't know whether
|
||||||
// to send the routing id message or not. To resolve this,
|
// to send the identity message or not. To resolve this,
|
||||||
// we always send our routing id and drop it later if
|
// we always send our identity and drop it later if
|
||||||
// the peer doesn't expect it.
|
// the peer doesn't expect it.
|
||||||
msg_t id;
|
msg_t id;
|
||||||
rc = id.init_size (options.routing_id_size);
|
rc = id.init_size (options.identity_size);
|
||||||
errno_assert (rc == 0);
|
errno_assert (rc == 0);
|
||||||
memcpy (id.data (), options.routing_id, options.routing_id_size);
|
memcpy (id.data (), options.identity, options.identity_size);
|
||||||
id.set_flags (msg_t::routing_id);
|
id.set_flags (msg_t::identity);
|
||||||
bool written = new_pipes [0]->write (&id);
|
bool written = new_pipes [0]->write (&id);
|
||||||
zmq_assert (written);
|
zmq_assert (written);
|
||||||
new_pipes [0]->flush ();
|
new_pipes [0]->flush ();
|
||||||
@ -780,25 +780,25 @@ int zmq::socket_base_t::connect (const char *addr_)
|
|||||||
pend_connection (std::string (addr_), endpoint, new_pipes);
|
pend_connection (std::string (addr_), endpoint, new_pipes);
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
// If required, send the routing id of the local socket to the peer.
|
// If required, send the identity of the local socket to the peer.
|
||||||
if (peer.options.recv_routing_id) {
|
if (peer.options.recv_identity) {
|
||||||
msg_t id;
|
msg_t id;
|
||||||
rc = id.init_size (options.routing_id_size);
|
rc = id.init_size (options.identity_size);
|
||||||
errno_assert (rc == 0);
|
errno_assert (rc == 0);
|
||||||
memcpy (id.data (), options.routing_id, options.routing_id_size);
|
memcpy (id.data (), options.identity, options.identity_size);
|
||||||
id.set_flags (msg_t::routing_id);
|
id.set_flags (msg_t::identity);
|
||||||
bool written = new_pipes [0]->write (&id);
|
bool written = new_pipes [0]->write (&id);
|
||||||
zmq_assert (written);
|
zmq_assert (written);
|
||||||
new_pipes [0]->flush ();
|
new_pipes [0]->flush ();
|
||||||
}
|
}
|
||||||
|
|
||||||
// If required, send the routing id of the peer to the local socket.
|
// If required, send the identity of the peer to the local socket.
|
||||||
if (options.recv_routing_id) {
|
if (options.recv_identity) {
|
||||||
msg_t id;
|
msg_t id;
|
||||||
rc = id.init_size (peer.options.routing_id_size);
|
rc = id.init_size (peer.options.identity_size);
|
||||||
errno_assert (rc == 0);
|
errno_assert (rc == 0);
|
||||||
memcpy (id.data (), peer.options.routing_id, peer.options.routing_id_size);
|
memcpy (id.data (), peer.options.identity, peer.options.identity_size);
|
||||||
id.set_flags (msg_t::routing_id);
|
id.set_flags (msg_t::identity);
|
||||||
bool written = new_pipes [1]->write (&id);
|
bool written = new_pipes [1]->write (&id);
|
||||||
zmq_assert (written);
|
zmq_assert (written);
|
||||||
new_pipes [1]->flush ();
|
new_pipes [1]->flush ();
|
||||||
@ -1591,9 +1591,9 @@ void zmq::socket_base_t::pipe_terminated (pipe_t *pipe_)
|
|||||||
|
|
||||||
void zmq::socket_base_t::extract_flags (msg_t *msg_)
|
void zmq::socket_base_t::extract_flags (msg_t *msg_)
|
||||||
{
|
{
|
||||||
// Test whether routing_id flag is valid for this socket type.
|
// Test whether IDENTITY flag is valid for this socket type.
|
||||||
if (unlikely (msg_->flags () & msg_t::routing_id))
|
if (unlikely (msg_->flags () & msg_t::identity))
|
||||||
zmq_assert (options.recv_routing_id);
|
zmq_assert (options.recv_identity);
|
||||||
|
|
||||||
// Remove MORE flag.
|
// Remove MORE flag.
|
||||||
rcvmore = msg_->flags () & msg_t::more ? true : false;
|
rcvmore = msg_->flags () & msg_t::more ? true : false;
|
||||||
|
@ -186,7 +186,7 @@ namespace zmq
|
|||||||
void process_destroy ();
|
void process_destroy ();
|
||||||
|
|
||||||
// Next assigned name on a zmq_connect() call used by ROUTER and STREAM socket types
|
// Next assigned name on a zmq_connect() call used by ROUTER and STREAM socket types
|
||||||
std::string connect_routing_id;
|
std::string connect_rid;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
// test if event should be sent and then dispatch it
|
// test if event should be sent and then dispatch it
|
||||||
|
@ -39,22 +39,22 @@
|
|||||||
zmq::stream_t::stream_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
|
zmq::stream_t::stream_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
|
||||||
socket_base_t (parent_, tid_, sid_),
|
socket_base_t (parent_, tid_, sid_),
|
||||||
prefetched (false),
|
prefetched (false),
|
||||||
routing_id_sent (false),
|
identity_sent (false),
|
||||||
current_out (NULL),
|
current_out (NULL),
|
||||||
more_out (false),
|
more_out (false),
|
||||||
next_integral_routing_id (generate_random ())
|
next_rid (generate_random ())
|
||||||
{
|
{
|
||||||
options.type = ZMQ_STREAM;
|
options.type = ZMQ_STREAM;
|
||||||
options.raw_socket = true;
|
options.raw_socket = true;
|
||||||
|
|
||||||
prefetched_routing_id.init ();
|
prefetched_id.init ();
|
||||||
prefetched_msg.init ();
|
prefetched_msg.init ();
|
||||||
}
|
}
|
||||||
|
|
||||||
zmq::stream_t::~stream_t ()
|
zmq::stream_t::~stream_t ()
|
||||||
{
|
{
|
||||||
zmq_assert (outpipes.empty ());
|
zmq_assert (outpipes.empty ());
|
||||||
prefetched_routing_id.close ();
|
prefetched_id.close ();
|
||||||
prefetched_msg.close ();
|
prefetched_msg.close ();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -70,7 +70,7 @@ void zmq::stream_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
|
|||||||
|
|
||||||
void zmq::stream_t::xpipe_terminated (pipe_t *pipe_)
|
void zmq::stream_t::xpipe_terminated (pipe_t *pipe_)
|
||||||
{
|
{
|
||||||
outpipes_t::iterator it = outpipes.find (pipe_->get_routing_id ());
|
outpipes_t::iterator it = outpipes.find (pipe_->get_identity ());
|
||||||
zmq_assert (it != outpipes.end ());
|
zmq_assert (it != outpipes.end ());
|
||||||
outpipes.erase (it);
|
outpipes.erase (it);
|
||||||
fq.pipe_terminated (pipe_);
|
fq.pipe_terminated (pipe_);
|
||||||
@ -107,10 +107,10 @@ int zmq::stream_t::xsend (msg_t *msg_)
|
|||||||
// TODO: The connections should be killed instead.
|
// TODO: The connections should be killed instead.
|
||||||
if (msg_->flags () & msg_t::more) {
|
if (msg_->flags () & msg_t::more) {
|
||||||
|
|
||||||
// Find the pipe associated with the routing id stored in the prefix.
|
// Find the pipe associated with the identity stored in the prefix.
|
||||||
// If there's no such pipe return an error
|
// If there's no such pipe return an error
|
||||||
blob_t routing_id ((unsigned char*) msg_->data (), msg_->size ());
|
blob_t identity ((unsigned char*) msg_->data (), msg_->size ());
|
||||||
outpipes_t::iterator it = outpipes.find (routing_id);
|
outpipes_t::iterator it = outpipes.find (identity);
|
||||||
|
|
||||||
if (it != outpipes.end ()) {
|
if (it != outpipes.end ()) {
|
||||||
current_out = it->second.pipe;
|
current_out = it->second.pipe;
|
||||||
@ -183,9 +183,9 @@ int zmq::stream_t::xsetsockopt (int option_, const void *optval_,
|
|||||||
if (is_int) memcpy(&value, optval_, sizeof (int));
|
if (is_int) memcpy(&value, optval_, sizeof (int));
|
||||||
|
|
||||||
switch (option_) {
|
switch (option_) {
|
||||||
case ZMQ_CONNECT_ROUTING_ID:
|
case ZMQ_CONNECT_RID:
|
||||||
if (optval_ && optvallen_) {
|
if (optval_ && optvallen_) {
|
||||||
connect_routing_id.assign ((char*) optval_, optvallen_);
|
connect_rid.assign ((char*) optval_, optvallen_);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
@ -207,10 +207,10 @@ int zmq::stream_t::xsetsockopt (int option_, const void *optval_,
|
|||||||
int zmq::stream_t::xrecv (msg_t *msg_)
|
int zmq::stream_t::xrecv (msg_t *msg_)
|
||||||
{
|
{
|
||||||
if (prefetched) {
|
if (prefetched) {
|
||||||
if (!routing_id_sent) {
|
if (!identity_sent) {
|
||||||
int rc = msg_->move (prefetched_routing_id);
|
int rc = msg_->move (prefetched_id);
|
||||||
errno_assert (rc == 0);
|
errno_assert (rc == 0);
|
||||||
routing_id_sent = true;
|
identity_sent = true;
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
int rc = msg_->move (prefetched_msg);
|
int rc = msg_->move (prefetched_msg);
|
||||||
@ -231,10 +231,10 @@ int zmq::stream_t::xrecv (msg_t *msg_)
|
|||||||
// We have received a frame with TCP data.
|
// We have received a frame with TCP data.
|
||||||
// Rather than sending this frame, we keep it in prefetched
|
// Rather than sending this frame, we keep it in prefetched
|
||||||
// buffer and send a frame with peer's ID.
|
// buffer and send a frame with peer's ID.
|
||||||
blob_t routing_id = pipe->get_routing_id ();
|
blob_t identity = pipe->get_identity ();
|
||||||
rc = msg_->close();
|
rc = msg_->close();
|
||||||
errno_assert (rc == 0);
|
errno_assert (rc == 0);
|
||||||
rc = msg_->init_size (routing_id.size ());
|
rc = msg_->init_size (identity.size ());
|
||||||
errno_assert (rc == 0);
|
errno_assert (rc == 0);
|
||||||
|
|
||||||
// forward metadata (if any)
|
// forward metadata (if any)
|
||||||
@ -242,11 +242,11 @@ int zmq::stream_t::xrecv (msg_t *msg_)
|
|||||||
if (metadata)
|
if (metadata)
|
||||||
msg_->set_metadata(metadata);
|
msg_->set_metadata(metadata);
|
||||||
|
|
||||||
memcpy (msg_->data (), routing_id.data (), routing_id.size ());
|
memcpy (msg_->data (), identity.data (), identity.size ());
|
||||||
msg_->set_flags (msg_t::more);
|
msg_->set_flags (msg_t::more);
|
||||||
|
|
||||||
prefetched = true;
|
prefetched = true;
|
||||||
routing_id_sent = true;
|
identity_sent = true;
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
@ -267,20 +267,20 @@ bool zmq::stream_t::xhas_in ()
|
|||||||
zmq_assert (pipe != NULL);
|
zmq_assert (pipe != NULL);
|
||||||
zmq_assert ((prefetched_msg.flags () & msg_t::more) == 0);
|
zmq_assert ((prefetched_msg.flags () & msg_t::more) == 0);
|
||||||
|
|
||||||
blob_t routing_id = pipe->get_routing_id ();
|
blob_t identity = pipe->get_identity ();
|
||||||
rc = prefetched_routing_id.init_size (routing_id.size ());
|
rc = prefetched_id.init_size (identity.size ());
|
||||||
errno_assert (rc == 0);
|
errno_assert (rc == 0);
|
||||||
|
|
||||||
// forward metadata (if any)
|
// forward metadata (if any)
|
||||||
metadata_t *metadata = prefetched_msg.metadata();
|
metadata_t *metadata = prefetched_msg.metadata();
|
||||||
if (metadata)
|
if (metadata)
|
||||||
prefetched_routing_id.set_metadata(metadata);
|
prefetched_id.set_metadata(metadata);
|
||||||
|
|
||||||
memcpy (prefetched_routing_id.data (), routing_id.data (), routing_id.size ());
|
memcpy (prefetched_id.data (), identity.data (), identity.size ());
|
||||||
prefetched_routing_id.set_flags (msg_t::more);
|
prefetched_id.set_flags (msg_t::more);
|
||||||
|
|
||||||
prefetched = true;
|
prefetched = true;
|
||||||
routing_id_sent = false;
|
identity_sent = false;
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
@ -295,27 +295,27 @@ bool zmq::stream_t::xhas_out ()
|
|||||||
|
|
||||||
void zmq::stream_t::identify_peer (pipe_t *pipe_)
|
void zmq::stream_t::identify_peer (pipe_t *pipe_)
|
||||||
{
|
{
|
||||||
// Always assign routing id for raw-socket
|
// Always assign identity for raw-socket
|
||||||
unsigned char buffer [5];
|
unsigned char buffer [5];
|
||||||
buffer [0] = 0;
|
buffer [0] = 0;
|
||||||
blob_t routing_id;
|
blob_t identity;
|
||||||
if (connect_routing_id.length ()) {
|
if (connect_rid.length ()) {
|
||||||
routing_id = blob_t ((unsigned char*) connect_routing_id.c_str(),
|
identity = blob_t ((unsigned char*) connect_rid.c_str(),
|
||||||
connect_routing_id.length ());
|
connect_rid.length ());
|
||||||
connect_routing_id.clear ();
|
connect_rid.clear ();
|
||||||
outpipes_t::iterator it = outpipes.find (routing_id);
|
outpipes_t::iterator it = outpipes.find (identity);
|
||||||
zmq_assert (it == outpipes.end ());
|
zmq_assert (it == outpipes.end ());
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
put_uint32 (buffer + 1, next_integral_routing_id++);
|
put_uint32 (buffer + 1, next_rid++);
|
||||||
routing_id = blob_t (buffer, sizeof buffer);
|
identity = blob_t (buffer, sizeof buffer);
|
||||||
memcpy (options.routing_id, routing_id.data (), routing_id.size ());
|
memcpy (options.identity, identity.data (), identity.size ());
|
||||||
options.routing_id_size = (unsigned char) routing_id.size ();
|
options.identity_size = (unsigned char) identity.size ();
|
||||||
}
|
}
|
||||||
pipe_->set_router_socket_routing_id (routing_id);
|
pipe_->set_identity (identity);
|
||||||
// Add the record into output pipes lookup table
|
// Add the record into output pipes lookup table
|
||||||
outpipe_t outpipe = {pipe_, true};
|
outpipe_t outpipe = {pipe_, true};
|
||||||
const bool ok = outpipes.insert (
|
const bool ok = outpipes.insert (
|
||||||
outpipes_t::value_type (routing_id, outpipe)).second;
|
outpipes_t::value_type (identity, outpipe)).second;
|
||||||
zmq_assert (ok);
|
zmq_assert (ok);
|
||||||
}
|
}
|
||||||
|
@ -70,10 +70,10 @@ namespace zmq
|
|||||||
|
|
||||||
// If true, the receiver got the message part with
|
// If true, the receiver got the message part with
|
||||||
// the peer's identity.
|
// the peer's identity.
|
||||||
bool routing_id_sent;
|
bool identity_sent;
|
||||||
|
|
||||||
// Holds the prefetched identity.
|
// Holds the prefetched identity.
|
||||||
msg_t prefetched_routing_id;
|
msg_t prefetched_id;
|
||||||
|
|
||||||
// Holds the prefetched message.
|
// Holds the prefetched message.
|
||||||
msg_t prefetched_msg;
|
msg_t prefetched_msg;
|
||||||
@ -96,7 +96,7 @@ namespace zmq
|
|||||||
|
|
||||||
// Routing IDs are generated. It's a simple increment and wrap-over
|
// Routing IDs are generated. It's a simple increment and wrap-over
|
||||||
// algorithm. This value is the next ID to use (if not used already).
|
// algorithm. This value is the next ID to use (if not used already).
|
||||||
uint32_t next_integral_routing_id;
|
uint32_t next_rid;
|
||||||
|
|
||||||
stream_t (const stream_t&);
|
stream_t (const stream_t&);
|
||||||
const stream_t &operator = (const stream_t&);
|
const stream_t &operator = (const stream_t&);
|
||||||
|
@ -81,8 +81,8 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_,
|
|||||||
options (options_),
|
options (options_),
|
||||||
endpoint (endpoint_),
|
endpoint (endpoint_),
|
||||||
plugged (false),
|
plugged (false),
|
||||||
next_msg (&stream_engine_t::routing_id_msg),
|
next_msg (&stream_engine_t::identity_msg),
|
||||||
process_msg (&stream_engine_t::process_routing_id_msg),
|
process_msg (&stream_engine_t::process_identity_msg),
|
||||||
io_error (false),
|
io_error (false),
|
||||||
subscription_required (false),
|
subscription_required (false),
|
||||||
mechanism (NULL),
|
mechanism (NULL),
|
||||||
@ -229,11 +229,11 @@ void zmq::stream_engine_t::plug (io_thread_t *io_thread_,
|
|||||||
// start optional timer, to prevent handshake hanging on no input
|
// start optional timer, to prevent handshake hanging on no input
|
||||||
set_handshake_timer ();
|
set_handshake_timer ();
|
||||||
|
|
||||||
// Send the 'length' and 'flags' fields of the routing id message.
|
// Send the 'length' and 'flags' fields of the identity message.
|
||||||
// The 'length' field is encoded in the long format.
|
// The 'length' field is encoded in the long format.
|
||||||
outpos = greeting_send;
|
outpos = greeting_send;
|
||||||
outpos [outsize++] = 0xff;
|
outpos [outsize++] = 0xff;
|
||||||
put_uint64 (&outpos [outsize], options.routing_id_size + 1);
|
put_uint64 (&outpos [outsize], options.identity_size + 1);
|
||||||
outsize += 8;
|
outsize += 8;
|
||||||
outpos [outsize++] = 0x7f;
|
outpos [outsize++] = 0x7f;
|
||||||
}
|
}
|
||||||
@ -520,7 +520,7 @@ bool zmq::stream_engine_t::handshake ()
|
|||||||
|
|
||||||
// Inspect the right-most bit of the 10th byte (which coincides
|
// Inspect the right-most bit of the 10th byte (which coincides
|
||||||
// with the 'flags' field if a regular message was sent).
|
// with the 'flags' field if a regular message was sent).
|
||||||
// Zero indicates this is a header of a routing id message
|
// Zero indicates this is a header of identity message
|
||||||
// (i.e. the peer is using the unversioned protocol).
|
// (i.e. the peer is using the unversioned protocol).
|
||||||
if (!(greeting_recv [9] & 0x01))
|
if (!(greeting_recv [9] & 0x01))
|
||||||
break;
|
break;
|
||||||
@ -575,7 +575,7 @@ bool zmq::stream_engine_t::handshake ()
|
|||||||
const size_t revision_pos = 10;
|
const size_t revision_pos = 10;
|
||||||
|
|
||||||
// Is the peer using ZMTP/1.0 with no revision number?
|
// Is the peer using ZMTP/1.0 with no revision number?
|
||||||
// If so, we send and receive rest of routing id message
|
// If so, we send and receive rest of identity message
|
||||||
if (greeting_recv [0] != 0xff || !(greeting_recv [9] & 0x01)) {
|
if (greeting_recv [0] != 0xff || !(greeting_recv [9] & 0x01)) {
|
||||||
if (session->zap_enabled ()) {
|
if (session->zap_enabled ()) {
|
||||||
// reject ZMTP 1.0 connections if ZAP is enabled
|
// reject ZMTP 1.0 connections if ZAP is enabled
|
||||||
@ -593,14 +593,14 @@ bool zmq::stream_engine_t::handshake ()
|
|||||||
// Since there is no way to tell the encoder to
|
// Since there is no way to tell the encoder to
|
||||||
// skip the message header, we simply throw that
|
// skip the message header, we simply throw that
|
||||||
// header data away.
|
// header data away.
|
||||||
const size_t header_size = options.routing_id_size + 1 >= 255 ? 10 : 2;
|
const size_t header_size = options.identity_size + 1 >= 255 ? 10 : 2;
|
||||||
unsigned char tmp [10], *bufferp = tmp;
|
unsigned char tmp [10], *bufferp = tmp;
|
||||||
|
|
||||||
// Prepare the routing id message and load it into encoder.
|
// Prepare the identity message and load it into encoder.
|
||||||
// Then consume bytes we have already sent to the peer.
|
// Then consume bytes we have already sent to the peer.
|
||||||
const int rc = tx_msg.init_size (options.routing_id_size);
|
const int rc = tx_msg.init_size (options.identity_size);
|
||||||
zmq_assert (rc == 0);
|
zmq_assert (rc == 0);
|
||||||
memcpy (tx_msg.data (), options.routing_id, options.routing_id_size);
|
memcpy (tx_msg.data (), options.identity, options.identity_size);
|
||||||
encoder->load_msg (&tx_msg);
|
encoder->load_msg (&tx_msg);
|
||||||
size_t buffer_size = encoder->encode (&bufferp, header_size);
|
size_t buffer_size = encoder->encode (&bufferp, header_size);
|
||||||
zmq_assert (buffer_size == header_size);
|
zmq_assert (buffer_size == header_size);
|
||||||
@ -615,12 +615,12 @@ bool zmq::stream_engine_t::handshake ()
|
|||||||
if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB)
|
if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB)
|
||||||
subscription_required = true;
|
subscription_required = true;
|
||||||
|
|
||||||
// We are sending our routing id now and the next message
|
// We are sending our identity now and the next message
|
||||||
// will come from the socket.
|
// will come from the socket.
|
||||||
next_msg = &stream_engine_t::pull_msg_from_session;
|
next_msg = &stream_engine_t::pull_msg_from_session;
|
||||||
|
|
||||||
// We are expecting routing id message.
|
// We are expecting identity message.
|
||||||
process_msg = &stream_engine_t::process_routing_id_msg;
|
process_msg = &stream_engine_t::process_identity_msg;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
if (greeting_recv [revision_pos] == ZMTP_1_0) {
|
if (greeting_recv [revision_pos] == ZMTP_1_0) {
|
||||||
@ -729,20 +729,20 @@ bool zmq::stream_engine_t::handshake ()
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
int zmq::stream_engine_t::routing_id_msg (msg_t *msg_)
|
int zmq::stream_engine_t::identity_msg (msg_t *msg_)
|
||||||
{
|
{
|
||||||
int rc = msg_->init_size (options.routing_id_size);
|
int rc = msg_->init_size (options.identity_size);
|
||||||
errno_assert (rc == 0);
|
errno_assert (rc == 0);
|
||||||
if (options.routing_id_size > 0)
|
if (options.identity_size > 0)
|
||||||
memcpy (msg_->data (), options.routing_id, options.routing_id_size);
|
memcpy (msg_->data (), options.identity, options.identity_size);
|
||||||
next_msg = &stream_engine_t::pull_msg_from_session;
|
next_msg = &stream_engine_t::pull_msg_from_session;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int zmq::stream_engine_t::process_routing_id_msg (msg_t *msg_)
|
int zmq::stream_engine_t::process_identity_msg (msg_t *msg_)
|
||||||
{
|
{
|
||||||
if (options.recv_routing_id) {
|
if (options.recv_identity) {
|
||||||
msg_->set_flags (msg_t::routing_id);
|
msg_->set_flags (msg_t::identity);
|
||||||
int rc = session->push_msg (msg_);
|
int rc = session->push_msg (msg_);
|
||||||
errno_assert (rc == 0);
|
errno_assert (rc == 0);
|
||||||
}
|
}
|
||||||
@ -839,14 +839,14 @@ void zmq::stream_engine_t::mechanism_ready ()
|
|||||||
has_heartbeat_timer = true;
|
has_heartbeat_timer = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (options.recv_routing_id) {
|
if (options.recv_identity) {
|
||||||
msg_t routing_id;
|
msg_t identity;
|
||||||
mechanism->peer_routing_id (&routing_id);
|
mechanism->peer_identity (&identity);
|
||||||
const int rc = session->push_msg (&routing_id);
|
const int rc = session->push_msg (&identity);
|
||||||
if (rc == -1 && errno == EAGAIN) {
|
if (rc == -1 && errno == EAGAIN) {
|
||||||
// If the write is failing at this stage with
|
// If the write is failing at this stage with
|
||||||
// an EAGAIN the pipe must be being shut down,
|
// an EAGAIN the pipe must be being shut down,
|
||||||
// so we can just bail out of the routing id set.
|
// so we can just bail out of the identity set.
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
errno_assert (rc == 0);
|
errno_assert (rc == 0);
|
||||||
|
@ -99,8 +99,8 @@ namespace zmq
|
|||||||
// Detects the protocol used by the peer.
|
// Detects the protocol used by the peer.
|
||||||
bool handshake ();
|
bool handshake ();
|
||||||
|
|
||||||
int routing_id_msg (msg_t *msg_);
|
int identity_msg (msg_t *msg_);
|
||||||
int process_routing_id_msg (msg_t *msg_);
|
int process_identity_msg (msg_t *msg_);
|
||||||
|
|
||||||
int next_handshake_command (msg_t *msg);
|
int next_handshake_command (msg_t *msg);
|
||||||
int process_handshake_command (msg_t *msg);
|
int process_handshake_command (msg_t *msg);
|
||||||
|
@ -104,10 +104,10 @@ void zap_client_t::send_zap_request (const char *mechanism,
|
|||||||
rc = session->write_zap_msg (&msg);
|
rc = session->write_zap_msg (&msg);
|
||||||
errno_assert (rc == 0);
|
errno_assert (rc == 0);
|
||||||
|
|
||||||
// Routing id frame
|
// Identity frame
|
||||||
rc = msg.init_size (options.routing_id_size);
|
rc = msg.init_size (options.identity_size);
|
||||||
errno_assert (rc == 0);
|
errno_assert (rc == 0);
|
||||||
memcpy (msg.data (), options.routing_id, options.routing_id_size);
|
memcpy (msg.data (), options.identity, options.identity_size);
|
||||||
msg.set_flags (msg_t::more);
|
msg.set_flags (msg_t::more);
|
||||||
rc = session->write_zap_msg (&msg);
|
rc = session->write_zap_msg (&msg);
|
||||||
errno_assert (rc == 0);
|
errno_assert (rc == 0);
|
||||||
|
@ -1359,14 +1359,14 @@ int zmq_poller_wait_all (void *poller_, zmq_poller_event_t *events_, int n_event
|
|||||||
// Peer-specific state
|
// Peer-specific state
|
||||||
|
|
||||||
int zmq_socket_get_peer_state (void *s_,
|
int zmq_socket_get_peer_state (void *s_,
|
||||||
const void *routing_id_,
|
const void *identity,
|
||||||
size_t routing_id_size_)
|
size_t identity_size)
|
||||||
{
|
{
|
||||||
zmq::socket_base_t *s = as_socket_base_t (s_);
|
zmq::socket_base_t *s = as_socket_base_t (s_);
|
||||||
if (!s)
|
if (!s)
|
||||||
return -1;
|
return -1;
|
||||||
|
|
||||||
return s->get_peer_state (routing_id_, routing_id_size_);
|
return s->get_peer_state (identity, identity_size);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Timers
|
// Timers
|
||||||
|
@ -142,8 +142,8 @@ int zmq_poller_remove_fd (void *poller, int fd);
|
|||||||
#endif
|
#endif
|
||||||
|
|
||||||
int zmq_socket_get_peer_state (void *socket,
|
int zmq_socket_get_peer_state (void *socket,
|
||||||
const void *routing_id,
|
const void *identity,
|
||||||
size_t routing_id_size);
|
size_t identity_size);
|
||||||
|
|
||||||
/******************************************************************************/
|
/******************************************************************************/
|
||||||
/* Scheduling timers */
|
/* Scheduling timers */
|
||||||
|
@ -61,13 +61,13 @@ void test_stream_2_stream(){
|
|||||||
assert (0 == ret);
|
assert (0 == ret);
|
||||||
|
|
||||||
// Do the connection.
|
// Do the connection.
|
||||||
ret = zmq_setsockopt (rconn1, ZMQ_CONNECT_ROUTING_ID, "conn1", 6);
|
ret = zmq_setsockopt (rconn1, ZMQ_CONNECT_RID, "conn1", 6);
|
||||||
assert (0 == ret);
|
assert (0 == ret);
|
||||||
ret = zmq_connect (rconn1, my_endpoint);
|
ret = zmq_connect (rconn1, my_endpoint);
|
||||||
|
|
||||||
/* Uncomment to test assert on duplicate routing id.
|
/* Uncomment to test assert on duplicate rid.
|
||||||
// Test duplicate connect attempt.
|
// Test duplicate connect attempt.
|
||||||
ret = zmq_setsockopt (rconn1, ZMQ_CONNECT_ROUTING_ID, "conn1", 6);
|
ret = zmq_setsockopt (rconn1, ZMQ_CONNECT_RID, "conn1", 6);
|
||||||
assert (0 == ret);
|
assert (0 == ret);
|
||||||
ret = zmq_connect (rconn1, bindip);
|
ret = zmq_connect (rconn1, bindip);
|
||||||
assert (0 == ret);
|
assert (0 == ret);
|
||||||
@ -126,18 +126,18 @@ void test_router_2_router(bool named){
|
|||||||
|
|
||||||
// If we're in named mode, set some identities.
|
// If we're in named mode, set some identities.
|
||||||
if (named) {
|
if (named) {
|
||||||
ret = zmq_setsockopt (rbind, ZMQ_ROUTING_ID, "X", 1);
|
ret = zmq_setsockopt (rbind, ZMQ_IDENTITY, "X", 1);
|
||||||
ret = zmq_setsockopt (rconn1, ZMQ_ROUTING_ID, "Y", 1);
|
ret = zmq_setsockopt (rconn1, ZMQ_IDENTITY, "Y", 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Make call to connect using a connect_routing_id.
|
// Make call to connect using a connect_rid.
|
||||||
ret = zmq_setsockopt (rconn1, ZMQ_CONNECT_ROUTING_ID, "conn1", 6);
|
ret = zmq_setsockopt (rconn1, ZMQ_CONNECT_RID, "conn1", 6);
|
||||||
assert (0 == ret);
|
assert (0 == ret);
|
||||||
ret = zmq_connect (rconn1, my_endpoint);
|
ret = zmq_connect (rconn1, my_endpoint);
|
||||||
assert (0 == ret);
|
assert (0 == ret);
|
||||||
/* Uncomment to test assert on duplicate routing id
|
/* Uncomment to test assert on duplicate rid
|
||||||
// Test duplicate connect attempt.
|
// Test duplicate connect attempt.
|
||||||
ret = zmq_setsockopt (rconn1, ZMQ_CONNECT_ROUTING_ID, "conn1", 6);
|
ret = zmq_setsockopt (rconn1, ZMQ_CONNECT_RID, "conn1", 6);
|
||||||
assert (0 == ret);
|
assert (0 == ret);
|
||||||
ret = zmq_connect (rconn1, bindip);
|
ret = zmq_connect (rconn1, bindip);
|
||||||
assert (0 == ret);
|
assert (0 == ret);
|
||||||
|
@ -62,7 +62,7 @@ int main (void)
|
|||||||
void *dealer = zmq_socket (ctx2, ZMQ_DEALER);
|
void *dealer = zmq_socket (ctx2, ZMQ_DEALER);
|
||||||
char identity [10];
|
char identity [10];
|
||||||
sprintf (identity, "%09d", cycle);
|
sprintf (identity, "%09d", cycle);
|
||||||
rc = zmq_setsockopt (dealer, ZMQ_ROUTING_ID, identity, 10);
|
rc = zmq_setsockopt (dealer, ZMQ_IDENTITY, identity, 10);
|
||||||
assert (rc == 0);
|
assert (rc == 0);
|
||||||
int rcvtimeo = 1000;
|
int rcvtimeo = 1000;
|
||||||
rc = zmq_setsockopt (dealer, ZMQ_RCVTIMEO, &rcvtimeo, sizeof (int));
|
rc = zmq_setsockopt (dealer, ZMQ_RCVTIMEO, &rcvtimeo, sizeof (int));
|
||||||
|
@ -48,7 +48,7 @@ int main (void)
|
|||||||
// Create client and connect to server, doing a probe
|
// Create client and connect to server, doing a probe
|
||||||
void *client = zmq_socket (ctx, ZMQ_ROUTER);
|
void *client = zmq_socket (ctx, ZMQ_ROUTER);
|
||||||
assert (client);
|
assert (client);
|
||||||
rc = zmq_setsockopt (client, ZMQ_ROUTING_ID, "X", 1);
|
rc = zmq_setsockopt (client, ZMQ_IDENTITY, "X", 1);
|
||||||
assert (rc == 0);
|
assert (rc == 0);
|
||||||
int probe = 1;
|
int probe = 1;
|
||||||
rc = zmq_setsockopt (client, ZMQ_PROBE_ROUTER, &probe, sizeof (probe));
|
rc = zmq_setsockopt (client, ZMQ_PROBE_ROUTER, &probe, sizeof (probe));
|
||||||
|
@ -53,7 +53,7 @@ int main (void)
|
|||||||
// Create dealer called "X" and connect it to our router
|
// Create dealer called "X" and connect it to our router
|
||||||
void *dealer_one = zmq_socket (ctx, ZMQ_DEALER);
|
void *dealer_one = zmq_socket (ctx, ZMQ_DEALER);
|
||||||
assert (dealer_one);
|
assert (dealer_one);
|
||||||
rc = zmq_setsockopt (dealer_one, ZMQ_ROUTING_ID, "X", 1);
|
rc = zmq_setsockopt (dealer_one, ZMQ_IDENTITY, "X", 1);
|
||||||
assert (rc == 0);
|
assert (rc == 0);
|
||||||
rc = zmq_connect (dealer_one, my_endpoint);
|
rc = zmq_connect (dealer_one, my_endpoint);
|
||||||
assert (rc == 0);
|
assert (rc == 0);
|
||||||
@ -71,7 +71,7 @@ int main (void)
|
|||||||
// Now create a second dealer that uses the same identity
|
// Now create a second dealer that uses the same identity
|
||||||
void *dealer_two = zmq_socket (ctx, ZMQ_DEALER);
|
void *dealer_two = zmq_socket (ctx, ZMQ_DEALER);
|
||||||
assert (dealer_two);
|
assert (dealer_two);
|
||||||
rc = zmq_setsockopt (dealer_two, ZMQ_ROUTING_ID, "X", 1);
|
rc = zmq_setsockopt (dealer_two, ZMQ_IDENTITY, "X", 1);
|
||||||
assert (rc == 0);
|
assert (rc == 0);
|
||||||
rc = zmq_connect (dealer_two, my_endpoint);
|
rc = zmq_connect (dealer_two, my_endpoint);
|
||||||
assert (rc == 0);
|
assert (rc == 0);
|
||||||
|
@ -85,13 +85,13 @@ void test_get_peer_state ()
|
|||||||
const char *dealer2_identity = "Y";
|
const char *dealer2_identity = "Y";
|
||||||
|
|
||||||
// Name dealer1 "X" and connect it to our router
|
// Name dealer1 "X" and connect it to our router
|
||||||
rc = zmq_setsockopt (dealer1, ZMQ_ROUTING_ID, dealer1_identity, 1);
|
rc = zmq_setsockopt (dealer1, ZMQ_IDENTITY, dealer1_identity, 1);
|
||||||
assert (rc == 0);
|
assert (rc == 0);
|
||||||
rc = zmq_connect (dealer1, my_endpoint);
|
rc = zmq_connect (dealer1, my_endpoint);
|
||||||
assert (rc == 0);
|
assert (rc == 0);
|
||||||
|
|
||||||
// Name dealer2 "Y" and connect it to our router
|
// Name dealer2 "Y" and connect it to our router
|
||||||
rc = zmq_setsockopt (dealer2, ZMQ_ROUTING_ID, dealer2_identity, 1);
|
rc = zmq_setsockopt (dealer2, ZMQ_IDENTITY, dealer2_identity, 1);
|
||||||
assert (rc == 0);
|
assert (rc == 0);
|
||||||
rc = zmq_connect (dealer2, my_endpoint);
|
rc = zmq_connect (dealer2, my_endpoint);
|
||||||
assert (rc == 0);
|
assert (rc == 0);
|
||||||
@ -250,7 +250,7 @@ void test_basic ()
|
|||||||
// Create dealer called "X" and connect it to our router
|
// Create dealer called "X" and connect it to our router
|
||||||
void *dealer = zmq_socket (ctx, ZMQ_DEALER);
|
void *dealer = zmq_socket (ctx, ZMQ_DEALER);
|
||||||
assert (dealer);
|
assert (dealer);
|
||||||
rc = zmq_setsockopt (dealer, ZMQ_ROUTING_ID, "X", 1);
|
rc = zmq_setsockopt (dealer, ZMQ_IDENTITY, "X", 1);
|
||||||
assert (rc == 0);
|
assert (rc == 0);
|
||||||
rc = zmq_connect (dealer, my_endpoint);
|
rc = zmq_connect (dealer, my_endpoint);
|
||||||
assert (rc == 0);
|
assert (rc == 0);
|
||||||
|
@ -658,7 +658,7 @@ int main (void)
|
|||||||
setup_context_and_server_side (&ctx, &handler, &zap_thread, &server,
|
setup_context_and_server_side (&ctx, &handler, &zap_thread, &server,
|
||||||
&server_mon, my_endpoint);
|
&server_mon, my_endpoint);
|
||||||
test_null_key (ctx, server, server_mon, my_endpoint, null_key,
|
test_null_key (ctx, server, server_mon, my_endpoint, null_key,
|
||||||
valid_client_public, valid_client_secret);
|
valid_client_public, valid_client_secret);
|
||||||
shutdown_context_and_server_side (ctx, zap_thread, server, server_mon,
|
shutdown_context_and_server_side (ctx, zap_thread, server, server_mon,
|
||||||
handler);
|
handler);
|
||||||
|
|
||||||
@ -668,7 +668,7 @@ int main (void)
|
|||||||
setup_context_and_server_side (&ctx, &handler, &zap_thread, &server,
|
setup_context_and_server_side (&ctx, &handler, &zap_thread, &server,
|
||||||
&server_mon, my_endpoint);
|
&server_mon, my_endpoint);
|
||||||
test_null_key (ctx, server, server_mon, my_endpoint, valid_server_public,
|
test_null_key (ctx, server, server_mon, my_endpoint, valid_server_public,
|
||||||
null_key, valid_client_secret);
|
null_key, valid_client_secret);
|
||||||
shutdown_context_and_server_side (ctx, zap_thread, server, server_mon,
|
shutdown_context_and_server_side (ctx, zap_thread, server, server_mon,
|
||||||
handler);
|
handler);
|
||||||
|
|
||||||
@ -678,7 +678,7 @@ int main (void)
|
|||||||
setup_context_and_server_side (&ctx, &handler, &zap_thread, &server,
|
setup_context_and_server_side (&ctx, &handler, &zap_thread, &server,
|
||||||
&server_mon, my_endpoint);
|
&server_mon, my_endpoint);
|
||||||
test_null_key (ctx, server, server_mon, my_endpoint, valid_server_public,
|
test_null_key (ctx, server, server_mon, my_endpoint, valid_server_public,
|
||||||
valid_client_public, null_key);
|
valid_client_public, null_key);
|
||||||
shutdown_context_and_server_side (ctx, zap_thread, server, server_mon,
|
shutdown_context_and_server_side (ctx, zap_thread, server, server_mon,
|
||||||
handler);
|
handler);
|
||||||
|
|
||||||
@ -752,8 +752,7 @@ int main (void)
|
|||||||
shutdown_context_and_server_side (ctx, zap_thread, server, server_mon,
|
shutdown_context_and_server_side (ctx, zap_thread, server, server_mon,
|
||||||
handler);
|
handler);
|
||||||
|
|
||||||
fprintf (stderr,
|
fprintf (stderr, "test_curve_security_invalid_initiate_command_encrypted_cookie\n");
|
||||||
"test_curve_security_invalid_initiate_command_encrypted_cookie\n");
|
|
||||||
setup_context_and_server_side (&ctx, &handler, &zap_thread, &server,
|
setup_context_and_server_side (&ctx, &handler, &zap_thread, &server,
|
||||||
&server_mon, my_endpoint);
|
&server_mon, my_endpoint);
|
||||||
test_curve_security_invalid_initiate_command_encrypted_cookie (
|
test_curve_security_invalid_initiate_command_encrypted_cookie (
|
||||||
@ -761,9 +760,7 @@ int main (void)
|
|||||||
shutdown_context_and_server_side (ctx, zap_thread, server, server_mon,
|
shutdown_context_and_server_side (ctx, zap_thread, server, server_mon,
|
||||||
handler);
|
handler);
|
||||||
|
|
||||||
fprintf (
|
fprintf (stderr, "test_curve_security_invalid_initiate_command_encrypted_content\n");
|
||||||
stderr,
|
|
||||||
"test_curve_security_invalid_initiate_command_encrypted_content\n");
|
|
||||||
setup_context_and_server_side (&ctx, &handler, &zap_thread, &server,
|
setup_context_and_server_side (&ctx, &handler, &zap_thread, &server,
|
||||||
&server_mon, my_endpoint);
|
&server_mon, my_endpoint);
|
||||||
test_curve_security_invalid_initiate_command_encrypted_content (
|
test_curve_security_invalid_initiate_command_encrypted_content (
|
||||||
@ -772,16 +769,15 @@ int main (void)
|
|||||||
handler);
|
handler);
|
||||||
|
|
||||||
// test with a large identity (resulting in large metadata)
|
// test with a large identity (resulting in large metadata)
|
||||||
fprintf (stderr,
|
fprintf (stderr, "test_curve_security_with_valid_credentials (large identity)\n");
|
||||||
"test_curve_security_with_valid_credentials (large identity)\n");
|
|
||||||
setup_context_and_server_side (
|
setup_context_and_server_side (
|
||||||
&ctx, &handler, &zap_thread, &server, &server_mon, my_endpoint,
|
&ctx, &handler, &zap_thread, &server, &server_mon, my_endpoint,
|
||||||
&zap_handler_large_identity, &socket_config_curve_server,
|
&zap_handler_large_identity, &socket_config_curve_server, &valid_server_secret,
|
||||||
&valid_server_secret, large_identity);
|
large_identity);
|
||||||
test_curve_security_with_valid_credentials (ctx, my_endpoint, server,
|
test_curve_security_with_valid_credentials (ctx, my_endpoint, server,
|
||||||
server_mon, timeout);
|
server_mon, timeout);
|
||||||
shutdown_context_and_server_side (ctx, zap_thread, server, server_mon,
|
shutdown_context_and_server_side (ctx, zap_thread, server, server_mon,
|
||||||
handler);
|
handler);
|
||||||
|
|
||||||
ctx = zmq_ctx_new ();
|
ctx = zmq_ctx_new ();
|
||||||
test_curve_security_invalid_keysize (ctx);
|
test_curve_security_invalid_keysize (ctx);
|
||||||
|
@ -108,7 +108,7 @@ int main (void)
|
|||||||
// Server socket will accept connections
|
// Server socket will accept connections
|
||||||
void *server = zmq_socket (ctx, ZMQ_DEALER);
|
void *server = zmq_socket (ctx, ZMQ_DEALER);
|
||||||
assert (server);
|
assert (server);
|
||||||
int rc = zmq_setsockopt (server, ZMQ_ROUTING_ID, "IDENT", 6);
|
int rc = zmq_setsockopt (server, ZMQ_IDENTITY, "IDENT", 6);
|
||||||
assert (rc == 0);
|
assert (rc == 0);
|
||||||
int as_server = 1;
|
int as_server = 1;
|
||||||
rc = zmq_setsockopt (server, ZMQ_PLAIN_SERVER, &as_server, sizeof (int));
|
rc = zmq_setsockopt (server, ZMQ_PLAIN_SERVER, &as_server, sizeof (int));
|
||||||
|
@ -280,11 +280,13 @@ int main (void)
|
|||||||
test_zap_errors (&socket_config_plain_server, NULL,
|
test_zap_errors (&socket_config_plain_server, NULL,
|
||||||
&socket_config_plain_client, NULL);
|
&socket_config_plain_client, NULL);
|
||||||
|
|
||||||
fprintf (stderr, "CURVE mechanism\n");
|
if (zmq_has ("curve")) {
|
||||||
setup_testutil_security_curve ();
|
fprintf (stderr, "CURVE mechanism\n");
|
||||||
|
setup_testutil_security_curve ();
|
||||||
|
|
||||||
curve_client_data_t curve_client_data = {
|
curve_client_data_t curve_client_data = {
|
||||||
valid_server_public, valid_client_public, valid_client_secret};
|
valid_server_public, valid_client_public, valid_client_secret};
|
||||||
test_zap_errors (&socket_config_curve_server, valid_server_secret,
|
test_zap_errors (&socket_config_curve_server, valid_server_secret,
|
||||||
&socket_config_curve_client, &curve_client_data);
|
&socket_config_curve_client, &curve_client_data);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -82,7 +82,7 @@ void test_req_only_listens_to_current_peer (void *ctx)
|
|||||||
void *req = zmq_socket (ctx, ZMQ_REQ);
|
void *req = zmq_socket (ctx, ZMQ_REQ);
|
||||||
assert (req);
|
assert (req);
|
||||||
|
|
||||||
int rc = zmq_setsockopt(req, ZMQ_ROUTING_ID, "A", 2);
|
int rc = zmq_setsockopt(req, ZMQ_IDENTITY, "A", 2);
|
||||||
assert (rc == 0);
|
assert (rc == 0);
|
||||||
|
|
||||||
rc = zmq_bind (req, bind_address);
|
rc = zmq_bind (req, bind_address);
|
||||||
|
@ -58,7 +58,7 @@ void test_fair_queue_in (void *ctx)
|
|||||||
|
|
||||||
char *str = strdup("A");
|
char *str = strdup("A");
|
||||||
str [0] += peer;
|
str [0] += peer;
|
||||||
rc = zmq_setsockopt (senders [peer], ZMQ_ROUTING_ID, str, 2);
|
rc = zmq_setsockopt (senders [peer], ZMQ_IDENTITY, str, 2);
|
||||||
assert (rc == 0);
|
assert (rc == 0);
|
||||||
free (str);
|
free (str);
|
||||||
|
|
||||||
@ -130,7 +130,7 @@ void test_destroy_queue_on_disconnect (void *ctx)
|
|||||||
void *B = zmq_socket (ctx, ZMQ_DEALER);
|
void *B = zmq_socket (ctx, ZMQ_DEALER);
|
||||||
assert (B);
|
assert (B);
|
||||||
|
|
||||||
rc = zmq_setsockopt (B, ZMQ_ROUTING_ID, "B", 2);
|
rc = zmq_setsockopt (B, ZMQ_IDENTITY, "B", 2);
|
||||||
assert (rc == 0);
|
assert (rc == 0);
|
||||||
|
|
||||||
rc = zmq_connect (B, connect_address);
|
rc = zmq_connect (B, connect_address);
|
||||||
|
@ -277,7 +277,7 @@ test_stream_to_stream (void)
|
|||||||
|
|
||||||
// Sent HTTP request on client socket
|
// Sent HTTP request on client socket
|
||||||
// Get server identity
|
// Get server identity
|
||||||
rc = zmq_getsockopt (client, ZMQ_ROUTING_ID, id, &id_size);
|
rc = zmq_getsockopt (client, ZMQ_IDENTITY, id, &id_size);
|
||||||
assert (rc == 0);
|
assert (rc == 0);
|
||||||
// First frame is server identity
|
// First frame is server identity
|
||||||
rc = zmq_send (client, id, id_size, ZMQ_SNDMORE);
|
rc = zmq_send (client, id, id_size, ZMQ_SNDMORE);
|
||||||
|
@ -57,7 +57,7 @@ bool has_more (void* socket)
|
|||||||
|
|
||||||
bool get_identity (void* socket, char* data, size_t* size)
|
bool get_identity (void* socket, char* data, size_t* size)
|
||||||
{
|
{
|
||||||
int rc = zmq_getsockopt (socket, ZMQ_ROUTING_ID, data, size);
|
int rc = zmq_getsockopt (socket, ZMQ_IDENTITY, data, size);
|
||||||
return rc == 0;
|
return rc == 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -140,7 +140,7 @@ int main(int, char**)
|
|||||||
// Send initial message.
|
// Send initial message.
|
||||||
char blob_data [256];
|
char blob_data [256];
|
||||||
size_t blob_size = sizeof(blob_data);
|
size_t blob_size = sizeof(blob_data);
|
||||||
rc = zmq_getsockopt (sockets [CLIENT], ZMQ_ROUTING_ID, blob_data, &blob_size);
|
rc = zmq_getsockopt (sockets [CLIENT], ZMQ_IDENTITY, blob_data, &blob_size);
|
||||||
assert (rc != -1);
|
assert (rc != -1);
|
||||||
assert(blob_size > 0);
|
assert(blob_size > 0);
|
||||||
zmq_msg_t msg;
|
zmq_msg_t msg;
|
||||||
|
@ -349,7 +349,7 @@ void setup_context_and_server_side (
|
|||||||
|
|
||||||
socket_config_ (*server, socket_config_data_);
|
socket_config_ (*server, socket_config_data_);
|
||||||
|
|
||||||
rc = zmq_setsockopt (*server, ZMQ_ROUTING_ID, identity, strlen(identity));
|
rc = zmq_setsockopt (*server, ZMQ_IDENTITY, identity, strlen(identity));
|
||||||
assert (rc == 0);
|
assert (rc == 0);
|
||||||
|
|
||||||
rc = zmq_bind (*server, "tcp://127.0.0.1:*");
|
rc = zmq_bind (*server, "tcp://127.0.0.1:*");
|
||||||
|
Loading…
x
Reference in New Issue
Block a user