mirror of
https://github.com/zeromq/libzmq.git
synced 2025-01-15 18:38:00 +08:00
commit
025e218629
22
NEWS
22
NEWS
@ -1,3 +1,25 @@
|
||||
0MQ version 3.2.2 stable, released on 2012/11/23
|
||||
================================================
|
||||
|
||||
Issues addressed in this release
|
||||
--------------------------------
|
||||
|
||||
* LIBZMQ-384 No meta data for ZMQ_EVENT_DISCONNECTED monitor event
|
||||
* LIBZMQ-414 Error in ARM/Thumb2 assembly (atomic_ptr.hpp)
|
||||
* LIBZMQ-417 zmq_assert (!incomplete_in) in session_base.cpp 228
|
||||
* LIBZMQ-447 socket_base_t::recv() packet loss and memory leak at high receiving rate
|
||||
* LIBZMQ-448 Builds fail on older versions of GCC
|
||||
* LIBZMQ-449 Builds fail on AIX
|
||||
* LIBZMQ-450 lt-test_monitor: fails with assertion at test_monitor.cpp:81
|
||||
* LIBZMQ-451 ZMQ_ROUTER_MANDATORY blocks forever
|
||||
* LIBZMQ-452 test_connect_delay.cpp:175:12: error: 'sleep' was not declared in this scope
|
||||
* LIBZMQ-456 ZMQ_XPUB_VERBOSE does not propagate in a tree of XPUB/XSUB devices
|
||||
* LIBZMQ-458 lt-test_router_mandatory fails with assertion at test_router_mandatory.cpp:53
|
||||
* LIBZMQ-459 Assertion failed: encoder (stream_engine.cpp:266
|
||||
* LIBZMQ-464 PUB socket with HWM set leaks memory
|
||||
* LIBZMQ-468 ZMQ_XPUB_VERBOSE & unsubscribe
|
||||
* LIBZMQ-472 Segfault in zmq_poll in REQ to ROUTER dialog
|
||||
|
||||
0MQ version 3.2.1 (RC2), released on 2012/10/15
|
||||
===============================================
|
||||
|
||||
|
@ -122,7 +122,7 @@ void zmq::ipc_connecter_t::out_event ()
|
||||
// Shut the connecter down.
|
||||
terminate ();
|
||||
|
||||
socket->event_connected (endpoint.c_str(), fd);
|
||||
socket->event_connected (endpoint, fd);
|
||||
}
|
||||
|
||||
void zmq::ipc_connecter_t::timer_event (int id_)
|
||||
@ -145,11 +145,12 @@ void zmq::ipc_connecter_t::start_connecting ()
|
||||
}
|
||||
|
||||
// Connection establishment may be delayed. Poll for its completion.
|
||||
else if (rc == -1 && errno == EINPROGRESS) {
|
||||
else
|
||||
if (rc == -1 && errno == EINPROGRESS) {
|
||||
handle = add_fd (s);
|
||||
handle_valid = true;
|
||||
set_pollout (handle);
|
||||
socket->event_connect_delayed (endpoint.c_str(), zmq_errno());
|
||||
socket->event_connect_delayed (endpoint, zmq_errno());
|
||||
}
|
||||
|
||||
// Handle any other error condition by eventual reconnect.
|
||||
@ -164,7 +165,7 @@ void zmq::ipc_connecter_t::add_reconnect_timer()
|
||||
{
|
||||
int rc_ivl = get_new_reconnect_ivl();
|
||||
add_timer (rc_ivl, reconnect_timer_id);
|
||||
socket->event_connect_retried (endpoint.c_str(), rc_ivl);
|
||||
socket->event_connect_retried (endpoint, rc_ivl);
|
||||
timer_started = true;
|
||||
}
|
||||
|
||||
@ -225,7 +226,7 @@ int zmq::ipc_connecter_t::close ()
|
||||
zmq_assert (s != retired_fd);
|
||||
int rc = ::close (s);
|
||||
errno_assert (rc == 0);
|
||||
socket->event_closed (endpoint.c_str(), s);
|
||||
socket->event_closed (endpoint, s);
|
||||
s = retired_fd;
|
||||
return 0;
|
||||
}
|
||||
|
@ -76,7 +76,7 @@ void zmq::ipc_listener_t::in_event ()
|
||||
// If connection was reset by the peer in the meantime, just ignore it.
|
||||
// TODO: Handle specific errors like ENFILE/EMFILE etc.
|
||||
if (fd == retired_fd) {
|
||||
socket->event_accept_failed (endpoint.c_str(), zmq_errno());
|
||||
socket->event_accept_failed (endpoint, zmq_errno());
|
||||
return;
|
||||
}
|
||||
|
||||
@ -96,7 +96,7 @@ void zmq::ipc_listener_t::in_event ()
|
||||
session->inc_seqnum ();
|
||||
launch_child (session);
|
||||
send_attach (session, engine, false);
|
||||
socket->event_accepted (endpoint.c_str(), fd);
|
||||
socket->event_accepted (endpoint, fd);
|
||||
}
|
||||
|
||||
int zmq::ipc_listener_t::get_address (std::string &addr_)
|
||||
@ -155,7 +155,7 @@ int zmq::ipc_listener_t::set_address (const char *addr_)
|
||||
if (rc != 0)
|
||||
goto error;
|
||||
|
||||
socket->event_listening (endpoint.c_str(), s);
|
||||
socket->event_listening (endpoint, s);
|
||||
return 0;
|
||||
|
||||
error:
|
||||
@ -178,12 +178,12 @@ int zmq::ipc_listener_t::close ()
|
||||
if (has_file && !filename.empty ()) {
|
||||
rc = ::unlink(filename.c_str ());
|
||||
if (rc != 0) {
|
||||
socket->event_close_failed (endpoint.c_str(), zmq_errno());
|
||||
socket->event_close_failed (endpoint, zmq_errno());
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
socket->event_closed (endpoint.c_str(), s);
|
||||
socket->event_closed (endpoint, s);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -357,7 +357,7 @@ int zmq::socket_base_t::bind (const char *addr_)
|
||||
int rc = listener->set_address (address.c_str ());
|
||||
if (rc != 0) {
|
||||
delete listener;
|
||||
event_bind_failed (addr_, zmq_errno());
|
||||
event_bind_failed (address, zmq_errno());
|
||||
return -1;
|
||||
}
|
||||
|
||||
@ -376,7 +376,7 @@ int zmq::socket_base_t::bind (const char *addr_)
|
||||
int rc = listener->set_address (address.c_str ());
|
||||
if (rc != 0) {
|
||||
delete listener;
|
||||
event_bind_failed (addr_, zmq_errno());
|
||||
event_bind_failed (address, zmq_errno());
|
||||
return -1;
|
||||
}
|
||||
|
||||
@ -428,15 +428,11 @@ int zmq::socket_base_t::connect (const char *addr_)
|
||||
|
||||
// The total HWM for an inproc connection should be the sum of
|
||||
// the binder's HWM and the connector's HWM.
|
||||
int sndhwm;
|
||||
int rcvhwm;
|
||||
if (options.sndhwm == 0 || peer.options.rcvhwm == 0)
|
||||
sndhwm = 0;
|
||||
else
|
||||
int sndhwm = 0;
|
||||
if (options.sndhwm != 0 && peer.options.rcvhwm != 0)
|
||||
sndhwm = options.sndhwm + peer.options.rcvhwm;
|
||||
if (options.rcvhwm == 0 || peer.options.sndhwm == 0)
|
||||
rcvhwm = 0;
|
||||
else
|
||||
int rcvhwm = 0;
|
||||
if (options.rcvhwm != 0 && peer.options.sndhwm != 0)
|
||||
rcvhwm = options.rcvhwm + peer.options.sndhwm;
|
||||
|
||||
// Create a bi-directional pipe to connect the peers.
|
||||
@ -507,7 +503,8 @@ int zmq::socket_base_t::connect (const char *addr_)
|
||||
}
|
||||
}
|
||||
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS
|
||||
else if(protocol == "ipc") {
|
||||
else
|
||||
if (protocol == "ipc") {
|
||||
paddr->resolved.ipc_addr = new (std::nothrow) ipc_address_t ();
|
||||
alloc_assert (paddr->resolved.ipc_addr);
|
||||
int rc = paddr->resolved.ipc_addr->resolve (address.c_str ());
|
||||
@ -1033,8 +1030,7 @@ int zmq::socket_base_t::monitor (const char *addr_, int events_)
|
||||
|
||||
// Register events to monitor
|
||||
monitor_events = events_;
|
||||
|
||||
monitor_socket = zmq_socket( get_ctx (), ZMQ_PAIR);
|
||||
monitor_socket = zmq_socket (get_ctx (), ZMQ_PAIR);
|
||||
if (monitor_socket == NULL)
|
||||
return -1;
|
||||
|
||||
@ -1051,124 +1047,144 @@ int zmq::socket_base_t::monitor (const char *addr_, int events_)
|
||||
return rc;
|
||||
}
|
||||
|
||||
void zmq::socket_base_t::event_connected (const char *addr_, int fd_)
|
||||
void zmq::socket_base_t::event_connected (std::string &addr_, int fd_)
|
||||
{
|
||||
zmq_event_t event;
|
||||
if (!(monitor_events & ZMQ_EVENT_CONNECTED)) return;
|
||||
event.event = ZMQ_EVENT_CONNECTED;
|
||||
event.data.connected.addr = (char *)addr_;
|
||||
event.data.connected.fd = fd_;
|
||||
monitor_event (event);
|
||||
if (monitor_events & ZMQ_EVENT_CONNECTED) {
|
||||
zmq_event_t event;
|
||||
event.event = ZMQ_EVENT_CONNECTED;
|
||||
event.data.connected.addr = (char *) malloc (addr_.size () + 1);
|
||||
copy_monitor_address (event.data.connected.addr, addr_);
|
||||
event.data.connected.fd = fd_;
|
||||
monitor_event (event);
|
||||
}
|
||||
}
|
||||
|
||||
void zmq::socket_base_t::event_connect_delayed (const char *addr_, int err_)
|
||||
void zmq::socket_base_t::event_connect_delayed (std::string &addr_, int err_)
|
||||
{
|
||||
if (monitor_events & ZMQ_EVENT_CONNECT_DELAYED) {
|
||||
zmq_event_t event;
|
||||
event.event = ZMQ_EVENT_CONNECT_DELAYED;
|
||||
event.data.connect_delayed.addr = const_cast <char *> (addr_);
|
||||
event.data.connect_delayed.addr = (char *) malloc (addr_.size () + 1);
|
||||
copy_monitor_address (event.data.connect_delayed.addr, addr_);
|
||||
event.data.connect_delayed.err = err_;
|
||||
monitor_event (event);
|
||||
}
|
||||
}
|
||||
|
||||
void zmq::socket_base_t::event_connect_retried (const char *addr_, int interval_)
|
||||
void zmq::socket_base_t::event_connect_retried (std::string &addr_, int interval_)
|
||||
{
|
||||
if (monitor_events & ZMQ_EVENT_CONNECT_RETRIED) {
|
||||
zmq_event_t event;
|
||||
event.event = ZMQ_EVENT_CONNECT_RETRIED;
|
||||
event.data.connect_retried.addr = const_cast <char *> (addr_);
|
||||
event.data.connect_retried.addr = (char *) malloc (addr_.size () + 1);
|
||||
copy_monitor_address (event.data.connect_retried.addr, addr_);
|
||||
event.data.connect_retried.interval = interval_;
|
||||
monitor_event (event);
|
||||
}
|
||||
}
|
||||
|
||||
void zmq::socket_base_t::event_listening (const char *addr_, int fd_)
|
||||
void zmq::socket_base_t::event_listening (std::string &addr_, int fd_)
|
||||
{
|
||||
if (monitor_events & ZMQ_EVENT_LISTENING) {
|
||||
zmq_event_t event;
|
||||
event.event = ZMQ_EVENT_LISTENING;
|
||||
event.data.listening.addr = const_cast <char *> (addr_);
|
||||
event.data.listening.addr = (char *) malloc (addr_.size () + 1);
|
||||
copy_monitor_address (event.data.listening.addr, addr_);
|
||||
event.data.listening.fd = fd_;
|
||||
monitor_event (event);
|
||||
}
|
||||
}
|
||||
|
||||
void zmq::socket_base_t::event_bind_failed (const char *addr_, int err_)
|
||||
void zmq::socket_base_t::event_bind_failed (std::string &addr_, int err_)
|
||||
{
|
||||
if (monitor_events & ZMQ_EVENT_BIND_FAILED) {
|
||||
zmq_event_t event;
|
||||
event.event = ZMQ_EVENT_BIND_FAILED;
|
||||
event.data.bind_failed.addr = const_cast <char *> (addr_);
|
||||
event.data.bind_failed.addr = (char *) malloc (addr_.size () + 1);
|
||||
copy_monitor_address (event.data.bind_failed.addr, addr_);
|
||||
event.data.bind_failed.err = err_;
|
||||
monitor_event (event);
|
||||
}
|
||||
}
|
||||
|
||||
void zmq::socket_base_t::event_accepted (const char *addr_, int fd_)
|
||||
void zmq::socket_base_t::event_accepted (std::string &addr_, int fd_)
|
||||
{
|
||||
if (monitor_events & ZMQ_EVENT_ACCEPTED) {
|
||||
zmq_event_t event;
|
||||
event.event = ZMQ_EVENT_ACCEPTED;
|
||||
event.data.accepted.addr = const_cast <char *> (addr_);
|
||||
event.data.accepted.addr = (char *) malloc (addr_.size () + 1);
|
||||
copy_monitor_address (event.data.accepted.addr, addr_);
|
||||
event.data.accepted.fd = fd_;
|
||||
monitor_event (event);
|
||||
}
|
||||
}
|
||||
|
||||
void zmq::socket_base_t::event_accept_failed (const char *addr_, int err_)
|
||||
void zmq::socket_base_t::event_accept_failed (std::string &addr_, int err_)
|
||||
{
|
||||
if (monitor_events & ZMQ_EVENT_ACCEPT_FAILED) {
|
||||
zmq_event_t event;
|
||||
event.event = ZMQ_EVENT_ACCEPT_FAILED;
|
||||
event.data.accept_failed.addr = const_cast <char *> (addr_);
|
||||
event.data.accept_failed.addr = (char *) malloc (addr_.size () + 1);
|
||||
copy_monitor_address (event.data.accept_failed.addr, addr_);
|
||||
event.data.accept_failed.err= err_;
|
||||
monitor_event (event);
|
||||
}
|
||||
}
|
||||
|
||||
void zmq::socket_base_t::event_closed (const char *addr_, int fd_)
|
||||
void zmq::socket_base_t::event_closed (std::string &addr_, int fd_)
|
||||
{
|
||||
if (monitor_events & ZMQ_EVENT_CLOSED) {
|
||||
zmq_event_t event;
|
||||
event.event = ZMQ_EVENT_CLOSED;
|
||||
event.data.closed.addr = const_cast <char *> (addr_);
|
||||
event.data.closed.addr = (char *) malloc (addr_.size () + 1);
|
||||
copy_monitor_address (event.data.closed.addr, addr_);
|
||||
event.data.closed.fd = fd_;
|
||||
monitor_event (event);
|
||||
}
|
||||
}
|
||||
|
||||
void zmq::socket_base_t::event_close_failed (const char *addr_, int err_)
|
||||
|
||||
void zmq::socket_base_t::event_close_failed (std::string &addr_, int err_)
|
||||
{
|
||||
if (monitor_events & ZMQ_EVENT_CLOSE_FAILED) {
|
||||
zmq_event_t event;
|
||||
event.event = ZMQ_EVENT_CLOSE_FAILED;
|
||||
event.data.close_failed.addr = const_cast <char *> (addr_);
|
||||
event.data.close_failed.addr = (char *) malloc (addr_.size () + 1);
|
||||
copy_monitor_address (event.data.close_failed.addr, addr_);
|
||||
event.data.close_failed.err = err_;
|
||||
monitor_event (event);
|
||||
}
|
||||
}
|
||||
|
||||
void zmq::socket_base_t::event_disconnected (const char *addr_, int fd_)
|
||||
void zmq::socket_base_t::event_disconnected (std::string &addr_, int fd_)
|
||||
{
|
||||
if (monitor_events & ZMQ_EVENT_DISCONNECTED) {
|
||||
zmq_event_t event;
|
||||
event.event = ZMQ_EVENT_DISCONNECTED;
|
||||
event.data.disconnected.addr = const_cast <char *> (addr_);
|
||||
event.data.disconnected.addr = (char *) malloc (addr_.size () + 1);
|
||||
copy_monitor_address (event.data.disconnected.addr, addr_);
|
||||
event.data.disconnected.fd = fd_;
|
||||
monitor_event (event);
|
||||
}
|
||||
}
|
||||
|
||||
void zmq::socket_base_t::copy_monitor_address (char *dest_, std::string &src_)
|
||||
{
|
||||
alloc_assert (dest_);
|
||||
dest_[src_.size ()] = 0;
|
||||
memcpy (dest_, src_.c_str (), src_.size ());
|
||||
}
|
||||
|
||||
void zmq::socket_base_t::monitor_event (zmq_event_t event_)
|
||||
{
|
||||
zmq_msg_t msg;
|
||||
if (!monitor_socket)
|
||||
return;
|
||||
zmq_msg_init_size (&msg, sizeof (event_));
|
||||
memcpy (zmq_msg_data (&msg), &event_, sizeof (event_));
|
||||
zmq_sendmsg (monitor_socket, &msg, 0);
|
||||
zmq_msg_close (&msg);
|
||||
if (monitor_socket) {
|
||||
zmq_msg_t msg;
|
||||
void *event_data = malloc (sizeof (event_));
|
||||
alloc_assert (event_data);
|
||||
memcpy (event_data, &event_, sizeof (event_));
|
||||
zmq_msg_init_data (&msg, event_data, sizeof (event_), zmq_free_event, NULL);
|
||||
zmq_sendmsg (monitor_socket, &msg, 0);
|
||||
zmq_msg_close (&msg);
|
||||
}
|
||||
}
|
||||
|
||||
void zmq::socket_base_t::stop_monitor()
|
||||
@ -1179,4 +1195,3 @@ void zmq::socket_base_t::stop_monitor()
|
||||
monitor_events = 0;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -38,6 +38,11 @@
|
||||
#include "clock.hpp"
|
||||
#include "pipe.hpp"
|
||||
|
||||
extern "C"
|
||||
{
|
||||
void zmq_free_event (void *data, void *hint);
|
||||
}
|
||||
|
||||
namespace zmq
|
||||
{
|
||||
|
||||
@ -104,16 +109,16 @@ namespace zmq
|
||||
|
||||
int monitor(const char *endpoint_, int events_);
|
||||
|
||||
void event_connected(const char *addr_, int fd_);
|
||||
void event_connect_delayed(const char *addr_, int err_);
|
||||
void event_connect_retried(const char *addr_, int interval_);
|
||||
void event_listening(const char *addr_, int fd_);
|
||||
void event_bind_failed(const char *addr_, int err_);
|
||||
void event_accepted(const char *addr_, int fd_);
|
||||
void event_accept_failed(const char *addr_, int err_);
|
||||
void event_closed(const char *addr_, int fd_);
|
||||
void event_close_failed(const char *addr_, int fd_);
|
||||
void event_disconnected(const char *addr_, int fd_);
|
||||
void event_connected (std::string &addr_, int fd_);
|
||||
void event_connect_delayed (std::string &addr_, int err_);
|
||||
void event_connect_retried (std::string &addr_, int interval_);
|
||||
void event_listening (std::string &addr_, int fd_);
|
||||
void event_bind_failed (std::string &addr_, int err_);
|
||||
void event_accepted (std::string &addr_, int fd_);
|
||||
void event_accept_failed (std::string &addr_, int err_);
|
||||
void event_closed (std::string &addr_, int fd_);
|
||||
void event_close_failed (std::string &addr_, int fd_);
|
||||
void event_disconnected (std::string &addr_, int fd_);
|
||||
|
||||
protected:
|
||||
|
||||
@ -151,6 +156,9 @@ namespace zmq
|
||||
// Socket event data dispath
|
||||
void monitor_event (zmq_event_t data_);
|
||||
|
||||
// Copy monitor specific event endpoints to event messages
|
||||
void copy_monitor_address (char *dest_, std::string &src_);
|
||||
|
||||
// Monitor socket cleanup
|
||||
void stop_monitor ();
|
||||
|
||||
|
@ -459,7 +459,7 @@ int zmq::stream_engine_t::push_msg (msg_t *msg_)
|
||||
void zmq::stream_engine_t::error ()
|
||||
{
|
||||
zmq_assert (session);
|
||||
socket->event_disconnected (endpoint.c_str(), s);
|
||||
socket->event_disconnected (endpoint, s);
|
||||
session->detach ();
|
||||
unplug ();
|
||||
delete this;
|
||||
|
@ -136,7 +136,7 @@ void zmq::tcp_connecter_t::out_event ()
|
||||
// Shut the connecter down.
|
||||
terminate ();
|
||||
|
||||
socket->event_connected (endpoint.c_str(), fd);
|
||||
socket->event_connected (endpoint, fd);
|
||||
}
|
||||
|
||||
void zmq::tcp_connecter_t::timer_event (int id_)
|
||||
@ -159,11 +159,12 @@ void zmq::tcp_connecter_t::start_connecting ()
|
||||
}
|
||||
|
||||
// Connection establishment may be delayed. Poll for its completion.
|
||||
else if (rc == -1 && errno == EINPROGRESS) {
|
||||
else
|
||||
if (rc == -1 && errno == EINPROGRESS) {
|
||||
handle = add_fd (s);
|
||||
handle_valid = true;
|
||||
set_pollout (handle);
|
||||
socket->event_connect_delayed (endpoint.c_str(), zmq_errno());
|
||||
socket->event_connect_delayed (endpoint, zmq_errno());
|
||||
}
|
||||
|
||||
// Handle any other error condition by eventual reconnect.
|
||||
@ -178,7 +179,7 @@ void zmq::tcp_connecter_t::add_reconnect_timer()
|
||||
{
|
||||
int rc_ivl = get_new_reconnect_ivl();
|
||||
add_timer (rc_ivl, reconnect_timer_id);
|
||||
socket->event_connect_retried (endpoint.c_str(), rc_ivl);
|
||||
socket->event_connect_retried (endpoint, rc_ivl);
|
||||
timer_started = true;
|
||||
}
|
||||
|
||||
@ -304,6 +305,6 @@ void zmq::tcp_connecter_t::close ()
|
||||
int rc = ::close (s);
|
||||
errno_assert (rc == 0);
|
||||
#endif
|
||||
socket->event_closed (endpoint.c_str(), s);
|
||||
socket->event_closed (endpoint, s);
|
||||
s = retired_fd;
|
||||
}
|
||||
|
@ -85,7 +85,7 @@ void zmq::tcp_listener_t::in_event ()
|
||||
// If connection was reset by the peer in the meantime, just ignore it.
|
||||
// TODO: Handle specific errors like ENFILE/EMFILE etc.
|
||||
if (fd == retired_fd) {
|
||||
socket->event_accept_failed (endpoint.c_str(), zmq_errno());
|
||||
socket->event_accept_failed (endpoint, zmq_errno());
|
||||
return;
|
||||
}
|
||||
|
||||
@ -108,7 +108,7 @@ void zmq::tcp_listener_t::in_event ()
|
||||
session->inc_seqnum ();
|
||||
launch_child (session);
|
||||
send_attach (session, engine, false);
|
||||
socket->event_accepted (endpoint.c_str(), fd);
|
||||
socket->event_accepted (endpoint, fd);
|
||||
}
|
||||
|
||||
void zmq::tcp_listener_t::close ()
|
||||
@ -121,7 +121,7 @@ void zmq::tcp_listener_t::close ()
|
||||
int rc = ::close (s);
|
||||
errno_assert (rc == 0);
|
||||
#endif
|
||||
socket->event_closed (endpoint.c_str(), s);
|
||||
socket->event_closed (endpoint, s);
|
||||
s = retired_fd;
|
||||
}
|
||||
|
||||
@ -223,7 +223,7 @@ int zmq::tcp_listener_t::set_address (const char *addr_)
|
||||
goto error;
|
||||
#endif
|
||||
|
||||
socket->event_listening (endpoint.c_str(), s);
|
||||
socket->event_listening (endpoint, s);
|
||||
return 0;
|
||||
|
||||
error:
|
||||
|
63
src/zmq.cpp
63
src/zmq.cpp
@ -674,15 +674,15 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
|
||||
int nevents = 0;
|
||||
|
||||
while (true) {
|
||||
|
||||
// Compute the timeout for the subsequent poll.
|
||||
int timeout;
|
||||
if (first_pass)
|
||||
timeout = 0;
|
||||
else if (timeout_ < 0)
|
||||
timeout = -1;
|
||||
else
|
||||
timeout = end - now;
|
||||
// Compute the timeout for the subsequent poll.
|
||||
int timeout;
|
||||
if (first_pass)
|
||||
timeout = 0;
|
||||
else
|
||||
if (timeout_ < 0)
|
||||
timeout = -1;
|
||||
else
|
||||
timeout = end - now;
|
||||
|
||||
// Wait for events.
|
||||
while (true) {
|
||||
@ -694,7 +694,6 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
|
||||
errno_assert (rc >= 0);
|
||||
break;
|
||||
}
|
||||
|
||||
// Check for the events.
|
||||
for (int i = 0; i != nitems_; i++) {
|
||||
|
||||
@ -848,7 +847,8 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
|
||||
timeout.tv_usec = 0;
|
||||
ptimeout = &timeout;
|
||||
}
|
||||
else if (timeout_ < 0)
|
||||
else
|
||||
if (timeout_ < 0)
|
||||
ptimeout = NULL;
|
||||
else {
|
||||
timeout.tv_sec = (long) ((end - now) / 1000);
|
||||
@ -987,6 +987,47 @@ int zmq_device (int type, void *frontend_, void *backend_)
|
||||
(zmq::socket_base_t*) backend_, NULL);
|
||||
}
|
||||
|
||||
// Callback to free socket event data
|
||||
|
||||
void zmq_free_event (void *event_data, void *hint)
|
||||
{
|
||||
zmq_event_t *event = (zmq_event_t *) event_data;
|
||||
|
||||
switch (event->event) {
|
||||
case ZMQ_EVENT_CONNECTED:
|
||||
free (event->data.connected.addr);
|
||||
break;
|
||||
case ZMQ_EVENT_CONNECT_DELAYED:
|
||||
free (event->data.connect_delayed.addr);
|
||||
break;
|
||||
case ZMQ_EVENT_CONNECT_RETRIED:
|
||||
free (event->data.connect_retried.addr);
|
||||
break;
|
||||
case ZMQ_EVENT_LISTENING:
|
||||
free (event->data.listening.addr);
|
||||
break;
|
||||
case ZMQ_EVENT_BIND_FAILED:
|
||||
free (event->data.bind_failed.addr);
|
||||
break;
|
||||
case ZMQ_EVENT_ACCEPTED:
|
||||
free (event->data.accepted.addr);
|
||||
break;
|
||||
case ZMQ_EVENT_ACCEPT_FAILED:
|
||||
free (event->data.accept_failed.addr);
|
||||
break;
|
||||
case ZMQ_EVENT_CLOSED:
|
||||
free (event->data.closed.addr);
|
||||
break;
|
||||
case ZMQ_EVENT_CLOSE_FAILED:
|
||||
free (event->data.close_failed.addr);
|
||||
break;
|
||||
case ZMQ_EVENT_DISCONNECTED:
|
||||
free (event->data.disconnected.addr);
|
||||
break;
|
||||
}
|
||||
free (event_data);
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
// 0MQ utils - to be used by perf tests
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -149,7 +149,7 @@ int main (void)
|
||||
val = 0;
|
||||
rc = zmq_setsockopt(to, ZMQ_LINGER, &val, sizeof(val));
|
||||
assert (rc == 0);
|
||||
rc = zmq_bind(to, "tcp://*:5555");
|
||||
rc = zmq_bind(to, "tcp://*:6555");
|
||||
assert (rc == 0);
|
||||
|
||||
// Create a socket pushing to two endpoints - only 1 message should arrive.
|
||||
@ -158,9 +158,9 @@ int main (void)
|
||||
|
||||
val = 0;
|
||||
zmq_setsockopt (from, ZMQ_LINGER, &val, sizeof(val));
|
||||
rc = zmq_connect (from, "tcp://localhost:5556");
|
||||
rc = zmq_connect (from, "tcp://localhost:6556");
|
||||
assert (rc == 0);
|
||||
rc = zmq_connect (from, "tcp://localhost:5555");
|
||||
rc = zmq_connect (from, "tcp://localhost:6555");
|
||||
assert (rc == 0);
|
||||
|
||||
for (int i = 0; i < 10; ++i)
|
||||
|
Loading…
x
Reference in New Issue
Block a user