mirror of
https://github.com/zeromq/libzmq.git
synced 2025-03-12 09:06:26 +00:00
Revert "add a proxy hook"
This reverts commit 9ae6a91fadb96fd48038fde04cc3d4b61b49a8a1.
This commit is contained in:
parent
bc25366f7c
commit
abf9d8b74e
@ -1,200 +0,0 @@
|
|||||||
zmq_proxy_hook(3)
|
|
||||||
=================
|
|
||||||
|
|
||||||
NAME
|
|
||||||
----
|
|
||||||
zmq_proxy_hook - start built-in 0MQ proxy with an hook to modify the messages
|
|
||||||
between the frontend and the backend
|
|
||||||
|
|
||||||
|
|
||||||
SYNOPSIS
|
|
||||||
--------
|
|
||||||
*int zmq_proxy_hook (const void '*frontend', const void '*backend',
|
|
||||||
const void '*capture', const void '*hook', const void '*control');*
|
|
||||||
|
|
||||||
|
|
||||||
DESCRIPTION
|
|
||||||
-----------
|
|
||||||
The _zmq_proxy_hook()_ function starts the built-in 0MQ proxy in the
|
|
||||||
current application thread, as _zmq_proxy()_ or _zmq_proxy_steerable()_ do.
|
|
||||||
Please, refer to these functions for the general description and usage.
|
|
||||||
We describe here only the additional hook provided by the structure "hook"
|
|
||||||
passed as a fith argument.
|
|
||||||
|
|
||||||
If the hook structure pointer is not NULL, the proxy supports a hook defined as
|
|
||||||
a structure 'zmq_proxy_hook_t' containing a data pointer to any data type and
|
|
||||||
the address of two functions of type 'zmq_hook_f'. The first function,
|
|
||||||
'front2back_hook' is to manipulate the message received from the frontend, before
|
|
||||||
it is sent to the backend. The second one, 'back2front_hook' is for the way back.
|
|
||||||
|
|
||||||
Both functions receive as an argument in addition to a pointer to the message, the
|
|
||||||
pointer to the data passed in the 'zmq_proxy_hook_t' structure. This data makes it
|
|
||||||
possible to manage stateful behaviours in the proxy. They receive also the frame
|
|
||||||
number n_ which is 1 for the first frame, n for the nth one, 0 for the last one. This
|
|
||||||
enable to manage specifically the identity frame when ROUTER | STREAM sockets are
|
|
||||||
concerned. Moreover, to give the hook full capabilities, the three sockets passed
|
|
||||||
as parameters to the proxy are also provided to the hook functions, enabling to
|
|
||||||
consume some frames or to add others:
|
|
||||||
|
|
||||||
----
|
|
||||||
typedef int (*zmq_hook_f)(void *frontend, void *backend, void *capture,
|
|
||||||
zmq_msg_t* msg_, size_t n_, void *data_);
|
|
||||||
typedef struct zmq_proxy_hook_t {
|
|
||||||
void *data;
|
|
||||||
zmq_hook_f front2back_hook;
|
|
||||||
zmq_hook_f back2front_hook;
|
|
||||||
} zmq_proxy_hook_t;
|
|
||||||
----
|
|
||||||
|
|
||||||
If the hook pointer is NULL, zmq_proxy_hook behaves exactly as if zmq_proxy
|
|
||||||
or zmq_proxy_steerable had been called.
|
|
||||||
|
|
||||||
Refer to linkzmq:zmq_socket[3] for a description of the available socket types.
|
|
||||||
Refer to linkzmq:zmq_proxy[3] for a description of the zmq_proxy.
|
|
||||||
Refer to linkzmq:zmq_proxy_steerable[3] for a description of the zmq_proxy_steerable.
|
|
||||||
|
|
||||||
EXAMPLE USAGE
|
|
||||||
-------------
|
|
||||||
|
|
||||||
Filter
|
|
||||||
~~~~~~
|
|
||||||
|
|
||||||
The most simple use is to simply filter the messages for example against vulgarity.
|
|
||||||
Messages are simply scanned against a dictionnary and target words are replaced.
|
|
||||||
|
|
||||||
ROUTER | STREAM / ROUTER | STREAM proxy
|
|
||||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
|
||||||
|
|
||||||
The data field enables to multiplex as desired identities in a ROUTER/ROUTER or in a
|
|
||||||
STREAM/STREAM proxy or what ever. Such architecture enables also custom load balancers.
|
|
||||||
|
|
||||||
Sticky ROUTER / ROUTER proxy
|
|
||||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
|
||||||
|
|
||||||
The data field enables to manage sticky identity pairing in a ROUTER/ROUTER proxy.
|
|
||||||
|
|
||||||
Security mechanism proxying
|
|
||||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
|
||||||
|
|
||||||
We expect to be able to proxy CURVE with the use of this feature.
|
|
||||||
|
|
||||||
Tests
|
|
||||||
~~~~~
|
|
||||||
|
|
||||||
In an existing application, just change zmq_proxy or zmq_proxy_steerable for
|
|
||||||
zmq_proxy_hook to test anythink, even "Man in the middle" attacks ws security
|
|
||||||
mechanisms with a STREAM/STREAM proxy.
|
|
||||||
|
|
||||||
|
|
||||||
RETURN VALUE
|
|
||||||
------------
|
|
||||||
The _zmq_proxy_hook()_ function returns the same values than zmq_proxy
|
|
||||||
or zmq_proxy_steerable in the same conditions of use.
|
|
||||||
|
|
||||||
|
|
||||||
EXAMPLE
|
|
||||||
-------
|
|
||||||
This simple example aims at uppercasing the traffic between the frontend and the
|
|
||||||
backend, and lowercasing it on the way back.
|
|
||||||
|
|
||||||
.Setup the hook
|
|
||||||
----
|
|
||||||
struct stats_t {
|
|
||||||
int qt_upper_case;
|
|
||||||
int qt_lower_case;
|
|
||||||
} stats = {NULL, 0, 0};
|
|
||||||
|
|
||||||
int
|
|
||||||
upper_case(void*, void*, void*, zmq_msg_t* msg_, size_t n_, void *stats_)
|
|
||||||
{
|
|
||||||
size_t size = zmq_msg_size(msg_);
|
|
||||||
if (!size || n_ == 1) return 0; // skip identity and 0 frames
|
|
||||||
char* message = (char*) zmq_msg_data(msg_);
|
|
||||||
for (size_t i = 0; i < size; i++)
|
|
||||||
if ('a' <= message[i] && message[i] <= 'z')
|
|
||||||
message[i] += 'A' - 'a';
|
|
||||||
struct stats_t* stats = (struct stats_t*) stats_;
|
|
||||||
stats->qt_upper_case++;
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int
|
|
||||||
lower_case(void*, void*, void*, zmq_msg_t* msg_, size_t n_, void *stats_)
|
|
||||||
{
|
|
||||||
size_t size = zmq_msg_size(msg_);
|
|
||||||
if (!size || n_ == 1) return 0; // skip identity and 0 frames
|
|
||||||
char* message = (char*) zmq_msg_data(msg_);
|
|
||||||
for (size_t i = 0; i < size; i++)
|
|
||||||
if ('A' <= message[i] && message[i] <= 'Z')
|
|
||||||
message[i] += 'a' - 'A';
|
|
||||||
struct stats_t* stats = (struct stats_t*) stats_;
|
|
||||||
stats->qt_lower_case++;
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
zmq_proxy_hook_t hook = {
|
|
||||||
&stats, // data used by the hook functions, passed as void* data_
|
|
||||||
upper_case, // hook for messages going from frontend to backend
|
|
||||||
lower_case // hook for messages going from backend to frontend
|
|
||||||
};
|
|
||||||
----
|
|
||||||
.in main:
|
|
||||||
----
|
|
||||||
int
|
|
||||||
main (void)
|
|
||||||
{
|
|
||||||
setup_test_environment ();
|
|
||||||
void *context = zmq_ctx_new ();
|
|
||||||
assert (context);
|
|
||||||
// Create frontend, backend and control sockets
|
|
||||||
void *frontend = zmq_socket (context, ZMQ_ROUTER);
|
|
||||||
assert (backend);
|
|
||||||
void *backend = zmq_socket (context, ZMQ_DEALER);
|
|
||||||
assert (frontend);
|
|
||||||
void *control = zmq_socket (context, ZMQ_PUB);
|
|
||||||
assert (control);
|
|
||||||
|
|
||||||
// Bind sockets to TCP ports
|
|
||||||
assert (zmq_bind (frontend, "tcp://*:5555") == 0);
|
|
||||||
assert (zmq_bind (backend, "tcp://*:5556") == 0);
|
|
||||||
assert (zmq_connect (control, "tcp://*:5557") == 0);
|
|
||||||
|
|
||||||
// Start the queue proxy, which runs until ETERM or "TERMINATE"
|
|
||||||
// received on the control socket
|
|
||||||
zmq_proxy_hook (frontend, backend, NULL, &hook, control);
|
|
||||||
|
|
||||||
printf("frontend to backend hook hits = %d\nbackend to frontend hook hits = %d\n", stats.qt_upper_case, stats.qt_lower_case);
|
|
||||||
|
|
||||||
// close sockets and context
|
|
||||||
rc = zmq_close (control);
|
|
||||||
assert (rc == 0);
|
|
||||||
rc = zmq_close (backend);
|
|
||||||
assert (rc == 0);
|
|
||||||
rc = zmq_close (frontend);
|
|
||||||
assert (rc == 0);
|
|
||||||
rc = zmq_ctx_term (ctx);
|
|
||||||
assert (rc == 0);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
----
|
|
||||||
.somewhere, the proxy is stopped with:
|
|
||||||
----
|
|
||||||
rc = zmq_send (control, "TERMINATE", 9, 0); // stops the hooked proxy
|
|
||||||
assert (rc == 9);
|
|
||||||
----
|
|
||||||
.cf test_proxy.cpp for a full implementation of this test, with clients and workers.
|
|
||||||
|
|
||||||
SEE ALSO
|
|
||||||
--------
|
|
||||||
linkzmq:zmq_proxy[3]
|
|
||||||
linkzmq:zmq_proxy_steerable[3]
|
|
||||||
linkzmq:zmq_bind[3]
|
|
||||||
linkzmq:zmq_connect[3]
|
|
||||||
linkzmq:zmq_socket[3]
|
|
||||||
linkzmq:zmq[7]
|
|
||||||
|
|
||||||
|
|
||||||
AUTHORS
|
|
||||||
-------
|
|
||||||
This page was written by the 0MQ community. To make a change please
|
|
||||||
read the 0MQ Contribution Policy at <http://www.zeromq.org/docs:contributing>.
|
|
@ -400,14 +400,6 @@ ZMQ_EXPORT int zmq_poll (zmq_pollitem_t *items, int nitems, long timeout);
|
|||||||
|
|
||||||
ZMQ_EXPORT int zmq_proxy (void *frontend, void *backend, void *capture);
|
ZMQ_EXPORT int zmq_proxy (void *frontend, void *backend, void *capture);
|
||||||
ZMQ_EXPORT int zmq_proxy_steerable (void *frontend, void *backend, void *capture, void *control);
|
ZMQ_EXPORT int zmq_proxy_steerable (void *frontend, void *backend, void *capture, void *control);
|
||||||
ZMQ_EXPORT int zmq_proxy_hook (void *frontend, void *backend, void *capture, void *hook, void *control);
|
|
||||||
|
|
||||||
typedef int (*zmq_hook_f)(void *frontend, void *backend, void *capture, zmq_msg_t* msg_, size_t n_, void *data_);
|
|
||||||
typedef struct zmq_proxy_hook_t {
|
|
||||||
void *data;
|
|
||||||
zmq_hook_f front2back_hook;
|
|
||||||
zmq_hook_f back2front_hook;
|
|
||||||
} zmq_proxy_hook_t;
|
|
||||||
|
|
||||||
/* Encode a binary key as printable text using ZMQ RFC 32 */
|
/* Encode a binary key as printable text using ZMQ RFC 32 */
|
||||||
ZMQ_EXPORT char *zmq_z85_encode (char *dest, uint8_t *data, size_t size);
|
ZMQ_EXPORT char *zmq_z85_encode (char *dest, uint8_t *data, size_t size);
|
||||||
|
@ -53,8 +53,7 @@
|
|||||||
// zmq.h must be included *after* poll.h for AIX to build properly
|
// zmq.h must be included *after* poll.h for AIX to build properly
|
||||||
#include "../include/zmq.h"
|
#include "../include/zmq.h"
|
||||||
|
|
||||||
int
|
int capture(
|
||||||
capture(
|
|
||||||
class zmq::socket_base_t *capture_,
|
class zmq::socket_base_t *capture_,
|
||||||
zmq::msg_t& msg_,
|
zmq::msg_t& msg_,
|
||||||
int more_ = 0)
|
int more_ = 0)
|
||||||
@ -75,18 +74,15 @@ capture(
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int
|
int forward(
|
||||||
forward(
|
|
||||||
class zmq::socket_base_t *from_,
|
class zmq::socket_base_t *from_,
|
||||||
class zmq::socket_base_t *to_,
|
class zmq::socket_base_t *to_,
|
||||||
class zmq::socket_base_t *capture_,
|
class zmq::socket_base_t *capture_,
|
||||||
zmq::msg_t& msg_,
|
zmq::msg_t& msg_)
|
||||||
zmq::hook_f do_hook_,
|
|
||||||
void *data_)
|
|
||||||
{
|
{
|
||||||
int more;
|
int more;
|
||||||
size_t moresz;
|
size_t moresz;
|
||||||
for (size_t n = 1;; n++) {
|
while (true) {
|
||||||
int rc = from_->recv (&msg_, 0);
|
int rc = from_->recv (&msg_, 0);
|
||||||
if (unlikely (rc < 0))
|
if (unlikely (rc < 0))
|
||||||
return -1;
|
return -1;
|
||||||
@ -101,13 +97,6 @@ forward(
|
|||||||
if (unlikely (rc < 0))
|
if (unlikely (rc < 0))
|
||||||
return -1;
|
return -1;
|
||||||
|
|
||||||
// Hook
|
|
||||||
if (do_hook_) {
|
|
||||||
rc = (*do_hook_)(from_, to_, capture_, &msg_, more ? n : 0, data_); // first message: n == 1, mth message: n == m, last message: n == 0
|
|
||||||
if (unlikely (rc < 0))
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
rc = to_->send (&msg_, more? ZMQ_SNDMORE: 0);
|
rc = to_->send (&msg_, more? ZMQ_SNDMORE: 0);
|
||||||
if (unlikely (rc < 0))
|
if (unlikely (rc < 0))
|
||||||
return -1;
|
return -1;
|
||||||
@ -117,16 +106,12 @@ forward(
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int
|
int zmq::proxy (
|
||||||
zmq::proxy (
|
|
||||||
class socket_base_t *frontend_,
|
class socket_base_t *frontend_,
|
||||||
class socket_base_t *backend_,
|
class socket_base_t *backend_,
|
||||||
class socket_base_t *capture_,
|
class socket_base_t *capture_,
|
||||||
class socket_base_t *control_,
|
class socket_base_t *control_)
|
||||||
zmq::proxy_hook_t *hook_)
|
|
||||||
{
|
{
|
||||||
static zmq::proxy_hook_t dummy_hook = {NULL, NULL, NULL};
|
|
||||||
|
|
||||||
msg_t msg;
|
msg_t msg;
|
||||||
int rc = msg.init ();
|
int rc = msg.init ();
|
||||||
if (rc != 0)
|
if (rc != 0)
|
||||||
@ -187,20 +172,17 @@ zmq::proxy (
|
|||||||
zmq_assert (false);
|
zmq_assert (false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Check if a hook is used
|
|
||||||
if (!hook_)
|
|
||||||
hook_ = &dummy_hook;
|
|
||||||
// Process a request
|
// Process a request
|
||||||
if (state == active
|
if (state == active
|
||||||
&& items [0].revents & ZMQ_POLLIN) {
|
&& items [0].revents & ZMQ_POLLIN) {
|
||||||
rc = forward(frontend_, backend_, capture_, msg, hook_->front2back_hook, hook_->data);
|
rc = forward(frontend_, backend_, capture_,msg);
|
||||||
if (unlikely (rc < 0))
|
if (unlikely (rc < 0))
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
// Process a reply
|
// Process a reply
|
||||||
if (state == active
|
if (state == active
|
||||||
&& items [1].revents & ZMQ_POLLIN) {
|
&& items [1].revents & ZMQ_POLLIN) {
|
||||||
rc = forward(backend_, frontend_, capture_, msg, hook_->back2front_hook, hook_->data);
|
rc = forward(backend_, frontend_, capture_,msg);
|
||||||
if (unlikely (rc < 0))
|
if (unlikely (rc < 0))
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -22,22 +22,11 @@
|
|||||||
|
|
||||||
namespace zmq
|
namespace zmq
|
||||||
{
|
{
|
||||||
typedef int (*hook_f)(void *frontend, void *backend, void *capture, void* msg_, size_t n_, void *data_);
|
|
||||||
|
|
||||||
struct proxy_hook_t
|
|
||||||
{
|
|
||||||
void *data;
|
|
||||||
hook_f front2back_hook;
|
|
||||||
hook_f back2front_hook;
|
|
||||||
};
|
|
||||||
|
|
||||||
int proxy (
|
int proxy (
|
||||||
class socket_base_t *frontend_,
|
class socket_base_t *frontend_,
|
||||||
class socket_base_t *backend_,
|
class socket_base_t *backend_,
|
||||||
class socket_base_t *capture_ = NULL,
|
class socket_base_t *capture_,
|
||||||
class socket_base_t *control_ = NULL, // backward compatibility without this argument
|
class socket_base_t *control_ = NULL); // backward compatibility without this argument
|
||||||
proxy_hook_t *hook_ = NULL // backward compatibility without this argument
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
21
src/zmq.cpp
21
src/zmq.cpp
@ -1018,11 +1018,6 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
|
|||||||
|
|
||||||
// The proxy functionality
|
// The proxy functionality
|
||||||
|
|
||||||
// Compile time check whether proxy_hook_t fits into zmq_proxy_hook_t.
|
|
||||||
typedef char check_proxy_hook_t_size
|
|
||||||
[sizeof (zmq::proxy_hook_t) == sizeof (zmq_proxy_hook_t) ? 1 : -1];
|
|
||||||
|
|
||||||
|
|
||||||
int zmq_proxy (void *frontend_, void *backend_, void *capture_)
|
int zmq_proxy (void *frontend_, void *backend_, void *capture_)
|
||||||
{
|
{
|
||||||
if (!frontend_ || !backend_) {
|
if (!frontend_ || !backend_) {
|
||||||
@ -1048,25 +1043,11 @@ int zmq_proxy_steerable (void *frontend_, void *backend_, void *capture_, void *
|
|||||||
(zmq::socket_base_t*) control_);
|
(zmq::socket_base_t*) control_);
|
||||||
}
|
}
|
||||||
|
|
||||||
int zmq_proxy_hook (void *frontend_, void *backend_, void *capture_, void *hook_, void *control_)
|
|
||||||
{
|
|
||||||
if (!frontend_ || !backend_) {
|
|
||||||
errno = EFAULT;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
return zmq::proxy (
|
|
||||||
(zmq::socket_base_t*) frontend_,
|
|
||||||
(zmq::socket_base_t*) backend_,
|
|
||||||
(zmq::socket_base_t*) capture_,
|
|
||||||
(zmq::socket_base_t*) control_,
|
|
||||||
(zmq::proxy_hook_t*) hook_);
|
|
||||||
}
|
|
||||||
|
|
||||||
// The deprecated device functionality
|
// The deprecated device functionality
|
||||||
|
|
||||||
int zmq_device (int /* type */, void *frontend_, void *backend_)
|
int zmq_device (int /* type */, void *frontend_, void *backend_)
|
||||||
{
|
{
|
||||||
return zmq::proxy (
|
return zmq::proxy (
|
||||||
(zmq::socket_base_t*) frontend_,
|
(zmq::socket_base_t*) frontend_,
|
||||||
(zmq::socket_base_t*) backend_);
|
(zmq::socket_base_t*) backend_, NULL);
|
||||||
}
|
}
|
||||||
|
@ -41,48 +41,6 @@
|
|||||||
#define QT_CLIENTS 3
|
#define QT_CLIENTS 3
|
||||||
#define is_verbose 0
|
#define is_verbose 0
|
||||||
|
|
||||||
// Our test Hook that uppercase the message from the frontend to the backend and vice versa
|
|
||||||
struct stats_t {
|
|
||||||
void *ctx; // not usefull for the kook itself, but convenient to provide the thread with it without building an additional struct for arguments
|
|
||||||
int qt_upper_case;
|
|
||||||
int qt_lower_case;
|
|
||||||
} stats = {NULL, 0, 0};
|
|
||||||
|
|
||||||
int
|
|
||||||
upper_case(void*, void*, void*, zmq_msg_t* msg_, size_t n_, void *stats_)
|
|
||||||
{
|
|
||||||
size_t size = zmq_msg_size(msg_);
|
|
||||||
if (!size || n_ == 1) return 0; // skip identity and 0 frames
|
|
||||||
char* message = (char*) zmq_msg_data(msg_);
|
|
||||||
for (size_t i = 0; i < size; i++)
|
|
||||||
if ('a' <= message[i] && message[i] <= 'z')
|
|
||||||
message[i] += 'A' - 'a';
|
|
||||||
struct stats_t* stats = (struct stats_t*) stats_;
|
|
||||||
stats->qt_upper_case++;
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int
|
|
||||||
lower_case(void*, void*, void*, zmq_msg_t* msg_, size_t n_, void *stats_)
|
|
||||||
{
|
|
||||||
size_t size = zmq_msg_size(msg_);
|
|
||||||
if (!size || n_ == 1) return 0; // skip identity and 0 frames
|
|
||||||
char* message = (char*) zmq_msg_data(msg_);
|
|
||||||
for (size_t i = 0; i < size; i++)
|
|
||||||
if ('A' <= message[i] && message[i] <= 'Z')
|
|
||||||
message[i] += 'a' - 'A';
|
|
||||||
struct stats_t* stats = (struct stats_t*) stats_;
|
|
||||||
stats->qt_lower_case++;
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
zmq_proxy_hook_t hook = {
|
|
||||||
&stats, // data used by the hook functions if needed, NULL otherwise
|
|
||||||
upper_case, // hook for messages going from frontend to backend
|
|
||||||
lower_case // hook for messages going from backend to frontend
|
|
||||||
};
|
|
||||||
|
|
||||||
|
|
||||||
static void
|
static void
|
||||||
client_task (void *ctx)
|
client_task (void *ctx)
|
||||||
{
|
{
|
||||||
@ -128,16 +86,10 @@ client_task (void *ctx)
|
|||||||
}
|
}
|
||||||
if (items [1].revents & ZMQ_POLLIN) {
|
if (items [1].revents & ZMQ_POLLIN) {
|
||||||
rc = zmq_recv (control, content, CONTENT_SIZE_MAX, 0);
|
rc = zmq_recv (control, content, CONTENT_SIZE_MAX, 0);
|
||||||
if (rc > 0) {
|
if (is_verbose) printf("client receive - identity = %s command = %s\n", identity, content);
|
||||||
if (is_verbose) {
|
if (memcmp (content, "TERMINATE", 9) == 0) {
|
||||||
if (rc == 9 && memcmp(content, "TERMINATE", 9) == 0)
|
run = false;
|
||||||
content[9] = '\0'; // required to have a clean output since '\0' is not included in the command
|
break;
|
||||||
printf("client receive - identity = %s command = %s\n", identity, content);
|
|
||||||
}
|
|
||||||
if (memcmp (content, "STOP", 4) == 0) {
|
|
||||||
run = false;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -161,11 +113,8 @@ client_task (void *ctx)
|
|||||||
static void server_worker (void *ctx);
|
static void server_worker (void *ctx);
|
||||||
|
|
||||||
void
|
void
|
||||||
server_task (void *arg)
|
server_task (void *ctx)
|
||||||
{
|
{
|
||||||
zmq_proxy_hook_t* hook = (zmq_proxy_hook_t*) arg;
|
|
||||||
struct stats_t* stats = (struct stats_t*) hook->data;
|
|
||||||
void* ctx = stats->ctx;
|
|
||||||
// Frontend socket talks to clients over TCP
|
// Frontend socket talks to clients over TCP
|
||||||
void *frontend = zmq_socket (ctx, ZMQ_ROUTER);
|
void *frontend = zmq_socket (ctx, ZMQ_ROUTER);
|
||||||
assert (frontend);
|
assert (frontend);
|
||||||
@ -193,13 +142,7 @@ server_task (void *arg)
|
|||||||
threads[thread_nbr] = zmq_threadstart (&server_worker, ctx);
|
threads[thread_nbr] = zmq_threadstart (&server_worker, ctx);
|
||||||
|
|
||||||
// Connect backend to frontend via a proxy
|
// Connect backend to frontend via a proxy
|
||||||
if (is_verbose)
|
zmq_proxy_steerable (frontend, backend, NULL, control);
|
||||||
printf("---------- standard proxy ----------\n");
|
|
||||||
zmq_proxy_steerable (frontend, backend, NULL, control); // until TERMINATE is sent on control
|
|
||||||
// Connect backend to frontend via a hooked proxy
|
|
||||||
if (is_verbose)
|
|
||||||
printf("---------- hooked proxy ----------\n");
|
|
||||||
zmq_proxy_hook (frontend, backend, NULL, hook, control); // until TERMINATE is sent on control
|
|
||||||
|
|
||||||
for (thread_nbr = 0; thread_nbr < QT_WORKERS; thread_nbr++)
|
for (thread_nbr = 0; thread_nbr < QT_WORKERS; thread_nbr++)
|
||||||
zmq_threadclose (threads[thread_nbr]);
|
zmq_threadclose (threads[thread_nbr]);
|
||||||
@ -239,12 +182,9 @@ server_worker (void *ctx)
|
|||||||
while (run) {
|
while (run) {
|
||||||
rc = zmq_recv (control, content, CONTENT_SIZE_MAX, ZMQ_DONTWAIT); // usually, rc == -1 (no message)
|
rc = zmq_recv (control, content, CONTENT_SIZE_MAX, ZMQ_DONTWAIT); // usually, rc == -1 (no message)
|
||||||
if (rc > 0) {
|
if (rc > 0) {
|
||||||
if (is_verbose) {
|
if (is_verbose)
|
||||||
if (rc == 9 && memcmp(content, "TERMINATE", 9) == 0)
|
|
||||||
content[9] = '\0'; // required to have a clean output since '\0' is not included in the command
|
|
||||||
printf("server_worker receives command = %s\n", content);
|
printf("server_worker receives command = %s\n", content);
|
||||||
}
|
if (memcmp (content, "TERMINATE", 9) == 0)
|
||||||
if (memcmp (content, "STOP", 4) == 0)
|
|
||||||
run = false;
|
run = false;
|
||||||
}
|
}
|
||||||
// The DEALER socket gives us the reply envelope and message
|
// The DEALER socket gives us the reply envelope and message
|
||||||
@ -278,8 +218,7 @@ server_worker (void *ctx)
|
|||||||
// The main thread simply starts several clients and a server, and then
|
// The main thread simply starts several clients and a server, and then
|
||||||
// waits for the server to finish.
|
// waits for the server to finish.
|
||||||
|
|
||||||
int
|
int main (void)
|
||||||
main (void)
|
|
||||||
{
|
{
|
||||||
setup_test_environment ();
|
setup_test_environment ();
|
||||||
|
|
||||||
@ -294,19 +233,11 @@ main (void)
|
|||||||
void *threads [QT_CLIENTS + 1];
|
void *threads [QT_CLIENTS + 1];
|
||||||
for (int i = 0; i < QT_CLIENTS; i++)
|
for (int i = 0; i < QT_CLIENTS; i++)
|
||||||
threads[i] = zmq_threadstart (&client_task, ctx);
|
threads[i] = zmq_threadstart (&client_task, ctx);
|
||||||
stats.ctx = ctx;
|
threads[QT_CLIENTS] = zmq_threadstart (&server_task, ctx);
|
||||||
threads[QT_CLIENTS] = zmq_threadstart (&server_task, &hook);
|
msleep (500); // Run for 500 ms then quit
|
||||||
|
|
||||||
msleep (500); // Run for 500 ms the standard proxy
|
rc = zmq_send (control, "TERMINATE", 9, 0);
|
||||||
rc = zmq_send (control, "TERMINATE", 9, 0); // stops the standard proxy
|
|
||||||
assert (rc == 9);
|
assert (rc == 9);
|
||||||
msleep (200); // Run for 200 ms the standard proxy
|
|
||||||
rc = zmq_send (control, "TERMINATE", 9, 0); // stops the hooked proxy
|
|
||||||
assert (rc == 9);
|
|
||||||
rc = zmq_send (control, "STOP", 5, 0); // stops clients and workers (\0 is sent to ease the printf of the verbose mode)
|
|
||||||
assert (rc == 5);
|
|
||||||
|
|
||||||
if (is_verbose) printf("frontend to backend hook hits = %d\nbackend to frontend hook hits = %d\n", stats.qt_upper_case, stats.qt_lower_case);
|
|
||||||
|
|
||||||
rc = zmq_close (control);
|
rc = zmq_close (control);
|
||||||
assert (rc == 0);
|
assert (rc == 0);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user