mirror of
https://github.com/zeromq/libzmq.git
synced 2025-03-09 15:26:04 +00:00
Merge branch 'master' of https://github.com/zeromq/libzmq
This commit is contained in:
commit
b6c97230ed
@ -64,6 +64,33 @@ extern "C" {
|
||||
#define ZMQ_VERSION \
|
||||
ZMQ_MAKE_VERSION(ZMQ_VERSION_MAJOR, ZMQ_VERSION_MINOR, ZMQ_VERSION_PATCH)
|
||||
|
||||
/* ensure one of ZMQ_TYPE_SAFE/UNSAFE is defined */
|
||||
/* Choose default based on version */
|
||||
|
||||
/* Uncomment to test */
|
||||
/* #define ZMQ_EMULATE_TYPE_SAFE */
|
||||
|
||||
#if !defined(ZMQ_TYPE_SAFE) && !defined(ZMQ_TYPE_UNSAFE)
|
||||
# if ZMQ_VERSION_MAJOR <= 3
|
||||
# if defined ZMQ_EMULATE_TYPE_SAFE
|
||||
# else
|
||||
# define ZMQ_TYPE_UNSAFE
|
||||
# endif
|
||||
# else
|
||||
# define ZMQ_TYPE_SAFE
|
||||
# endif
|
||||
#elif defined(ZMQ_TYPE_SAFE) && defined(ZMQ_TYPE_UNSAFE)
|
||||
# error "BOTH ZMQ_TYPE_SAFE and ZMQ_TYPE_UNSAFE are defined!"
|
||||
#endif
|
||||
|
||||
#ifdef ZMQ_TYPE_UNSAFE
|
||||
typedef void *zmq_socket_t;
|
||||
typedef void *zmq_ctx_t;
|
||||
#else
|
||||
typedef struct zmq_socket_t { void *data; } zmq_socket_t;
|
||||
typedef struct zmq_ctx_t { void *data; } zmq_ctx_t;
|
||||
#endif
|
||||
|
||||
/* Run-time API version detection */
|
||||
ZMQ_EXPORT void zmq_version (int *major, int *minor, int *patch);
|
||||
|
||||
@ -146,9 +173,9 @@ ZMQ_EXPORT int zmq_getmsgopt (zmq_msg_t *msg, int option, void *optval,
|
||||
/* 0MQ infrastructure (a.k.a. context) initialisation & termination. */
|
||||
/******************************************************************************/
|
||||
|
||||
ZMQ_EXPORT void *zmq_init (int io_threads);
|
||||
ZMQ_EXPORT void *zmq_init_thread_safe (int io_threads);
|
||||
ZMQ_EXPORT int zmq_term (void *context);
|
||||
ZMQ_EXPORT zmq_ctx_t zmq_init (int io_threads);
|
||||
ZMQ_EXPORT zmq_ctx_t zmq_init_thread_safe (int io_threads);
|
||||
ZMQ_EXPORT int zmq_term (zmq_ctx_t context);
|
||||
|
||||
/******************************************************************************/
|
||||
/* 0MQ socket definition. */
|
||||
@ -203,21 +230,21 @@ ZMQ_EXPORT int zmq_term (void *context);
|
||||
#define ZMQ_DONTWAIT 1
|
||||
#define ZMQ_SNDMORE 2
|
||||
|
||||
ZMQ_EXPORT void *zmq_socket (void *context, int type);
|
||||
ZMQ_EXPORT int zmq_close (void *s);
|
||||
ZMQ_EXPORT int zmq_setsockopt (void *s, int option, const void *optval,
|
||||
ZMQ_EXPORT zmq_socket_t zmq_socket (zmq_ctx_t context, int type);
|
||||
ZMQ_EXPORT int zmq_close (zmq_socket_t s);
|
||||
ZMQ_EXPORT int zmq_setsockopt (zmq_socket_t s, int option, const void *optval,
|
||||
size_t optvallen);
|
||||
ZMQ_EXPORT int zmq_getsockopt (void *s, int option, void *optval,
|
||||
ZMQ_EXPORT int zmq_getsockopt (zmq_socket_t s, int option, void *optval,
|
||||
size_t *optvallen);
|
||||
ZMQ_EXPORT int zmq_bind (void *s, const char *addr);
|
||||
ZMQ_EXPORT int zmq_connect (void *s, const char *addr);
|
||||
ZMQ_EXPORT int zmq_send (void *s, const void *buf, size_t len, int flags);
|
||||
ZMQ_EXPORT int zmq_recv (void *s, void *buf, size_t len, int flags);
|
||||
ZMQ_EXPORT int zmq_sendmsg (void *s, zmq_msg_t *msg, int flags);
|
||||
ZMQ_EXPORT int zmq_recvmsg (void *s, zmq_msg_t *msg, int flags);
|
||||
ZMQ_EXPORT int zmq_bind (zmq_socket_t s, const char *addr);
|
||||
ZMQ_EXPORT int zmq_connect (zmq_socket_t s, const char *addr);
|
||||
ZMQ_EXPORT int zmq_send (zmq_socket_t s, const void *buf, size_t len, int flags);
|
||||
ZMQ_EXPORT int zmq_recv (zmq_socket_t s, void *buf, size_t len, int flags);
|
||||
ZMQ_EXPORT int zmq_sendmsg (zmq_socket_t s, zmq_msg_t *msg, int flags);
|
||||
ZMQ_EXPORT int zmq_recvmsg (zmq_socket_t s, zmq_msg_t *msg, int flags);
|
||||
|
||||
ZMQ_EXPORT int zmq_sendv (void *s, struct iovec *iov, size_t count, int flags);
|
||||
ZMQ_EXPORT int zmq_recvmmsg (void *s, struct iovec *iov, size_t *count, int flags);
|
||||
ZMQ_EXPORT int zmq_sendv (zmq_socket_t s, struct iovec *iov, size_t count, int flags);
|
||||
ZMQ_EXPORT int zmq_recvmmsg (zmq_socket_t s, struct iovec *iov, size_t *count, int flags);
|
||||
|
||||
/******************************************************************************/
|
||||
/* I/O multiplexing. */
|
||||
@ -229,7 +256,7 @@ ZMQ_EXPORT int zmq_recvmmsg (void *s, struct iovec *iov, size_t *count, int flag
|
||||
|
||||
typedef struct
|
||||
{
|
||||
void *socket;
|
||||
zmq_socket_t socket;
|
||||
#if defined _WIN32
|
||||
SOCKET fd;
|
||||
#else
|
||||
|
98
src/zmq.cpp
98
src/zmq.cpp
@ -18,6 +18,8 @@
|
||||
You should have received a copy of the GNU Lesser General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
#define ZMQ_TYPE_UNSAFE
|
||||
#include "../include/zmq.h"
|
||||
|
||||
#include "platform.hpp"
|
||||
|
||||
@ -235,6 +237,10 @@ int zmq_setsockopt (void *s_, int option_, const void *optval_,
|
||||
errno = ENOTSOCK;
|
||||
return -1;
|
||||
}
|
||||
if (optvallen_ && !(optval_)) {
|
||||
errno = EINVAL;
|
||||
return -1;
|
||||
}
|
||||
zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
|
||||
if(s->thread_safe()) s->lock();
|
||||
int result = s->setsockopt (option_, optval_, optvallen_);
|
||||
@ -248,6 +254,10 @@ int zmq_getsockopt (void *s_, int option_, void *optval_, size_t *optvallen_)
|
||||
errno = ENOTSOCK;
|
||||
return -1;
|
||||
}
|
||||
if (optvallen_ && *optvallen_ && !(optval_)) {
|
||||
errno = EINVAL;
|
||||
return -1;
|
||||
}
|
||||
zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
|
||||
if(s->thread_safe()) s->lock();
|
||||
int result = s->getsockopt (option_, optval_, optvallen_);
|
||||
@ -285,6 +295,14 @@ int zmq_connect (void *s_, const char *addr_)
|
||||
|
||||
static int inner_sendmsg (zmq::socket_base_t *s_, zmq_msg_t *msg_, int flags_)
|
||||
{
|
||||
if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
|
||||
errno = ENOTSOCK;
|
||||
return -1;
|
||||
}
|
||||
if (!msg_) {
|
||||
errno = EINVAL;
|
||||
return -1;
|
||||
}
|
||||
int sz = (int) zmq_msg_size (msg_);
|
||||
int rc = s_->send ((zmq::msg_t*) msg_, flags_);
|
||||
if (unlikely (rc < 0))
|
||||
@ -298,6 +316,10 @@ int zmq_sendmsg (void *s_, zmq_msg_t *msg_, int flags_)
|
||||
errno = ENOTSOCK;
|
||||
return -1;
|
||||
}
|
||||
if (!msg_) {
|
||||
errno = EINVAL;
|
||||
return -1;
|
||||
}
|
||||
zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
|
||||
if(s->thread_safe()) s->lock();
|
||||
int result = inner_sendmsg (s, msg_, flags_);
|
||||
@ -311,6 +333,10 @@ int zmq_send (void *s_, const void *buf_, size_t len_, int flags_)
|
||||
errno = ENOTSOCK;
|
||||
return -1;
|
||||
}
|
||||
if (!buf_ || !len_) {
|
||||
errno = EINVAL;
|
||||
return -1;
|
||||
}
|
||||
zmq_msg_t msg;
|
||||
int rc = zmq_msg_init_size (&msg, len_);
|
||||
if (rc != 0)
|
||||
@ -346,6 +372,11 @@ int zmq_sendv (void *s_, iovec *a_, size_t count_, int flags_)
|
||||
errno = ENOTSOCK;
|
||||
return -1;
|
||||
}
|
||||
if (!a_) {
|
||||
errno = EINVAL;
|
||||
return -1;
|
||||
}
|
||||
|
||||
int rc = 0;
|
||||
zmq_msg_t msg;
|
||||
zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
|
||||
@ -377,9 +408,14 @@ int zmq_sendv (void *s_, iovec *a_, size_t count_, int flags_)
|
||||
|
||||
// Receiving functions.
|
||||
|
||||
static int inner_recvmsg (zmq::socket_base_t *s_, zmq_msg_t *msg_, int flags_)
|
||||
int inner_recvmsg (void *s_, zmq_msg_t *msg_, int flags_)
|
||||
{
|
||||
int rc = s_->recv ((zmq::msg_t*) msg_, flags_);
|
||||
if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
|
||||
errno = ENOTSOCK;
|
||||
return -1;
|
||||
}
|
||||
zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
|
||||
int rc = s->recv ((zmq::msg_t*) msg_, flags_);
|
||||
if (unlikely (rc < 0))
|
||||
return -1;
|
||||
return (int) zmq_msg_size (msg_);
|
||||
@ -405,6 +441,11 @@ int zmq_recv (void *s_, void *buf_, size_t len_, int flags_)
|
||||
errno = ENOTSOCK;
|
||||
return -1;
|
||||
}
|
||||
if (!buf_ || !len_) {
|
||||
errno = EINVAL;
|
||||
return -1;
|
||||
}
|
||||
|
||||
zmq_msg_t msg;
|
||||
int rc = zmq_msg_init (&msg);
|
||||
errno_assert (rc == 0);
|
||||
@ -458,6 +499,11 @@ int zmq_recvmmsg (void *s_, iovec *a_, size_t *count_, int flags_)
|
||||
errno = ENOTSOCK;
|
||||
return -1;
|
||||
}
|
||||
if (!a_ || !count_ || (a_ && !(*count_))) {
|
||||
errno = EINVAL;
|
||||
return -1;
|
||||
}
|
||||
|
||||
zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
|
||||
if(s->thread_safe()) s->lock();
|
||||
|
||||
@ -482,7 +528,7 @@ int zmq_recvmmsg (void *s_, iovec *a_, size_t *count_, int flags_)
|
||||
nread = -1;
|
||||
break;
|
||||
}
|
||||
++*count_;
|
||||
++(*count_);
|
||||
++nread;
|
||||
|
||||
// Cheat: acquire zmq_msg buffer.
|
||||
@ -500,48 +546,84 @@ int zmq_recvmmsg (void *s_, iovec *a_, size_t *count_, int flags_)
|
||||
|
||||
int zmq_msg_init (zmq_msg_t *msg_)
|
||||
{
|
||||
if (!msg_) {
|
||||
errno = EINVAL;
|
||||
return -1;
|
||||
}
|
||||
return ((zmq::msg_t*) msg_)->init ();
|
||||
}
|
||||
|
||||
int zmq_msg_init_size (zmq_msg_t *msg_, size_t size_)
|
||||
{
|
||||
if (!msg_ || !size_) {
|
||||
errno = EINVAL;
|
||||
return -1;
|
||||
}
|
||||
return ((zmq::msg_t*) msg_)->init_size (size_);
|
||||
}
|
||||
|
||||
int zmq_msg_init_data (zmq_msg_t *msg_, void *data_, size_t size_,
|
||||
zmq_free_fn *ffn_, void *hint_)
|
||||
{
|
||||
if (!msg_ || !data_ || !size_) {
|
||||
errno = EINVAL;
|
||||
return -1;
|
||||
}
|
||||
return ((zmq::msg_t*) msg_)->init_data (data_, size_, ffn_, hint_);
|
||||
}
|
||||
|
||||
int zmq_msg_close (zmq_msg_t *msg_)
|
||||
{
|
||||
if (!msg_) {
|
||||
errno = EINVAL;
|
||||
return -1;
|
||||
}
|
||||
return ((zmq::msg_t*) msg_)->close ();
|
||||
}
|
||||
|
||||
int zmq_msg_move (zmq_msg_t *dest_, zmq_msg_t *src_)
|
||||
{
|
||||
if (!dest_ || !src_) {
|
||||
errno = EINVAL;
|
||||
return -1;
|
||||
}
|
||||
return ((zmq::msg_t*) dest_)->move (*(zmq::msg_t*) src_);
|
||||
}
|
||||
|
||||
int zmq_msg_copy (zmq_msg_t *dest_, zmq_msg_t *src_)
|
||||
{
|
||||
if (!dest_ || !src_) {
|
||||
errno = EINVAL;
|
||||
return -1;
|
||||
}
|
||||
return ((zmq::msg_t*) dest_)->copy (*(zmq::msg_t*) src_);
|
||||
}
|
||||
|
||||
void *zmq_msg_data (zmq_msg_t *msg_)
|
||||
{
|
||||
if (!msg_) {
|
||||
errno = EINVAL;
|
||||
return NULL;
|
||||
}
|
||||
return ((zmq::msg_t*) msg_)->data ();
|
||||
}
|
||||
|
||||
size_t zmq_msg_size (zmq_msg_t *msg_)
|
||||
{
|
||||
if (!msg_) {
|
||||
errno = EINVAL;
|
||||
return -1;
|
||||
}
|
||||
return ((zmq::msg_t*) msg_)->size ();
|
||||
}
|
||||
|
||||
int zmq_getmsgopt (zmq_msg_t *msg_, int option_, void *optval_,
|
||||
size_t *optvallen_)
|
||||
{
|
||||
if (!msg_ || !optval_ || !optvallen_) {
|
||||
errno = EINVAL;
|
||||
return -1;
|
||||
}
|
||||
switch (option_) {
|
||||
case ZMQ_MORE:
|
||||
if (*optvallen_ < sizeof (int)) {
|
||||
@ -562,6 +644,11 @@ int zmq_getmsgopt (zmq_msg_t *msg_, int option_, void *optval_,
|
||||
|
||||
int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
|
||||
{
|
||||
if (!items_) {
|
||||
errno = EFAULT;
|
||||
return -1;
|
||||
}
|
||||
|
||||
#if defined ZMQ_POLL_BASED_ON_POLL
|
||||
if (unlikely (nitems_ < 0)) {
|
||||
errno = EINVAL;
|
||||
@ -581,11 +668,6 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
|
||||
#endif
|
||||
}
|
||||
|
||||
if (!items_) {
|
||||
errno = EFAULT;
|
||||
return -1;
|
||||
}
|
||||
|
||||
zmq::clock_t clock;
|
||||
uint64_t now = 0;
|
||||
uint64_t end = 0;
|
||||
|
Loading…
x
Reference in New Issue
Block a user