From aaac4b84cb899f94570cec83970d09b19f1f9c38 Mon Sep 17 00:00:00 2001 From: Pieter Hintjens Date: Thu, 6 Sep 2012 18:20:33 +0900 Subject: [PATCH 1/2] Code cleanups --- doc/zmq.txt | 4 ++-- include/zmq.h | 2 ++ src/device.cpp | 32 +++----------------------------- 3 files changed, 7 insertions(+), 31 deletions(-) diff --git a/doc/zmq.txt b/doc/zmq.txt index cb70dbb2..84c061dc 100644 --- a/doc/zmq.txt +++ b/doc/zmq.txt @@ -136,10 +136,10 @@ Establishing a message flow:: linkzmq:zmq_connect[3] Sending and receiving messages:: + linkzmq:zmq_msg_send[3] + linkzmq:zmq_msg_recv[3] linkzmq:zmq_send[3] - linkzmq:zmq_sendmsg[3] linkzmq:zmq_recv[3] - linkzmq:zmq_recvmsg[3] .Input/output multiplexing 0MQ provides a mechanism for applications to multiplex input/output events over diff --git a/include/zmq.h b/include/zmq.h index ed0ab131..1829e4fe 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -256,6 +256,8 @@ ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int option, int optval); /* Send/recv options. */ #define ZMQ_DONTWAIT 1 #define ZMQ_SNDMORE 2 +/* Deprecated aliases */ +#define ZMQ_NOBLOCK ZMQ_DONTWAIT /******************************************************************************/ /* 0MQ socket events and monitoring */ diff --git a/src/device.cpp b/src/device.cpp index 3829d4e6..08999e58 100644 --- a/src/device.cpp +++ b/src/device.cpp @@ -1,6 +1,6 @@ /* - Copyright (c) 2007-2011 iMatix Corporation - Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file + Copyright (c) 2007-2012 iMatix Corporation + Copyright (c) 2007-2012 Other contributors as noted in the AUTHORS file This file is part of 0MQ. @@ -19,34 +19,8 @@ */ #include - -#include "platform.hpp" - -#if defined ZMQ_FORCE_SELECT -#define ZMQ_POLL_BASED_ON_SELECT -#elif defined ZMQ_FORCE_POLL -#define ZMQ_POLL_BASED_ON_POLL -#elif defined ZMQ_HAVE_LINUX || defined ZMQ_HAVE_FREEBSD ||\ - defined ZMQ_HAVE_OPENBSD || defined ZMQ_HAVE_SOLARIS ||\ - defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_QNXNTO ||\ - defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_AIX ||\ - defined ZMQ_HAVE_NETBSD -#define ZMQ_POLL_BASED_ON_POLL -#elif defined ZMQ_HAVE_WINDOWS || defined ZMQ_HAVE_OPENVMS ||\ - defined ZMQ_HAVE_CYGWIN -#define ZMQ_POLL_BASED_ON_SELECT -#endif - -// On AIX platform, poll.h has to be included first to get consistent -// definition of pollfd structure (AIX uses 'reqevents' and 'retnevents' -// instead of 'events' and 'revents' and defines macros to map from POSIX-y -// names to AIX-specific names). -#if defined ZMQ_POLL_BASED_ON_POLL -#include -#endif - #include "../include/zmq.h" - +#include "platform.hpp" #include "device.hpp" #include "socket_base.hpp" #include "likely.hpp" From 5db28752f3d6956d2e10bf8fcd66a1b4044339e0 Mon Sep 17 00:00:00 2001 From: Pieter Hintjens Date: Thu, 6 Sep 2012 19:20:21 +0900 Subject: [PATCH 2/2] Removed 'device' concept and introduced proxies * zmq_device is now a wrapper that calls zmq_proxy * zmq_proxy adds capture socket --- doc/Makefile.am | 2 +- doc/zmq.txt | 12 ++-- doc/zmq_device.txt | 125 ---------------------------------- doc/zmq_proxy.txt | 97 ++++++++++++++++++++++++++ include/zmq.h | 11 +-- src/Makefile.am | 4 +- src/{device.cpp => proxy.cpp} | 57 +++++++++++----- src/{device.hpp => proxy.hpp} | 16 ++--- src/zmq.cpp | 26 ++++--- 9 files changed, 176 insertions(+), 174 deletions(-) delete mode 100644 doc/zmq_device.txt create mode 100644 doc/zmq_proxy.txt rename src/{device.cpp => proxy.cpp} (57%) rename src/{device.hpp => proxy.hpp} (69%) diff --git a/doc/Makefile.am b/doc/Makefile.am index eb75e62d..849f3b67 100644 --- a/doc/Makefile.am +++ b/doc/Makefile.am @@ -1,4 +1,4 @@ -MAN3 = zmq_bind.3 zmq_close.3 zmq_connect.3 zmq_device.3 \ +MAN3 = zmq_bind.3 zmq_close.3 zmq_connect.3 zmq_proxy.3 \ zmq_ctx_new.3 zmq_ctx_destroy.3 zmq_ctx_get.3 zmq_ctx_set.3 \ zmq_init.3 zmq_term.3 zmq_ctx_set_monitor.3\ zmq_msg_close.3 zmq_msg_copy.3 zmq_msg_data.3 zmq_msg_init.3 \ diff --git a/doc/zmq.txt b/doc/zmq.txt index 84c061dc..4c79e4d6 100644 --- a/doc/zmq.txt +++ b/doc/zmq.txt @@ -169,14 +169,12 @@ Local in-process (inter-thread) communication transport:: linkzmq:zmq_inproc[7] -Devices +Proxies ~~~~~~~ -0MQ provides 'devices', which are building blocks that act as intermediate -nodes in complex messaging topologies. Devices can act as brokers that other -nodes connect to, proxies that connect through to other nodes, or any mix of -these two models. - -You can start a device in an application thread, see linkzmq:zmq_device[3]. +0MQ provides 'proxies' to create fanout and fan-in topologies. A proxy connects +a 'frontend' socket to a 'backend' socket and switches all messages between the +two sockets, opaquely. A proxy may optionally capture all traffic to a third +socket. To start a proxy in an application thread, use linkzmq:zmq_proxy[3]. ERROR HANDLING diff --git a/doc/zmq_device.txt b/doc/zmq_device.txt deleted file mode 100644 index c33aa07d..00000000 --- a/doc/zmq_device.txt +++ /dev/null @@ -1,125 +0,0 @@ -zmq_device(3) -============= - -NAME ----- -zmq_device - start built-in 0MQ device - - -SYNOPSIS --------- -*int zmq_device (int 'device', const void '*frontend', const void '*backend');* - - -DESCRIPTION ------------ -The _zmq_device()_ function starts a built-in 0MQ device. The 'device' argument -is one of: - -'ZMQ_QUEUE':: - starts a queue device -'ZMQ_FORWARDER':: - starts a forwarder device -'ZMQ_STREAMER':: - starts a streamer device - -The device connects a frontend socket to a backend socket. Conceptually, data -flows from frontend to backend. Depending on the socket types, replies may flow -in the opposite direction. - -Before calling _zmq_device()_ you must set any socket options, and connect or -bind both frontend and backend sockets. The two conventional device models are: - -*proxy*:: - bind frontend socket to an endpoint, and connect backend socket to - downstream components. A proxy device model does not require changes to - the downstream topology but that topology is static (any changes require - reconfiguring the device). -*broker*:: - bind frontend socket to one endpoint and bind backend socket to a second - endpoint. Downstream components must now connect into the device. A broker - device model allows a dynamic downstream topology (components can come and - go at any time). - -_zmq_device()_ runs in the current thread and returns only if/when the current -context is closed. - - -QUEUE DEVICE ------------- -'ZMQ_QUEUE' creates a shared queue that collects requests from a set of clients, -and distributes these fairly among a set of services. Requests are fair-queued -from frontend connections and load-balanced between backend connections. -Replies automatically return to the client that made the original request. - -This device is part of the 'request-reply' pattern. The frontend speaks to -clients and the backend speaks to services. You should use 'ZMQ_QUEUE' with a -'ZMQ_ROUTER' socket for the frontend and a 'ZMQ_DEALER' socket for the backend. -Other combinations are not documented. - -Refer to linkzmq:zmq_socket[3] for a description of these socket types. - - -FORWARDER DEVICE ----------------- -'ZMQ_FORWARDER' collects messages from a set of publishers and forwards these to -a set of subscribers. You will generally use this to bridge networks, e.g. read -on TCP unicast and forward on multicast. - -This device is part of the 'publish-subscribe' pattern. The frontend speaks to -publishers and the backend speaks to subscribers. You should use -'ZMQ_FORWARDER' with a 'ZMQ_SUB' socket for the frontend and a 'ZMQ_PUB' socket -for the backend. Other combinations are not documented. - -Refer to linkzmq:zmq_socket[3] for a description of these socket types. - - -STREAMER DEVICE ---------------- -'ZMQ_STREAMER' collects tasks from a set of pushers and forwards these to a set -of pullers. You will generally use this to bridge networks. Messages are -fair-queued from pushers and load-balanced to pullers. - -This device is part of the 'pipeline' pattern. The frontend speaks to pushers -and the backend speaks to pullers. You should use 'ZMQ_STREAMER' with a -'ZMQ_PULL' socket for the frontend and a 'ZMQ_PUSH' socket for the backend. -Other combinations are not documented. - -Refer to linkzmq:zmq_socket[3] for a description of these socket types. - - -RETURN VALUE ------------- -The _zmq_device()_ function always returns `-1` and 'errno' set to *ETERM* (the -0MQ 'context' associated with either of the specified sockets was terminated). - - -EXAMPLE -------- -.Creating a queue broker ----- -// Create frontend and backend sockets -void *frontend = zmq_socket (context, ZMQ_ROUTER); -assert (backend); -void *backend = zmq_socket (context, ZMQ_DEALER); -assert (frontend); -// Bind both sockets to TCP ports -assert (zmq_bind (frontend, "tcp://*:5555") == 0); -assert (zmq_bind (backend, "tcp://*:5556") == 0); -// Start a queue device -zmq_device (ZMQ_QUEUE, frontend, backend); ----- - - -SEE ALSO --------- -linkzmq:zmq_bind[3] -linkzmq:zmq_connect[3] -linkzmq:zmq_socket[3] -linkzmq:zmq[7] - - -AUTHORS -------- -This 0MQ manual page was written by Pieter Hintjens - diff --git a/doc/zmq_proxy.txt b/doc/zmq_proxy.txt new file mode 100644 index 00000000..9368429f --- /dev/null +++ b/doc/zmq_proxy.txt @@ -0,0 +1,97 @@ +zmq_proxy(3) +============ + +NAME +---- +zmq_proxy - start built-in 0MQ proxy + + +SYNOPSIS +-------- +*int zmq_proxy (const void '*frontend', const void '*backend', const void '*capture');* + + +DESCRIPTION +----------- +The _zmq_proxy()_ function starts the built-in 0MQ proxy in the current +application thread. + +The proxy connects a frontend socket to a backend socket. Conceptually, data +flows from frontend to backend. Depending on the socket types, replies may flow +in the opposite direction. The direction is conceptual only; the proxy is fully +symmetric and there is no technical difference between frontend and backend. + +Before calling _zmq_proxy()_ you must set any socket options, and connect or +bind both frontend and backend sockets. The two conventional proxy models are: + +_zmq_proxy()_ runs in the current thread and returns only if/when the current +context is closed. + +If the capture socket is not NULL, the proxy shall send all messages, received +on both frontend and backend, to the capture socket. The capture socket should +be a 'ZMQ_PUB', 'ZMQ_DEALER', 'ZMQ_PUSH', or 'ZMQ_PAIR' socket. + +Refer to linkzmq:zmq_socket[3] for a description of the available socket types. + +EXAMPLE USAGE +------------- + +Shared Queue +~~~~~~~~~~~~ + +When the frontend is a ZMQ_ROUTER socket, and the backend is a ZMQ_DEALER +socket, the proxy shall act as a shared queue that collects requests from a +set of clients, and distributes these fairly among a set of services. +Requests shall be fair-queued from frontend connections and distributed evenly +across backend connections. Replies shall automatically return to the client +that made the original request. + +Forwarder +~~~~~~~~~ + +When the frontend is a ZMQ_XSUB socket, and the backend is a ZMQ_XPUB socket, +the proxy shall act as a message forwarder that collects messages from a set +of publishers and forwards these to a set of subscribers. This may be used to +bridge networks transports, e.g. read on tcp:// and forward on pgm://. + +Streamer +~~~~~~~~ + +When the frontend is a ZMQ_PULL socket, and the backend is a ZMQ_PUSH socket, +the proxy shall collect tasks from a set of clients and forwards these to a set +of workers using the pipeline pattern. + +RETURN VALUE +------------ +The _zmq_proxy()_ function always returns `-1` and 'errno' set to *ETERM* (the +0MQ 'context' associated with either of the specified sockets was terminated). + + +EXAMPLE +------- +.Creating a shared queue proxy +---- +// Create frontend and backend sockets +void *frontend = zmq_socket (context, ZMQ_ROUTER); +assert (backend); +void *backend = zmq_socket (context, ZMQ_DEALER); +assert (frontend); +// Bind both sockets to TCP ports +assert (zmq_bind (frontend, "tcp://*:5555") == 0); +assert (zmq_bind (backend, "tcp://*:5556") == 0); +// Start the queue proxy, which runs until ETERM +zmq_proxy (frontend, backend, NULL); +---- + + +SEE ALSO +-------- +linkzmq:zmq_bind[3] +linkzmq:zmq_connect[3] +linkzmq:zmq_socket[3] +linkzmq:zmq[7] + + +AUTHORS +------- +This 0MQ manual page was written by Pieter Hintjens diff --git a/include/zmq.h b/include/zmq.h index 1829e4fe..3327b6bf 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -371,15 +371,16 @@ typedef struct ZMQ_EXPORT int zmq_poll (zmq_pollitem_t *items, int nitems, long timeout); -/******************************************************************************/ -/* Devices - Experimental. */ -/******************************************************************************/ +// Built-in message proxy (3-way) +ZMQ_EXPORT int zmq_proxy (void *frontend, void *backend, void *capture); + +// Deprecated aliases #define ZMQ_STREAMER 1 #define ZMQ_FORWARDER 2 #define ZMQ_QUEUE 3 - -ZMQ_EXPORT int zmq_device (int device, void *insocket, void* outsocket); +// Deprecated method +ZMQ_EXPORT int zmq_device (int type, void *frontend, void *backend); #undef ZMQ_EXPORT diff --git a/src/Makefile.am b/src/Makefile.am index 5adac699..00a587eb 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -16,7 +16,6 @@ libzmq_la_SOURCES = \ config.hpp \ ctx.hpp \ decoder.hpp \ - device.hpp \ devpoll.hpp \ dist.hpp \ encoder.hpp \ @@ -51,6 +50,7 @@ libzmq_la_SOURCES = \ poller.hpp \ poller_base.hpp \ pair.hpp \ + proxy.hpp \ pub.hpp \ pull.hpp \ push.hpp \ @@ -83,7 +83,6 @@ libzmq_la_SOURCES = \ clock.cpp \ ctx.cpp \ decoder.cpp \ - device.cpp \ devpoll.cpp \ dist.cpp \ encoder.cpp \ @@ -113,6 +112,7 @@ libzmq_la_SOURCES = \ poller_base.cpp \ pull.cpp \ push.cpp \ + proxy.cpp \ reaper.cpp \ pub.cpp \ random.cpp \ diff --git a/src/device.cpp b/src/proxy.cpp similarity index 57% rename from src/device.cpp rename to src/proxy.cpp index 08999e58..5cd8ae52 100644 --- a/src/device.cpp +++ b/src/proxy.cpp @@ -21,13 +21,15 @@ #include #include "../include/zmq.h" #include "platform.hpp" -#include "device.hpp" +#include "proxy.hpp" #include "socket_base.hpp" #include "likely.hpp" #include "err.hpp" -int zmq::device (class socket_base_t *insocket_, - class socket_base_t *outsocket_) +int zmq::proxy ( + class socket_base_t *frontend_, + class socket_base_t *backend_, + class socket_base_t *capture_) { msg_t msg; int rc = msg.init (); @@ -37,14 +39,11 @@ int zmq::device (class socket_base_t *insocket_, // The algorithm below assumes ratio of requests and replies processed // under full load to be 1:1. - // TODO: The current implementation drops messages when - // any of the pipes becomes full. - int more; size_t moresz; zmq_pollitem_t items [] = { - { insocket_, 0, ZMQ_POLLIN, 0 }, - { outsocket_, 0, ZMQ_POLLIN, 0 } + { frontend_, 0, ZMQ_POLLIN, 0 }, + { backend_, 0, ZMQ_POLLIN, 0 } }; while (true) { // Wait while there are either requests or replies to process. @@ -52,38 +51,64 @@ int zmq::device (class socket_base_t *insocket_, if (unlikely (rc < 0)) return -1; - // Process a request. + // Process a request if (items [0].revents & ZMQ_POLLIN) { while (true) { - rc = insocket_->recv (&msg, 0); + rc = frontend_->recv (&msg, 0); if (unlikely (rc < 0)) return -1; moresz = sizeof more; - rc = insocket_->getsockopt (ZMQ_RCVMORE, &more, &moresz); + rc = frontend_->getsockopt (ZMQ_RCVMORE, &more, &moresz); if (unlikely (rc < 0)) return -1; - rc = outsocket_->send (&msg, more? ZMQ_SNDMORE: 0); + // Copy message to capture socket if any + if (capture_) { + msg_t ctrl; + rc = ctrl.init (); + if (unlikely (rc < 0)) + return -1; + rc = ctrl.copy (msg); + if (unlikely (rc < 0)) + return -1; + rc = capture_->send (&ctrl, more? ZMQ_SNDMORE: 0); + if (unlikely (rc < 0)) + return -1; + } + rc = backend_->send (&msg, more? ZMQ_SNDMORE: 0); if (unlikely (rc < 0)) return -1; if (more == 0) break; } } - // Process a reply. + // Process a reply if (items [1].revents & ZMQ_POLLIN) { while (true) { - rc = outsocket_->recv (&msg, 0); + rc = backend_->recv (&msg, 0); if (unlikely (rc < 0)) return -1; moresz = sizeof more; - rc = outsocket_->getsockopt (ZMQ_RCVMORE, &more, &moresz); + rc = backend_->getsockopt (ZMQ_RCVMORE, &more, &moresz); if (unlikely (rc < 0)) return -1; - rc = insocket_->send (&msg, more? ZMQ_SNDMORE: 0); + // Copy message to capture socket if any + if (capture_) { + msg_t ctrl; + rc = ctrl.init (); + if (unlikely (rc < 0)) + return -1; + rc = ctrl.copy (msg); + if (unlikely (rc < 0)) + return -1; + rc = capture_->send (&ctrl, more? ZMQ_SNDMORE: 0); + if (unlikely (rc < 0)) + return -1; + } + rc = frontend_->send (&msg, more? ZMQ_SNDMORE: 0); if (unlikely (rc < 0)) return -1; if (more == 0) diff --git a/src/device.hpp b/src/proxy.hpp similarity index 69% rename from src/device.hpp rename to src/proxy.hpp index c5b71187..acd46e18 100644 --- a/src/device.hpp +++ b/src/proxy.hpp @@ -1,6 +1,6 @@ /* - Copyright (c) 2007-2011 iMatix Corporation - Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file + Copyright (c) 2007-2012 iMatix Corporation + Copyright (c) 2007-2012 Other contributors as noted in the AUTHORS file This file is part of 0MQ. @@ -18,15 +18,15 @@ along with this program. If not, see . */ -#ifndef __ZMQ_DEVICE_HPP_INCLUDED__ -#define __ZMQ_DEVICE_HPP_INCLUDED__ +#ifndef __ZMQ_PROXY_HPP_INCLUDED__ +#define __ZMQ_PROXY_HPP_INCLUDED__ namespace zmq { - - int device (class socket_base_t *insocket_, - class socket_base_t *outsocket_); - + int proxy ( + class socket_base_t *frontend_, + class socket_base_t *backend_, + class socket_base_t *control_); } #endif diff --git a/src/zmq.cpp b/src/zmq.cpp index 55dd4a92..f5b0229c 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -70,7 +70,7 @@ struct iovec { #include #include -#include "device.hpp" +#include "proxy.hpp" #include "socket_base.hpp" #include "stdint.hpp" #include "config.hpp" @@ -962,21 +962,27 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) #undef ZMQ_POLL_BASED_ON_POLL #endif -int zmq_device (int device_, void *insocket_, void *outsocket_) +// The proxy functionality + +int zmq_proxy (void *frontend_, void *backend_, void *control_) { - if (!insocket_ || !outsocket_) { + if (!frontend_ || !backend_) { errno = EFAULT; return -1; } + return zmq::proxy ( + (zmq::socket_base_t*) frontend_, + (zmq::socket_base_t*) backend_, + (zmq::socket_base_t*) control_); +} - if (device_ != ZMQ_FORWARDER && device_ != ZMQ_QUEUE && - device_ != ZMQ_STREAMER) { - errno = EINVAL; - return -1; - } +// The deprecated device functionality - return zmq::device ((zmq::socket_base_t*) insocket_, - (zmq::socket_base_t*) outsocket_); +int zmq_device (int type, void *frontend_, void *backend_) +{ + return zmq::proxy ( + (zmq::socket_base_t*) frontend_, + (zmq::socket_base_t*) backend_, NULL); } ////////////////////////////////////////////////////////////////////////////////