mirror of
https://github.com/zeromq/libzmq.git
synced 2025-01-02 19:35:29 +08:00
* Add assertions to check for OpenPGM calls with invalid parameters.
* Assertion to check that pgm_getaddrinfo is actually returning something. * Missing pgm_connect call. * Typo on TOS causing immediate abort. * Placeholder calls for timeouts whilst continuing spin loop functionality. * OpenPGM v5 now supports reference counting so remove init checks. * Duplicate UDP unicast port setting, requires one unicast and one multicast. * Incorrectly set socket rcvbuf size with sndbuf. * Replace std::lexicographical_compare of TSI's with long word integer comparisons. * pgm_socket_t::receive returns -1 on no data.
This commit is contained in:
parent
00cd7d49c7
commit
96d85b2098
@ -144,7 +144,7 @@ void zmq::pgm_receiver_t::in_event ()
|
|||||||
|
|
||||||
// No data to process. This may happen if the packet received is
|
// No data to process. This may happen if the packet received is
|
||||||
// neither ODATA nor ODATA.
|
// neither ODATA nor ODATA.
|
||||||
if (received == 0)
|
if (received < 0)
|
||||||
break;
|
break;
|
||||||
|
|
||||||
// Find the peer based on its TSI.
|
// Find the peer based on its TSI.
|
||||||
|
@ -73,15 +73,13 @@ namespace zmq
|
|||||||
|
|
||||||
struct tsi_comp
|
struct tsi_comp
|
||||||
{
|
{
|
||||||
inline bool operator () (const pgm_tsi_t <si,
|
bool operator () (const pgm_tsi_t <si,
|
||||||
const pgm_tsi_t &rtsi) const
|
const pgm_tsi_t &rtsi) const
|
||||||
{
|
{
|
||||||
if (ltsi.sport < rtsi.sport)
|
uint32_t ll[2], rl[2];
|
||||||
return true;
|
memcpy (ll, <si, sizeof (ll));
|
||||||
|
memcpy (rl, &rtsi, sizeof (rl));
|
||||||
return (std::lexicographical_compare (ltsi.gsi.identifier,
|
return (ll[0] < rl[0]) || (ll[0] == rl[0] && ll[1] < rl[1]);
|
||||||
ltsi.gsi.identifier + 6,
|
|
||||||
rtsi.gsi.identifier, rtsi.gsi.identifier + 6));
|
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -52,6 +52,11 @@ zmq::pgm_socket_t::pgm_socket_t (bool receiver_, const options_t &options_) :
|
|||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Create, bind and connect PGM socket.
|
||||||
|
// network_ of the form <interface & multicast group decls>:<IP port>
|
||||||
|
// e.g. eth0;239.192.0.1:7500
|
||||||
|
// link-local;224.250.0.1,224.250.0.2;224.250.0.3:8000
|
||||||
|
// ;[fe80::1%en0]:7500
|
||||||
int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_)
|
int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_)
|
||||||
{
|
{
|
||||||
// Can not open transport before destroying old one.
|
// Can not open transport before destroying old one.
|
||||||
@ -99,51 +104,59 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_)
|
|||||||
memset (&hints, 0, sizeof (hints));
|
memset (&hints, 0, sizeof (hints));
|
||||||
hints.ai_family = AF_UNSPEC;
|
hints.ai_family = AF_UNSPEC;
|
||||||
if (!pgm_getaddrinfo (network, NULL, &res, &pgm_error)) {
|
if (!pgm_getaddrinfo (network, NULL, &res, &pgm_error)) {
|
||||||
|
// Invalid parameters don't set pgm_error_t
|
||||||
|
zmq_assert (pgm_error != NULL);
|
||||||
if (pgm_error->domain == PGM_ERROR_DOMAIN_IF && (
|
if (pgm_error->domain == PGM_ERROR_DOMAIN_IF && (
|
||||||
pgm_error->code == PGM_ERROR_INVAL ||
|
// NB: cannot catch EAI_BADFLAGS
|
||||||
pgm_error->code == PGM_ERROR_XDEV ||
|
pgm_error->code != PGM_ERROR_SERVICE &&
|
||||||
pgm_error->code == PGM_ERROR_NODEV ||
|
pgm_error->code != PGM_ERROR_SOCKTNOSUPPORT))
|
||||||
pgm_error->code == PGM_ERROR_NOTUNIQ ||
|
// User, host, or network configuration or transient error
|
||||||
pgm_error->code == PGM_ERROR_ADDRFAMILY ||
|
|
||||||
pgm_error->code == PGM_ERROR_AFNOSUPPORT ||
|
|
||||||
pgm_error->code == PGM_ERROR_NODATA ||
|
|
||||||
pgm_error->code == PGM_ERROR_NONAME ||
|
|
||||||
pgm_error->code == PGM_ERROR_SERVICE))
|
|
||||||
goto err_abort;
|
goto err_abort;
|
||||||
|
|
||||||
/* fatal OpenPGM API error */
|
// Fatal OpenPGM internal error
|
||||||
zmq_assert (false);
|
zmq_assert (false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
zmq_assert (res != NULL);
|
||||||
|
|
||||||
// Pick up detected IP family
|
// Pick up detected IP family
|
||||||
sa_family = res->ai_send_addrs[0].gsr_group.ss_family;
|
sa_family = res->ai_send_addrs[0].gsr_group.ss_family;
|
||||||
|
|
||||||
// Create IP/PGM or UDP/PGM socket
|
// Create IP/PGM or UDP/PGM socket
|
||||||
if (udp_encapsulation_) {
|
if (udp_encapsulation_) {
|
||||||
if (!pgm_socket (&sock, sa_family, SOCK_SEQPACKET, IPPROTO_UDP, &pgm_error)) {
|
if (!pgm_socket (&sock, sa_family, SOCK_SEQPACKET, IPPROTO_UDP, &pgm_error)) {
|
||||||
|
// Invalid parameters don't set pgm_error_t
|
||||||
|
zmq_assert (pgm_error != NULL);
|
||||||
if (pgm_error->domain == PGM_ERROR_DOMAIN_SOCKET && (
|
if (pgm_error->domain == PGM_ERROR_DOMAIN_SOCKET && (
|
||||||
pgm_error->code == PGM_ERROR_INVAL ||
|
pgm_error->code != PGM_ERROR_BADF &&
|
||||||
pgm_error->code == PGM_ERROR_NODEV))
|
pgm_error->code != PGM_ERROR_FAULT &&
|
||||||
|
pgm_error->code != PGM_ERROR_NOPROTOOPT &&
|
||||||
|
pgm_error->code != PGM_ERROR_FAILED))
|
||||||
|
// User, host, or network configuration or transient error
|
||||||
goto err_abort;
|
goto err_abort;
|
||||||
|
|
||||||
/* fatal OpenPGM API error */
|
// Fatal OpenPGM internal error
|
||||||
zmq_assert (false);
|
zmq_assert (false);
|
||||||
}
|
}
|
||||||
|
|
||||||
// All options are of data type int
|
// All options are of data type int
|
||||||
const int encapsulation_port = port_number;
|
const int encapsulation_port = port_number;
|
||||||
if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_UDP_ENCAP_UCAST_PORT, &encapsulation_port, sizeof (encapsulation_port)) ||
|
if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_UDP_ENCAP_UCAST_PORT, &encapsulation_port, sizeof (encapsulation_port)) ||
|
||||||
!pgm_setsockopt (sock, IPPROTO_PGM, PGM_UDP_ENCAP_UCAST_PORT, &encapsulation_port, sizeof (encapsulation_port)))
|
!pgm_setsockopt (sock, IPPROTO_PGM, PGM_UDP_ENCAP_MCAST_PORT, &encapsulation_port, sizeof (encapsulation_port)))
|
||||||
goto err_abort;
|
goto err_abort;
|
||||||
} else {
|
} else {
|
||||||
if (!pgm_socket (&sock, sa_family, SOCK_SEQPACKET, IPPROTO_PGM, &pgm_error)) {
|
if (!pgm_socket (&sock, sa_family, SOCK_SEQPACKET, IPPROTO_PGM, &pgm_error)) {
|
||||||
|
// Invalid parameters don't set pgm_error_t
|
||||||
|
zmq_assert (pgm_error != NULL);
|
||||||
if (pgm_error->domain == PGM_ERROR_DOMAIN_SOCKET && (
|
if (pgm_error->domain == PGM_ERROR_DOMAIN_SOCKET && (
|
||||||
pgm_error->code == PGM_ERROR_INVAL ||
|
pgm_error->code != PGM_ERROR_BADF &&
|
||||||
pgm_error->code == PGM_ERROR_PERM ||
|
pgm_error->code != PGM_ERROR_FAULT &&
|
||||||
pgm_error->code == PGM_ERROR_NODEV))
|
pgm_error->code != PGM_ERROR_NOPROTOOPT &&
|
||||||
|
pgm_error->code != PGM_ERROR_FAILED))
|
||||||
|
// User, host, or network configuration or transient error
|
||||||
goto err_abort;
|
goto err_abort;
|
||||||
|
|
||||||
/* fatal OpenPGM API error */
|
// Fatal OpenPGM internal error
|
||||||
zmq_assert (false);
|
zmq_assert (false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -157,7 +170,7 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_)
|
|||||||
goto err_abort;
|
goto err_abort;
|
||||||
}
|
}
|
||||||
if (sndbuf) {
|
if (sndbuf) {
|
||||||
if (!pgm_setsockopt (sock, SOL_SOCKET, SO_RCVBUF, &sndbuf, sizeof (sndbuf)))
|
if (!pgm_setsockopt (sock, SOL_SOCKET, SO_SNDBUF, &sndbuf, sizeof (sndbuf)))
|
||||||
goto err_abort;
|
goto err_abort;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -247,22 +260,17 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_)
|
|||||||
if_req.ir_scope_id = sa6.sin6_scope_id;
|
if_req.ir_scope_id = sa6.sin6_scope_id;
|
||||||
}
|
}
|
||||||
if (!pgm_bind3 (sock, &addr, sizeof (addr), &if_req, sizeof (if_req), &if_req, sizeof (if_req), &pgm_error)) {
|
if (!pgm_bind3 (sock, &addr, sizeof (addr), &if_req, sizeof (if_req), &if_req, sizeof (if_req), &pgm_error)) {
|
||||||
if (pgm_error->domain == PGM_ERROR_DOMAIN_IF && (
|
// Invalid parameters don't set pgm_error_t
|
||||||
pgm_error->code == PGM_ERROR_INVAL ||
|
zmq_assert (pgm_error != NULL);
|
||||||
pgm_error->code == PGM_ERROR_XDEV ||
|
if ((pgm_error->domain == PGM_ERROR_DOMAIN_SOCKET ||
|
||||||
pgm_error->code == PGM_ERROR_NODEV ||
|
pgm_error->domain == PGM_ERROR_DOMAIN_IF) && (
|
||||||
pgm_error->code == PGM_ERROR_NOTUNIQ ||
|
pgm_error->code != PGM_ERROR_INVAL &&
|
||||||
pgm_error->code == PGM_ERROR_ADDRFAMILY ||
|
pgm_error->code != PGM_ERROR_BADF &&
|
||||||
pgm_error->code == PGM_ERROR_AFNOSUPPORT ||
|
pgm_error->code != PGM_ERROR_FAULT))
|
||||||
pgm_error->code == PGM_ERROR_NODATA ||
|
// User, host, or network configuration or transient error
|
||||||
pgm_error->code == PGM_ERROR_NONAME ||
|
|
||||||
pgm_error->code == PGM_ERROR_SERVICE))
|
|
||||||
goto err_abort;
|
|
||||||
if (pgm_error->domain == PGM_ERROR_DOMAIN_SOCKET && (
|
|
||||||
pgm_error->code == PGM_ERROR_FAILED))
|
|
||||||
goto err_abort;
|
goto err_abort;
|
||||||
|
|
||||||
/* fatal OpenPGM API error */
|
// Fatal OpenPGM internal error
|
||||||
zmq_assert (false);
|
zmq_assert (false);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -274,7 +282,9 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_)
|
|||||||
}
|
}
|
||||||
if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_SEND_GROUP, &res->ai_send_addrs[0], sizeof (struct group_req)))
|
if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_SEND_GROUP, &res->ai_send_addrs[0], sizeof (struct group_req)))
|
||||||
goto err_abort;
|
goto err_abort;
|
||||||
|
|
||||||
pgm_freeaddrinfo (res);
|
pgm_freeaddrinfo (res);
|
||||||
|
res = NULL;
|
||||||
|
|
||||||
// Set IP level parameters
|
// Set IP level parameters
|
||||||
{
|
{
|
||||||
@ -287,12 +297,19 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_)
|
|||||||
!pgm_setsockopt (sock, IPPROTO_PGM, PGM_MULTICAST_HOPS, &multicast_hops, sizeof (multicast_hops)))
|
!pgm_setsockopt (sock, IPPROTO_PGM, PGM_MULTICAST_HOPS, &multicast_hops, sizeof (multicast_hops)))
|
||||||
goto err_abort;
|
goto err_abort;
|
||||||
if (AF_INET6 != sa_family &&
|
if (AF_INET6 != sa_family &&
|
||||||
!pgm_setsockopt (sock, IPPROTO_PGM, PGM_TOS, &dscp, sizeof (dscp)));
|
!pgm_setsockopt (sock, IPPROTO_PGM, PGM_TOS, &dscp, sizeof (dscp)))
|
||||||
goto err_abort;
|
goto err_abort;
|
||||||
if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_NOBLOCK, &nonblocking, sizeof (nonblocking)))
|
if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_NOBLOCK, &nonblocking, sizeof (nonblocking)))
|
||||||
goto err_abort;
|
goto err_abort;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Connect PGM transport to start state machine.
|
||||||
|
if (!pgm_connect (sock, &pgm_error)) {
|
||||||
|
// Invalid parameters don't set pgm_error_t
|
||||||
|
zmq_assert (pgm_error != NULL);
|
||||||
|
goto err_abort;
|
||||||
|
}
|
||||||
|
|
||||||
// For receiver transport preallocate pgm_msgv array.
|
// For receiver transport preallocate pgm_msgv array.
|
||||||
if (receiver) {
|
if (receiver) {
|
||||||
zmq_assert (in_batch_size > 0);
|
zmq_assert (in_batch_size > 0);
|
||||||
@ -439,7 +456,8 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_)
|
|||||||
nbytes_rec = 0;
|
nbytes_rec = 0;
|
||||||
nbytes_processed = 0;
|
nbytes_processed = 0;
|
||||||
pgm_msgv_processed = 0;
|
pgm_msgv_processed = 0;
|
||||||
return 0;
|
errno = EAGAIN;
|
||||||
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
// If we have are going first time or if we have processed all pgm_msgv_t
|
// If we have are going first time or if we have processed all pgm_msgv_t
|
||||||
@ -458,6 +476,7 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_)
|
|||||||
const int status = pgm_recvmsgv (sock, pgm_msgv,
|
const int status = pgm_recvmsgv (sock, pgm_msgv,
|
||||||
pgm_msgv_len, MSG_ERRQUEUE, &nbytes_rec, &pgm_error);
|
pgm_msgv_len, MSG_ERRQUEUE, &nbytes_rec, &pgm_error);
|
||||||
|
|
||||||
|
// Invalid parameters
|
||||||
zmq_assert (status != PGM_IO_STATUS_ERROR);
|
zmq_assert (status != PGM_IO_STATUS_ERROR);
|
||||||
|
|
||||||
// In a case when no ODATA/RDATA fired POLLIN event (SPM...)
|
// In a case when no ODATA/RDATA fired POLLIN event (SPM...)
|
||||||
@ -466,10 +485,17 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_)
|
|||||||
|
|
||||||
zmq_assert (nbytes_rec == 0);
|
zmq_assert (nbytes_rec == 0);
|
||||||
|
|
||||||
|
struct timeval tv;
|
||||||
|
socklen_t optlen = sizeof (tv);
|
||||||
|
const bool rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_TIME_REMAIN, &tv, &optlen);
|
||||||
|
|
||||||
|
zmq_assert (rc);
|
||||||
|
|
||||||
// In case if no RDATA/ODATA caused POLLIN 0 is
|
// In case if no RDATA/ODATA caused POLLIN 0 is
|
||||||
// returned.
|
// returned.
|
||||||
nbytes_rec = 0;
|
nbytes_rec = 0;
|
||||||
return 0;
|
errno = EBUSY;
|
||||||
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Send SPMR, NAK, ACK is rate limited.
|
// Send SPMR, NAK, ACK is rate limited.
|
||||||
@ -477,10 +503,15 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_)
|
|||||||
|
|
||||||
zmq_assert (nbytes_rec == 0);
|
zmq_assert (nbytes_rec == 0);
|
||||||
|
|
||||||
|
struct timeval tv;
|
||||||
|
socklen_t optlen = sizeof (tv);
|
||||||
|
const bool rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_RATE_REMAIN, &tv, &optlen);
|
||||||
|
|
||||||
// In case if no RDATA/ODATA caused POLLIN 0 is
|
// In case if no RDATA/ODATA caused POLLIN 0 is
|
||||||
// returned.
|
// returned.
|
||||||
nbytes_rec = 0;
|
nbytes_rec = 0;
|
||||||
return 0;
|
errno = EBUSY;
|
||||||
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
// No peers and hence no incoming packets.
|
// No peers and hence no incoming packets.
|
||||||
@ -491,7 +522,8 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_)
|
|||||||
// In case if no RDATA/ODATA caused POLLIN 0 is
|
// In case if no RDATA/ODATA caused POLLIN 0 is
|
||||||
// returned.
|
// returned.
|
||||||
nbytes_rec = 0;
|
nbytes_rec = 0;
|
||||||
return 0;
|
errno = EAGAIN;
|
||||||
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Data loss.
|
// Data loss.
|
||||||
@ -549,6 +581,7 @@ void zmq::pgm_socket_t::process_upstream ()
|
|||||||
const int status = pgm_recvmsgv (sock, &dummy_msg,
|
const int status = pgm_recvmsgv (sock, &dummy_msg,
|
||||||
1, MSG_ERRQUEUE, &dummy_bytes, &pgm_error);
|
1, MSG_ERRQUEUE, &dummy_bytes, &pgm_error);
|
||||||
|
|
||||||
|
// Invalid parameters
|
||||||
zmq_assert (status != PGM_IO_STATUS_ERROR);
|
zmq_assert (status != PGM_IO_STATUS_ERROR);
|
||||||
|
|
||||||
// No data should be returned.
|
// No data should be returned.
|
||||||
|
24
src/zmq.cpp
24
src/zmq.cpp
@ -234,31 +234,27 @@ void *zmq_init (int io_threads_)
|
|||||||
}
|
}
|
||||||
|
|
||||||
#if defined ZMQ_HAVE_OPENPGM
|
#if defined ZMQ_HAVE_OPENPGM
|
||||||
// Unfortunately, OpenPGM doesn't support refcounted init/shutdown, thus,
|
|
||||||
// let's fail if it was initialised beforehand.
|
|
||||||
zmq_assert (!pgm_supported ());
|
|
||||||
|
|
||||||
// Init PGM transport. Ensure threading and timer are enabled. Find PGM
|
// Init PGM transport. Ensure threading and timer are enabled. Find PGM
|
||||||
// protocol ID. Note that if you want to use gettimeofday and sleep for
|
// protocol ID. Note that if you want to use gettimeofday and sleep for
|
||||||
// openPGM timing, set environment variables PGM_TIMER to "GTOD" and
|
// openPGM timing, set environment variables PGM_TIMER to "GTOD" and
|
||||||
// PGM_SLEEP to "USLEEP".
|
// PGM_SLEEP to "USLEEP".
|
||||||
pgm_error_t *pgm_error = NULL;
|
pgm_error_t *pgm_error = NULL;
|
||||||
bool rc = pgm_init (&pgm_error);
|
const bool rc = pgm_init (&pgm_error);
|
||||||
if (rc != TRUE) {
|
if (rc != TRUE) {
|
||||||
if (pgm_error->domain == PGM_ERROR_DOMAIN_IF && (
|
|
||||||
pgm_error->code == PGM_ERROR_INVAL ||
|
// Invalid parameters don't set pgm_error_t
|
||||||
pgm_error->code == PGM_ERROR_XDEV ||
|
zmq_assert (pgm_error != NULL);
|
||||||
pgm_error->code == PGM_ERROR_NODEV ||
|
if (pgm_error->domain == PGM_ERROR_DOMAIN_TIME && (
|
||||||
pgm_error->code == PGM_ERROR_NOTUNIQ ||
|
pgm_error->code == PGM_ERROR_FAILED)) {
|
||||||
pgm_error->code == PGM_ERROR_ADDRFAMILY ||
|
|
||||||
pgm_error->code == PGM_ERROR_AFNOSUPPORT ||
|
// Failed to access RTC or HPET device.
|
||||||
pgm_error->code == PGM_ERROR_NODATA ||
|
|
||||||
pgm_error->code == PGM_ERROR_NONAME ||
|
|
||||||
pgm_error->code == PGM_ERROR_SERVICE)) {
|
|
||||||
pgm_error_free (pgm_error);
|
pgm_error_free (pgm_error);
|
||||||
errno = EINVAL;
|
errno = EINVAL;
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// PGM_ERROR_DOMAIN_ENGINE: WSAStartup errors or missing WSARecvMsg.
|
||||||
zmq_assert (false);
|
zmq_assert (false);
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
Loading…
x
Reference in New Issue
Block a user