From 9ac40c47d7fc3f47859a914147a0a840505e868c Mon Sep 17 00:00:00 2001 From: Pieter Hintjens Date: Fri, 16 Mar 2012 16:39:11 -0500 Subject: [PATCH] Fixed issue LIBZMQ-333 - reverted commit 941be8d2175332cb720f390f93d07a0870db8824. - fixed zmq_device implementation for latest socket_base class - added back zmq_device.3 man page --- builds/redhat/zeromq.spec.in | 7 + doc/Makefile.am | 2 +- doc/zmq.txt | 10 ++ doc/zmq_device.txt | 125 +++++++++++++++ doc/zmq_socket.txt | 3 - include/zmq.h | 10 ++ include/zmq.hpp | 296 +++++++++++++++++++++++++++++++++++ src/Makefile.am | 2 + src/device.cpp | 120 ++++++++++++++ src/device.hpp | 32 ++++ src/zmq.cpp | 47 ++++++ 11 files changed, 650 insertions(+), 4 deletions(-) create mode 100644 doc/zmq_device.txt create mode 100644 include/zmq.hpp create mode 100644 src/device.cpp create mode 100644 src/device.hpp diff --git a/builds/redhat/zeromq.spec.in b/builds/redhat/zeromq.spec.in index 1fa96e1f..be1e5638 100644 --- a/builds/redhat/zeromq.spec.in +++ b/builds/redhat/zeromq.spec.in @@ -90,7 +90,14 @@ This package contains ZeroMQ related development libraries and header files. %{_libdir}/libzmq.so.1 %{_libdir}/libzmq.so.1.0.0 +%attr(0755,root,root) %{_bindir}/zmq_forwarder +%attr(0755,root,root) %{_bindir}/zmq_queue +%attr(0755,root,root) %{_bindir}/zmq_streamer + %{_mandir}/man7/zmq.7.gz +%{_mandir}/man1/zmq_forwarder.1.gz +%{_mandir}/man1/zmq_queue.1.gz +%{_mandir}/man1/zmq_streamer.1.gz %files devel %defattr(-,root,root,-) diff --git a/doc/Makefile.am b/doc/Makefile.am index 042f9c0b..2f68edf5 100644 --- a/doc/Makefile.am +++ b/doc/Makefile.am @@ -1,4 +1,4 @@ -MAN3 = zmq_bind.3 zmq_close.3 zmq_connect.3 zmq_init.3 \ +MAN3 = zmq_bind.3 zmq_close.3 zmq_connect.3 zmq_device.3 zmq_init.3 \ zmq_msg_close.3 zmq_msg_copy.3 zmq_msg_data.3 zmq_msg_init.3 \ zmq_msg_init_data.3 zmq_msg_init_size.3 zmq_msg_move.3 zmq_msg_size.3 \ zmq_msg_send.3 zmq_msg_recv.3 \ diff --git a/doc/zmq.txt b/doc/zmq.txt index 8753f6d3..f5eda96d 100644 --- a/doc/zmq.txt +++ b/doc/zmq.txt @@ -154,6 +154,16 @@ Local in-process (inter-thread) communication transport:: linkzmq:zmq_inproc[7] +Devices +~~~~~~~ +0MQ provides 'devices', which are building blocks that act as intermediate +nodes in complex messaging topologies. Devices can act as brokers that other +nodes connect to, proxies that connect through to other nodes, or any mix of +these two models. + +You can start a device in an application thread, see linkzmq:zmq_device[3]. + + ERROR HANDLING -------------- The 0MQ library functions handle errors using the standard conventions found on diff --git a/doc/zmq_device.txt b/doc/zmq_device.txt new file mode 100644 index 00000000..c33aa07d --- /dev/null +++ b/doc/zmq_device.txt @@ -0,0 +1,125 @@ +zmq_device(3) +============= + +NAME +---- +zmq_device - start built-in 0MQ device + + +SYNOPSIS +-------- +*int zmq_device (int 'device', const void '*frontend', const void '*backend');* + + +DESCRIPTION +----------- +The _zmq_device()_ function starts a built-in 0MQ device. The 'device' argument +is one of: + +'ZMQ_QUEUE':: + starts a queue device +'ZMQ_FORWARDER':: + starts a forwarder device +'ZMQ_STREAMER':: + starts a streamer device + +The device connects a frontend socket to a backend socket. Conceptually, data +flows from frontend to backend. Depending on the socket types, replies may flow +in the opposite direction. + +Before calling _zmq_device()_ you must set any socket options, and connect or +bind both frontend and backend sockets. The two conventional device models are: + +*proxy*:: + bind frontend socket to an endpoint, and connect backend socket to + downstream components. A proxy device model does not require changes to + the downstream topology but that topology is static (any changes require + reconfiguring the device). +*broker*:: + bind frontend socket to one endpoint and bind backend socket to a second + endpoint. Downstream components must now connect into the device. A broker + device model allows a dynamic downstream topology (components can come and + go at any time). + +_zmq_device()_ runs in the current thread and returns only if/when the current +context is closed. + + +QUEUE DEVICE +------------ +'ZMQ_QUEUE' creates a shared queue that collects requests from a set of clients, +and distributes these fairly among a set of services. Requests are fair-queued +from frontend connections and load-balanced between backend connections. +Replies automatically return to the client that made the original request. + +This device is part of the 'request-reply' pattern. The frontend speaks to +clients and the backend speaks to services. You should use 'ZMQ_QUEUE' with a +'ZMQ_ROUTER' socket for the frontend and a 'ZMQ_DEALER' socket for the backend. +Other combinations are not documented. + +Refer to linkzmq:zmq_socket[3] for a description of these socket types. + + +FORWARDER DEVICE +---------------- +'ZMQ_FORWARDER' collects messages from a set of publishers and forwards these to +a set of subscribers. You will generally use this to bridge networks, e.g. read +on TCP unicast and forward on multicast. + +This device is part of the 'publish-subscribe' pattern. The frontend speaks to +publishers and the backend speaks to subscribers. You should use +'ZMQ_FORWARDER' with a 'ZMQ_SUB' socket for the frontend and a 'ZMQ_PUB' socket +for the backend. Other combinations are not documented. + +Refer to linkzmq:zmq_socket[3] for a description of these socket types. + + +STREAMER DEVICE +--------------- +'ZMQ_STREAMER' collects tasks from a set of pushers and forwards these to a set +of pullers. You will generally use this to bridge networks. Messages are +fair-queued from pushers and load-balanced to pullers. + +This device is part of the 'pipeline' pattern. The frontend speaks to pushers +and the backend speaks to pullers. You should use 'ZMQ_STREAMER' with a +'ZMQ_PULL' socket for the frontend and a 'ZMQ_PUSH' socket for the backend. +Other combinations are not documented. + +Refer to linkzmq:zmq_socket[3] for a description of these socket types. + + +RETURN VALUE +------------ +The _zmq_device()_ function always returns `-1` and 'errno' set to *ETERM* (the +0MQ 'context' associated with either of the specified sockets was terminated). + + +EXAMPLE +------- +.Creating a queue broker +---- +// Create frontend and backend sockets +void *frontend = zmq_socket (context, ZMQ_ROUTER); +assert (backend); +void *backend = zmq_socket (context, ZMQ_DEALER); +assert (frontend); +// Bind both sockets to TCP ports +assert (zmq_bind (frontend, "tcp://*:5555") == 0); +assert (zmq_bind (backend, "tcp://*:5556") == 0); +// Start a queue device +zmq_device (ZMQ_QUEUE, frontend, backend); +---- + + +SEE ALSO +-------- +linkzmq:zmq_bind[3] +linkzmq:zmq_connect[3] +linkzmq:zmq_socket[3] +linkzmq:zmq[7] + + +AUTHORS +------- +This 0MQ manual page was written by Pieter Hintjens + diff --git a/doc/zmq_socket.txt b/doc/zmq_socket.txt index 47f05b6a..61835711 100644 --- a/doc/zmq_socket.txt +++ b/doc/zmq_socket.txt @@ -149,9 +149,6 @@ remove the first part of the message and use it to determine the _identity_ of the peer the message shall be routed to. If the peer does not exist anymore the message shall be silently discarded. -Previously this socket was called 'ZMQ_XREP' and that name remains available -for backwards compatibility. - When a 'ZMQ_ROUTER' socket enters an exceptional state due to having reached the high water mark for all peers, or if there are no peers at all, then any messages sent to the socket shall be dropped until the exceptional state ends. diff --git a/include/zmq.h b/include/zmq.h index 8bc3d901..fc6e4f3e 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -279,6 +279,16 @@ typedef struct ZMQ_EXPORT int zmq_poll (zmq_pollitem_t *items, int nitems, long timeout); +/******************************************************************************/ +/* Devices - Experimental. */ +/******************************************************************************/ + +#define ZMQ_STREAMER 1 +#define ZMQ_FORWARDER 2 +#define ZMQ_QUEUE 3 + +ZMQ_EXPORT int zmq_device (int device, void * insocket, void* outsocket); + #undef ZMQ_EXPORT #ifdef __cplusplus diff --git a/include/zmq.hpp b/include/zmq.hpp new file mode 100644 index 00000000..813535fe --- /dev/null +++ b/include/zmq.hpp @@ -0,0 +1,296 @@ +/* + Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#ifndef __ZMQ_HPP_INCLUDED__ +#define __ZMQ_HPP_INCLUDED__ + +#include "zmq.h" + +#include +#include +#include + +namespace zmq +{ + + typedef zmq_free_fn free_fn; + typedef zmq_pollitem_t pollitem_t; + + class error_t : public std::exception + { + public: + + error_t () : errnum (zmq_errno ()) {} + + virtual const char *what () const throw () + { + return zmq_strerror (errnum); + } + + int num () const + { + return errnum; + } + + private: + + int errnum; + }; + + inline int poll (zmq_pollitem_t *items_, int nitems_, long timeout_ = -1) + { + int rc = zmq_poll (items_, nitems_, timeout_); + if (rc < 0) + throw error_t (); + return rc; + } + + inline void device (int device_, void * insocket_, void* outsocket_) + { + int rc = zmq_device (device_, insocket_, outsocket_); + if (rc != 0) + throw error_t (); + } + + class message_t : private zmq_msg_t + { + friend class socket_t; + + public: + + inline message_t () + { + int rc = zmq_msg_init (this); + if (rc != 0) + throw error_t (); + } + + inline message_t (size_t size_) + { + int rc = zmq_msg_init_size (this, size_); + if (rc != 0) + throw error_t (); + } + + inline message_t (void *data_, size_t size_, free_fn *ffn_, + void *hint_ = NULL) + { + int rc = zmq_msg_init_data (this, data_, size_, ffn_, hint_); + if (rc != 0) + throw error_t (); + } + + inline ~message_t () + { + int rc = zmq_msg_close (this); + assert (rc == 0); + } + + inline void rebuild () + { + int rc = zmq_msg_close (this); + if (rc != 0) + throw error_t (); + rc = zmq_msg_init (this); + if (rc != 0) + throw error_t (); + } + + inline void rebuild (size_t size_) + { + int rc = zmq_msg_close (this); + if (rc != 0) + throw error_t (); + rc = zmq_msg_init_size (this, size_); + if (rc != 0) + throw error_t (); + } + + inline void rebuild (void *data_, size_t size_, free_fn *ffn_, + void *hint_ = NULL) + { + int rc = zmq_msg_close (this); + if (rc != 0) + throw error_t (); + rc = zmq_msg_init_data (this, data_, size_, ffn_, hint_); + if (rc != 0) + throw error_t (); + } + + inline void move (message_t *msg_) + { + int rc = zmq_msg_move (this, (zmq_msg_t*) msg_); + if (rc != 0) + throw error_t (); + } + + inline void copy (message_t *msg_) + { + int rc = zmq_msg_copy (this, (zmq_msg_t*) msg_); + if (rc != 0) + throw error_t (); + } + + inline void *data () + { + return zmq_msg_data (this); + } + + inline size_t size () + { + return zmq_msg_size (this); + } + + private: + + // Disable implicit message copying, so that users won't use shared + // messages (less efficient) without being aware of the fact. + message_t (const message_t&); + void operator = (const message_t&); + }; + + class context_t + { + friend class socket_t; + + public: + + inline context_t (int io_threads_) + { + ptr = zmq_init (io_threads_); + if (ptr == NULL) + throw error_t (); + } + + inline ~context_t () + { + int rc = zmq_term (ptr); + assert (rc == 0); + } + + // Be careful with this, it's probably only useful for + // using the C api together with an existing C++ api. + // Normally you should never need to use this. + inline operator void* () + { + return ptr; + } + + private: + + void *ptr; + + context_t (const context_t&); + void operator = (const context_t&); + }; + + class socket_t + { + public: + + inline socket_t (context_t &context_, int type_) + { + ptr = zmq_socket (context_.ptr, type_); + if (ptr == NULL) + throw error_t (); + } + + inline ~socket_t () + { + close(); + } + + inline operator void* () + { + return ptr; + } + + inline void close() + { + if(ptr == NULL) + // already closed + return ; + int rc = zmq_close (ptr); + if (rc != 0) + throw error_t (); + ptr = 0 ; + } + + inline void setsockopt (int option_, const void *optval_, + size_t optvallen_) + { + int rc = zmq_setsockopt (ptr, option_, optval_, optvallen_); + if (rc != 0) + throw error_t (); + } + + inline void getsockopt (int option_, void *optval_, + size_t *optvallen_) + { + int rc = zmq_getsockopt (ptr, option_, optval_, optvallen_); + if (rc != 0) + throw error_t (); + } + + inline void bind (const char *addr_) + { + int rc = zmq_bind (ptr, addr_); + if (rc != 0) + throw error_t (); + } + + inline void connect (const char *addr_) + { + int rc = zmq_connect (ptr, addr_); + if (rc != 0) + throw error_t (); + } + + inline bool send (message_t &msg_, int flags_ = 0) + { + int rc = zmq_send (ptr, &msg_, flags_); + if (rc == 0) + return true; + if (rc == -1 && zmq_errno () == EAGAIN) + return false; + throw error_t (); + } + + inline bool recv (message_t *msg_, int flags_ = 0) + { + int rc = zmq_recv (ptr, msg_, flags_); + if (rc == 0) + return true; + if (rc == -1 && zmq_errno () == EAGAIN) + return false; + throw error_t (); + } + + private: + + void *ptr; + + socket_t (const socket_t&); + void operator = (const socket_t&); + }; + +} + +#endif diff --git a/src/Makefile.am b/src/Makefile.am index 59357470..c61a4dd0 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -16,6 +16,7 @@ libzmq_la_SOURCES = \ config.hpp \ ctx.hpp \ decoder.hpp \ + device.hpp \ devpoll.hpp \ dist.hpp \ encoder.hpp \ @@ -81,6 +82,7 @@ libzmq_la_SOURCES = \ clock.cpp \ ctx.cpp \ decoder.cpp \ + device.cpp \ devpoll.cpp \ dist.cpp \ encoder.cpp \ diff --git a/src/device.cpp b/src/device.cpp new file mode 100644 index 00000000..d3e73e40 --- /dev/null +++ b/src/device.cpp @@ -0,0 +1,120 @@ +/* + Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include + +#include "../include/zmq.h" + +#include "device.hpp" +#include "socket_base.hpp" +#include "likely.hpp" +#include "err.hpp" + +int zmq::device (class socket_base_t *insocket_, + class socket_base_t *outsocket_) +{ + msg_t msg; + int rc = msg.init (); + + if (rc != 0) { + return -1; + } + + int64_t more; + size_t moresz; + + zmq_pollitem_t items [2]; + items [0].socket = insocket_; + items [0].fd = 0; + items [0].events = ZMQ_POLLIN; + items [0].revents = 0; + items [1].socket = outsocket_; + items [1].fd = 0; + items [1].events = ZMQ_POLLIN; + items [1].revents = 0; + + while (true) { + + // Wait while there are either requests or replies to process. + rc = zmq_poll (&items [0], 2, -1); + if (unlikely (rc < 0)) { + return -1; + } + + // The algorithm below asumes ratio of request and replies processed + // under full load to be 1:1. Although processing requests replies + // first is tempting it is suspectible to DoS attacks (overloading + // the system with unsolicited replies). + + // Process a request. + if (items [0].revents & ZMQ_POLLIN) { + while (true) { + + rc = insocket_->recv (&msg, 0); + if (unlikely (rc < 0)) { + return -1; + } + + moresz = sizeof (more); + rc = insocket_->getsockopt (ZMQ_RCVMORE, &more, &moresz); + if (unlikely (rc < 0)) { + return -1; + } + + rc = outsocket_->send (&msg, more ? ZMQ_SNDMORE : 0); + if (unlikely (rc < 0)) { + return -1; + } + + if (!more) + break; + } + } + + // Process a reply. + if (items [1].revents & ZMQ_POLLIN) { + while (true) { + + rc = outsocket_->recv (&msg, 0); + if (unlikely (rc < 0)) { + return -1; + } + + moresz = sizeof (more); + rc = outsocket_->getsockopt (ZMQ_RCVMORE, &more, &moresz); + if (unlikely (rc < 0)) { + return -1; + } + + rc = insocket_->send (&msg, more ? ZMQ_SNDMORE : 0); + if (unlikely (rc < 0)) { + return -1; + } + + if (!more) + break; + } + } + + } + + return 0; +} + diff --git a/src/device.hpp b/src/device.hpp new file mode 100644 index 00000000..c5b71187 --- /dev/null +++ b/src/device.hpp @@ -0,0 +1,32 @@ +/* + Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#ifndef __ZMQ_DEVICE_HPP_INCLUDED__ +#define __ZMQ_DEVICE_HPP_INCLUDED__ + +namespace zmq +{ + + int device (class socket_base_t *insocket_, + class socket_base_t *outsocket_); + +} + +#endif diff --git a/src/zmq.cpp b/src/zmq.cpp index ed241819..a962bcb5 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -68,6 +68,7 @@ struct iovec { #include #include +#include "device.hpp" #include "socket_base.hpp" #include "stdint.hpp" #include "config.hpp" @@ -910,3 +911,49 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) #if defined ZMQ_POLL_BASED_ON_POLL #undef ZMQ_POLL_BASED_ON_POLL #endif + +int zmq_device (int device_, void *insocket_, void *outsocket_) +{ + if (!insocket_ || !outsocket_) { + errno = EFAULT; + return -1; + } + + if (device_ != ZMQ_FORWARDER && device_ != ZMQ_QUEUE && + device_ != ZMQ_STREAMER) { + errno = EINVAL; + return -1; + } + + return zmq::device ((zmq::socket_base_t*) insocket_, + (zmq::socket_base_t*) outsocket_); +} + +//////////////////////////////////////////////////////////////////////////////// +// 0MQ utils - to be used by perf tests +//////////////////////////////////////////////////////////////////////////////// + +void zmq_sleep (int seconds_) +{ +#if defined ZMQ_HAVE_WINDOWS + Sleep (seconds_ * 1000); +#else + sleep (seconds_); +#endif +} + +void *zmq_stopwatch_start () +{ + uint64_t *watch = (uint64_t*) malloc (sizeof (uint64_t)); + alloc_assert (watch); + *watch = zmq::clock_t::now_us (); + return (void*) watch; +} + +unsigned long zmq_stopwatch_stop (void *watch_) +{ + uint64_t end = zmq::clock_t::now_us (); + uint64_t start = *(uint64_t*) watch_; + free (watch_); + return (unsigned long) (end - start); +}