diff --git a/include/zmq.h b/include/zmq.h index 4a492ae9..656fa556 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -383,7 +383,8 @@ ZMQ_EXPORT int zmq_send (void *s, const void *buf, size_t len, int flags); ZMQ_EXPORT int zmq_send_const (void *s, const void *buf, size_t len, int flags); ZMQ_EXPORT int zmq_recv (void *s, void *buf, size_t len, int flags); ZMQ_EXPORT int zmq_socket_monitor (void *s, const char *addr, int events); - +ZMQ_EXPORT int zmq_add_poller (void *s, void *p); +ZMQ_EXPORT int zmq_remove_poller (void *s, void *p); /******************************************************************************/ /* I/O multiplexing. */ @@ -397,6 +398,7 @@ ZMQ_EXPORT int zmq_socket_monitor (void *s, const char *addr, int events); typedef struct zmq_pollitem_t { void *socket; + void *poller; #if defined _WIN32 SOCKET fd; #else @@ -408,7 +410,9 @@ typedef struct zmq_pollitem_t #define ZMQ_POLLITEMS_DFLT 16 -ZMQ_EXPORT int zmq_poll (zmq_pollitem_t *items, int nitems, long timeout); +ZMQ_EXPORT int zmq_poll (zmq_pollitem_t *items, int nitems, long timeout); +ZMQ_EXPORT void *zmq_poller_new (); +ZMQ_EXPORT int zmq_poller_close (void *p); /******************************************************************************/ /* Message proxying */ diff --git a/src/socket_base.cpp b/src/socket_base.cpp index f53bc59f..47289c61 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -441,6 +441,38 @@ int zmq::socket_base_t::getsockopt (int option_, void *optval_, return rc; } +int zmq::socket_base_t::add_signaler(signaler_t *s_) +{ + ENTER_MUTEX(); + + if (!thread_safe) { + errno = EINVAL; + EXIT_MUTEX(); + return -1; + } + + ((mailbox_safe_t*)mailbox)->add_signaler(s_); + + EXIT_MUTEX(); + return 0; +} + +int zmq::socket_base_t::remove_signaler(signaler_t *s_) +{ + ENTER_MUTEX(); + + if (!thread_safe) { + errno = EINVAL; + EXIT_MUTEX(); + return -1; + } + + ((mailbox_safe_t*)mailbox)->remove_signaler(s_); + + EXIT_MUTEX(); + return 0; +} + int zmq::socket_base_t::bind (const char *addr_) { ENTER_MUTEX(); diff --git a/src/socket_base.hpp b/src/socket_base.hpp index 56062b5b..bed81f2f 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -90,6 +90,8 @@ namespace zmq int term_endpoint (const char *addr_); int send (zmq::msg_t *msg_, int flags_); int recv (zmq::msg_t *msg_, int flags_); + int add_signaler (signaler_t *s); + int remove_signaler (signaler_t *s); int close (); // These functions are used by the polling mechanism to determine diff --git a/src/zmq.cpp b/src/zmq.cpp index 5f58b370..339c486c 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -74,6 +74,7 @@ struct iovec { #include "msg.hpp" #include "fd.hpp" #include "metadata.hpp" +#include "signaler.hpp" #if !defined ZMQ_HAVE_WINDOWS #include @@ -561,6 +562,34 @@ int zmq_recviov (void *s_, iovec *a_, size_t *count_, int flags_) return nread; } +// Add/remove poller from a socket + +int zmq_add_poller (void *s_, void *p_) +{ + if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) { + errno = ENOTSOCK; + return -1; + } + zmq::socket_base_t *s = (zmq::socket_base_t *) s_; + zmq::signaler_t *p = (zmq::signaler_t *) p_; + + return s->add_signaler(p); +} + +int zmq_remove_poller (void *s_, void *p_) +{ + if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) { + errno = ENOTSOCK; + return -1; + } + zmq::socket_base_t *s = (zmq::socket_base_t *) s_; + zmq::signaler_t *p = (zmq::signaler_t *) p_; + + return s->remove_signaler(p); +} + + + // Message manipulators. int zmq_msg_init (zmq_msg_t *msg_) @@ -680,6 +709,31 @@ const char *zmq_msg_gets (zmq_msg_t *msg_, const char *property_) } } +// Create poller + +void *zmq_poller_new () +{ + return new zmq::signaler_t (); +} + +// Close poller + +int zmq_poller_close (void* p) +{ + zmq::signaler_t *s = (zmq::signaler_t*)p; + delete s; + + return 0; +} + +// Get poller fd + +zmq::fd_t zmq_poller_get_fd (void *p) +{ + zmq::signaler_t *s = (zmq::signaler_t*)p; + return s->get_fd (); +} + // Polling. int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)