0
0
mirror of https://github.com/zeromq/libzmq.git synced 2024-12-26 23:01:04 +08:00

make udp support for radio-dish

This commit is contained in:
somdoron 2016-01-29 21:17:11 +02:00
parent a2b9d826e4
commit 5ebfd1728f
14 changed files with 744 additions and 8 deletions

1
.gitignore vendored
View File

@ -123,6 +123,7 @@ test_stream_exceeds_buffer
test_poller test_poller
test_timers test_timers
test_radio_dish test_radio_dish
test_udp
test_large_msg test_large_msg
tests/test*.log tests/test*.log
tests/test*.trs tests/test*.trs

View File

@ -460,7 +460,9 @@ set(cxx-sources
timers.cpp timers.cpp
config.hpp config.hpp
radio.cpp radio.cpp
dish.cpp) dish.cpp
udp_engine.cpp
udp_address.cpp)
set(rc-sources version.rc) set(rc-sources version.rc)

View File

@ -193,6 +193,10 @@ src_libzmq_la_SOURCES = \
src/tipc_listener.hpp \ src/tipc_listener.hpp \
src/trie.cpp \ src/trie.cpp \
src/trie.hpp \ src/trie.hpp \
src/udp_address.cpp \
src/udp_address.hpp \
src/udp_engine.cpp \
src/udp_engine.hpp \
src/v1_decoder.cpp \ src/v1_decoder.cpp \
src/v1_decoder.hpp \ src/v1_decoder.hpp \
src/v2_decoder.cpp \ src/v2_decoder.cpp \
@ -385,7 +389,8 @@ test_apps = \
tests/test_stream_exceeds_buffer \ tests/test_stream_exceeds_buffer \
tests/test_poller \ tests/test_poller \
tests/test_timers \ tests/test_timers \
tests/test_radio_dish tests/test_radio_dish \
tests/test_udp
tests_test_system_SOURCES = tests/test_system.cpp tests_test_system_SOURCES = tests/test_system.cpp
tests_test_system_LDADD = src/libzmq.la tests_test_system_LDADD = src/libzmq.la
@ -605,6 +610,8 @@ tests_test_timers_LDADD = src/libzmq.la
tests_test_radio_dish_SOURCES = tests/test_radio_dish.cpp tests_test_radio_dish_SOURCES = tests/test_radio_dish.cpp
tests_test_radio_dish_LDADD = src/libzmq.la tests_test_radio_dish_LDADD = src/libzmq.la
tests_test_udp_SOURCES = tests/test_udp.cpp
tests_test_udp_LDADD = src/libzmq.la
if !ON_MINGW if !ON_MINGW
if !ON_CYGWIN if !ON_CYGWIN

View File

