From 9ae6a91fadb96fd48038fde04cc3d4b61b49a8a1 Mon Sep 17 00:00:00 2001 From: Laurent Alebarde Date: Wed, 22 Jan 2014 15:20:13 +0100 Subject: [PATCH] add a proxy hook --- doc/zmq_proxy_hook.txt | 201 +++++++++++++++++++++++++++++++++++++++++ include/zmq.h | 8 ++ src/proxy.cpp | 34 +++++-- src/proxy.hpp | 15 ++- src/zmq.cpp | 21 ++++- tests/test_proxy.cpp | 93 ++++++++++++++++--- 6 files changed, 349 insertions(+), 23 deletions(-) create mode 100644 doc/zmq_proxy_hook.txt diff --git a/doc/zmq_proxy_hook.txt b/doc/zmq_proxy_hook.txt new file mode 100644 index 00000000..a93a0d99 --- /dev/null +++ b/doc/zmq_proxy_hook.txt @@ -0,0 +1,201 @@ +zmq_proxy_hook(3) +================= + +NAME +---- +zmq_proxy_hook - start built-in 0MQ proxy with an hook to modify the messages +between the frontend and the backend + + +SYNOPSIS +-------- +*int zmq_proxy_hook (const void '*frontend', const void '*backend', + const void '*capture', const void '*hook', const void '*control');* + + +DESCRIPTION +----------- +The _zmq_proxy_hook()_ function starts the built-in 0MQ proxy in the +current application thread, as _zmq_proxy()_ or _zmq_proxy_steerable()_ do. +Please, refer to these functions for the general description and usage. +We describe here only the additional hook provided by the structure "hook" +passed as a fith argument. + +If the hook structure pointer is not NULL, the proxy supports a hook defined as +a structure 'zmq_proxy_hook_t' containing a data pointer to any data type and +the address of two functions of type 'zmq_hook_f'. The first function, +'front2back_hook' is to manipulate the message received from the frontend, before +it is sent to the backend. The second one, 'back2front_hook' is for the way back. + +Both functions receive as an argument in addition to a pointer to the message, the +pointer to the data passed in the 'zmq_proxy_hook_t' structure. This data makes it +possible to manage statefull behaviours in the proxy. They receive also the frame +number n_ which is 1 for the first frame, n for the nth one, 0 for the last one. This +enable to manage specifically the identity frame when ROUTER | STREAM sockets are +concerned. Moreover, to give the hook full capabilities, the three sockets passed +as parameters to the proxy are also provided to the hook functions, enabling to +consume some frames or to add others: + +---- +typedef int (*zmq_hook_f)(void *frontend, void *backend, void *capture, +zmq_msg_t* msg_, size_t n_, void *data_); +typedef struct zmq_proxy_hook_t { + void *data; + zmq_hook_f front2back_hook; + zmq_hook_f back2front_hook; +} zmq_proxy_hook_t; +---- + +If the hook pointer is NULL, zmq_proxy_hook behaves exactly as if zmq_proxy +or zmq_proxy_steerable had been called. + +Refer to linkzmq:zmq_socket[3] for a description of the available socket types. +Refer to linkzmq:zmq_proxy[3] for a description of the zmq_proxy. +Refer to linkzmq:zmq_proxy_steerable[3] for a description of the zmq_proxy_steerable. + +EXAMPLE USAGE +------------- + +Filter +------ + +The most simple use is to simply filter the messages for example against vulgarity. +Messages are simply scanned against a dictionnary and target words are replaced. + +ROUTER | STREAM / ROUTER | STREAM proxy +--------------------------------------- + +The data field enables to multiplex as desired identities in a ROUTER/ROUTER or in a +STREAM/STREAM proxy or what ever. Such architecture enables also custom load balancers. + +Sticky ROUTER / ROUTER proxy +---------------------------- + +The data field enables to manage sticky identity pairing in a ROUTER/ROUTER proxy. + +Security mechanism proxying +--------------------------- + +We expect to be able to proxy CURVE with the use of this feature. + +Tests +----- + +In an existing application, just change zmq_proxy or zmq_proxy_steerable for +zmq_proxy_hook to test anythink, even "Man in the middle" attacks ws security +mechanisms with a STREAM/STREAM proxy. + + + +RETURN VALUE +------------ +The _zmq_proxy_hook()_ function returns the same values than zmq_proxy +or zmq_proxy_steerable in the same conditions of use. + + +EXAMPLE +------- +This simple example aims at uppercasing the traffic between the frontend and the +backend, and lowercasing it on the way back. + +.Setup the hook +---- +struct stats_t { + int qt_upper_case; + int qt_lower_case; +} stats = {NULL, 0, 0}; + +int +upper_case(void*, void*, void*, zmq_msg_t* msg_, size_t n_, void *stats_) +{ + size_t size = zmq_msg_size(msg_); + if (!size || n_ == 1) return 0; // skip identity and 0 frames + char* message = (char*) zmq_msg_data(msg_); + for (size_t i = 0; i < size; i++) + if ('a' <= message[i] && message[i] <= 'z') + message[i] += 'A' - 'a'; + struct stats_t* stats = (struct stats_t*) stats_; + stats->qt_upper_case++; + return 0; +} + +int +lower_case(void*, void*, void*, zmq_msg_t* msg_, size_t n_, void *stats_) +{ + size_t size = zmq_msg_size(msg_); + if (!size || n_ == 1) return 0; // skip identity and 0 frames + char* message = (char*) zmq_msg_data(msg_); + for (size_t i = 0; i < size; i++) + if ('A' <= message[i] && message[i] <= 'Z') + message[i] += 'a' - 'A'; + struct stats_t* stats = (struct stats_t*) stats_; + stats->qt_lower_case++; + return 0; +} + +zmq_proxy_hook_t hook = { + &stats, // data used by the hook functions, passed as void* data_ + upper_case, // hook for messages going from frontend to backend + lower_case // hook for messages going from backend to frontend +}; +---- +.in main: +---- +int +main (void) +{ + setup_test_environment (); + void *context = zmq_ctx_new (); + assert (context); + // Create frontend, backend and control sockets + void *frontend = zmq_socket (context, ZMQ_ROUTER); + assert (backend); + void *backend = zmq_socket (context, ZMQ_DEALER); + assert (frontend); + void *control = zmq_socket (context, ZMQ_PUB); + assert (control); + + // Bind sockets to TCP ports + assert (zmq_bind (frontend, "tcp://*:5555") == 0); + assert (zmq_bind (backend, "tcp://*:5556") == 0); + assert (zmq_connect (control, "tcp://*:5557") == 0); + + // Start the queue proxy, which runs until ETERM or "TERMINATE" + // received on the control socket + zmq_proxy_hook (frontend, backend, NULL, &hook, control); + + printf("frontend to backend hook hits = %d\nbackend to frontend hook hits = %d\n", stats.qt_upper_case, stats.qt_lower_case); + + // close sockets and context + rc = zmq_close (control); + assert (rc == 0); + rc = zmq_close (backend); + assert (rc == 0); + rc = zmq_close (frontend); + assert (rc == 0); + rc = zmq_ctx_term (ctx); + assert (rc == 0); + return 0; +} +---- +.somewhere, the proxy is stopped with: +---- +rc = zmq_send (control, "TERMINATE", 9, 0); // stops the hooked proxy +assert (rc == 9); +---- +.cf test_proxy.cpp for a full implementation of this test, with clients and workers. + +SEE ALSO +-------- +linkzmq:zmq_proxy[3] +linkzmq:zmq_proxy_steerable[3] +linkzmq:zmq_bind[3] +linkzmq:zmq_connect[3] +linkzmq:zmq_socket[3] +linkzmq:zmq[7] + + +AUTHORS +------- +This page was written by the 0MQ community. To make a change please +read the 0MQ Contribution Policy at . diff --git a/include/zmq.h b/include/zmq.h index 524d6aa2..3bfc6bd0 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -400,6 +400,14 @@ ZMQ_EXPORT int zmq_poll (zmq_pollitem_t *items, int nitems, long timeout); ZMQ_EXPORT int zmq_proxy (void *frontend, void *backend, void *capture); ZMQ_EXPORT int zmq_proxy_steerable (void *frontend, void *backend, void *capture, void *control); +ZMQ_EXPORT int zmq_proxy_hook (void *frontend, void *backend, void *capture, void *hook, void *control); + +typedef int (*zmq_hook_f)(void *frontend, void *backend, void *capture, zmq_msg_t* msg_, size_t n_, void *data_); +typedef struct zmq_proxy_hook_t { + void *data; + zmq_hook_f front2back_hook; + zmq_hook_f back2front_hook; +} zmq_proxy_hook_t; /* Encode a binary key as printable text using ZMQ RFC 32 */ ZMQ_EXPORT char *zmq_z85_encode (char *dest, uint8_t *data, size_t size); diff --git a/src/proxy.cpp b/src/proxy.cpp index ab57d509..09788e57 100644 --- a/src/proxy.cpp +++ b/src/proxy.cpp @@ -53,7 +53,8 @@ // zmq.h must be included *after* poll.h for AIX to build properly #include "../include/zmq.h" -int capture( +int +capture( class zmq::socket_base_t *capture_, zmq::msg_t& msg_, int more_ = 0) @@ -74,15 +75,18 @@ int capture( return 0; } -int forward( +int +forward( class zmq::socket_base_t *from_, class zmq::socket_base_t *to_, class zmq::socket_base_t *capture_, - zmq::msg_t& msg_) + zmq::msg_t& msg_, + zmq::hook_f do_hook_, + void *data_) { int more; size_t moresz; - while (true) { + for (size_t n = 1;; n++) { int rc = from_->recv (&msg_, 0); if (unlikely (rc < 0)) return -1; @@ -97,6 +101,13 @@ int forward( if (unlikely (rc < 0)) return -1; + // Hook + if (do_hook_) { + rc = (*do_hook_)(from_, to_, capture_, &msg_, more ? n : 0, data_); // first message: n == 1, mth message: n == m, last message: n == 0 + if (unlikely (rc < 0)) + return -1; + } + rc = to_->send (&msg_, more? ZMQ_SNDMORE: 0); if (unlikely (rc < 0)) return -1; @@ -106,12 +117,16 @@ int forward( return 0; } -int zmq::proxy ( +int +zmq::proxy ( class socket_base_t *frontend_, class socket_base_t *backend_, class socket_base_t *capture_, - class socket_base_t *control_) + class socket_base_t *control_, + zmq::proxy_hook_t *hook_) { + static zmq::proxy_hook_t dummy_hook = {NULL, NULL, NULL}; + msg_t msg; int rc = msg.init (); if (rc != 0) @@ -172,17 +187,20 @@ int zmq::proxy ( zmq_assert (false); } } + // Check if a hook is used + if (!hook_) + hook_ = &dummy_hook; // Process a request if (state == active && items [0].revents & ZMQ_POLLIN) { - rc = forward(frontend_, backend_, capture_,msg); + rc = forward(frontend_, backend_, capture_, msg, hook_->front2back_hook, hook_->data); if (unlikely (rc < 0)) return -1; } // Process a reply if (state == active && items [1].revents & ZMQ_POLLIN) { - rc = forward(backend_, frontend_, capture_,msg); + rc = forward(backend_, frontend_, capture_, msg, hook_->back2front_hook, hook_->data); if (unlikely (rc < 0)) return -1; } diff --git a/src/proxy.hpp b/src/proxy.hpp index c055290b..0b9eef84 100644 --- a/src/proxy.hpp +++ b/src/proxy.hpp @@ -22,11 +22,22 @@ namespace zmq { + typedef int (*hook_f)(void *frontend, void *backend, void *capture, void* msg_, size_t n_, void *data_); + + struct proxy_hook_t + { + void *data; + hook_f front2back_hook; + hook_f back2front_hook; + }; + int proxy ( class socket_base_t *frontend_, class socket_base_t *backend_, - class socket_base_t *capture_, - class socket_base_t *control_ = NULL); // backward compatibility without this argument + class socket_base_t *capture_ = NULL, + class socket_base_t *control_ = NULL, // backward compatibility without this argument + proxy_hook_t *hook_ = NULL // backward compatibility without this argument + ); } #endif diff --git a/src/zmq.cpp b/src/zmq.cpp index 25da581a..4508eee1 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -1018,6 +1018,11 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) // The proxy functionality +// Compile time check whether proxy_hook_t fits into zmq_proxy_hook_t. +typedef char check_proxy_hook_t_size + [sizeof (zmq::proxy_hook_t) == sizeof (zmq_proxy_hook_t) ? 1 : -1]; + + int zmq_proxy (void *frontend_, void *backend_, void *capture_) { if (!frontend_ || !backend_) { @@ -1043,11 +1048,25 @@ int zmq_proxy_steerable (void *frontend_, void *backend_, void *capture_, void * (zmq::socket_base_t*) control_); } +int zmq_proxy_hook (void *frontend_, void *backend_, void *capture_, void *hook_, void *control_) +{ + if (!frontend_ || !backend_) { + errno = EFAULT; + return -1; + } + return zmq::proxy ( + (zmq::socket_base_t*) frontend_, + (zmq::socket_base_t*) backend_, + (zmq::socket_base_t*) capture_, + (zmq::socket_base_t*) control_, + (zmq::proxy_hook_t*) hook_); +} + // The deprecated device functionality int zmq_device (int /* type */, void *frontend_, void *backend_) { return zmq::proxy ( (zmq::socket_base_t*) frontend_, - (zmq::socket_base_t*) backend_, NULL); + (zmq::socket_base_t*) backend_); } diff --git a/tests/test_proxy.cpp b/tests/test_proxy.cpp index 83d95741..5731c0c1 100644 --- a/tests/test_proxy.cpp +++ b/tests/test_proxy.cpp @@ -41,6 +41,48 @@ #define QT_CLIENTS 3 #define is_verbose 0 +// Our test Hook that uppercase the message from the frontend to the backend and vice versa +struct stats_t { + void *ctx; // not usefull for the kook itself, but convenient to provide the thread with it without building an additional struct for arguments + int qt_upper_case; + int qt_lower_case; +} stats = {NULL, 0, 0}; + +int +upper_case(void*, void*, void*, zmq_msg_t* msg_, size_t n_, void *stats_) +{ + size_t size = zmq_msg_size(msg_); + if (!size || n_ == 1) return 0; // skip identity and 0 frames + char* message = (char*) zmq_msg_data(msg_); + for (size_t i = 0; i < size; i++) + if ('a' <= message[i] && message[i] <= 'z') + message[i] += 'A' - 'a'; + struct stats_t* stats = (struct stats_t*) stats_; + stats->qt_upper_case++; + return 0; +} + +int +lower_case(void*, void*, void*, zmq_msg_t* msg_, size_t n_, void *stats_) +{ + size_t size = zmq_msg_size(msg_); + if (!size || n_ == 1) return 0; // skip identity and 0 frames + char* message = (char*) zmq_msg_data(msg_); + for (size_t i = 0; i < size; i++) + if ('A' <= message[i] && message[i] <= 'Z') + message[i] += 'a' - 'A'; + struct stats_t* stats = (struct stats_t*) stats_; + stats->qt_lower_case++; + return 0; +} + +zmq_proxy_hook_t hook = { + &stats, // data used by the hook functions if needed, NULL otherwise + upper_case, // hook for messages going from frontend to backend + lower_case // hook for messages going from backend to frontend +}; + + static void client_task (void *ctx) { @@ -86,10 +128,16 @@ client_task (void *ctx) } if (items [1].revents & ZMQ_POLLIN) { rc = zmq_recv (control, content, CONTENT_SIZE_MAX, 0); - if (is_verbose) printf("client receive - identity = %s command = %s\n", identity, content); - if (memcmp (content, "TERMINATE", 9) == 0) { - run = false; - break; + if (rc > 0) { + if (is_verbose) { + if (rc == 9 && memcmp(content, "TERMINATE", 9) == 0) + content[9] = '\0'; // required to have a clean output since '\0' is not included in the command + printf("client receive - identity = %s command = %s\n", identity, content); + } + if (memcmp (content, "STOP", 4) == 0) { + run = false; + break; + } } } } @@ -113,8 +161,11 @@ client_task (void *ctx) static void server_worker (void *ctx); void -server_task (void *ctx) +server_task (void *arg) { + zmq_proxy_hook_t* hook = (zmq_proxy_hook_t*) arg; + struct stats_t* stats = (struct stats_t*) hook->data; + void* ctx = stats->ctx; // Frontend socket talks to clients over TCP void *frontend = zmq_socket (ctx, ZMQ_ROUTER); assert (frontend); @@ -142,7 +193,13 @@ server_task (void *ctx) threads[thread_nbr] = zmq_threadstart (&server_worker, ctx); // Connect backend to frontend via a proxy - zmq_proxy_steerable (frontend, backend, NULL, control); + if (is_verbose) + printf("---------- standard proxy ----------\n"); + zmq_proxy_steerable (frontend, backend, NULL, control); // until TERMINATE is sent on control + // Connect backend to frontend via a hooked proxy + if (is_verbose) + printf("---------- hooked proxy ----------\n"); + zmq_proxy_hook (frontend, backend, NULL, hook, control); // until TERMINATE is sent on control for (thread_nbr = 0; thread_nbr < QT_WORKERS; thread_nbr++) zmq_threadclose (threads[thread_nbr]); @@ -182,9 +239,12 @@ server_worker (void *ctx) while (run) { rc = zmq_recv (control, content, CONTENT_SIZE_MAX, ZMQ_DONTWAIT); // usually, rc == -1 (no message) if (rc > 0) { - if (is_verbose) + if (is_verbose) { + if (rc == 9 && memcmp(content, "TERMINATE", 9) == 0) + content[9] = '\0'; // required to have a clean output since '\0' is not included in the command printf("server_worker receives command = %s\n", content); - if (memcmp (content, "TERMINATE", 9) == 0) + } + if (memcmp (content, "STOP", 4) == 0) run = false; } // The DEALER socket gives us the reply envelope and message @@ -218,7 +278,8 @@ server_worker (void *ctx) // The main thread simply starts several clients and a server, and then // waits for the server to finish. -int main (void) +int +main (void) { setup_test_environment (); @@ -233,11 +294,19 @@ int main (void) void *threads [QT_CLIENTS + 1]; for (int i = 0; i < QT_CLIENTS; i++) threads[i] = zmq_threadstart (&client_task, ctx); - threads[QT_CLIENTS] = zmq_threadstart (&server_task, ctx); - msleep (500); // Run for 500 ms then quit + stats.ctx = ctx; + threads[QT_CLIENTS] = zmq_threadstart (&server_task, &hook); - rc = zmq_send (control, "TERMINATE", 9, 0); + msleep (500); // Run for 500 ms the standard proxy + rc = zmq_send (control, "TERMINATE", 9, 0); // stops the standard proxy assert (rc == 9); + msleep (200); // Run for 200 ms the standard proxy + rc = zmq_send (control, "TERMINATE", 9, 0); // stops the hooked proxy + assert (rc == 9); + rc = zmq_send (control, "STOP", 5, 0); // stops clients and workers (\0 is sent to ease the printf of the verbose mode) + assert (rc == 5); + + if (is_verbose) printf("frontend to backend hook hits = %d\nbackend to frontend hook hits = %d\n", stats.qt_upper_case, stats.qt_lower_case); rc = zmq_close (control); assert (rc == 0);