0
0
mirror of https://github.com/zeromq/libzmq.git synced 2025-01-14 01:37:56 +08:00

Problem: zmq_socket_monitor code is dirty

Specifically:

* zmq_event_t should not be used internally in libzmq, it was
  meant to be an outward facing structure.

* In 4.x, zmq_event_t does not correspond to monitor events, so
  I removed the structure entirely.

* man page for zmq_socket_monitor is incomplete and the example
  code was particularly nasty.

* test_monitor.cpp needed rewriting, it was not clean.
This commit is contained in:
Pieter Hintjens 2014-04-28 11:30:04 +02:00
parent 97935c582e
commit 9753de8566
6 changed files with 282 additions and 510 deletions

View File

@ -1,115 +1,100 @@
zmq_ctx_socket_monitor(3)
=========================
zmq_socket_monitor(3)
=====================
NAME
----
zmq_socket_monitor - register a monitoring callback
zmq_socket_monitor - monitor socket events
SYNOPSIS
--------
*int zmq_socket_monitor (void '*socket', char * '*addr', int 'events');*
*int zmq_socket_monitor (void '*socket', char '*endpoint', int 'events');*
DESCRIPTION
-----------
The _zmq_socket_monitor()_ function shall spawn a 'PAIR' socket that publishes
socket state changes (events) over the inproc:// transport to a given endpoint.
The _zmq_socket_monitor()_ method lets an application thread track
socket events (like connects) on a ZeroMQ socket. Each call to this
method creates a 'ZMQ_PAIR' socket and binds that to the specified
inproc:// 'endpoint'. To collect the socket events, you must create
your own 'ZMQ_PAIR' socket, and connect that to the endpoint.
Messages consist of 2 Frames, the first containing the event-id and the
associated value. The second frame holds the affected endpoint as string.
The 'events' argument is a bitmask of the socket events you wish to
monitor, see 'Supported events' below. To monitor all events, use the
event value ZMQ_EVENT_ALL.
The layout of the first Frame is:
16 bit event id
32 bit event value
event id and value are in the native byte order (for the machine the
application is running on). There is no padding between the fields.
The event value has to be interpreted in the context of the event id.
See 'Supported events' below for details.
Each event is sent as two frames. The first frame contains an event
number (16 bits), and an event value (32 bits) that provides additional
data according to the event number. The second frame contains a string
that specifies the affected TCP or IPC endpoint.
----
Only connection oriented (tcp and ipc) transports are supported in this initial
implementation.
The _zmq_socket_monitor()_ method supports only connection-oriented
transports, that is, TCP, IPC, and TIPC.
----
Supported events
----------------
ZMQ_EVENT_CONNECTED: connection established
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_EVENT_CONNECTED' event triggers when a connection has been established
to a remote peer. This can happen either synchronous or asynchronous.
Value is the FD of the newly connected socket.
ZMQ_EVENT_CONNECTED
~~~~~~~~~~~~~~~~~~~
The socket has successfully connected to a remote peer. The event value
is the file descriptor (FD) of the underlying network socket. Warning:
there is no guarantee that the FD is still valid by the time your code
receives this event.
ZMQ_EVENT_CONNECT_DELAYED
~~~~~~~~~~~~~~~~~~~~~~~~~
A connect request on the socket is pending. The event value is unspecified.
ZMQ_EVENT_CONNECT_DELAYED: synchronous connect failed, it's being polled
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_EVENT_CONNECT_DELAYED' event triggers when an immediate connection
attempt is delayed and its completion is being polled for.
Value has no meaning.
ZMQ_EVENT_CONNECT_RETRIED
~~~~~~~~~~~~~~~~~~~~~~~~~
A connect request failed, and is now being retried. The event value is the
reconnect interval in milliseconds. Note that the reconnect interval is
recalculated at each retry.
ZMQ_EVENT_LISTENING
~~~~~~~~~~~~~~~~~~~
The socket was successfully bound to a network interface. The event value
is the FD of the underlying network socket. Warning: there is no guarantee
that the FD is still valid by the time your code receives this event.
ZMQ_EVENT_CONNECT_RETRIED: asynchronous connect / reconnection attempt
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_EVENT_CONNECT_RETRIED' event triggers when a connection attempt
is being handled by reconnect timer. The reconnect interval's recomputed
for each attempt.
Value is the reconnect interval.
ZMQ_EVENT_BIND_FAILED
~~~~~~~~~~~~~~~~~~~~~
The socket could not bind to a given interface. The event value is the
errno generated by the system bind call.
ZMQ_EVENT_ACCEPTED
~~~~~~~~~~~~~~~~~~
The socket has accepted a connection from a remote peer. The event value is
the FD of the underlying network socket. Warning: there is no guarantee that
the FD is still valid by the time your code receives this event.
ZMQ_EVENT_LISTENING: socket bound to an address, ready to accept connections
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_EVENT_LISTENING' event triggers when a socket's successfully bound
to a an interface.
Value is the FD of the newly bound socket.
ZMQ_EVENT_ACCEPT_FAILED
~~~~~~~~~~~~~~~~~~~~~~~
The socket has rejected a connection from a remote peer. The event value is
the errno generated by the accept call.
ZMQ_EVENT_CLOSED
~~~~~~~~~~~~~~~~
The socket was closed. The event value is the FD of the (now closed) network
socket.
ZMQ_EVENT_BIND_FAILED: socket could not bind to an address
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_EVENT_BIND_FAILED' event triggers when a socket could not bind to
a given interface.
Value is the errno generated by the bind call.
ZMQ_EVENT_CLOSE_FAILED
~~~~~~~~~~~~~~~~~~~~~~
The socket close failed. The event value is the errno returned by the system
call. Note that this event occurs only on IPC transports.
ZMQ_EVENT_DISCONNECTED
~~~~~~~~~~~~~~~~~~~~~~
The socket was disconnected unexpectedly. The event value is the FD of the
underlying network socket. Warning: this socket will be closed.
ZMQ_EVENT_ACCEPTED: connection accepted to bound interface
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_EVENT_ACCEPTED' event triggers when a connection from a remote peer
has been established with a socket's listen address.
Value is the FD of the accepted socket.
ZMQ_EVENT_ACCEPT_FAILED: could not accept client connection
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_EVENT_ACCEPT_FAILED' event triggers when a connection attempt to
a socket's bound address fails.
Value is the errno generated by accept.
ZMQ_EVENT_CLOSED: connection closed
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_EVENT_CLOSED' event triggers when a connection's underlying descriptor
has been closed.
Value is the former FD of the for the closed socket. FD has been closed already!
ZMQ_EVENT_CLOSE_FAILED: connection couldn't be closed
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_EVENT_CLOSE_FAILED' event triggers when a descriptor could not be
released back to the OS. Implementation note: ONLY FOR IPC SOCKETS.
Value is the errno generated by unlink.
ZMQ_EVENT_DISCONNECTED: broken session
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_EVENT_DISCONNECTED' event triggers when the stream engine (tcp and ipc
specific) detects a corrupted / broken session.
Value is the FD of the socket.
ZMQ_EVENT_MONITOR_STOPPED
~~~~~~~~~~~~~~~~~~~~~~~~~
Monitoring on this socket ended.
RETURN VALUE
@ -133,115 +118,110 @@ The endpoint supplied is invalid.
EXAMPLE
-------
.Observing a 'REP' socket's connection state
.Monitoring client and server sockets
----
#include <stdio.h>
#include <zmq.h>
#include <pthread.h>
#include <string.h>
#include <assert.h>
// Read one event off the monitor socket; return value and address
// by reference, if not null, and event number by value. Returns -1
// in case of error.
static int read_msg(void* s, zmq_event_t* event, char* ep)
static int
get_monitor_event (void *monitor, int *value, char **address)
{
int rc ;
zmq_msg_t msg1; // binary part
zmq_msg_init (&msg1);
zmq_msg_t msg2; // address part
zmq_msg_init (&msg2);
rc = zmq_msg_recv (&msg1, s, 0);
if (rc == -1 && zmq_errno() == ETERM)
return 1 ;
assert (rc != -1);
assert (zmq_msg_more(&msg1) != 0);
rc = zmq_msg_recv (&msg2, s, 0);
if (rc == -1 && zmq_errno() == ETERM)
return 1;
assert (rc != -1);
assert (zmq_msg_more(&msg2) == 0);
// copy binary data to event struct
const char* data = (char*)zmq_msg_data(&msg1);
memcpy(&(event->event), data, sizeof(event->event));
memcpy(&(event->value), data+sizeof(event->event), sizeof(event->value));
// copy address part
const size_t len = zmq_msg_size(&msg2) ;
ep = memcpy(ep, zmq_msg_data(&msg2), len);
*(ep + len) = 0 ;
return 0 ;
}
// First frame in message contains event number and value
zmq_msg_t msg;
zmq_msg_init (&msg);
if (zmq_msg_recv (&msg, monitor, 0) == -1)
return -1; // Interruped, presumably
assert (zmq_msg_more (&msg));
// REP socket monitor thread
static void *rep_socket_monitor (void *ctx)
{
zmq_event_t event;
static char addr[1025] ;
int rc;
uint8_t *data = (uint8_t *) zmq_msg_data (&msg);
uint16_t event = *(uint16_t *) (data);
if (value)
*value = *(uint32_t *) (data + 2);
printf("starting monitor...\n");
// Second frame in message contains event address
zmq_msg_init (&msg);
if (zmq_msg_recv (&msg, monitor, 0) == -1)
return -1; // Interruped, presumably
assert (!zmq_msg_more (&msg));
void *s = zmq_socket (ctx, ZMQ_PAIR);
assert (s);
rc = zmq_connect (s, "inproc://monitor.rep");
assert (rc == 0);
while (!read_msg(s, &event, addr)) {
switch (event.event) {
case ZMQ_EVENT_LISTENING:
printf ("listening socket descriptor %d\n", event.value);
printf ("listening socket address %s\n", addr);
break;
case ZMQ_EVENT_ACCEPTED:
printf ("accepted socket descriptor %d\n", event.value);
printf ("accepted socket address %s\n", addr);
break;
case ZMQ_EVENT_CLOSE_FAILED:
printf ("socket close failure error code %d\n", event.value);
printf ("socket address %s\n", addr);
break;
case ZMQ_EVENT_CLOSED:
printf ("closed socket descriptor %d\n", event.value);
printf ("closed socket address %s\n", addr);
break;
case ZMQ_EVENT_DISCONNECTED:
printf ("disconnected socket descriptor %d\n", event.value);
printf ("disconnected socket address %s\n", addr);
break;
}
if (address) {
uint8_t *data = (uint8_t *) zmq_msg_data (&msg);
size_t size = zmq_msg_size (&msg);
*address = (char *) malloc (size + 1);
memcpy (*address, data, size);
*address [size] = 0;
}
zmq_close (s);
return NULL;
return event;
}
int main()
int main (void)
{
const char* addr = "tcp://127.0.0.1:6666" ;
pthread_t thread ;
// Create the infrastructure
void *ctx = zmq_init (1);
void *ctx = zmq_ctx_new ();
assert (ctx);
// REP socket
void* rep = zmq_socket (ctx, ZMQ_REP);
assert (rep);
// We'll monitor these two sockets
void *client = zmq_socket (ctx, ZMQ_DEALER);
assert (client);
void *server = zmq_socket (ctx, ZMQ_DEALER);
assert (server);
// REP socket monitor, all events
int rc = zmq_socket_monitor (rep, "inproc://monitor.rep", ZMQ_EVENT_ALL);
// Socket monitoring only works over inproc://
int rc = zmq_socket_monitor (client, "tcp://127.0.0.1:9999", 0);
assert (rc == -1);
assert (zmq_errno () == EPROTONOSUPPORT);
// Monitor all events on client and server sockets
rc = zmq_socket_monitor (client, "inproc://monitor-client", ZMQ_EVENT_ALL);
assert (rc == 0);
rc = pthread_create (&thread, NULL, rep_socket_monitor, ctx);
rc = zmq_socket_monitor (server, "inproc://monitor-server", ZMQ_EVENT_ALL);
assert (rc == 0);
rc = zmq_bind (rep, addr);
// Create two sockets for collecting monitor events
void *client_mon = zmq_socket (ctx, ZMQ_PAIR);
assert (client_mon);
void *server_mon = zmq_socket (ctx, ZMQ_PAIR);
assert (server_mon);
// Connect these to the inproc endpoints so they'll get events
rc = zmq_connect (client_mon, "inproc://monitor-client");
assert (rc == 0);
rc = zmq_connect (server_mon, "inproc://monitor-server");
assert (rc == 0);
// Allow some time for event detection
zmq_sleep (1);
// Close the REP socket
rc = zmq_close (rep);
// Now do a basic ping test
rc = zmq_bind (server, "tcp://127.0.0.1:9998");
assert (rc == 0);
rc = zmq_connect (client, "tcp://127.0.0.1:9998");
assert (rc == 0);
bounce (client, server);
zmq_term (ctx);
// Close client and server
close_zero_linger (client);
close_zero_linger (server);
// Now collect and check events from both sockets
int event = get_monitor_event (client_mon, NULL, NULL);
if (event == ZMQ_EVENT_CONNECT_DELAYED)
event = get_monitor_event (client_mon, NULL, NULL);
assert (event == ZMQ_EVENT_CONNECTED);
event = get_monitor_event (client_mon, NULL, NULL);
assert (event == ZMQ_EVENT_MONITOR_STOPPED);
// This is the flow of server events
event = get_monitor_event (server_mon, NULL, NULL);
assert (event == ZMQ_EVENT_LISTENING);
event = get_monitor_event (server_mon, NULL, NULL);
assert (event == ZMQ_EVENT_ACCEPTED);
event = get_monitor_event (server_mon, NULL, NULL);
assert (event == ZMQ_EVENT_CLOSED);
event = get_monitor_event (server_mon, NULL, NULL);
assert (event == ZMQ_EVENT_MONITOR_STOPPED);
// Close down the sockets
close_zero_linger (client_mon);
close_zero_linger (server_mon);
zmq_ctx_term (ctx);
return 0 ;
}

View File

@ -326,34 +326,20 @@ ZMQ_EXPORT char *zmq_msg_gets (zmq_msg_t *msg, char *property);
/* 0MQ socket events and monitoring */
/******************************************************************************/
/* Socket transport events (tcp and ipc only) */
#define ZMQ_EVENT_CONNECTED 1
#define ZMQ_EVENT_CONNECT_DELAYED 2
#define ZMQ_EVENT_CONNECT_RETRIED 4
/* Socket transport events (TCP and IPC only) */
#define ZMQ_EVENT_LISTENING 8
#define ZMQ_EVENT_BIND_FAILED 16
#define ZMQ_EVENT_ACCEPTED 32
#define ZMQ_EVENT_ACCEPT_FAILED 64
#define ZMQ_EVENT_CLOSED 128
#define ZMQ_EVENT_CLOSE_FAILED 256
#define ZMQ_EVENT_DISCONNECTED 512
#define ZMQ_EVENT_MONITOR_STOPPED 1024
#define ZMQ_EVENT_ALL ( ZMQ_EVENT_CONNECTED | ZMQ_EVENT_CONNECT_DELAYED | \
ZMQ_EVENT_CONNECT_RETRIED | ZMQ_EVENT_LISTENING | \
ZMQ_EVENT_BIND_FAILED | ZMQ_EVENT_ACCEPTED | \
ZMQ_EVENT_ACCEPT_FAILED | ZMQ_EVENT_CLOSED | \
ZMQ_EVENT_CLOSE_FAILED | ZMQ_EVENT_DISCONNECTED | \
ZMQ_EVENT_MONITOR_STOPPED)
/* Socket event data */
typedef struct {
uint16_t event; // id of the event as bitfield
int32_t value ; // value is either error code, fd or reconnect interval
} zmq_event_t;
#define ZMQ_EVENT_CONNECTED 0x0001
#define ZMQ_EVENT_CONNECT_DELAYED 0x0002
#define ZMQ_EVENT_CONNECT_RETRIED 0x0004
#define ZMQ_EVENT_LISTENING 0x0008
#define ZMQ_EVENT_BIND_FAILED 0x0010
#define ZMQ_EVENT_ACCEPTED 0x0020
#define ZMQ_EVENT_ACCEPT_FAILED 0x0040
#define ZMQ_EVENT_CLOSED 0x0080
#define ZMQ_EVENT_CLOSE_FAILED 0x0100
#define ZMQ_EVENT_DISCONNECTED 0x0200
#define ZMQ_EVENT_MONITOR_STOPPED 0x0400
#define ZMQ_EVENT_ALL 0xFFFF
ZMQ_EXPORT void *zmq_socket (void *, int type);
ZMQ_EXPORT int zmq_close (void *s);