@ -33,6 +33,7 @@
#include "ctx.hpp" #include "ctx.hpp"
#include "err.hpp" #include "err.hpp"
#include "tcp_address.hpp" #include "tcp_address.hpp"
#include "udp_address.hpp"
#include "ipc_address.hpp" #include "ipc_address.hpp"
#include "tipc_address.hpp" #include "tipc_address.hpp"
@ -59,6 +60,11 @@ zmq::address_t::~address_t ()
LIBZMQ_DELETE(resolved.tcp_addr); LIBZMQ_DELETE(resolved.tcp_addr);
} }
} }
if (protocol == "udp") {
if (resolved.udp_addr) {
LIBZMQ_DELETE(resolved.udp_addr);
}
}
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS #if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS
else else
if (protocol == "ipc") { if (protocol == "ipc") {
@ -91,6 +97,10 @@ int zmq::address_t::to_string (std::string &addr_) const
if (resolved.tcp_addr) if (resolved.tcp_addr)
return resolved.tcp_addr->to_string (addr_); return resolved.tcp_addr->to_string (addr_);
} }
if (protocol == "udp") {
if (resolved.udp_addr)
return resolved.udp_addr->to_string (addr_);
}
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS #if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS
else else
if (protocol == "ipc") { if (protocol == "ipc") {

View File

@ -36,6 +36,7 @@ namespace zmq
{ {
class ctx_t; class ctx_t;
class tcp_address_t; class tcp_address_t;
class udp_address_t;
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS #if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS
class ipc_address_t; class ipc_address_t;
#endif #endif
@ -57,6 +58,7 @@ namespace zmq
// Protocol specific resolved address // Protocol specific resolved address
union { union {
tcp_address_t *tcp_addr; tcp_address_t *tcp_addr;
udp_address_t *udp_addr;
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS #if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS
ipc_address_t *ipc_addr; ipc_address_t *ipc_addr;
#endif #endif

View File

@ -57,9 +57,12 @@ void zmq::radio_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
dist.attach (pipe_); dist.attach (pipe_);
if (subscribe_to_all_)
udp_pipes.push_back (pipe_);
// The pipe is active when attached. Let's read the subscriptions from // The pipe is active when attached. Let's read the subscriptions from
// it, if any. // it, if any.
xread_activated (pipe_); else
xread_activated (pipe_);
} }
void zmq::radio_t::xread_activated (pipe_t *pipe_) void zmq::radio_t::xread_activated (pipe_t *pipe_)
@ -102,6 +105,11 @@ void zmq::radio_t::xpipe_terminated (pipe_t *pipe_)
} }
} }
udp_pipes_t::iterator it = std::find(udp_pipes.begin(),
udp_pipes.end (), pipe_);
if (it != udp_pipes.end ())
udp_pipes.erase (it);
dist.pipe_terminated (pipe_); dist.pipe_terminated (pipe_);
} }
@ -121,6 +129,9 @@ int zmq::radio_t::xsend (msg_t *msg_)
for (subscriptions_t::iterator it = range.first; it != range.second; ++it) for (subscriptions_t::iterator it = range.first; it != range.second; ++it)
dist.match (it-> second); dist.match (it-> second);
for (udp_pipes_t::iterator it = udp_pipes.begin (); it != udp_pipes.end (); ++it)
dist.match (*it);
int rc = dist.send_to_matching (msg_); int rc = dist.send_to_matching (msg_);
return rc; return rc;

View File

@ -32,6 +32,7 @@
#include <map> #include <map>
#include <string> #include <string>
#include <vector>
#include "socket_base.hpp" #include "socket_base.hpp"
#include "session_base.hpp" #include "session_base.hpp"
@ -70,6 +71,10 @@ namespace zmq
typedef std::multimap<std::string, pipe_t*> subscriptions_t; typedef std::multimap<std::string, pipe_t*> subscriptions_t;
subscriptions_t subscriptions; subscriptions_t subscriptions;
// List of udp pipes
typedef std::vector<pipe_t*> udp_pipes_t;
udp_pipes_t udp_pipes;
// Distributor of messages holding the list of outbound pipes. // Distributor of messages holding the list of outbound pipes.
dist_t dist; dist_t dist;

View File

@ -42,6 +42,7 @@
#include "pgm_receiver.hpp" #include "pgm_receiver.hpp"
#include "address.hpp" #include "address.hpp"
#include "norm_engine.hpp" #include "norm_engine.hpp"
#include "udp_engine.hpp"
#include "ctx.hpp" #include "ctx.hpp"
#include "req.hpp" #include "req.hpp"
@ -498,7 +499,7 @@ void zmq::session_base_t::reconnect ()
// and reestablish later on // and reestablish later on
if (pipe && options.immediate == 1 if (pipe && options.immediate == 1
&& addr->protocol != "pgm" && addr->protocol != "epgm" && addr->protocol != "pgm" && addr->protocol != "epgm"
&& addr->protocol != "norm") { && addr->protocol != "norm" && addr->protocol != "udp") {
pipe->hiccup (); pipe->hiccup ();
pipe->terminate (false); pipe->terminate (false);
terminating_pipes.insert (pipe); terminating_pipes.insert (pipe);
@ -567,6 +568,32 @@ void zmq::session_base_t::start_connecting (bool wait_)
} }
#endif #endif
if (addr->protocol == "udp") {
zmq_assert (options.type == ZMQ_DISH || options.type == ZMQ_RADIO);
udp_engine_t* engine = new (std::nothrow) udp_engine_t ();
alloc_assert (engine);
bool recv = false;
bool send = false;
if (options.type == ZMQ_RADIO) {
send = true;
recv = false;
}
else if (options.type == ZMQ_DISH) {
send = false;
recv = true;
}
int rc = engine->init (addr, send, recv);
errno_assert (rc == 0);
send_attach (this, engine);
return;
}
#ifdef ZMQ_HAVE_OPENPGM #ifdef ZMQ_HAVE_OPENPGM
// Both PGM and EPGM transports are using the same infrastructure. // Both PGM and EPGM transports are using the same infrastructure.

