mirror of
https://github.com/zeromq/libzmq.git
synced 2025-03-09 23:36:04 +00:00
Merge pull request #422 from hintjens/master
Replaced device concept with proxy concept
This commit is contained in:
commit
7a40df6d3a
@ -1,4 +1,4 @@
|
|||||||
MAN3 = zmq_bind.3 zmq_close.3 zmq_connect.3 zmq_device.3 \
|
MAN3 = zmq_bind.3 zmq_close.3 zmq_connect.3 zmq_proxy.3 \
|
||||||
zmq_ctx_new.3 zmq_ctx_destroy.3 zmq_ctx_get.3 zmq_ctx_set.3 \
|
zmq_ctx_new.3 zmq_ctx_destroy.3 zmq_ctx_get.3 zmq_ctx_set.3 \
|
||||||
zmq_init.3 zmq_term.3 zmq_ctx_set_monitor.3\
|
zmq_init.3 zmq_term.3 zmq_ctx_set_monitor.3\
|
||||||
zmq_msg_close.3 zmq_msg_copy.3 zmq_msg_data.3 zmq_msg_init.3 \
|
zmq_msg_close.3 zmq_msg_copy.3 zmq_msg_data.3 zmq_msg_init.3 \
|
||||||
|
16
doc/zmq.txt
16
doc/zmq.txt
@ -136,10 +136,10 @@ Establishing a message flow::
|
|||||||
linkzmq:zmq_connect[3]
|
linkzmq:zmq_connect[3]
|
||||||
|
|
||||||
Sending and receiving messages::
|
Sending and receiving messages::
|
||||||
|
linkzmq:zmq_msg_send[3]
|
||||||
|
linkzmq:zmq_msg_recv[3]
|
||||||
linkzmq:zmq_send[3]
|
linkzmq:zmq_send[3]
|
||||||
linkzmq:zmq_sendmsg[3]
|
|
||||||
linkzmq:zmq_recv[3]
|
linkzmq:zmq_recv[3]
|
||||||
linkzmq:zmq_recvmsg[3]
|
|
||||||
|
|
||||||
.Input/output multiplexing
|
.Input/output multiplexing
|
||||||
0MQ provides a mechanism for applications to multiplex input/output events over
|
0MQ provides a mechanism for applications to multiplex input/output events over
|
||||||
@ -169,14 +169,12 @@ Local in-process (inter-thread) communication transport::
|
|||||||
linkzmq:zmq_inproc[7]
|
linkzmq:zmq_inproc[7]
|
||||||
|
|
||||||
|
|
||||||
Devices
|
Proxies
|
||||||
~~~~~~~
|
~~~~~~~
|
||||||
0MQ provides 'devices', which are building blocks that act as intermediate
|
0MQ provides 'proxies' to create fanout and fan-in topologies. A proxy connects
|
||||||
nodes in complex messaging topologies. Devices can act as brokers that other
|
a 'frontend' socket to a 'backend' socket and switches all messages between the
|
||||||
nodes connect to, proxies that connect through to other nodes, or any mix of
|
two sockets, opaquely. A proxy may optionally capture all traffic to a third
|
||||||
these two models.
|
socket. To start a proxy in an application thread, use linkzmq:zmq_proxy[3].
|
||||||
|
|
||||||
You can start a device in an application thread, see linkzmq:zmq_device[3].
|
|
||||||
|
|
||||||
|
|
||||||
ERROR HANDLING
|
ERROR HANDLING
|
||||||
|
@ -1,125 +0,0 @@
|
|||||||
zmq_device(3)
|
|
||||||
=============
|
|
||||||
|
|
||||||
NAME
|
|
||||||
----
|
|
||||||
zmq_device - start built-in 0MQ device
|
|
||||||
|
|
||||||
|
|
||||||
SYNOPSIS
|
|
||||||
--------
|
|
||||||
*int zmq_device (int 'device', const void '*frontend', const void '*backend');*
|
|
||||||
|
|
||||||
|
|
||||||
DESCRIPTION
|
|
||||||
-----------
|
|
||||||
The _zmq_device()_ function starts a built-in 0MQ device. The 'device' argument
|
|
||||||
is one of:
|
|
||||||
|
|
||||||
'ZMQ_QUEUE'::
|
|
||||||
starts a queue device
|
|
||||||
'ZMQ_FORWARDER'::
|
|
||||||
starts a forwarder device
|
|
||||||
'ZMQ_STREAMER'::
|
|
||||||
starts a streamer device
|
|
||||||
|
|
||||||
The device connects a frontend socket to a backend socket. Conceptually, data
|
|
||||||
flows from frontend to backend. Depending on the socket types, replies may flow
|
|
||||||
in the opposite direction.
|
|
||||||
|
|
||||||
Before calling _zmq_device()_ you must set any socket options, and connect or
|
|
||||||
bind both frontend and backend sockets. The two conventional device models are:
|
|
||||||
|
|
||||||
*proxy*::
|
|
||||||
bind frontend socket to an endpoint, and connect backend socket to
|
|
||||||
downstream components. A proxy device model does not require changes to
|
|
||||||
the downstream topology but that topology is static (any changes require
|
|
||||||
reconfiguring the device).
|
|
||||||
*broker*::
|
|
||||||
bind frontend socket to one endpoint and bind backend socket to a second
|
|
||||||
endpoint. Downstream components must now connect into the device. A broker
|
|
||||||
device model allows a dynamic downstream topology (components can come and
|
|
||||||
go at any time).
|
|
||||||
|
|
||||||
_zmq_device()_ runs in the current thread and returns only if/when the current
|
|
||||||
context is closed.
|
|
||||||
|
|
||||||
|
|
||||||
QUEUE DEVICE
|
|
||||||
------------
|
|
||||||
'ZMQ_QUEUE' creates a shared queue that collects requests from a set of clients,
|
|
||||||
and distributes these fairly among a set of services. Requests are fair-queued
|
|
||||||
from frontend connections and load-balanced between backend connections.
|
|
||||||
Replies automatically return to the client that made the original request.
|
|
||||||
|
|
||||||
This device is part of the 'request-reply' pattern. The frontend speaks to
|
|
||||||
clients and the backend speaks to services. You should use 'ZMQ_QUEUE' with a
|
|
||||||
'ZMQ_ROUTER' socket for the frontend and a 'ZMQ_DEALER' socket for the backend.
|
|
||||||
Other combinations are not documented.
|
|
||||||
|
|
||||||
Refer to linkzmq:zmq_socket[3] for a description of these socket types.
|
|
||||||
|
|
||||||
|
|
||||||
FORWARDER DEVICE
|
|
||||||
----------------
|
|
||||||
'ZMQ_FORWARDER' collects messages from a set of publishers and forwards these to
|
|
||||||
a set of subscribers. You will generally use this to bridge networks, e.g. read
|
|
||||||
on TCP unicast and forward on multicast.
|
|
||||||
|
|
||||||
This device is part of the 'publish-subscribe' pattern. The frontend speaks to
|
|
||||||
publishers and the backend speaks to subscribers. You should use
|
|
||||||
'ZMQ_FORWARDER' with a 'ZMQ_SUB' socket for the frontend and a 'ZMQ_PUB' socket
|
|
||||||
for the backend. Other combinations are not documented.
|
|
||||||
|
|
||||||
Refer to linkzmq:zmq_socket[3] for a description of these socket types.
|
|
||||||
|
|
||||||
|
|
||||||
STREAMER DEVICE
|
|
||||||
---------------
|
|
||||||
'ZMQ_STREAMER' collects tasks from a set of pushers and forwards these to a set
|
|
||||||
of pullers. You will generally use this to bridge networks. Messages are
|
|
||||||
fair-queued from pushers and load-balanced to pullers.
|
|
||||||
|
|
||||||
This device is part of the 'pipeline' pattern. The frontend speaks to pushers
|
|
||||||
and the backend speaks to pullers. You should use 'ZMQ_STREAMER' with a
|
|
||||||
'ZMQ_PULL' socket for the frontend and a 'ZMQ_PUSH' socket for the backend.
|
|
||||||
Other combinations are not documented.
|
|
||||||
|
|
||||||
Refer to linkzmq:zmq_socket[3] for a description of these socket types.
|
|
||||||
|
|
||||||
|
|
||||||
RETURN VALUE
|
|
||||||
------------
|
|
||||||
The _zmq_device()_ function always returns `-1` and 'errno' set to *ETERM* (the
|
|
||||||
0MQ 'context' associated with either of the specified sockets was terminated).
|
|
||||||
|
|
||||||
|
|
||||||
EXAMPLE
|
|
||||||
-------
|
|
||||||
.Creating a queue broker
|
|
||||||
----
|
|
||||||
// Create frontend and backend sockets
|
|
||||||
void *frontend = zmq_socket (context, ZMQ_ROUTER);
|
|
||||||
assert (backend);
|
|
||||||
void *backend = zmq_socket (context, ZMQ_DEALER);
|
|
||||||
assert (frontend);
|
|
||||||
// Bind both sockets to TCP ports
|
|
||||||
assert (zmq_bind (frontend, "tcp://*:5555") == 0);
|
|
||||||
assert (zmq_bind (backend, "tcp://*:5556") == 0);
|
|
||||||
// Start a queue device
|
|
||||||
zmq_device (ZMQ_QUEUE, frontend, backend);
|
|
||||||
----
|
|
||||||
|
|
||||||
|
|
||||||
SEE ALSO
|
|
||||||
--------
|
|
||||||
linkzmq:zmq_bind[3]
|
|
||||||
linkzmq:zmq_connect[3]
|
|
||||||
linkzmq:zmq_socket[3]
|
|
||||||
linkzmq:zmq[7]
|
|
||||||
|
|
||||||
|
|
||||||
AUTHORS
|
|
||||||
-------
|
|
||||||
This 0MQ manual page was written by Pieter Hintjens <ph@imatix.com>
|
|
||||||
|
|
97
doc/zmq_proxy.txt
Normal file
97
doc/zmq_proxy.txt
Normal file
@ -0,0 +1,97 @@
|
|||||||
|
zmq_proxy(3)
|
||||||
|
============
|
||||||
|
|
||||||
|
NAME
|
||||||
|
----
|
||||||
|
zmq_proxy - start built-in 0MQ proxy
|
||||||
|
|
||||||
|
|
||||||
|
SYNOPSIS
|
||||||
|
--------
|
||||||
|
*int zmq_proxy (const void '*frontend', const void '*backend', const void '*capture');*
|
||||||
|
|
||||||
|
|
||||||
|
DESCRIPTION
|
||||||
|
-----------
|
||||||
|
The _zmq_proxy()_ function starts the built-in 0MQ proxy in the current
|
||||||
|
application thread.
|
||||||
|
|
||||||
|
The proxy connects a frontend socket to a backend socket. Conceptually, data
|
||||||
|
flows from frontend to backend. Depending on the socket types, replies may flow
|
||||||
|
in the opposite direction. The direction is conceptual only; the proxy is fully
|
||||||
|
symmetric and there is no technical difference between frontend and backend.
|
||||||
|
|
||||||
|
Before calling _zmq_proxy()_ you must set any socket options, and connect or
|
||||||
|
bind both frontend and backend sockets. The two conventional proxy models are:
|
||||||
|
|
||||||
|
_zmq_proxy()_ runs in the current thread and returns only if/when the current
|
||||||
|
context is closed.
|
||||||
|
|
||||||
|
If the capture socket is not NULL, the proxy shall send all messages, received
|
||||||
|
on both frontend and backend, to the capture socket. The capture socket should
|
||||||
|
be a 'ZMQ_PUB', 'ZMQ_DEALER', 'ZMQ_PUSH', or 'ZMQ_PAIR' socket.
|
||||||
|
|
||||||
|
Refer to linkzmq:zmq_socket[3] for a description of the available socket types.
|
||||||
|
|
||||||
|
EXAMPLE USAGE
|
||||||
|
-------------
|
||||||
|
|
||||||
|
Shared Queue
|
||||||
|
~~~~~~~~~~~~
|
||||||
|
|
||||||
|
When the frontend is a ZMQ_ROUTER socket, and the backend is a ZMQ_DEALER
|
||||||
|
socket, the proxy shall act as a shared queue that collects requests from a
|
||||||
|
set of clients, and distributes these fairly among a set of services.
|
||||||
|
Requests shall be fair-queued from frontend connections and distributed evenly
|
||||||
|
across backend connections. Replies shall automatically return to the client
|
||||||
|
that made the original request.
|
||||||
|
|
||||||
|
Forwarder
|
||||||
|
~~~~~~~~~
|
||||||
|
|
||||||
|
When the frontend is a ZMQ_XSUB socket, and the backend is a ZMQ_XPUB socket,
|
||||||
|
the proxy shall act as a message forwarder that collects messages from a set
|
||||||
|
of publishers and forwards these to a set of subscribers. This may be used to
|
||||||
|
bridge networks transports, e.g. read on tcp:// and forward on pgm://.
|
||||||
|
|
||||||
|
Streamer
|
||||||
|
~~~~~~~~
|
||||||
|
|
||||||
|
When the frontend is a ZMQ_PULL socket, and the backend is a ZMQ_PUSH socket,
|
||||||
|
the proxy shall collect tasks from a set of clients and forwards these to a set
|
||||||
|
of workers using the pipeline pattern.
|
||||||
|
|
||||||
|
RETURN VALUE
|
||||||
|
------------
|
||||||
|
The _zmq_proxy()_ function always returns `-1` and 'errno' set to *ETERM* (the
|
||||||
|
0MQ 'context' associated with either of the specified sockets was terminated).
|
||||||
|
|
||||||
|
|
||||||
|
EXAMPLE
|
||||||
|
-------
|
||||||
|
.Creating a shared queue proxy
|
||||||
|
----
|
||||||
|
// Create frontend and backend sockets
|
||||||
|
void *frontend = zmq_socket (context, ZMQ_ROUTER);
|
||||||
|
assert (backend);
|
||||||
|
void *backend = zmq_socket (context, ZMQ_DEALER);
|
||||||
|
assert (frontend);
|
||||||
|
// Bind both sockets to TCP ports
|
||||||
|
assert (zmq_bind (frontend, "tcp://*:5555") == 0);
|
||||||
|
assert (zmq_bind (backend, "tcp://*:5556") == 0);
|
||||||
|
// Start the queue proxy, which runs until ETERM
|
||||||
|
zmq_proxy (frontend, backend, NULL);
|
||||||
|
----
|
||||||
|
|
||||||
|
|
||||||
|
SEE ALSO
|
||||||
|
--------
|
||||||
|
linkzmq:zmq_bind[3]
|
||||||
|
linkzmq:zmq_connect[3]
|
||||||
|
linkzmq:zmq_socket[3]
|
||||||
|
linkzmq:zmq[7]
|
||||||
|
|
||||||
|
|
||||||
|
AUTHORS
|
||||||
|
-------
|
||||||
|
This 0MQ manual page was written by Pieter Hintjens <ph@imatix.com>
|
@ -256,6 +256,8 @@ ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int option, int optval);
|
|||||||
/* Send/recv options. */
|
/* Send/recv options. */
|
||||||
#define ZMQ_DONTWAIT 1
|
#define ZMQ_DONTWAIT 1
|
||||||
#define ZMQ_SNDMORE 2
|
#define ZMQ_SNDMORE 2
|
||||||
|
/* Deprecated aliases */
|
||||||
|
#define ZMQ_NOBLOCK ZMQ_DONTWAIT
|
||||||
|
|
||||||
/******************************************************************************/
|
/******************************************************************************/
|
||||||
/* 0MQ socket events and monitoring */
|
/* 0MQ socket events and monitoring */
|
||||||
@ -369,15 +371,16 @@ typedef struct
|
|||||||
|
|
||||||
ZMQ_EXPORT int zmq_poll (zmq_pollitem_t *items, int nitems, long timeout);
|
ZMQ_EXPORT int zmq_poll (zmq_pollitem_t *items, int nitems, long timeout);
|
||||||
|
|
||||||
/******************************************************************************/
|
// Built-in message proxy (3-way)
|
||||||
/* Devices - Experimental. */
|
|
||||||
/******************************************************************************/
|
|
||||||
|
|
||||||
|
ZMQ_EXPORT int zmq_proxy (void *frontend, void *backend, void *capture);
|
||||||
|
|
||||||
|
// Deprecated aliases
|
||||||
#define ZMQ_STREAMER 1
|
#define ZMQ_STREAMER 1
|
||||||
#define ZMQ_FORWARDER 2
|
#define ZMQ_FORWARDER 2
|
||||||
#define ZMQ_QUEUE 3
|
#define ZMQ_QUEUE 3
|
||||||
|
// Deprecated method
|
||||||
ZMQ_EXPORT int zmq_device (int device, void *insocket, void* outsocket);
|
ZMQ_EXPORT int zmq_device (int type, void *frontend, void *backend);
|
||||||
|
|
||||||
#undef ZMQ_EXPORT
|
#undef ZMQ_EXPORT
|
||||||
|
|
||||||
|
@ -16,7 +16,6 @@ libzmq_la_SOURCES = \
|
|||||||
config.hpp \
|
config.hpp \
|
||||||
ctx.hpp \
|
ctx.hpp \
|
||||||
decoder.hpp \
|
decoder.hpp \
|
||||||
device.hpp \
|
|
||||||
devpoll.hpp \
|
devpoll.hpp \
|
||||||
dist.hpp \
|
dist.hpp \
|
||||||
encoder.hpp \
|
encoder.hpp \
|
||||||
@ -51,6 +50,7 @@ libzmq_la_SOURCES = \
|
|||||||
poller.hpp \
|
poller.hpp \
|
||||||
poller_base.hpp \
|
poller_base.hpp \
|
||||||
pair.hpp \
|
pair.hpp \
|
||||||
|
proxy.hpp \
|
||||||
pub.hpp \
|
pub.hpp \
|
||||||
pull.hpp \
|
pull.hpp \
|
||||||
push.hpp \
|
push.hpp \
|
||||||
@ -83,7 +83,6 @@ libzmq_la_SOURCES = \
|
|||||||
clock.cpp \
|
clock.cpp \
|
||||||
ctx.cpp \
|
ctx.cpp \
|
||||||
decoder.cpp \
|
decoder.cpp \
|
||||||
device.cpp \
|
|
||||||
devpoll.cpp \
|
devpoll.cpp \
|
||||||
dist.cpp \
|
dist.cpp \
|
||||||
encoder.cpp \
|
encoder.cpp \
|
||||||
@ -113,6 +112,7 @@ libzmq_la_SOURCES = \
|
|||||||
poller_base.cpp \
|
poller_base.cpp \
|
||||||
pull.cpp \
|
pull.cpp \
|
||||||
push.cpp \
|
push.cpp \
|
||||||
|
proxy.cpp \
|
||||||
reaper.cpp \
|
reaper.cpp \
|
||||||
pub.cpp \
|
pub.cpp \
|
||||||
random.cpp \
|
random.cpp \
|
||||||
|
@ -1,6 +1,6 @@
|
|||||||
/*
|
/*
|
||||||
Copyright (c) 2007-2011 iMatix Corporation
|
Copyright (c) 2007-2012 iMatix Corporation
|
||||||
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
|
Copyright (c) 2007-2012 Other contributors as noted in the AUTHORS file
|
||||||
|
|
||||||
This file is part of 0MQ.
|
This file is part of 0MQ.
|
||||||
|
|
||||||
@ -19,41 +19,17 @@
|
|||||||
*/
|
*/
|
||||||
|
|
||||||
#include <stddef.h>
|
#include <stddef.h>
|
||||||
|
|
||||||
#include "platform.hpp"
|
|
||||||
|
|
||||||
#if defined ZMQ_FORCE_SELECT
|
|
||||||
#define ZMQ_POLL_BASED_ON_SELECT
|
|
||||||
#elif defined ZMQ_FORCE_POLL
|
|
||||||
#define ZMQ_POLL_BASED_ON_POLL
|
|
||||||
#elif defined ZMQ_HAVE_LINUX || defined ZMQ_HAVE_FREEBSD ||\
|
|
||||||
defined ZMQ_HAVE_OPENBSD || defined ZMQ_HAVE_SOLARIS ||\
|
|
||||||
defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_QNXNTO ||\
|
|
||||||
defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_AIX ||\
|
|
||||||
defined ZMQ_HAVE_NETBSD
|
|
||||||
#define ZMQ_POLL_BASED_ON_POLL
|
|
||||||
#elif defined ZMQ_HAVE_WINDOWS || defined ZMQ_HAVE_OPENVMS ||\
|
|
||||||
defined ZMQ_HAVE_CYGWIN
|
|
||||||
#define ZMQ_POLL_BASED_ON_SELECT
|
|
||||||
#endif
|
|
||||||
|
|
||||||
// On AIX platform, poll.h has to be included first to get consistent
|
|
||||||
// definition of pollfd structure (AIX uses 'reqevents' and 'retnevents'
|
|
||||||
// instead of 'events' and 'revents' and defines macros to map from POSIX-y
|
|
||||||
// names to AIX-specific names).
|
|
||||||
#if defined ZMQ_POLL_BASED_ON_POLL
|
|
||||||
#include <poll.h>
|
|
||||||
#endif
|
|
||||||
|
|
||||||
#include "../include/zmq.h"
|
#include "../include/zmq.h"
|
||||||
|
#include "platform.hpp"
|
||||||
#include "device.hpp"
|
#include "proxy.hpp"
|
||||||
#include "socket_base.hpp"
|
#include "socket_base.hpp"
|
||||||
#include "likely.hpp"
|
#include "likely.hpp"
|
||||||
#include "err.hpp"
|
#include "err.hpp"
|
||||||
|
|
||||||
int zmq::device (class socket_base_t *insocket_,
|
int zmq::proxy (
|
||||||
class socket_base_t *outsocket_)
|
class socket_base_t *frontend_,
|
||||||
|
class socket_base_t *backend_,
|
||||||
|
class socket_base_t *capture_)
|
||||||
{
|
{
|
||||||
msg_t msg;
|
msg_t msg;
|
||||||
int rc = msg.init ();
|
int rc = msg.init ();
|
||||||
@ -63,14 +39,11 @@ int zmq::device (class socket_base_t *insocket_,
|
|||||||
// The algorithm below assumes ratio of requests and replies processed
|
// The algorithm below assumes ratio of requests and replies processed
|
||||||
// under full load to be 1:1.
|
// under full load to be 1:1.
|
||||||
|
|
||||||
// TODO: The current implementation drops messages when
|
|
||||||
// any of the pipes becomes full.
|
|
||||||
|
|
||||||
int more;
|
int more;
|
||||||
size_t moresz;
|
size_t moresz;
|
||||||
zmq_pollitem_t items [] = {
|
zmq_pollitem_t items [] = {
|
||||||
{ insocket_, 0, ZMQ_POLLIN, 0 },
|
{ frontend_, 0, ZMQ_POLLIN, 0 },
|
||||||
{ outsocket_, 0, ZMQ_POLLIN, 0 }
|
{ backend_, 0, ZMQ_POLLIN, 0 }
|
||||||
};
|
};
|
||||||
while (true) {
|
while (true) {
|
||||||
// Wait while there are either requests or replies to process.
|
// Wait while there are either requests or replies to process.
|
||||||
@ -78,38 +51,64 @@ int zmq::device (class socket_base_t *insocket_,
|
|||||||
if (unlikely (rc < 0))
|
if (unlikely (rc < 0))
|
||||||
return -1;
|
return -1;
|
||||||
|
|
||||||
// Process a request.
|
// Process a request
|
||||||
if (items [0].revents & ZMQ_POLLIN) {
|
if (items [0].revents & ZMQ_POLLIN) {
|
||||||
while (true) {
|
while (true) {
|
||||||
rc = insocket_->recv (&msg, 0);
|
rc = frontend_->recv (&msg, 0);
|
||||||
if (unlikely (rc < 0))
|
if (unlikely (rc < 0))
|
||||||
return -1;
|
return -1;
|
||||||
|
|
||||||
moresz = sizeof more;
|
moresz = sizeof more;
|
||||||
rc = insocket_->getsockopt (ZMQ_RCVMORE, &more, &moresz);
|
rc = frontend_->getsockopt (ZMQ_RCVMORE, &more, &moresz);
|
||||||
if (unlikely (rc < 0))
|
if (unlikely (rc < 0))
|
||||||
return -1;
|
return -1;
|
||||||
|
|
||||||
rc = outsocket_->send (&msg, more? ZMQ_SNDMORE: 0);
|
// Copy message to capture socket if any
|
||||||
|
if (capture_) {
|
||||||
|
msg_t ctrl;
|
||||||
|
rc = ctrl.init ();
|
||||||
|
if (unlikely (rc < 0))
|
||||||
|
return -1;
|
||||||
|
rc = ctrl.copy (msg);
|
||||||
|
if (unlikely (rc < 0))
|
||||||
|
return -1;
|
||||||
|
rc = capture_->send (&ctrl, more? ZMQ_SNDMORE: 0);
|
||||||
|
if (unlikely (rc < 0))
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
rc = backend_->send (&msg, more? ZMQ_SNDMORE: 0);
|
||||||
if (unlikely (rc < 0))
|
if (unlikely (rc < 0))
|
||||||
return -1;
|
return -1;
|
||||||
if (more == 0)
|
if (more == 0)
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Process a reply.
|
// Process a reply
|
||||||
if (items [1].revents & ZMQ_POLLIN) {
|
if (items [1].revents & ZMQ_POLLIN) {
|
||||||
while (true) {
|
while (true) {
|
||||||
rc = outsocket_->recv (&msg, 0);
|
rc = backend_->recv (&msg, 0);
|
||||||
if (unlikely (rc < 0))
|
if (unlikely (rc < 0))
|
||||||
return -1;
|
return -1;
|
||||||
|
|
||||||
moresz = sizeof more;
|
moresz = sizeof more;
|
||||||
rc = outsocket_->getsockopt (ZMQ_RCVMORE, &more, &moresz);
|
rc = backend_->getsockopt (ZMQ_RCVMORE, &more, &moresz);
|
||||||
if (unlikely (rc < 0))
|
if (unlikely (rc < 0))
|
||||||
return -1;
|
return -1;
|
||||||
|
|
||||||
rc = insocket_->send (&msg, more? ZMQ_SNDMORE: 0);
|
// Copy message to capture socket if any
|
||||||
|
if (capture_) {
|
||||||
|
msg_t ctrl;
|
||||||
|
rc = ctrl.init ();
|
||||||
|
if (unlikely (rc < 0))
|
||||||
|
return -1;
|
||||||
|
rc = ctrl.copy (msg);
|
||||||
|
if (unlikely (rc < 0))
|
||||||
|
return -1;
|
||||||
|
rc = capture_->send (&ctrl, more? ZMQ_SNDMORE: 0);
|
||||||
|
if (unlikely (rc < 0))
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
rc = frontend_->send (&msg, more? ZMQ_SNDMORE: 0);
|
||||||
if (unlikely (rc < 0))
|
if (unlikely (rc < 0))
|
||||||
return -1;
|
return -1;
|
||||||
if (more == 0)
|
if (more == 0)
|
@ -1,6 +1,6 @@
|
|||||||
/*
|
/*
|
||||||
Copyright (c) 2007-2011 iMatix Corporation
|
Copyright (c) 2007-2012 iMatix Corporation
|
||||||
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
|
Copyright (c) 2007-2012 Other contributors as noted in the AUTHORS file
|
||||||
|
|
||||||
This file is part of 0MQ.
|
This file is part of 0MQ.
|
||||||
|
|
||||||
@ -18,15 +18,15 @@
|
|||||||
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#ifndef __ZMQ_DEVICE_HPP_INCLUDED__
|
#ifndef __ZMQ_PROXY_HPP_INCLUDED__
|
||||||
#define __ZMQ_DEVICE_HPP_INCLUDED__
|
#define __ZMQ_PROXY_HPP_INCLUDED__
|
||||||
|
|
||||||
namespace zmq
|
namespace zmq
|
||||||
{
|
{
|
||||||
|
int proxy (
|
||||||
int device (class socket_base_t *insocket_,
|
class socket_base_t *frontend_,
|
||||||
class socket_base_t *outsocket_);
|
class socket_base_t *backend_,
|
||||||
|
class socket_base_t *control_);
|
||||||
}
|
}
|
||||||
|
|
||||||
#endif
|
#endif
|
26
src/zmq.cpp
26
src/zmq.cpp
@ -70,7 +70,7 @@ struct iovec {
|
|||||||
#include <stdlib.h>
|
#include <stdlib.h>
|
||||||
#include <new>
|
#include <new>
|
||||||
|
|
||||||
#include "device.hpp"
|
#include "proxy.hpp"
|
||||||
#include "socket_base.hpp"
|
#include "socket_base.hpp"
|
||||||
#include "stdint.hpp"
|
#include "stdint.hpp"
|
||||||
#include "config.hpp"
|
#include "config.hpp"
|
||||||
@ -962,21 +962,27 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
|
|||||||
#undef ZMQ_POLL_BASED_ON_POLL
|
#undef ZMQ_POLL_BASED_ON_POLL
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
int zmq_device (int device_, void *insocket_, void *outsocket_)
|
// The proxy functionality
|
||||||
|
|
||||||
|
int zmq_proxy (void *frontend_, void *backend_, void *control_)
|
||||||
{
|
{
|
||||||
if (!insocket_ || !outsocket_) {
|
if (!frontend_ || !backend_) {
|
||||||
errno = EFAULT;
|
errno = EFAULT;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
return zmq::proxy (
|
||||||
|
(zmq::socket_base_t*) frontend_,
|
||||||
|
(zmq::socket_base_t*) backend_,
|
||||||
|
(zmq::socket_base_t*) control_);
|
||||||
|
}
|
||||||
|
|
||||||
if (device_ != ZMQ_FORWARDER && device_ != ZMQ_QUEUE &&
|
// The deprecated device functionality
|
||||||
device_ != ZMQ_STREAMER) {
|
|
||||||
errno = EINVAL;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
return zmq::device ((zmq::socket_base_t*) insocket_,
|
int zmq_device (int type, void *frontend_, void *backend_)
|
||||||
(zmq::socket_base_t*) outsocket_);
|
{
|
||||||
|
return zmq::proxy (
|
||||||
|
(zmq::socket_base_t*) frontend_,
|
||||||
|
(zmq::socket_base_t*) backend_, NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
Loading…
x
Reference in New Issue
Block a user