diff --git a/Makefile.am b/Makefile.am index 0630dcd9..514f539d 100644 --- a/Makefile.am +++ b/Makefile.am @@ -45,6 +45,8 @@ src_libzmq_la_SOURCES = \ src/decoder.hpp \ src/devpoll.cpp \ src/devpoll.hpp \ + src/dgram.cpp \ + src/dgram.hpp \ src/dish.cpp \ src/dish.hpp \ src/dist.cpp \ diff --git a/include/zmq.h b/include/zmq.h index 07d148d4..f6eec923 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -523,6 +523,7 @@ ZMQ_EXPORT void zmq_threadclose (void* thread); #define ZMQ_DISH 15 #define ZMQ_GATHER 16 #define ZMQ_SCATTER 17 +#define ZMQ_DGRAM 18 /* DRAFT Socket methods. */ ZMQ_EXPORT int zmq_join (void *s, const char *group); diff --git a/src/dgram.cpp b/src/dgram.cpp new file mode 100644 index 00000000..d23fe963 --- /dev/null +++ b/src/dgram.cpp @@ -0,0 +1,321 @@ +/* + Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file + + This file is part of libzmq, the ZeroMQ core engine in C++. + + libzmq is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License (LGPL) as published + by the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + As a special exception, the Contributors give you permission to link + this library with independent modules to produce an executable, + regardless of the license terms of these independent modules, and to + copy and distribute the resulting executable under terms of your choice, + provided that you also meet, for each linked independent module, the + terms and conditions of the license of that module. An independent + module is a module which is not derived from or based on this library. + If you modify this library, you must extend this exception to your + version of the library. + + libzmq is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include "precompiled.hpp" +#include "macros.hpp" +#include "dgram.hpp" +#include "pipe.hpp" +#include "wire.hpp" +#include "random.hpp" +#include "likely.hpp" +#include "err.hpp" + +zmq::dgram_t::dgram_t (class ctx_t *parent_, uint32_t tid_, int sid_) : + socket_base_t (parent_, tid_, sid_), + prefetched (false), + identity_sent (false), + current_out (NULL), + more_out (false), + next_rid (generate_random ()) +{ + options.type = ZMQ_DGRAM; + options.raw_socket = true; + + prefetched_id.init (); + prefetched_msg.init (); +} + +zmq::dgram_t::~dgram_t () +{ + zmq_assert (outpipes.empty ()); + prefetched_id.close (); + prefetched_msg.close (); +} + +void zmq::dgram_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_) +{ + LIBZMQ_UNUSED(subscribe_to_all_); + + zmq_assert (pipe_); + + identify_peer (pipe_); + fq.attach (pipe_); +} + +void zmq::dgram_t::xpipe_terminated (pipe_t *pipe_) +{ + outpipes_t::iterator it = outpipes.find (pipe_->get_identity ()); + zmq_assert (it != outpipes.end ()); + outpipes.erase (it); + fq.pipe_terminated (pipe_); + if (pipe_ == current_out) + current_out = NULL; +} + +void zmq::dgram_t::xread_activated (pipe_t *pipe_) +{ + fq.activated (pipe_); +} + +void zmq::dgram_t::xwrite_activated (pipe_t *pipe_) +{ + outpipes_t::iterator it; + for (it = outpipes.begin (); it != outpipes.end (); ++it) + if (it->second.pipe == pipe_) + break; + + zmq_assert (it != outpipes.end ()); + zmq_assert (!it->second.active); + it->second.active = true; +} + +int zmq::dgram_t::xsend (msg_t *msg_) +{ + // If this is the first part of the message it's the ID of the + // peer to send the message to. + if (!more_out) { + zmq_assert (!current_out); + + // If we have malformed message (prefix with no subsequent message) + // then just silently ignore it. + // TODO: The connections should be killed instead. + if (msg_->flags () & msg_t::more) { + + // Find the pipe associated with the identity stored in the prefix. + // If there's no such pipe return an error + blob_t identity ((unsigned char*) msg_->data (), msg_->size ()); + outpipes_t::iterator it = outpipes.find (identity); + + if (it != outpipes.end ()) { + current_out = it->second.pipe; + if (!current_out->check_write ()) { + it->second.active = false; + current_out = NULL; + errno = EAGAIN; + return -1; + } + } + else { + errno = EHOSTUNREACH; + return -1; + } + } + + // Expect one more message frame. + more_out = true; + + int rc = msg_->close (); + errno_assert (rc == 0); + rc = msg_->init (); + errno_assert (rc == 0); + return 0; + } + + // Ignore the MORE flag + msg_->reset_flags (msg_t::more); + + // This is the last part of the message. + more_out = false; + + // Push the message into the pipe. If there's no out pipe, just drop it. + if (current_out) { + + // Close the remote connection if user has asked to do so + // by sending zero length message. + // Pending messages in the pipe will be dropped (on receiving term- ack) + if (msg_->size () == 0) { + current_out->terminate (false); + int rc = msg_->close (); + errno_assert (rc == 0); + rc = msg_->init (); + errno_assert (rc == 0); + current_out = NULL; + return 0; + } + bool ok = current_out->write (msg_); + if (likely (ok)) + current_out->flush (); + current_out = NULL; + } + else { + int rc = msg_->close (); + errno_assert (rc == 0); + } + + // Detach the message from the data buffer. + int rc = msg_->init (); + errno_assert (rc == 0); + + return 0; +} + +int zmq::dgram_t::xsetsockopt (int option_, const void *optval_, + size_t optvallen_) +{ + bool is_int = (optvallen_ == sizeof (int)); + int value = 0; + if (is_int) memcpy(&value, optval_, sizeof (int)); + + switch (option_) { + case ZMQ_CONNECT_RID: + if (optval_ && optvallen_) { + connect_rid.assign ((char*) optval_, optvallen_); + return 0; + } + break; + + case ZMQ_STREAM_NOTIFY: + if (is_int && (value == 0 || value == 1)) { + options.raw_notify = (value != 0); + return 0; + } + break; + + default: + break; + } + errno = EINVAL; + return -1; +} + +int zmq::dgram_t::xrecv (msg_t *msg_) +{ + if (prefetched) { + if (!identity_sent) { + int rc = msg_->move (prefetched_id); + errno_assert (rc == 0); + identity_sent = true; + } + else { + int rc = msg_->move (prefetched_msg); + errno_assert (rc == 0); + prefetched = false; + } + return 0; + } + + pipe_t *pipe = NULL; + int rc = fq.recvpipe (&prefetched_msg, &pipe); + if (rc != 0) + return -1; + + zmq_assert (pipe != NULL); + zmq_assert ((prefetched_msg.flags () & msg_t::more) == 0); + + // We have received a frame with TCP data. + // Rather than sending this frame, we keep it in prefetched + // buffer and send a frame with peer's ID. + blob_t identity = pipe->get_identity (); + rc = msg_->close(); + errno_assert (rc == 0); + rc = msg_->init_size (identity.size ()); + errno_assert (rc == 0); + + // forward metadata (if any) + metadata_t *metadata = prefetched_msg.metadata(); + if (metadata) + msg_->set_metadata(metadata); + + memcpy (msg_->data (), identity.data (), identity.size ()); + msg_->set_flags (msg_t::more); + + prefetched = true; + identity_sent = true; + + return 0; +} + +bool zmq::dgram_t::xhas_in () +{ + // We may already have a message pre-fetched. + if (prefetched) + return true; + + // Try to read the next message. + // The message, if read, is kept in the pre-fetch buffer. + pipe_t *pipe = NULL; + int rc = fq.recvpipe (&prefetched_msg, &pipe); + if (rc != 0) + return false; + + zmq_assert (pipe != NULL); + zmq_assert ((prefetched_msg.flags () & msg_t::more) == 0); + + blob_t identity = pipe->get_identity (); + rc = prefetched_id.init_size (identity.size ()); + errno_assert (rc == 0); + + // forward metadata (if any) + metadata_t *metadata = prefetched_msg.metadata(); + if (metadata) + prefetched_id.set_metadata(metadata); + + memcpy (prefetched_id.data (), identity.data (), identity.size ()); + prefetched_id.set_flags (msg_t::more); + + prefetched = true; + identity_sent = false; + + return true; +} + +bool zmq::dgram_t::xhas_out () +{ + // In theory, STREAM socket is always ready for writing. Whether actual + // attempt to write succeeds depends on which pipe the message is going + // to be routed to. + return true; +} + +void zmq::dgram_t::identify_peer (pipe_t *pipe_) +{ + // Always assign identity for raw-socket + unsigned char buffer [5]; + buffer [0] = 0; + blob_t identity; + if (connect_rid.length ()) { + identity = blob_t ((unsigned char*) connect_rid.c_str(), + connect_rid.length ()); + connect_rid.clear (); + outpipes_t::iterator it = outpipes.find (identity); + zmq_assert (it == outpipes.end ()); + } + else { + put_uint32 (buffer + 1, next_rid++); + identity = blob_t (buffer, sizeof buffer); + memcpy (options.identity, identity.data (), identity.size ()); + options.identity_size = (unsigned char) identity.size (); + } + pipe_->set_identity (identity); + // Add the record into output pipes lookup table + outpipe_t outpipe = {pipe_, true}; + const bool ok = outpipes.insert ( + outpipes_t::value_type (identity, outpipe)).second; + zmq_assert (ok); +} diff --git a/src/dgram.hpp b/src/dgram.hpp new file mode 100644 index 00000000..85716f06 --- /dev/null +++ b/src/dgram.hpp @@ -0,0 +1,107 @@ +/* + Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file + + This file is part of libzmq, the ZeroMQ core engine in C++. + + libzmq is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License (LGPL) as published + by the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + As a special exception, the Contributors give you permission to link + this library with independent modules to produce an executable, + regardless of the license terms of these independent modules, and to + copy and distribute the resulting executable under terms of your choice, + provided that you also meet, for each linked independent module, the + terms and conditions of the license of that module. An independent + module is a module which is not derived from or based on this library. + If you modify this library, you must extend this exception to your + version of the library. + + libzmq is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#ifndef __ZMQ_DGRAM_HPP_INCLUDED__ +#define __ZMQ_DGRAM_HPP_INCLUDED__ + +#include + +#include "router.hpp" + +namespace zmq +{ + + class ctx_t; + class pipe_t; + + class dgram_t : + public socket_base_t + { + public: + + dgram_t (zmq::ctx_t *parent_, uint32_t tid_, int sid); + ~dgram_t (); + + // Overrides of functions from socket_base_t. + void xattach_pipe (zmq::pipe_t *pipe_, bool subscribe_to_all_); + int xsend (zmq::msg_t *msg_); + int xrecv (zmq::msg_t *msg_); + bool xhas_in (); + bool xhas_out (); + void xread_activated (zmq::pipe_t *pipe_); + void xwrite_activated (zmq::pipe_t *pipe_); + void xpipe_terminated (zmq::pipe_t *pipe_); + int xsetsockopt (int option_, const void *optval_, size_t optvallen_); + private: + // Generate peer's id and update lookup map + void identify_peer (pipe_t *pipe_); + + // Fair queueing object for inbound pipes. + fq_t fq; + + // True iff there is a message held in the pre-fetch buffer. + bool prefetched; + + // If true, the receiver got the message part with + // the peer's identity. + bool identity_sent; + + // Holds the prefetched identity. + msg_t prefetched_id; + + // Holds the prefetched message. + msg_t prefetched_msg; + + struct outpipe_t + { + zmq::pipe_t *pipe; + bool active; + }; + + // Outbound pipes indexed by the peer IDs. + typedef std::map outpipes_t; + outpipes_t outpipes; + + // The pipe we are currently writing to. + zmq::pipe_t *current_out; + + // If true, more outgoing message parts are expected. + bool more_out; + + // Routing IDs are generated. It's a simple increment and wrap-over + // algorithm. This value is the next ID to use (if not used already). + uint32_t next_rid; + + dgram_t (const dgram_t&); + const dgram_t &operator = (const dgram_t&); + }; + +} + +#endif diff --git a/src/mechanism.cpp b/src/mechanism.cpp index 2df803ce..92367bea 100644 --- a/src/mechanism.cpp +++ b/src/mechanism.cpp @@ -78,8 +78,8 @@ const char *zmq::mechanism_t::socket_type_string (int socket_type) const "XPUB", "XSUB", "STREAM", "SERVER", "CLIENT", "RADIO", "DISH", - "GATHER", "SCATTER"}; - zmq_assert (socket_type >= 0 && socket_type <= 17); + "GATHER", "SCATTER", "DGRAM"}; + zmq_assert (socket_type >= 0 && socket_type <= 18); return names [socket_type]; } @@ -203,6 +203,8 @@ bool zmq::mechanism_t::check_socket_type (const std::string& type_) const return type_ == "SCATTER"; case ZMQ_SCATTER: return type_ == "GATHER"; + case ZMQ_DGRAM: + return type_ == "DGRAM"; default: break; } diff --git a/src/session_base.cpp b/src/session_base.cpp index 53d979ce..c74ce363 100644 --- a/src/session_base.cpp +++ b/src/session_base.cpp @@ -83,6 +83,7 @@ zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_, case ZMQ_CLIENT: case ZMQ_GATHER: case ZMQ_SCATTER: + case ZMQ_DGRAM: s = new (std::nothrow) session_base_t (io_thread_, active_, socket_, options_, addr_); break; @@ -572,9 +573,9 @@ void zmq::session_base_t::start_connecting (bool wait_) #endif if (addr->protocol == "udp") { - zmq_assert (options.type == ZMQ_DISH || options.type == ZMQ_RADIO); + zmq_assert (options.type == ZMQ_DISH || options.type == ZMQ_RADIO || options.type == ZMQ_DGRAM); - udp_engine_t* engine = new (std::nothrow) udp_engine_t (); + udp_engine_t* engine = new (std::nothrow) udp_engine_t (options); alloc_assert (engine); bool recv = false; @@ -588,6 +589,10 @@ void zmq::session_base_t::start_connecting (bool wait_) send = false; recv = true; } + else if (options.type == ZMQ_DGRAM) { + send = true; + recv = true; + } int rc = engine->init (addr, send, recv); errno_assert (rc == 0); diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 73a40364..88d6e41c 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -95,6 +95,7 @@ #include "dish.hpp" #include "gather.hpp" #include "scatter.hpp" +#include "dgram.hpp" #define ENTER_MUTEX() \ if (thread_safe) \ @@ -168,6 +169,9 @@ zmq::socket_base_t *zmq::socket_base_t::create (int type_, class ctx_t *parent_, case ZMQ_SCATTER: s = new (std::nothrow) scatter_t (parent_, tid_, sid_); break; + case ZMQ_DGRAM: + s = new (std::nothrow) dgram_t (parent_, tid_, sid_); + break; default: errno = EINVAL; return NULL; @@ -304,7 +308,8 @@ int zmq::socket_base_t::check_protocol (const std::string &protocol_) #endif if (protocol_ == "udp" && (options.type != ZMQ_DISH && - options.type != ZMQ_RADIO)) { + options.type != ZMQ_RADIO && + options.type != ZMQ_DGRAM)) { errno = ENOCOMPATPROTO; return -1; } @@ -878,7 +883,7 @@ int zmq::socket_base_t::connect (const char *addr_) if (protocol == "udp") { paddr->resolved.udp_addr = new (std::nothrow) udp_address_t (); alloc_assert (paddr->resolved.udp_addr); - rc = paddr->resolved.udp_addr->resolve (address.c_str(), options.type == ZMQ_DISH); + rc = paddr->resolved.udp_addr->resolve (address.c_str(), (options.type == ZMQ_DISH || options.type == ZMQ_DGRAM)); if (rc != 0) { LIBZMQ_DELETE(paddr); EXIT_MUTEX (); diff --git a/src/udp_engine.cpp b/src/udp_engine.cpp index 169f6a84..8f44a02a 100644 --- a/src/udp_engine.cpp +++ b/src/udp_engine.cpp @@ -43,12 +43,13 @@ along with this program. If not, see . #include "err.hpp" #include "ip.hpp" -zmq::udp_engine_t::udp_engine_t() : +zmq::udp_engine_t::udp_engine_t(const options_t &options_) : plugged (false), fd(-1), session(NULL), handle(NULL), address(NULL), + options(options_), send_enabled(false), recv_enabled(false) { @@ -165,10 +166,23 @@ void zmq::udp_engine_t::out_event() size_t body_size = body_msg.size (); size_t size = group_size + body_size + 1; - // TODO: check if larger than maximum size - out_buffer[0] = (unsigned char) group_size; - memcpy (out_buffer + 1, group_msg.data (), group_size); - memcpy (out_buffer + 1 + group_size, body_msg.data (), body_size); + struct sockaddr* out_address = (struct sockaddr*) address->resolved.udp_addr->dest_addr (); + socklen_t out_addrlen = address->resolved.udp_addr->dest_addrlen (); + + if (options.raw_socket) { + if (group_size > 0) { + out_address = (struct sockaddr*) group_msg.data(); + out_addrlen = group_size; + size = body_size; + } + memcpy (out_buffer, body_msg.data (), body_size); + } + else { + // TODO: check if larger than maximum size + out_buffer[0] = (unsigned char) group_size; + memcpy (out_buffer + 1, group_msg.data (), group_size); + memcpy (out_buffer + 1 + group_size, body_msg.data (), body_size); + } rc = group_msg.close (); errno_assert (rc == 0); @@ -182,9 +196,7 @@ void zmq::udp_engine_t::out_event() (int) address->resolved.udp_addr->dest_addrlen ()); wsa_assert (rc != SOCKET_ERROR); #else - rc = sendto (fd, out_buffer, size, 0, - address->resolved.udp_addr->dest_addr (), - address->resolved.udp_addr->dest_addrlen ()); + rc = sendto (fd, out_buffer, size, 0, out_address, out_addrlen); errno_assert (rc != -1); #endif } @@ -208,8 +220,10 @@ void zmq::udp_engine_t::restart_output() void zmq::udp_engine_t::in_event() { + struct sockaddr_in in_address; + socklen_t in_addrlen; #ifdef ZMQ_HAVE_WINDOWS - int nbytes = recv(fd, (char*) in_buffer, MAX_UDP_MSG, 0); + int nbytes = recvfrom(fd, (char*) in_buffer, MAX_UDP_MSG, 0, (sockaddr*) &address, &addrlen); const int last_error = WSAGetLastError(); if (nbytes == SOCKET_ERROR) { wsa_assert( @@ -219,7 +233,7 @@ void zmq::udp_engine_t::in_event() return; } #else - int nbytes = recv(fd, in_buffer, MAX_UDP_MSG, 0); + int nbytes = recvfrom(fd, in_buffer, MAX_UDP_MSG, 0, (sockaddr*) &in_address, &in_addrlen); if (nbytes == -1) { errno_assert(errno != EBADF && errno != EFAULT @@ -229,20 +243,33 @@ void zmq::udp_engine_t::in_event() } #endif - int group_size = in_buffer[0]; - - // This doesn't fit, just ingore - if (nbytes - 1 < group_size) - return; - - int body_size = nbytes - 1 - group_size; - + void* group_buffer; + int group_size; + int body_size; msg_t msg; + + if (options.raw_socket) { + group_buffer = (void*) &(in_address); + group_size = in_addrlen; + + body_size = nbytes - 1; + } + else { + group_buffer = in_buffer + 1; + group_size = in_buffer[0]; + + // This doesn't fit, just ingore + if (nbytes - 1 < group_size) + return; + + body_size = nbytes - 1 - group_size; + } + int rc = msg.init_size (group_size); errno_assert (rc == 0); msg.set_flags (msg_t::more); - memcpy (msg.data (), in_buffer + 1, group_size); - + memcpy (msg.data (), group_buffer, group_size); + rc = session->push_msg (&msg); errno_assert (rc == 0 || (rc == -1 && errno == EAGAIN)); diff --git a/src/udp_engine.hpp b/src/udp_engine.hpp index f6339ff0..44b63c5d 100644 --- a/src/udp_engine.hpp +++ b/src/udp_engine.hpp @@ -17,7 +17,7 @@ namespace zmq class udp_engine_t : public io_object_t, public i_engine { public: - udp_engine_t (); + udp_engine_t (const options_t &options_); ~udp_engine_t (); int init (address_t *address_, bool send_, bool recv_); @@ -51,6 +51,8 @@ namespace zmq session_base_t* session; handle_t handle; address_t *address; + + options_t options; unsigned char out_buffer[MAX_UDP_MSG]; unsigned char in_buffer[MAX_UDP_MSG];