View File

@ -65,6 +65,7 @@
#include "address.hpp" #include "address.hpp"
#include "ipc_address.hpp" #include "ipc_address.hpp"
#include "tcp_address.hpp" #include "tcp_address.hpp"
#include "udp_address.hpp"
#include "tipc_address.hpp" #include "tipc_address.hpp"
#include "mailbox.hpp" #include "mailbox.hpp"
#include "mailbox_safe.hpp" #include "mailbox_safe.hpp"
@ -260,7 +261,8 @@ int zmq::socket_base_t::check_protocol (const std::string &protocol_)
&& protocol_ != "epgm" && protocol_ != "epgm"
&& protocol_ != "tipc" && protocol_ != "tipc"
&& protocol_ != "norm" && protocol_ != "norm"
&& protocol_ != "vmci") { && protocol_ != "vmci"
&& protocol_ != "udp") {
errno = EPROTONOSUPPORT; errno = EPROTONOSUPPORT;
return -1; return -1;
} }
@ -314,6 +316,9 @@ int zmq::socket_base_t::check_protocol (const std::string &protocol_)
return -1; return -1;
} }
if (protocol_ == "udp" && (options.type != ZMQ_DISH && options.type != ZMQ_RADIO))
return -1;
// Protocol is available. // Protocol is available.
return 0; return 0;
} }
@ -556,9 +561,9 @@ int zmq::socket_base_t::bind (const char *addr_)
return rc; return rc;
} }
if (protocol == "pgm" || protocol == "epgm" || protocol == "norm") { if (protocol == "pgm" || protocol == "epgm" || protocol == "norm" || protocol == "udp") {
// For convenience's sake, bind can be used interchangeable with // For convenience's sake, bind can be used interchangeable with
// connect for PGM, EPGM and NORM transports. // connect for PGM, EPGM, NORM and UDP transports.
EXIT_MUTEX (); EXIT_MUTEX ();
rc = connect (addr_); rc = connect (addr_);
if (rc != -1) if (rc != -1)
@ -878,6 +883,17 @@ int zmq::socket_base_t::connect (const char *addr_)
} }
#endif #endif
if (protocol == "udp") {
paddr->resolved.udp_addr = new (std::nothrow) udp_address_t ();
alloc_assert (paddr->resolved.udp_addr);
int rc = paddr->resolved.udp_addr->resolve (address.c_str());
if (rc != 0) {
LIBZMQ_DELETE(paddr);
EXIT_MUTEX ();
return -1;
}
}
// TBD - Should we check address for ZMQ_HAVE_NORM??? // TBD - Should we check address for ZMQ_HAVE_NORM???
#ifdef ZMQ_HAVE_OPENPGM #ifdef ZMQ_HAVE_OPENPGM
@ -927,7 +943,7 @@ int zmq::socket_base_t::connect (const char *addr_)
// PGM does not support subscription forwarding; ask for all data to be // PGM does not support subscription forwarding; ask for all data to be
// sent to this pipe. (same for NORM, currently?) // sent to this pipe. (same for NORM, currently?)
bool subscribe_to_all = protocol == "pgm" || protocol == "epgm" || protocol == "norm"; bool subscribe_to_all = protocol == "pgm" || protocol == "epgm" || protocol == "norm" || protocol == "udp";
pipe_t *newpipe = NULL; pipe_t *newpipe = NULL;
if (options.immediate != 1 || subscribe_to_all) { if (options.immediate != 1 || subscribe_to_all) {

166
src/udp_address.cpp Normal file
View File

@ -0,0 +1,166 @@
/*
Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
libzmq is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License (LGPL) as published
by the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
As a special exception, the Contributors give you permission to link
this library with independent modules to produce an executable,
regardless of the license terms of these independent modules, and to
copy and distribute the resulting executable under terms of your choice,
provided that you also meet, for each linked independent module, the
terms and conditions of the license of that module. An independent
module is a module which is not derived from or based on this library.
If you modify this library, you must extend this exception to your
version of the library.
libzmq is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <string>
#include <sstream>
#include "macros.hpp"
#include "udp_address.hpp"
#include "platform.hpp"
#include "stdint.hpp"
#include "err.hpp"
#include "ip.hpp"
#ifdef ZMQ_HAVE_WINDOWS
#include "windows.hpp"
#else
#include <sys/types.h>
#include <arpa/inet.h>
#include <netdb.h>
#include <ctype.h>
#endif
zmq::udp_address_t::udp_address_t ()
{
memset (&bind_address, 0, sizeof bind_address);
}
zmq::udp_address_t::~udp_address_t ()
{
}
int zmq::udp_address_t::resolve (const char *name_)
{
// Find the ':' at end that separates address from the port number.
const char *delimiter = strrchr (name_, ':');
if (!delimiter) {
errno = EINVAL;
return -1;
}
// Separate the address/port.
std::string addr_str (name_, delimiter - name_);
std::string port_str (delimiter + 1);
// Remove square brackets around the address, if any, as used in IPv6
if (addr_str.size () >= 2 && addr_str [0] == '[' &&
addr_str [addr_str.size () - 1] == ']')
addr_str = addr_str.substr (1, addr_str.size () - 2);
// Parse the port number (0 is not a valid port).
uint16_t port = (uint16_t) atoi (port_str.c_str ());
if (port == 0) {
errno = EINVAL;
return -1;
}
dest_address.sin_family = AF_INET;
dest_address.sin_port = htons (port);
dest_address.sin_addr.s_addr = inet_addr (addr_str.c_str ());
if (dest_address.sin_addr.s_addr == INADDR_NONE) {
errno = EINVAL;
return -1;
}
// we will check only first byte of IP
// and if it from 224 to 239, then it can
// represent multicast IP.
int i = dest_address.sin_addr.s_addr & 0xFF;
if(i >= 224 && i <= 239) {
multicast = dest_address.sin_addr;
is_mutlicast = true;
}
else
is_mutlicast = false;
interface.s_addr = htons (INADDR_ANY);
if (interface.s_addr == INADDR_NONE) {
errno = EINVAL;
return -1;
}
bind_address.sin_family = AF_INET;
bind_address.sin_port = htons (port);
bind_address.sin_addr.s_addr = htons (INADDR_ANY);
address = name_;
return 0;
}
int zmq::udp_address_t::to_string (std::string &addr_)
{
addr_ = address;
return 0;
}
bool zmq::udp_address_t::is_mcast () const
{
return is_mutlicast;
}
const sockaddr* zmq::udp_address_t::bind_addr () const
{
return (sockaddr *) &bind_address;
}
socklen_t zmq::udp_address_t::bind_addrlen () const
{
return sizeof (sockaddr_in);
}
const sockaddr* zmq::udp_address_t::dest_addr () const
{
return (sockaddr *) &dest_address;
}
socklen_t zmq::udp_address_t::dest_addrlen () const
{
return sizeof (sockaddr_in);
}
const in_addr zmq::udp_address_t::multicast_ip () const
{
return multicast;
}
const in_addr zmq::udp_address_t::interface_ip () const
{
return interface;
}
#if defined ZMQ_HAVE_WINDOWS
unsigned short zmq::udp_address_t::family () const
#else
sa_family_t zmq::udp_address_t::family () const
#endif
{
return AF_INET;
}

84
src/udp_address.hpp Normal file
View File

@ -0,0 +1,84 @@
/*
Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
libzmq is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License (LGPL) as published
by the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
As a special exception, the Contributors give you permission to link
this library with independent modules to produce an executable,
regardless of the license terms of these independent modules, and to
copy and distribute the resulting executable under terms of your choice,
provided that you also meet, for each linked independent module, the
terms and conditions of the license of that module. An independent
module is a module which is not derived from or based on this library.
If you modify this library, you must extend this exception to your
version of the library.
libzmq is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_UDP_ADDRESS_HPP_INCLUDED__
#define __ZMQ_UDP_ADDRESS_HPP_INCLUDED__
#include "platform.hpp"
#if defined ZMQ_HAVE_WINDOWS
#include "windows.hpp"
#else
#include <sys/socket.h>
#include <netinet/in.h>
#endif
namespace zmq
{
class udp_address_t
{
public:
udp_address_t ();
virtual ~udp_address_t ();
int resolve (const char *name_);
// The opposite to resolve()
virtual int to_string (std::string &addr_);
#if defined ZMQ_HAVE_WINDOWS
unsigned short family () const;
#else
sa_family_t family () const;
#endif
const sockaddr *bind_addr () const;
socklen_t bind_addrlen () const;
const sockaddr *dest_addr () const;
socklen_t dest_addrlen () const;
bool is_mcast () const;
const in_addr multicast_ip () const;
const in_addr interface_ip () const;
private:
in_addr multicast;
in_addr interface;
sockaddr_in bind_address;
sockaddr_in dest_address;
bool is_mutlicast;
std::string address;
};
}
#endif

222
src/udp_engine.cpp Normal file
View File

@ -0,0 +1,222 @@
#include "platform.hpp"
#if defined ZMQ_HAVE_WINDOWS
#include "windows.hpp"
#else
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#endif
#include "udp_engine.hpp"
#include "session_base.hpp"
#include "v2_protocol.hpp"
#include "err.hpp"
#include "ip.hpp"
zmq::udp_engine_t::udp_engine_t() :
plugged (false),
session(NULL)
{
}
zmq::udp_engine_t::~udp_engine_t()
{
zmq_assert (!plugged);
if (fd != retired_fd) {
#ifdef ZMQ_HAVE_WINDOWS
int rc = closesocket (fd);
wsa_assert (rc != SOCKET_ERROR);
#else
int rc = close (fd);
errno_assert (rc == 0);
#endif
fd = retired_fd;
}
}
int zmq::udp_engine_t::init (address_t* address_, bool send_, bool recv_)
{
zmq_assert (address_);
zmq_assert (send_ || recv_);
send_enabled = send_;
recv_enabled = recv_;
address = address_;
fd = open_socket (address->resolved.udp_addr->family (), SOCK_DGRAM, IPPROTO_UDP);
if (fd == retired_fd)
return -1;
unblock_socket (fd);
return 0;
}
void zmq::udp_engine_t::plug (io_thread_t* io_thread_, session_base_t *session_)
{
zmq_assert (!plugged);
plugged = true;
zmq_assert (!session);
zmq_assert (session_);
session = session_;
// Connect to I/O threads poller object.
io_object_t::plug (io_thread_);
handle = add_fd (fd);
if (send_enabled)
set_pollout (handle);
if (recv_enabled) {
int on = 1;
int rc = setsockopt (fd, SOL_SOCKET, SO_REUSEADDR, (char *) &on, sizeof (on));
#ifdef ZMQ_HAVE_WINDOWS
wsa_assert (rc != SOCKET_ERROR);
#else
errno_assert (rc == 0);
#endif
rc = bind (fd, address->resolved.udp_addr->bind_addr (), address->resolved.udp_addr->bind_addrlen ());
#ifdef ZMQ_HAVE_WINDOWS
wsa_assert (rc != SOCKET_ERROR);
#else
errno_assert (rc == 0);
#endif
if (address->resolved.udp_addr->is_mcast ()) {
struct ip_mreq mreq;
mreq.imr_multiaddr = address->resolved.udp_addr->multicast_ip ();
mreq.imr_interface = address->resolved.udp_addr->interface_ip ();
int rc = setsockopt (fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof (mreq));
#ifdef ZMQ_HAVE_WINDOWS
wsa_assert (rc != SOCKET_ERROR);
#else
errno_assert (rc == 0);
#endif
}
set_pollin (handle);
// Call restart output to drop all join/leave commands
restart_output ();
}
}
void zmq::udp_engine_t::terminate()
{
zmq_assert (plugged);
plugged = false;
rm_fd (handle);
// Disconnect from I/O threads poller object.
io_object_t::unplug ();
delete this;
}
void zmq::udp_engine_t::out_event()
{
msg_t group_msg;
int rc = session->pull_msg (&group_msg);
errno_assert (rc == 0 || (rc == -1 && errno == EAGAIN));
if (rc == 0) {
msg_t body_msg;
rc = session->pull_msg (&body_msg);
size_t group_size = group_msg.size ();
size_t body_size = body_msg.size ();
size_t size = group_size + body_size + 1;
// TODO: check if larger than maximum size
out_buffer[0] = (unsigned char) group_size;
memcpy (out_buffer + 1, group_msg.data (), group_size);
memcpy (out_buffer + 1 + group_size, body_msg.data (), body_size);
rc = group_msg.close ();
errno_assert (rc == 0);
body_msg.close ();
errno_assert (rc == 0);
rc = sendto (fd, out_buffer, size, 0,
address->resolved.udp_addr->dest_addr (),
address->resolved.udp_addr->dest_addrlen ());
errno_assert (rc != -1);
}
else
reset_pollout (handle);
}
void zmq::udp_engine_t::restart_output()
{
// If we don't support send we just drop all messages
if (!send_enabled) {
msg_t msg;
while (session->pull_msg (&msg) == 0)
msg.close ();
}
else {
set_pollout(handle);
out_event ();
}
}
void zmq::udp_engine_t::in_event()
{
size_t read = recv (fd, in_buffer, MAX_UDP_MSG, 0);
if (read > 0) {
size_t group_size = in_buffer[0];
// This doesn't fit, just ingore
if (read - 1 < group_size)
return;
size_t body_size = read -1 - group_size;
msg_t msg;
int rc = msg.init_size (group_size);
errno_assert (rc == 0);
msg.set_flags (msg_t::more);
memcpy (msg.data (), in_buffer + 1, group_size);
rc = session->push_msg (&msg);
errno_assert (rc == 0 || (rc == -1 && errno == EAGAIN));
// Pipe is full
if (rc != 0) {
rc = msg.close ();
errno_assert (rc == 0);
reset_pollin (handle);
return;
}
rc = msg.close ();
errno_assert (rc == 0);
rc = msg.init_size (body_size);
errno_assert (rc == 0);
memcpy (msg.data (), in_buffer + 1 + group_size, body_size);
rc = session->push_msg (&msg);
errno_assert (rc == 0);
rc = msg.close ();
errno_assert (rc == 0);
session->flush ();
}
}
void zmq::udp_engine_t::restart_input()
{
if (!recv_enabled)
return;
set_pollin (handle);
in_event ();
}

62
src/udp_engine.hpp Normal file
View File

@ -0,0 +1,62 @@
#ifndef __ZMQ_UDP_ENGINE_HPP_INCLUDED__
#define __ZMQ_UDP_ENGINE_HPP_INCLUDED__
#include "io_object.hpp"
#include "i_engine.hpp"
#include "address.hpp"
#include "udp_address.hpp"
#define MAX_UDP_MSG 8192
namespace zmq
{
class io_thread_t;
class session_base_t;
class udp_engine_t : public io_object_t, public i_engine
{
public:
udp_engine_t ();
~udp_engine_t ();
int init (address_t *address_, bool send_, bool recv_);
// i_engine interface implementation.
// Plug the engine to the session.
void plug (zmq::io_thread_t *io_thread_, class session_base_t *session_);
// Terminate and deallocate the engine. Note that 'detached'
// events are not fired on termination.
void terminate ();
// This method is called by the session to signalise that more
// messages can be written to the pipe.
void restart_input ();
// This method is called by the session to signalise that there
// are messages to send available.
void restart_output ();
void zap_msg_available () {};
void in_event ();
void out_event ();
private:
bool plugged;
fd_t fd;
session_base_t* session;
handle_t handle;
address_t *address;
unsigned char out_buffer[MAX_UDP_MSG];
unsigned char in_buffer[MAX_UDP_MSG];
bool send_enabled;
bool recv_enabled;
};
}
#endif

121
tests/test_udp.cpp Normal file
View File

@ -0,0 +1,121 @@
/*
Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
libzmq is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License (LGPL) as published
by the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
As a special exception, the Contributors give you permission to link
this library with independent modules to produce an executable,
regardless of the license terms of these independent modules, and to
copy and distribute the resulting executable under terms of your choice,
provided that you also meet, for each linked independent module, the
terms and conditions of the license of that module. An independent
module is a module which is not derived from or based on this library.
If you modify this library, you must extend this exception to your
version of the library.
libzmq is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "testutil.hpp"
int msg_send (zmq_msg_t *msg_, void *s_, const char* group_, const char* body_)
{
int rc = zmq_msg_init_size (msg_, strlen (body_));
if (rc != 0)
return rc;
memcpy (zmq_msg_data (msg_), body_, strlen (body_));
rc = zmq_msg_set_group (msg_, group_);
if (rc != 0) {
zmq_msg_close (msg_);
return rc;
}
rc = zmq_msg_send (msg_, s_, 0);
zmq_msg_close (msg_);
return rc;
}
int msg_recv_cmp (zmq_msg_t *msg_, void *s_, const char* group_, const char* body_)
{
int rc = zmq_msg_init (msg_);
if (rc != 0)
return -1;
int recv_rc = zmq_msg_recv (msg_, s_, 0);
if (recv_rc == -1)
return -1;
if (strcmp (zmq_msg_group (msg_), group_) != 0)
{
zmq_msg_close (msg_);
return -1;
}
char * body = (char*) malloc (sizeof(char) * (zmq_msg_size (msg_) + 1));
memcpy (body, zmq_msg_data (msg_), zmq_msg_size (msg_));
body [zmq_msg_size (msg_)] = '\0';
if (strcmp (body, body_) != 0)
{
zmq_msg_close (msg_);
return -1;
}
zmq_msg_close (msg_);
return recv_rc;
}
int main (void)
{
setup_test_environment ();
void *ctx = zmq_ctx_new ();
assert (ctx);
zmq_msg_t msg;
void *radio = zmq_socket (ctx, ZMQ_RADIO);
void *dish = zmq_socket (ctx, ZMQ_DISH);
int rc = zmq_bind (radio, "udp://127.0.0.1:5556");
assert (rc == 0);
rc = zmq_connect (dish, "udp://127.0.0.1:5556");
assert (rc == 0);
zmq_sleep (1);
rc = zmq_join (dish, "TV");
assert (rc == 0);
rc = msg_send (&msg, radio, "TV", "Friends");
assert (rc != -1);
rc = msg_recv_cmp (&msg, dish, "TV", "Friends");
assert (rc != -1);
rc = zmq_close (dish);
assert (rc == 0);
rc = zmq_close (radio);
assert (rc == 0);
rc = zmq_ctx_term (ctx);
assert (rc == 0);
return 0 ;
}