mirror of
https://github.com/zeromq/libzmq.git
synced 2024-12-27 23:51:04 +08:00
add a proxy hook
This commit is contained in:
parent
a7065519df
commit
9ae6a91fad
201
doc/zmq_proxy_hook.txt
Normal file
201
doc/zmq_proxy_hook.txt
Normal file
@ -0,0 +1,201 @@
|
||||
zmq_proxy_hook(3)
|
||||
=================
|
||||
|
||||
NAME
|
||||
----
|
||||
zmq_proxy_hook - start built-in 0MQ proxy with an hook to modify the messages
|
||||
between the frontend and the backend
|
||||
|
||||
|
||||
SYNOPSIS
|
||||
--------
|
||||
*int zmq_proxy_hook (const void '*frontend', const void '*backend',
|
||||
const void '*capture', const void '*hook', const void '*control');*
|
||||
|
||||
|
||||
DESCRIPTION
|
||||
-----------
|
||||
The _zmq_proxy_hook()_ function starts the built-in 0MQ proxy in the
|
||||
current application thread, as _zmq_proxy()_ or _zmq_proxy_steerable()_ do.
|
||||
Please, refer to these functions for the general description and usage.
|
||||
We describe here only the additional hook provided by the structure "hook"
|
||||
passed as a fith argument.
|
||||
|
||||
If the hook structure pointer is not NULL, the proxy supports a hook defined as
|
||||
a structure 'zmq_proxy_hook_t' containing a data pointer to any data type and
|
||||
the address of two functions of type 'zmq_hook_f'. The first function,
|
||||
'front2back_hook' is to manipulate the message received from the frontend, before
|
||||
it is sent to the backend. The second one, 'back2front_hook' is for the way back.
|
||||
|
||||
Both functions receive as an argument in addition to a pointer to the message, the
|
||||
pointer to the data passed in the 'zmq_proxy_hook_t' structure. This data makes it
|
||||
possible to manage statefull behaviours in the proxy. They receive also the frame
|
||||
number n_ which is 1 for the first frame, n for the nth one, 0 for the last one. This
|
||||
enable to manage specifically the identity frame when ROUTER | STREAM sockets are
|
||||
concerned. Moreover, to give the hook full capabilities, the three sockets passed
|
||||
as parameters to the proxy are also provided to the hook functions, enabling to
|
||||
consume some frames or to add others:
|
||||
|
||||
----
|
||||
typedef int (*zmq_hook_f)(void *frontend, void *backend, void *capture,
|
||||
zmq_msg_t* msg_, size_t n_, void *data_);
|
||||
typedef struct zmq_proxy_hook_t {
|
||||
void *data;
|
||||
zmq_hook_f front2back_hook;
|
||||
zmq_hook_f back2front_hook;
|
||||
} zmq_proxy_hook_t;
|
||||
----
|
||||
|
||||
If the hook pointer is NULL, zmq_proxy_hook behaves exactly as if zmq_proxy
|
||||
or zmq_proxy_steerable had been called.
|
||||
|
||||
Refer to linkzmq:zmq_socket[3] for a description of the available socket types.
|
||||
Refer to linkzmq:zmq_proxy[3] for a description of the zmq_proxy.
|
||||
Refer to linkzmq:zmq_proxy_steerable[3] for a description of the zmq_proxy_steerable.
|
||||
|
||||
EXAMPLE USAGE
|
||||
-------------
|
||||
|
||||
Filter
|
||||
------
|
||||
|
||||
The most simple use is to simply filter the messages for example against vulgarity.
|
||||
Messages are simply scanned against a dictionnary and target words are replaced.
|
||||
|
||||
ROUTER | STREAM / ROUTER | STREAM proxy
|
||||
---------------------------------------
|
||||
|
||||
The data field enables to multiplex as desired identities in a ROUTER/ROUTER or in a
|
||||
STREAM/STREAM proxy or what ever. Such architecture enables also custom load balancers.
|
||||
|
||||
Sticky ROUTER / ROUTER proxy
|
||||
----------------------------
|
||||
|
||||
The data field enables to manage sticky identity pairing in a ROUTER/ROUTER proxy.
|
||||
|
||||
Security mechanism proxying
|
||||
---------------------------
|
||||
|
||||
We expect to be able to proxy CURVE with the use of this feature.
|
||||
|
||||
Tests
|
||||
-----
|
||||
|
||||
In an existing application, just change zmq_proxy or zmq_proxy_steerable for
|
||||
zmq_proxy_hook to test anythink, even "Man in the middle" attacks ws security
|
||||
mechanisms with a STREAM/STREAM proxy.
|
||||
|
||||
|
||||
|
||||
RETURN VALUE
|
||||
------------
|
||||
The _zmq_proxy_hook()_ function returns the same values than zmq_proxy
|
||||
or zmq_proxy_steerable in the same conditions of use.
|
||||
|
||||
|
||||
EXAMPLE
|
||||
-------
|
||||
This simple example aims at uppercasing the traffic between the frontend and the
|
||||
backend, and lowercasing it on the way back.
|
||||
|
||||
.Setup the hook
|
||||
----
|
||||
struct stats_t {
|
||||
int qt_upper_case;
|
||||
int qt_lower_case;
|
||||
} stats = {NULL, 0, 0};
|
||||
|
||||
int
|
||||
upper_case(void*, void*, void*, zmq_msg_t* msg_, size_t n_, void *stats_)
|
||||
{
|
||||
size_t size = zmq_msg_size(msg_);
|
||||
if (!size || n_ == 1) return 0; // skip identity and 0 frames
|
||||
char* message = (char*) zmq_msg_data(msg_);
|
||||
for (size_t i = 0; i < size; i++)
|
||||
if ('a' <= message[i] && message[i] <= 'z')
|
||||
message[i] += 'A' - 'a';
|
||||
struct stats_t* stats = (struct stats_t*) stats_;
|
||||
stats->qt_upper_case++;
|
||||
return 0;
|
||||
}
|
||||
|
||||
int
|
||||
lower_case(void*, void*, void*, zmq_msg_t* msg_, size_t n_, void *stats_)
|
||||
{
|
||||
size_t size = zmq_msg_size(msg_);
|
||||
if (!size || n_ == 1) return 0; // skip identity and 0 frames
|
||||
char* message = (char*) zmq_msg_data(msg_);
|
||||
for (size_t i = 0; i < size; i++)
|
||||
if ('A' <= message[i] && message[i] <= 'Z')
|
||||
message[i] += 'a' - 'A';
|
||||
struct stats_t* stats = (struct stats_t*) stats_;
|
||||
stats->qt_lower_case++;
|
||||
return 0;
|
||||
}
|
||||
|
||||
zmq_proxy_hook_t hook = {
|
||||
&stats, // data used by the hook functions, passed as void* data_
|
||||
upper_case, // hook for messages going from frontend to backend
|
||||
lower_case // hook for messages going from backend to frontend
|
||||
};
|
||||
----
|
||||
.in main:
|
||||
----
|
||||
int
|
||||
main (void)
|
||||
{
|
||||
setup_test_environment ();
|
||||
void *context = zmq_ctx_new ();
|
||||
assert (context);
|
||||
// Create frontend, backend and control sockets
|
||||
void *frontend = zmq_socket (context, ZMQ_ROUTER);
|
||||
assert (backend);
|
||||
void *backend = zmq_socket (context, ZMQ_DEALER);
|
||||
assert (frontend);
|
||||
void *control = zmq_socket (context, ZMQ_PUB);
|
||||
assert (control);
|
||||
|
||||
// Bind sockets to TCP ports
|
||||
assert (zmq_bind (frontend, "tcp://*:5555") == 0);
|
||||
assert (zmq_bind (backend, "tcp://*:5556") == 0);
|
||||
assert (zmq_connect (control, "tcp://*:5557") == 0);
|
||||
|
||||
// Start the queue proxy, which runs until ETERM or "TERMINATE"
|
||||
// received on the control socket
|
||||
zmq_proxy_hook (frontend, backend, NULL, &hook, control);
|
||||
|
||||
printf("frontend to backend hook hits = %d\nbackend to frontend hook hits = %d\n", stats.qt_upper_case, stats.qt_lower_case);
|
||||
|
||||
// close sockets and context
|
||||
rc = zmq_close (control);
|
||||
assert (rc == 0);
|
||||
rc = zmq_close (backend);
|
||||
assert (rc == 0);
|
||||
rc = zmq_close (frontend);
|
||||
assert (rc == 0);
|
||||
rc = zmq_ctx_term (ctx);
|
||||
assert (rc == 0);
|
||||
return 0;
|
||||
}
|
||||
----
|
||||
.somewhere, the proxy is stopped with:
|
||||
----
|
||||
rc = zmq_send (control, "TERMINATE", 9, 0); // stops the hooked proxy
|
||||
assert (rc == 9);
|
||||
----
|
||||
.cf test_proxy.cpp for a full implementation of this test, with clients and workers.
|
||||
|
||||
SEE ALSO
|
||||
--------
|
||||
linkzmq:zmq_proxy[3]
|
||||
linkzmq:zmq_proxy_steerable[3]
|
||||
linkzmq:zmq_bind[3]
|
||||
linkzmq:zmq_connect[3]
|
||||
linkzmq:zmq_socket[3]
|
||||
linkzmq:zmq[7]
|
||||
|
||||
|
||||
AUTHORS
|
||||
-------
|
||||
This page was written by the 0MQ community. To make a change please
|
||||
read the 0MQ Contribution Policy at <http://www.zeromq.org/docs:contributing>.
|
@ -400,6 +400,14 @@ ZMQ_EXPORT int zmq_poll (zmq_pollitem_t *items, int nitems, long timeout);
|
||||
|
||||
ZMQ_EXPORT int zmq_proxy (void *frontend, void *backend, void *capture);
|
||||
ZMQ_EXPORT int zmq_proxy_steerable (void *frontend, void *backend, void *capture, void *control);
|
||||
ZMQ_EXPORT int zmq_proxy_hook (void *frontend, void *backend, void *capture, void *hook, void *control);
|
||||
|
||||
typedef int (*zmq_hook_f)(void *frontend, void *backend, void *capture, zmq_msg_t* msg_, size_t n_, void *data_);
|
||||
typedef struct zmq_proxy_hook_t {
|
||||
void *data;
|
||||
zmq_hook_f front2back_hook;
|
||||
zmq_hook_f back2front_hook;
|
||||
} zmq_proxy_hook_t;
|
||||
|
||||
/* Encode a binary key as printable text using ZMQ RFC 32 */
|
||||
ZMQ_EXPORT char *zmq_z85_encode (char *dest, uint8_t *data, size_t size);
|
||||
|
@ -53,7 +53,8 @@
|
||||
// zmq.h must be included *after* poll.h for AIX to build properly
|
||||
#include "../include/zmq.h"
|
||||
|
||||
int capture(
|
||||
int
|
||||
capture(
|
||||
class zmq::socket_base_t *capture_,
|
||||
zmq::msg_t& msg_,
|
||||
int more_ = 0)
|
||||
@ -74,15 +75,18 @@ int capture(
|
||||
return 0;
|
||||
}
|
||||
|
||||
int forward(
|
||||
int
|
||||
forward(
|
||||
class zmq::socket_base_t *from_,
|
||||
class zmq::socket_base_t *to_,
|
||||
class zmq::socket_base_t *capture_,
|
||||
zmq::msg_t& msg_)
|
||||
zmq::msg_t& msg_,
|
||||
zmq::hook_f do_hook_,
|
||||
void *data_)
|
||||
{
|
||||
int more;
|
||||
size_t moresz;
|
||||
while (true) {
|
||||
for (size_t n = 1;; n++) {
|
||||
int rc = from_->recv (&msg_, 0);
|
||||
if (unlikely (rc < 0))
|
||||
return -1;
|
||||
@ -97,6 +101,13 @@ int forward(
|
||||
if (unlikely (rc < 0))
|
||||
return -1;
|
||||
|
||||
// Hook
|
||||
if (do_hook_) {
|
||||
rc = (*do_hook_)(from_, to_, capture_, &msg_, more ? n : 0, data_); // first message: n == 1, mth message: n == m, last message: n == 0
|
||||
if (unlikely (rc < 0))
|
||||
return -1;
|
||||
}
|
||||
|
||||
rc = to_->send (&msg_, more? ZMQ_SNDMORE: 0);
|
||||
if (unlikely (rc < 0))
|
||||
return -1;
|
||||
@ -106,12 +117,16 @@ int forward(
|
||||
return 0;
|
||||
}
|
||||
|
||||
int zmq::proxy (
|
||||
int
|
||||
zmq::proxy (
|
||||
class socket_base_t *frontend_,
|
||||
class socket_base_t *backend_,
|
||||
class socket_base_t *capture_,
|
||||
class socket_base_t *control_)
|
||||
class socket_base_t *control_,
|
||||
zmq::proxy_hook_t *hook_)
|
||||
{
|
||||
static zmq::proxy_hook_t dummy_hook = {NULL, NULL, NULL};
|
||||
|
||||
msg_t msg;
|
||||
int rc = msg.init ();
|
||||
if (rc != 0)
|
||||
@ -172,17 +187,20 @@ int zmq::proxy (
|
||||
zmq_assert (false);
|
||||
}
|
||||
}
|
||||
// Check if a hook is used
|
||||
if (!hook_)
|
||||
hook_ = &dummy_hook;
|
||||
// Process a request
|
||||
if (state == active
|
||||
&& items [0].revents & ZMQ_POLLIN) {
|
||||
rc = forward(frontend_, backend_, capture_,msg);
|
||||
rc = forward(frontend_, backend_, capture_, msg, hook_->front2back_hook, hook_->data);
|
||||
if (unlikely (rc < 0))
|
||||
return -1;
|
||||
}
|
||||
// Process a reply
|
||||
if (state == active
|
||||
&& items [1].revents & ZMQ_POLLIN) {
|
||||
rc = forward(backend_, frontend_, capture_,msg);
|
||||
rc = forward(backend_, frontend_, capture_, msg, hook_->back2front_hook, hook_->data);
|
||||
if (unlikely (rc < 0))
|
||||
return -1;
|
||||
}
|
||||
|
@ -22,11 +22,22 @@
|
||||
|
||||
namespace zmq
|
||||
{
|
||||
typedef int (*hook_f)(void *frontend, void *backend, void *capture, void* msg_, size_t n_, void *data_);
|
||||
|
||||
struct proxy_hook_t
|
||||
{
|
||||
void *data;
|
||||
hook_f front2back_hook;
|
||||
hook_f back2front_hook;
|
||||
};
|
||||
|
||||
int proxy (
|
||||
class socket_base_t *frontend_,
|
||||
class socket_base_t *backend_,
|
||||
class socket_base_t *capture_,
|
||||
class socket_base_t *control_ = NULL); // backward compatibility without this argument
|
||||
class socket_base_t *capture_ = NULL,
|
||||
class socket_base_t *control_ = NULL, // backward compatibility without this argument
|
||||
proxy_hook_t *hook_ = NULL // backward compatibility without this argument
|
||||
);
|
||||
}
|
||||
|
||||
#endif
|
||||
|
21
src/zmq.cpp
21
src/zmq.cpp
@ -1018,6 +1018,11 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
|
||||
|
||||
// The proxy functionality
|
||||
|
||||
// Compile time check whether proxy_hook_t fits into zmq_proxy_hook_t.
|
||||
typedef char check_proxy_hook_t_size
|
||||
[sizeof (zmq::proxy_hook_t) == sizeof (zmq_proxy_hook_t) ? 1 : -1];
|
||||
|
||||
|
||||
int zmq_proxy (void *frontend_, void *backend_, void *capture_)
|
||||
{
|
||||
if (!frontend_ || !backend_) {
|
||||
@ -1043,11 +1048,25 @@ int zmq_proxy_steerable (void *frontend_, void *backend_, void *capture_, void *
|
||||
(zmq::socket_base_t*) control_);
|
||||
}
|
||||
|
||||
int zmq_proxy_hook (void *frontend_, void *backend_, void *capture_, void *hook_, void *control_)
|
||||
{
|
||||
if (!frontend_ || !backend_) {
|
||||
errno = EFAULT;
|
||||
return -1;
|
||||
}
|
||||
return zmq::proxy (
|
||||
(zmq::socket_base_t*) frontend_,
|
||||
(zmq::socket_base_t*) backend_,
|
||||
(zmq::socket_base_t*) capture_,
|
||||
(zmq::socket_base_t*) control_,
|
||||
(zmq::proxy_hook_t*) hook_);
|
||||
}
|
||||
|
||||
// The deprecated device functionality
|
||||
|
||||
int zmq_device (int /* type */, void *frontend_, void *backend_)
|
||||
{
|
||||
return zmq::proxy (
|
||||
(zmq::socket_base_t*) frontend_,
|
||||
(zmq::socket_base_t*) backend_, NULL);
|
||||
(zmq::socket_base_t*) backend_);
|
||||
}
|
||||
|
@ -41,6 +41,48 @@
|
||||
#define QT_CLIENTS 3
|
||||
#define is_verbose 0
|
||||
|
||||
// Our test Hook that uppercase the message from the frontend to the backend and vice versa
|
||||
struct stats_t {
|
||||
void *ctx; // not usefull for the kook itself, but convenient to provide the thread with it without building an additional struct for arguments
|
||||
int qt_upper_case;
|
||||
int qt_lower_case;
|
||||
} stats = {NULL, 0, 0};
|
||||
|
||||
int
|
||||
upper_case(void*, void*, void*, zmq_msg_t* msg_, size_t n_, void *stats_)
|
||||
{
|
||||
size_t size = zmq_msg_size(msg_);
|
||||
if (!size || n_ == 1) return 0; // skip identity and 0 frames
|
||||
char* message = (char*) zmq_msg_data(msg_);
|
||||
for (size_t i = 0; i < size; i++)
|
||||
if ('a' <= message[i] && message[i] <= 'z')
|
||||
message[i] += 'A' - 'a';
|
||||
struct stats_t* stats = (struct stats_t*) stats_;
|
||||
stats->qt_upper_case++;
|
||||
return 0;
|
||||
}
|
||||
|
||||
int
|
||||
lower_case(void*, void*, void*, zmq_msg_t* msg_, size_t n_, void *stats_)
|
||||
{
|
||||
size_t size = zmq_msg_size(msg_);
|
||||
if (!size || n_ == 1) return 0; // skip identity and 0 frames
|
||||
char* message = (char*) zmq_msg_data(msg_);
|
||||
for (size_t i = 0; i < size; i++)
|
||||
if ('A' <= message[i] && message[i] <= 'Z')
|
||||
message[i] += 'a' - 'A';
|
||||
struct stats_t* stats = (struct stats_t*) stats_;
|
||||
stats->qt_lower_case++;
|
||||
return 0;
|
||||
}
|
||||
|
||||
zmq_proxy_hook_t hook = {
|
||||
&stats, // data used by the hook functions if needed, NULL otherwise
|
||||
upper_case, // hook for messages going from frontend to backend
|
||||
lower_case // hook for messages going from backend to frontend
|
||||
};
|
||||
|
||||
|
||||
static void
|
||||
client_task (void *ctx)
|
||||
{
|
||||
@ -86,10 +128,16 @@ client_task (void *ctx)
|
||||
}
|
||||
if (items [1].revents & ZMQ_POLLIN) {
|
||||
rc = zmq_recv (control, content, CONTENT_SIZE_MAX, 0);
|
||||
if (is_verbose) printf("client receive - identity = %s command = %s\n", identity, content);
|
||||
if (memcmp (content, "TERMINATE", 9) == 0) {
|
||||
run = false;
|
||||
break;
|
||||
if (rc > 0) {
|
||||
if (is_verbose) {
|
||||
if (rc == 9 && memcmp(content, "TERMINATE", 9) == 0)
|
||||
content[9] = '\0'; // required to have a clean output since '\0' is not included in the command
|
||||
printf("client receive - identity = %s command = %s\n", identity, content);
|
||||
}
|
||||
if (memcmp (content, "STOP", 4) == 0) {
|
||||
run = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -113,8 +161,11 @@ client_task (void *ctx)
|
||||
static void server_worker (void *ctx);
|
||||
|
||||
void
|
||||
server_task (void *ctx)
|
||||
server_task (void *arg)
|
||||
{
|
||||
zmq_proxy_hook_t* hook = (zmq_proxy_hook_t*) arg;
|
||||
struct stats_t* stats = (struct stats_t*) hook->data;
|
||||
void* ctx = stats->ctx;
|
||||
// Frontend socket talks to clients over TCP
|
||||
void *frontend = zmq_socket (ctx, ZMQ_ROUTER);
|
||||
assert (frontend);
|
||||
@ -142,7 +193,13 @@ server_task (void *ctx)
|
||||
threads[thread_nbr] = zmq_threadstart (&server_worker, ctx);
|
||||
|
||||
// Connect backend to frontend via a proxy
|
||||
zmq_proxy_steerable (frontend, backend, NULL, control);
|
||||
if (is_verbose)
|
||||
printf("---------- standard proxy ----------\n");
|
||||
zmq_proxy_steerable (frontend, backend, NULL, control); // until TERMINATE is sent on control
|
||||
// Connect backend to frontend via a hooked proxy
|
||||
if (is_verbose)
|
||||
printf("---------- hooked proxy ----------\n");
|
||||
zmq_proxy_hook (frontend, backend, NULL, hook, control); // until TERMINATE is sent on control
|
||||
|
||||
for (thread_nbr = 0; thread_nbr < QT_WORKERS; thread_nbr++)
|
||||
zmq_threadclose (threads[thread_nbr]);
|
||||
@ -182,9 +239,12 @@ server_worker (void *ctx)
|
||||
while (run) {
|
||||
rc = zmq_recv (control, content, CONTENT_SIZE_MAX, ZMQ_DONTWAIT); // usually, rc == -1 (no message)
|
||||
if (rc > 0) {
|
||||
if (is_verbose)
|
||||
if (is_verbose) {
|
||||
if (rc == 9 && memcmp(content, "TERMINATE", 9) == 0)
|
||||
content[9] = '\0'; // required to have a clean output since '\0' is not included in the command
|
||||
printf("server_worker receives command = %s\n", content);
|
||||
if (memcmp (content, "TERMINATE", 9) == 0)
|
||||
}
|
||||
if (memcmp (content, "STOP", 4) == 0)
|
||||
run = false;
|
||||
}
|
||||
// The DEALER socket gives us the reply envelope and message
|
||||
@ -218,7 +278,8 @@ server_worker (void *ctx)
|
||||
// The main thread simply starts several clients and a server, and then
|
||||
// waits for the server to finish.
|
||||
|
||||
int main (void)
|
||||
int
|
||||
main (void)
|
||||
{
|
||||
setup_test_environment ();
|
||||
|
||||
@ -233,11 +294,19 @@ int main (void)
|
||||
void *threads [QT_CLIENTS + 1];
|
||||
for (int i = 0; i < QT_CLIENTS; i++)
|
||||
threads[i] = zmq_threadstart (&client_task, ctx);
|
||||
threads[QT_CLIENTS] = zmq_threadstart (&server_task, ctx);
|
||||
msleep (500); // Run for 500 ms then quit
|
||||
stats.ctx = ctx;
|
||||
threads[QT_CLIENTS] = zmq_threadstart (&server_task, &hook);
|
||||
|
||||
rc = zmq_send (control, "TERMINATE", 9, 0);
|
||||
msleep (500); // Run for 500 ms the standard proxy
|
||||
rc = zmq_send (control, "TERMINATE", 9, 0); // stops the standard proxy
|
||||
assert (rc == 9);
|
||||
msleep (200); // Run for 200 ms the standard proxy
|
||||
rc = zmq_send (control, "TERMINATE", 9, 0); // stops the hooked proxy
|
||||
assert (rc == 9);
|
||||
rc = zmq_send (control, "STOP", 5, 0); // stops clients and workers (\0 is sent to ease the printf of the verbose mode)
|
||||
assert (rc == 5);
|
||||
|
||||
if (is_verbose) printf("frontend to backend hook hits = %d\nbackend to frontend hook hits = %d\n", stats.qt_upper_case, stats.qt_lower_case);
|
||||
|
||||
rc = zmq_close (control);
|
||||
assert (rc == 0);
|
||||
|
Loading…
x
Reference in New Issue
Block a user