mirror of
https://github.com/zeromq/libzmq.git
synced 2025-03-09 15:26:04 +00:00
detecting data loss for PGM2 receiver
This commit is contained in:
parent
39d915ded8
commit
64e68e7486
@ -160,8 +160,13 @@ void zmq::pgm_receiver_t::in_event ()
|
||||
peer_info_t peer_info = {false, NULL};
|
||||
it = peers.insert (std::make_pair (*tsi, peer_info)).first;
|
||||
|
||||
#ifdef ZMQ_HAVE_OPENPGM1
|
||||
zmq_log (1, "New peer TSI: %s, %s(%i).\n", pgm_print_tsi (tsi),
|
||||
__FILE__, __LINE__);
|
||||
#elif ZMQ_HAVE_OPENPGM2
|
||||
zmq_log (1, "New peer TSI: %s, %s(%i).\n", pgm_tsi_print (tsi),
|
||||
__FILE__, __LINE__);
|
||||
#endif
|
||||
}
|
||||
|
||||
// There is not beginning of the message in current APDU and we
|
||||
@ -187,8 +192,13 @@ void zmq::pgm_receiver_t::in_event ()
|
||||
it->second.decoder = new zmq_decoder_t;
|
||||
it->second.decoder->set_inout (inout);
|
||||
|
||||
#ifdef ZMQ_HAVE_OPENPGM1
|
||||
zmq_log (1, "Peer %s joined into the stream, %s(%i)\n",
|
||||
pgm_print_tsi (tsi), __FILE__, __LINE__);
|
||||
#elif ZMQ_HAVE_OPENPGM2
|
||||
zmq_log (1, "Peer %s joined into the stream, %s(%i)\n",
|
||||
pgm_tsi_print (tsi), __FILE__, __LINE__);
|
||||
#endif
|
||||
}
|
||||
|
||||
if (nbytes > 0) {
|
||||
|
@ -275,7 +275,11 @@ int zmq::pgm_socket_t::open_transport (void)
|
||||
|
||||
// Set transport->can_send_data = FALSE.
|
||||
// Note that NAKs are still generated by the transport.
|
||||
#ifdef ZMQ_HAVE_OPENPGM1
|
||||
rc = pgm_transport_set_recv_only (g_transport, false);
|
||||
#elif ZMQ_HAVE_OPENPGM2
|
||||
rc = pgm_transport_set_recv_only (g_transport, true, false);
|
||||
#endif
|
||||
if (rc != pgm_ok) {
|
||||
errno = EINVAL;
|
||||
return -1;
|
||||
@ -710,12 +714,14 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_)
|
||||
if (nbytes_rec == -1 && errno == ECONNRESET) {
|
||||
|
||||
// Save lost data TSI.
|
||||
*tsi_ = &(g_transport->lost_data_tsi);
|
||||
*tsi_ = &g_transport->lost_data_tsi;
|
||||
|
||||
zmq_log (1, "Data loss detected %s, %s(%i)\n",
|
||||
pgm_print_tsi (&g_transport->lost_data_tsi), __FILE__, __LINE__);
|
||||
|
||||
nbytes_rec = 0;
|
||||
|
||||
// In case of dala loss -1 is returned.
|
||||
zmq_log (1, "Data loss detected %s, %s(%i)\n",
|
||||
pgm_print_tsi (&(g_transport->lost_data_tsi)), __FILE__, __LINE__);
|
||||
nbytes_rec = 0;
|
||||
return -1;
|
||||
}
|
||||
|
||||
@ -723,7 +729,7 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_)
|
||||
if (nbytes_rec <= 0) {
|
||||
zmq_log (2, "received %i B, errno %i, %s(%i).\n", (int)nbytes_rec,
|
||||
errno, __FILE__, __LINE__);
|
||||
errno_assert (nbytes_rec > 0);
|
||||
errno_assert (false);
|
||||
}
|
||||
#elif defined ZMQ_HAVE_OPENPGM2
|
||||
GError *pgm_error = NULL;
|
||||
@ -740,14 +746,36 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_)
|
||||
// pgm_recvmsg returns ?.
|
||||
if (status == PGM_IO_STATUS_AGAIN ||
|
||||
status == PGM_IO_STATUS_AGAIN2) {
|
||||
|
||||
|
||||
zmq_assert (nbytes_rec == 0);
|
||||
|
||||
// In case if no RDATA/ODATA caused POLLIN 0 is
|
||||
// returned.
|
||||
nbytes_rec = 0;
|
||||
return 0;
|
||||
}
|
||||
|
||||
// Data loss?
|
||||
// Data loss.
|
||||
if (status == PGM_IO_STATUS_RESET) {
|
||||
|
||||
zmq_log (1, "PGMIOStatus %i, nbytes_rec %i, %s(%i).\n",
|
||||
status, (int) nbytes_rec, __FILE__, __LINE__);
|
||||
|
||||
pgm_peer_t* peer = (pgm_peer_t*) g_transport->peers_pending->data;
|
||||
|
||||
// Save lost data TSI.
|
||||
*tsi_ = &peer->tsi;
|
||||
|
||||
zmq_log (1, "Data loss detected %s, %s(%i)\n", pgm_tsi_print (&peer->tsi),
|
||||
__FILE__, __LINE__);
|
||||
|
||||
nbytes_rec = 0;
|
||||
|
||||
// In case of dala loss -1 is returned.
|
||||
return -1;
|
||||
}
|
||||
|
||||
// Catch the rest of the errors.
|
||||
if (status != PGM_IO_STATUS_NORMAL) {
|
||||
zmq_log (1, "PGMIOStatus %i, nbytes_rec %i, %s(%i).\n",
|
||||
status, (int) nbytes_rec, __FILE__, __LINE__);
|
||||
|
Loading…
x
Reference in New Issue
Block a user