diff --git a/src/udp_address.cpp b/src/udp_address.cpp index 5870fc69..4d6a5dd4 100644 --- a/src/udp_address.cpp +++ b/src/udp_address.cpp @@ -49,6 +49,7 @@ zmq::udp_address_t::udp_address_t () { memset (&bind_address, 0, sizeof bind_address); + memset (&dest_address, 0, sizeof dest_address); } zmq::udp_address_t::~udp_address_t () @@ -68,11 +69,6 @@ int zmq::udp_address_t::resolve (const char *name_) std::string addr_str (name_, delimiter - name_); std::string port_str (delimiter + 1); - // Remove square brackets around the address, if any, as used in IPv6 - if (addr_str.size () >= 2 && addr_str [0] == '[' && - addr_str [addr_str.size () - 1] == ']') - addr_str = addr_str.substr (1, addr_str.size () - 2); - // Parse the port number (0 is not a valid port). uint16_t port = (uint16_t) atoi (port_str.c_str ()); if (port == 0) { diff --git a/src/udp_engine.cpp b/src/udp_engine.cpp index e45f8c12..9d1210ac 100644 --- a/src/udp_engine.cpp +++ b/src/udp_engine.cpp @@ -91,7 +91,7 @@ void zmq::udp_engine_t::plug (io_thread_t* io_thread_, session_base_t *session_) mreq.imr_multiaddr = address->resolved.udp_addr->multicast_ip (); mreq.imr_interface = address->resolved.udp_addr->interface_ip (); - int rc = setsockopt (fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof (mreq)); + int rc = setsockopt (fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, (char*) &mreq, sizeof (mreq)); #ifdef ZMQ_HAVE_WINDOWS wsa_assert (rc != SOCKET_ERROR); @@ -145,10 +145,17 @@ void zmq::udp_engine_t::out_event() body_msg.close (); errno_assert (rc == 0); +#ifdef ZMQ_HAVE_WINDOWS + rc = sendto(fd, (char*) out_buffer, size, 0, + address->resolved.udp_addr->dest_addr(), + address->resolved.udp_addr->dest_addrlen()); + wsa_assert(rc != SOCKET_ERROR); +#else rc = sendto (fd, out_buffer, size, 0, address->resolved.udp_addr->dest_addr (), address->resolved.udp_addr->dest_addrlen ()); errno_assert (rc != -1); +#endif } else reset_pollout (handle); @@ -170,46 +177,63 @@ void zmq::udp_engine_t::restart_output() void zmq::udp_engine_t::in_event() { - size_t read = recv (fd, in_buffer, MAX_UDP_MSG, 0); - - if (read > 0) { - size_t group_size = in_buffer[0]; - - // This doesn't fit, just ingore - if (read - 1 < group_size) - return; - - size_t body_size = read -1 - group_size; - - msg_t msg; - int rc = msg.init_size (group_size); - errno_assert (rc == 0); - msg.set_flags (msg_t::more); - memcpy (msg.data (), in_buffer + 1, group_size); - - rc = session->push_msg (&msg); - errno_assert (rc == 0 || (rc == -1 && errno == EAGAIN)); - - // Pipe is full - if (rc != 0) { - rc = msg.close (); - errno_assert (rc == 0); - - reset_pollin (handle); - return; - } - - rc = msg.close (); - errno_assert (rc == 0); - rc = msg.init_size (body_size); - errno_assert (rc == 0); - memcpy (msg.data (), in_buffer + 1 + group_size, body_size); - rc = session->push_msg (&msg); - errno_assert (rc == 0); - rc = msg.close (); - errno_assert (rc == 0); - session->flush (); +#ifdef ZMQ_HAVE_WINDOWS + int nbytes = recv(fd, (char*) in_buffer, MAX_UDP_MSG, 0); + const int last_error = WSAGetLastError(); + if (nbytes == SOCKET_ERROR) { + wsa_assert( + last_error == WSAENETDOWN || + last_error == WSAENETRESET || + last_error == WSAEWOULDBLOCK); + return; } +#else + int nbytes = recv(fd, in_buffer, MAX_UDP_MSG, 0); + if (nbytes == -1) { + errno_assert(errno != EBADF + && errno != EFAULT + && errno != ENOMEM + && errno != ENOTSOCK); + return; + } +#endif + + int group_size = in_buffer[0]; + + // This doesn't fit, just ingore + if (nbytes - 1 < group_size) + return; + + int body_size = nbytes - 1 - group_size; + + msg_t msg; + int rc = msg.init_size (group_size); + errno_assert (rc == 0); + msg.set_flags (msg_t::more); + memcpy (msg.data (), in_buffer + 1, group_size); + + rc = session->push_msg (&msg); + errno_assert (rc == 0 || (rc == -1 && errno == EAGAIN)); + + // Pipe is full + if (rc != 0) { + rc = msg.close (); + errno_assert (rc == 0); + + reset_pollin (handle); + return; + } + + rc = msg.close (); + errno_assert (rc == 0); + rc = msg.init_size (body_size); + errno_assert (rc == 0); + memcpy (msg.data (), in_buffer + 1 + group_size, body_size); + rc = session->push_msg (&msg); + errno_assert (rc == 0); + rc = msg.close (); + errno_assert (rc == 0); + session->flush (); } void zmq::udp_engine_t::restart_input() diff --git a/tests/test_radio_dish.cpp b/tests/test_radio_dish.cpp index 560d330d..18183ac0 100644 --- a/tests/test_radio_dish.cpp +++ b/tests/test_radio_dish.cpp @@ -77,6 +77,7 @@ int msg_recv_cmp (zmq_msg_t *msg_, void *s_, const char* group_, const char* bod } zmq_msg_close (msg_); + free(body); return recv_rc; } diff --git a/tests/test_udp.cpp b/tests/test_udp.cpp index 05a78d75..a49e2b2e 100644 --- a/tests/test_udp.cpp +++ b/tests/test_udp.cpp @@ -77,6 +77,7 @@ int msg_recv_cmp (zmq_msg_t *msg_, void *s_, const char* group_, const char* bod } zmq_msg_close (msg_); + free (body); return recv_rc; } @@ -91,10 +92,10 @@ int main (void) void *radio = zmq_socket (ctx, ZMQ_RADIO); void *dish = zmq_socket (ctx, ZMQ_DISH); - int rc = zmq_bind (radio, "udp://127.0.0.1:5556"); + int rc = zmq_connect (radio, "udp://127.0.0.1:5556"); assert (rc == 0); - rc = zmq_connect (dish, "udp://127.0.0.1:5556"); + rc = zmq_bind (dish, "udp://127.0.0.1:5556"); assert (rc == 0); zmq_sleep (1);