View File

@ -1169,22 +1169,19 @@ void zmq::socket_base_t::extract_flags (msg_t *msg_)
int zmq::socket_base_t::monitor (const char *addr_, int events_)
{
int rc;
if (unlikely (ctx_terminated)) {
errno = ETERM;
return -1;
}
// Support deregistering monitoring endpoints as well
// Support deregistering monitoring endpoints as well
if (addr_ == NULL) {
stop_monitor ();
return 0;
}
// Parse addr_ string.
std::string protocol;
std::string address;
rc = parse_uri (addr_, protocol, address);
int rc = parse_uri (addr_, protocol, address);
if (rc != 0)
return -1;
@ -1192,25 +1189,24 @@ int zmq::socket_base_t::monitor (const char *addr_, int events_)
if (rc != 0)
return -1;
// Event notification only supported over inproc://
// Event notification only supported over inproc://
if (protocol != "inproc") {
errno = EPROTONOSUPPORT;
return -1;
}
// Register events to monitor
// Register events to monitor
monitor_events = events_;
monitor_socket = zmq_socket (get_ctx (), ZMQ_PAIR);
if (monitor_socket == NULL)
return -1;
// Never block context termination on pending event messages
// Never block context termination on pending event messages
int linger = 0;
rc = zmq_setsockopt (monitor_socket, ZMQ_LINGER, &linger, sizeof (linger));
if (rc == -1)
stop_monitor ();
// Spawn the monitor socket endpoint
// Spawn the monitor socket endpoint
rc = zmq_bind (monitor_socket, addr_);
if (rc == -1)
stop_monitor ();
@ -1229,134 +1225,88 @@ zmq::fd_t zmq::socket_base_t::fd()
void zmq::socket_base_t::event_connected (std::string &addr_, int fd_)
{
if (monitor_events & ZMQ_EVENT_CONNECTED) {
zmq_event_t event;
event.event = ZMQ_EVENT_CONNECTED;
event.value = fd_;
monitor_event (event, addr_);
}
if (monitor_events & ZMQ_EVENT_CONNECTED)
monitor_event (ZMQ_EVENT_CONNECTED, fd_, addr_);
}
void zmq::socket_base_t::event_connect_delayed (std::string &addr_, int err_)
{
if (monitor_events & ZMQ_EVENT_CONNECT_DELAYED) {
zmq_event_t event;
event.event = ZMQ_EVENT_CONNECT_DELAYED;
event.value = err_;
monitor_event (event, addr_);
}
if (monitor_events & ZMQ_EVENT_CONNECT_DELAYED)
monitor_event (ZMQ_EVENT_CONNECT_DELAYED, err_, addr_);
}
void zmq::socket_base_t::event_connect_retried (std::string &addr_, int interval_)
{
if (monitor_events & ZMQ_EVENT_CONNECT_RETRIED) {
zmq_event_t event;
event.event = ZMQ_EVENT_CONNECT_RETRIED;
event.value = interval_;
monitor_event (event, addr_);
}
if (monitor_events & ZMQ_EVENT_CONNECT_RETRIED)
monitor_event (ZMQ_EVENT_CONNECT_RETRIED, interval_, addr_);
}
void zmq::socket_base_t::event_listening (std::string &addr_, int fd_)
{
if (monitor_events & ZMQ_EVENT_LISTENING) {
zmq_event_t event;
event.event = ZMQ_EVENT_LISTENING;
event.value = fd_;
monitor_event (event, addr_);
}
if (monitor_events & ZMQ_EVENT_LISTENING)
monitor_event (ZMQ_EVENT_LISTENING, fd_, addr_);
}
void zmq::socket_base_t::event_bind_failed (std::string &addr_, int err_)
{
if (monitor_events & ZMQ_EVENT_BIND_FAILED) {
zmq_event_t event;
event.event = ZMQ_EVENT_BIND_FAILED;
event.value = err_;
monitor_event (event, addr_);
}
if (monitor_events & ZMQ_EVENT_BIND_FAILED)
monitor_event (ZMQ_EVENT_BIND_FAILED, err_, addr_);
}
void zmq::socket_base_t::event_accepted (std::string &addr_, int fd_)
{
if (monitor_events & ZMQ_EVENT_ACCEPTED) {
zmq_event_t event;
event.event = ZMQ_EVENT_ACCEPTED;
event.value = fd_;
monitor_event (event, addr_);
}
if (monitor_events & ZMQ_EVENT_ACCEPTED)
monitor_event (ZMQ_EVENT_ACCEPTED, fd_, addr_);
}
void zmq::socket_base_t::event_accept_failed (std::string &addr_, int err_)
{
if (monitor_events & ZMQ_EVENT_ACCEPT_FAILED) {
zmq_event_t event;
event.event = ZMQ_EVENT_ACCEPT_FAILED;
event.value= err_;
monitor_event (event, addr_);
}
if (monitor_events & ZMQ_EVENT_ACCEPT_FAILED)
monitor_event (ZMQ_EVENT_ACCEPT_FAILED, err_, addr_);
}
void zmq::socket_base_t::event_closed (std::string &addr_, int fd_)
{
if (monitor_events & ZMQ_EVENT_CLOSED) {
zmq_event_t event;
event.event = ZMQ_EVENT_CLOSED;
event.value = fd_;
monitor_event (event, addr_);
}
if (monitor_events & ZMQ_EVENT_CLOSED)
monitor_event (ZMQ_EVENT_CLOSED, fd_, addr_);
}
void zmq::socket_base_t::event_close_failed (std::string &addr_, int err_)
{
if (monitor_events & ZMQ_EVENT_CLOSE_FAILED) {
zmq_event_t event;
event.event = ZMQ_EVENT_CLOSE_FAILED;
event.value = err_;
monitor_event (event, addr_);
}
if (monitor_events & ZMQ_EVENT_CLOSE_FAILED)
monitor_event (ZMQ_EVENT_CLOSE_FAILED, err_, addr_);
}
void zmq::socket_base_t::event_disconnected (std::string &addr_, int fd_)
{
if (monitor_events & ZMQ_EVENT_DISCONNECTED) {
zmq_event_t event;
event.event = ZMQ_EVENT_DISCONNECTED;
event.value = fd_;
monitor_event (event, addr_);
}
if (monitor_events & ZMQ_EVENT_DISCONNECTED)
monitor_event (ZMQ_EVENT_DISCONNECTED, fd_, addr_);
}
void zmq::socket_base_t::monitor_event (zmq_event_t event_, const std::string& addr_)
// Send a monitor event
void zmq::socket_base_t::monitor_event (int event_, int value_, const std::string &addr_)
{
if (monitor_socket) {
const uint16_t eid = (uint16_t)event_.event;
const uint32_t value = (uint32_t)event_.value;
// prepare and send first message frame
// containing event id and value
// Send event in first frame
zmq_msg_t msg;
zmq_msg_init_size (&msg, sizeof(eid) + sizeof(value));
char* data1 = (char*)zmq_msg_data(&msg);
memcpy (data1, &eid, sizeof(eid));
memcpy (data1+sizeof(eid), &value, sizeof(value));
zmq_msg_init_size (&msg, 6);
uint8_t *data = (uint8_t *) zmq_msg_data (&msg);
*(uint16_t *) (data + 0) = (uint16_t) event_;
*(uint32_t *) (data + 2) = (uint32_t) value_;
zmq_sendmsg (monitor_socket, &msg, ZMQ_SNDMORE);
// prepare and send second message frame
// containing the address (endpoint)
// Send address in second frame
zmq_msg_init_size (&msg, addr_.size());
memcpy(zmq_msg_data(&msg), addr_.c_str(), addr_.size());
memcpy (zmq_msg_data (&msg), addr_.c_str (), addr_.size ());
zmq_sendmsg (monitor_socket, &msg, 0);
}
}
void zmq::socket_base_t::stop_monitor()
void zmq::socket_base_t::stop_monitor (void)
{
if (monitor_socket) {
if (monitor_events & ZMQ_EVENT_MONITOR_STOPPED) {
zmq_event_t event;
event.event = ZMQ_EVENT_MONITOR_STOPPED;
event.value = 0;
monitor_event (event, "");
}
if (monitor_events & ZMQ_EVENT_MONITOR_STOPPED)
monitor_event (ZMQ_EVENT_MONITOR_STOPPED, 0, "");
zmq_close (monitor_socket);
monitor_socket = NULL;
monitor_events = 0;

View File

@ -160,7 +160,7 @@ namespace zmq
void process_destroy ();
// Socket event data dispath
void monitor_event (zmq_event_t data_, const std::string& addr_);
void monitor_event (int event_, int value_, const std::string& addr_);
// Monitor socket cleanup
void stop_monitor ();

View File

@ -19,254 +19,110 @@
#include "testutil.hpp"
// REQ socket events handled
static int req_socket_events;
// 2nd REQ socket events handled
static int req2_socket_events;
// REP socket events handled
static int rep_socket_events;
// Read one event off the monitor socket; return value and address
// by reference, if not null, and event number by value. Returns -1
// in case of error.
std::string addr ;
static bool read_msg(void* s, zmq_event_t& event, std::string& ep)
static int
get_monitor_event (void *monitor, int *value, char **address)
{
int rc ;
zmq_msg_t msg1; // binary part
zmq_msg_init (&msg1);
zmq_msg_t msg2; // address part
zmq_msg_init (&msg2);
rc = zmq_msg_recv (&msg1, s, 0);
if (rc == -1 && zmq_errno() == ETERM)
return true ;
// First frame in message contains event number and value
zmq_msg_t msg;
zmq_msg_init (&msg);
if (zmq_msg_recv (&msg, monitor, 0) == -1)
return -1; // Interruped, presumably
assert (zmq_msg_more (&msg));
uint8_t *data = (uint8_t *) zmq_msg_data (&msg);
uint16_t event = *(uint16_t *) (data);
if (value)
*value = *(uint32_t *) (data + 2);
assert (rc != -1);
assert (zmq_msg_more(&msg1) != 0);
rc = zmq_msg_recv (&msg2, s, 0);
if (rc == -1 && zmq_errno() == ETERM)
return true;
assert (rc != -1);
assert (zmq_msg_more(&msg2) == 0);
// copy binary data to event struct
const char* data = (char*)zmq_msg_data(&msg1);
memcpy(&event.event, data, sizeof(event.event));
memcpy(&event.value, data+sizeof(event.event), sizeof(event.value));
// copy address part
ep = std::string((char*)zmq_msg_data(&msg2), zmq_msg_size(&msg2));
if (event.event == ZMQ_EVENT_MONITOR_STOPPED)
return true;
return false;
}
// REQ socket monitor thread
static void req_socket_monitor (void *ctx)
{
zmq_event_t event;
std::string ep ;
int rc;
void *s = zmq_socket (ctx, ZMQ_PAIR);
assert (s);
rc = zmq_connect (s, "inproc://monitor.req");
assert (rc == 0);
while (!read_msg(s, event, ep)) {
assert (ep == addr);
switch (event.event) {
case ZMQ_EVENT_CONNECTED:
assert (event.value > 0);
req_socket_events |= ZMQ_EVENT_CONNECTED;
req2_socket_events |= ZMQ_EVENT_CONNECTED;
break;
case ZMQ_EVENT_CONNECT_DELAYED:
assert (event.value != 0);
req_socket_events |= ZMQ_EVENT_CONNECT_DELAYED;
break;
case ZMQ_EVENT_CLOSE_FAILED:
assert (event.value != 0);
req_socket_events |= ZMQ_EVENT_CLOSE_FAILED;
break;
case ZMQ_EVENT_CLOSED:
assert (event.value != 0);
req_socket_events |= ZMQ_EVENT_CLOSED;
break;
case ZMQ_EVENT_DISCONNECTED:
assert (event.value != 0);
req_socket_events |= ZMQ_EVENT_DISCONNECTED;
break;
}
// Second frame in message contains event address
zmq_msg_init (&msg);
if (zmq_msg_recv (&msg, monitor, 0) == -1)
return -1; // Interruped, presumably
assert (!zmq_msg_more (&msg));
if (address) {
uint8_t *data = (uint8_t *) zmq_msg_data (&msg);
size_t size = zmq_msg_size (&msg);
*address = (char *) malloc (size + 1);
memcpy (*address, data, size);
*address [size] = 0;
}
zmq_close (s);
}
// 2nd REQ socket monitor thread
static void req2_socket_monitor (void *ctx)
{
zmq_event_t event;
std::string ep ;
int rc;
void *s = zmq_socket (ctx, ZMQ_PAIR);
assert (s);
rc = zmq_connect (s, "inproc://monitor.req2");
assert (rc == 0);
while (!read_msg(s, event, ep)) {
assert (ep == addr);
switch (event.event) {
case ZMQ_EVENT_CONNECTED:
assert (event.value > 0);
req2_socket_events |= ZMQ_EVENT_CONNECTED;
break;
case ZMQ_EVENT_CLOSED:
assert (event.value != 0);
req2_socket_events |= ZMQ_EVENT_CLOSED;
break;
}
}
zmq_close (s);
}
// REP socket monitor thread
static void rep_socket_monitor (void *ctx)
{
zmq_event_t event;
std::string ep ;
int rc;
void *s = zmq_socket (ctx, ZMQ_PAIR);
assert (s);
rc = zmq_connect (s, "inproc://monitor.rep");
assert (rc == 0);
while (!read_msg(s, event, ep)) {
assert (ep == addr);
switch (event.event) {
case ZMQ_EVENT_LISTENING:
assert (event.value > 0);
rep_socket_events |= ZMQ_EVENT_LISTENING;
break;
case ZMQ_EVENT_ACCEPTED:
assert (event.value > 0);
rep_socket_events |= ZMQ_EVENT_ACCEPTED;
break;
case ZMQ_EVENT_CLOSE_FAILED:
assert (event.value != 0);
rep_socket_events |= ZMQ_EVENT_CLOSE_FAILED;
break;
case ZMQ_EVENT_CLOSED:
assert (event.value != 0);
rep_socket_events |= ZMQ_EVENT_CLOSED;
break;
case ZMQ_EVENT_DISCONNECTED:
assert (event.value != 0);
rep_socket_events |= ZMQ_EVENT_DISCONNECTED;
break;
}
}
zmq_close (s);
return event;
}
int main (void)
{
setup_test_environment();
int rc;
void *req;
void *req2;
void *rep;
void* threads [3];
addr = "tcp://127.0.0.1:5560";
// Create the infrastructure
void *ctx = zmq_ctx_new ();
assert (ctx);
// We'll monitor these two sockets
void *client = zmq_socket (ctx, ZMQ_DEALER);
assert (client);
void *server = zmq_socket (ctx, ZMQ_DEALER);
assert (server);
// REP socket
rep = zmq_socket (ctx, ZMQ_REP);
assert (rep);
// Assert supported protocols
rc = zmq_socket_monitor (rep, addr.c_str(), 0);
// Socket monitoring only works over inproc://
int rc = zmq_socket_monitor (client, "tcp://127.0.0.1:9999", 0);
assert (rc == -1);
assert (zmq_errno() == EPROTONOSUPPORT);
assert (zmq_errno () == EPROTONOSUPPORT);
// Deregister monitor
rc = zmq_socket_monitor (rep, NULL, 0);
// Monitor all events on client and server sockets
rc = zmq_socket_monitor (client, "inproc://monitor-client", ZMQ_EVENT_ALL);
assert (rc == 0);
rc = zmq_socket_monitor (server, "inproc://monitor-server", ZMQ_EVENT_ALL);
assert (rc == 0);
// REP socket monitor, all events
rc = zmq_socket_monitor (rep, "inproc://monitor.rep", ZMQ_EVENT_ALL);
// Create two sockets for collecting monitor events
void *client_mon = zmq_socket (ctx, ZMQ_PAIR);
assert (client_mon);
void *server_mon = zmq_socket (ctx, ZMQ_PAIR);
assert (server_mon);
// Connect these to the inproc endpoints so they'll get events
rc = zmq_connect (client_mon, "inproc://monitor-client");
assert (rc == 0);
rc = zmq_connect (server_mon, "inproc://monitor-server");
assert (rc == 0);
threads [0] = zmq_threadstart(&rep_socket_monitor, ctx);
// REQ socket
req = zmq_socket (ctx, ZMQ_REQ);
assert (req);
// REQ socket monitor, all events
rc = zmq_socket_monitor (req, "inproc://monitor.req", ZMQ_EVENT_ALL);
// Now do a basic ping test
rc = zmq_bind (server, "tcp://127.0.0.1:9998");
assert (rc == 0);
threads [1] = zmq_threadstart(&req_socket_monitor, ctx);
msleep (SETTLE_TIME);
// Bind REQ and REP
rc = zmq_bind (rep, addr.c_str());
rc = zmq_connect (client, "tcp://127.0.0.1:9998");
assert (rc == 0);
bounce (client, server);
rc = zmq_connect (req, addr.c_str());
assert (rc == 0);
bounce (rep, req);
// Close client and server
close_zero_linger (client);
close_zero_linger (server);
// 2nd REQ socket
req2 = zmq_socket (ctx, ZMQ_REQ);
assert (req2);
// 2nd REQ socket monitor, connected event only
rc = zmq_socket_monitor (req2, "inproc://monitor.req2", ZMQ_EVENT_CONNECTED);
assert (rc == 0);
threads [2] = zmq_threadstart(&req2_socket_monitor, ctx);
rc = zmq_connect (req2, addr.c_str());
assert (rc == 0);
// Close the REP socket
rc = zmq_close (rep);
assert (rc == 0);
// Allow enough time for detecting error states
msleep (250);
// Close the REQ socket
rc = zmq_close (req);
assert (rc == 0);
// Close the 2nd REQ socket
rc = zmq_close (req2);
assert (rc == 0);
// Now collect and check events from both sockets
int event = get_monitor_event (client_mon, NULL, NULL);
if (event == ZMQ_EVENT_CONNECT_DELAYED)
event = get_monitor_event (client_mon, NULL, NULL);
assert (event == ZMQ_EVENT_CONNECTED);
event = get_monitor_event (client_mon, NULL, NULL);
assert (event == ZMQ_EVENT_MONITOR_STOPPED);
// This is the flow of server events
event = get_monitor_event (server_mon, NULL, NULL);
assert (event == ZMQ_EVENT_LISTENING);
event = get_monitor_event (server_mon, NULL, NULL);
assert (event == ZMQ_EVENT_ACCEPTED);
event = get_monitor_event (server_mon, NULL, NULL);
assert (event == ZMQ_EVENT_CLOSED);
event = get_monitor_event (server_mon, NULL, NULL);
assert (event == ZMQ_EVENT_MONITOR_STOPPED);
// Close down the sockets
close_zero_linger (client_mon);
close_zero_linger (server_mon);
zmq_ctx_term (ctx);
// Expected REP socket events
assert (rep_socket_events & ZMQ_EVENT_LISTENING);
assert (rep_socket_events & ZMQ_EVENT_ACCEPTED);
assert (rep_socket_events & ZMQ_EVENT_CLOSED);
// Expected REQ socket events
assert (req_socket_events & ZMQ_EVENT_CONNECTED);
assert (req_socket_events & ZMQ_EVENT_DISCONNECTED);
assert (req_socket_events & ZMQ_EVENT_CLOSED);
// Expected 2nd REQ socket events
assert (req2_socket_events & ZMQ_EVENT_CONNECTED);
assert (!(req2_socket_events & ZMQ_EVENT_CLOSED));
for (unsigned int i = 0; i < 3; ++i)
zmq_threadclose(threads [i]);
return 0 ;
}

View File

@ -244,7 +244,7 @@ void s_recv_seq (void *socket, ...)
}
// Sets a zero linger period on a socket and closes it.
// Sets a zero linger period on a socket and closes it.
void close_zero_linger (void *socket)
{
int linger = 0;