Merge pull request #18 from hintjens/master

Cherry picking changes from libzmq master
This commit is contained in:
Pieter Hintjens 2013-10-08 00:20:40 -07:00
commit c852620f5f
11 changed files with 159 additions and 127 deletions

1
.gitignore vendored
View File

@ -69,6 +69,7 @@ tests/test_inproc_connect
tests/test_linger tests/test_linger
tests/test_security_null tests/test_security_null
tests/test_security_plain tests/test_security_plain
tests/test_abstract_ipc
tests/test*.log tests/test*.log
tests/test*.trs tests/test*.trs
src/platform.hpp* src/platform.hpp*

View File

@ -48,6 +48,11 @@ NOTE: the endpoint pathname must be writable by the process. When the endpoint
starts with '/', e.g., `ipc:///pathname`, this will be an _absolute_ pathname. starts with '/', e.g., `ipc:///pathname`, this will be an _absolute_ pathname.
If the endpoint specifies a directory that does not exist, the bind shall fail. If the endpoint specifies a directory that does not exist, the bind shall fail.
NOTE: on Linux only, when the endpoint pathname starts with `@`, the abstract
namespace shall be used. The abstract namespace is independent of the
filesystem and if a process attempts to bind an endpoint already bound by a
process, it will fail. See unix(7) for details.
Connecting a socket Connecting a socket
~~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~~
When connecting a 'socket' to a peer address using _zmq_connect()_ with the When connecting a 'socket' to a peer address using _zmq_connect()_ with the

View File

@ -179,13 +179,18 @@ ZMQ_EXPORT const char *zmq_strerror (int errnum);
/******************************************************************************/ /******************************************************************************/
/* New API */ /* New API */
/* Context options */
#define ZMQ_IO_THREADS 1
#define ZMQ_MAX_SOCKETS 2
/* Default for new contexts */ enum zmq_ctx_opts_t {
#define ZMQ_IO_THREADS_DFLT 1 /* Context options */
#define ZMQ_MAX_SOCKETS_DFLT 1024 ZMQ_IO_THREADS = 1,
ZMQ_MAX_SOCKETS = 2
};
enum zmq_ctx_defaults_t {
/* Default for new contexts */
ZMQ_IO_THREADS_DFLT = 1,
ZMQ_MAX_SOCKETS_DFLT = 1024
};
ZMQ_EXPORT void *zmq_ctx_new (void); ZMQ_EXPORT void *zmq_ctx_new (void);
ZMQ_EXPORT int zmq_ctx_term (void *context); ZMQ_EXPORT int zmq_ctx_term (void *context);
@ -228,109 +233,110 @@ ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int option, int optval);
/******************************************************************************/ /******************************************************************************/
/* Socket types. */ /* Socket types. */
#define ZMQ_PAIR 0 enum zmq_socket_types_t {
#define ZMQ_PUB 1 ZMQ_PAIR = 0,
#define ZMQ_SUB 2 ZMQ_PUB = 1,
#define ZMQ_REQ 3 ZMQ_SUB = 2,
#define ZMQ_REP 4 ZMQ_REQ = 3,
#define ZMQ_DEALER 5 ZMQ_REP = 4,
#define ZMQ_ROUTER 6 ZMQ_DEALER = 5,
#define ZMQ_PULL 7 ZMQ_ROUTER = 6,
#define ZMQ_PUSH 8 ZMQ_PULL = 7,
#define ZMQ_XPUB 9 ZMQ_PUSH = 8,
#define ZMQ_XSUB 10 ZMQ_XPUB = 9,
#define ZMQ_STREAM 11 ZMQ_XSUB = 10,
ZMQ_STREAM = 11
/* Deprecated aliases */ };
#define ZMQ_XREQ ZMQ_DEALER
#define ZMQ_XREP ZMQ_ROUTER
/* Socket options. */ /* Socket options. */
#define ZMQ_AFFINITY 4 enum zmq_socket_opts_t {
#define ZMQ_IDENTITY 5 ZMQ_AFFINITY = 4,
#define ZMQ_SUBSCRIBE 6 ZMQ_IDENTITY = 5,
#define ZMQ_UNSUBSCRIBE 7 ZMQ_SUBSCRIBE = 6,
#define ZMQ_RATE 8 ZMQ_UNSUBSCRIBE = 7,
#define ZMQ_RECOVERY_IVL 9 ZMQ_RATE = 8,
#define ZMQ_SNDBUF 11 ZMQ_RECOVERY_IVL = 9,
#define ZMQ_RCVBUF 12 ZMQ_SNDBUF = 11,
#define ZMQ_RCVMORE 13 ZMQ_RCVBUF = 12,
#define ZMQ_FD 14 ZMQ_RCVMORE = 13,
#define ZMQ_EVENTS 15 ZMQ_FD = 14,
#define ZMQ_TYPE 16 ZMQ_EVENTS = 15,
#define ZMQ_LINGER 17 ZMQ_TYPE = 16,
#define ZMQ_RECONNECT_IVL 18 ZMQ_LINGER = 17,
#define ZMQ_BACKLOG 19 ZMQ_RECONNECT_IVL = 18,
#define ZMQ_RECONNECT_IVL_MAX 21 ZMQ_BACKLOG = 19,
#define ZMQ_MAXMSGSIZE 22 ZMQ_RECONNECT_IVL_MAX = 21,
#define ZMQ_SNDHWM 23 ZMQ_MAXMSGSIZE = 22,
#define ZMQ_RCVHWM 24 ZMQ_SNDHWM = 23,
#define ZMQ_MULTICAST_HOPS 25 ZMQ_RCVHWM = 24,
#define ZMQ_RCVTIMEO 27 ZMQ_MULTICAST_HOPS = 25,
#define ZMQ_SNDTIMEO 28 ZMQ_RCVTIMEO = 27,
#define ZMQ_LAST_ENDPOINT 32 ZMQ_SNDTIMEO = 28,
#define ZMQ_ROUTER_MANDATORY 33 ZMQ_LAST_ENDPOINT = 32,
#define ZMQ_TCP_KEEPALIVE 34 ZMQ_ROUTER_MANDATORY = 33,
#define ZMQ_TCP_KEEPALIVE_CNT 35 ZMQ_TCP_KEEPALIVE = 34,
#define ZMQ_TCP_KEEPALIVE_IDLE 36 ZMQ_TCP_KEEPALIVE_CNT = 35,
#define ZMQ_TCP_KEEPALIVE_INTVL 37 ZMQ_TCP_KEEPALIVE_IDLE = 36,
#define ZMQ_TCP_ACCEPT_FILTER 38 ZMQ_TCP_KEEPALIVE_INTVL = 37,
#define ZMQ_IMMEDIATE 39 ZMQ_TCP_ACCEPT_FILTER = 38,
#define ZMQ_XPUB_VERBOSE 40 ZMQ_IMMEDIATE = 39,
#define ZMQ_ROUTER_RAW 41 ZMQ_XPUB_VERBOSE = 40,
#define ZMQ_IPV6 42 ZMQ_ROUTER_RAW = 41,
#define ZMQ_MECHANISM 43 ZMQ_IPV6 = 42,
#define ZMQ_PLAIN_SERVER 44 ZMQ_MECHANISM = 43,
#define ZMQ_PLAIN_USERNAME 45 ZMQ_PLAIN_SERVER = 44,
#define ZMQ_PLAIN_PASSWORD 46 ZMQ_PLAIN_USERNAME = 45,
#define ZMQ_CURVE_SERVER 47 ZMQ_PLAIN_PASSWORD = 46,
#define ZMQ_CURVE_PUBLICKEY 48 ZMQ_CURVE_SERVER = 47,
#define ZMQ_CURVE_SECRETKEY 49 ZMQ_CURVE_PUBLICKEY = 48,
#define ZMQ_CURVE_SERVERKEY 50 ZMQ_CURVE_SECRETKEY = 49,
#define ZMQ_PROBE_ROUTER 51 ZMQ_CURVE_SERVERKEY = 50,
#define ZMQ_REQ_CORRELATE 52 ZMQ_PROBE_ROUTER = 51,
#define ZMQ_REQ_RELAXED 53 ZMQ_REQ_CORRELATE = 52,
#define ZMQ_CONFLATE 54 ZMQ_REQ_RELAXED = 53,
#define ZMQ_ZAP_DOMAIN 55 ZMQ_CONFLATE = 54,
ZMQ_ZAP_DOMAIN = 55
};
/* Message options */ /* Message options */
#define ZMQ_MORE 1 enum zmq_msg_opts_t {
ZMQ_MORE = 1
};
/* Send/recv options. */ /* Send/recv options. */
#define ZMQ_DONTWAIT 1 enum zmq_send_recv_opts_t {
#define ZMQ_SNDMORE 2 ZMQ_DONTWAIT = 1,
ZMQ_SNDMORE = 2
};
/* Security mechanisms */ /* Security mechanisms */
#define ZMQ_NULL 0 enum zmq_security_types_t {
#define ZMQ_PLAIN 1 ZMQ_NULL = 0,
#define ZMQ_CURVE 2 ZMQ_PLAIN = 1,
ZMQ_CURVE = 2
/* Deprecated options and aliases */ };
#define ZMQ_IPV4ONLY 31
#define ZMQ_DELAY_ATTACH_ON_CONNECT ZMQ_IMMEDIATE
#define ZMQ_NOBLOCK ZMQ_DONTWAIT
#define ZMQ_FAIL_UNROUTABLE ZMQ_ROUTER_MANDATORY
#define ZMQ_ROUTER_BEHAVIOR ZMQ_ROUTER_MANDATORY
/******************************************************************************/ /******************************************************************************/
/* 0MQ socket events and monitoring */ /* 0MQ socket events and monitoring */
/******************************************************************************/ /******************************************************************************/
/* Socket transport events (tcp and ipc only) */ /* Socket transport events (tcp and ipc only) */
#define ZMQ_EVENT_CONNECTED 1 enum zmq_transport_events_t {
#define ZMQ_EVENT_CONNECT_DELAYED 2 ZMQ_EVENT_CONNECTED = 1,
#define ZMQ_EVENT_CONNECT_RETRIED 4 ZMQ_EVENT_CONNECT_DELAYED = 2,
ZMQ_EVENT_CONNECT_RETRIED = 4,
#define ZMQ_EVENT_LISTENING 8 ZMQ_EVENT_LISTENING = 8,
#define ZMQ_EVENT_BIND_FAILED 16 ZMQ_EVENT_BIND_FAILED = 16,
#define ZMQ_EVENT_ACCEPTED 32 ZMQ_EVENT_ACCEPTED = 32,
#define ZMQ_EVENT_ACCEPT_FAILED 64 ZMQ_EVENT_ACCEPT_FAILED = 64,
#define ZMQ_EVENT_CLOSED 128 ZMQ_EVENT_CLOSED = 128,
#define ZMQ_EVENT_CLOSE_FAILED 256 ZMQ_EVENT_CLOSE_FAILED = 256,
#define ZMQ_EVENT_DISCONNECTED 512 ZMQ_EVENT_DISCONNECTED = 512,
#define ZMQ_EVENT_MONITOR_STOPPED 1024 ZMQ_EVENT_MONITOR_STOPPED = 1024
};
#define ZMQ_EVENT_ALL ( ZMQ_EVENT_CONNECTED | ZMQ_EVENT_CONNECT_DELAYED | \ #define ZMQ_EVENT_ALL ( ZMQ_EVENT_CONNECTED | ZMQ_EVENT_CONNECT_DELAYED | \
ZMQ_EVENT_CONNECT_RETRIED | ZMQ_EVENT_LISTENING | \ ZMQ_EVENT_CONNECT_RETRIED | ZMQ_EVENT_LISTENING | \
@ -373,9 +379,15 @@ ZMQ_EXPORT int zmq_recviov (void *s, struct iovec *iov, size_t *count, int flags
/* I/O multiplexing. */ /* I/O multiplexing. */
/******************************************************************************/ /******************************************************************************/
#define ZMQ_POLLIN 1 enum zmq_poll_types_t {
#define ZMQ_POLLOUT 2 ZMQ_POLLIN = 1,
#define ZMQ_POLLERR 4 ZMQ_POLLOUT = 2,
ZMQ_POLLERR = 4
};
enum zmq_poll_defaults_t {
ZMQ_POLLITEMS_DFLT = 16
};
typedef struct typedef struct
{ {
@ -389,8 +401,6 @@ typedef struct
short revents; short revents;
} zmq_pollitem_t; } zmq_pollitem_t;
#define ZMQ_POLLITEMS_DFLT 16
ZMQ_EXPORT int zmq_poll (zmq_pollitem_t *items, int nitems, long timeout); ZMQ_EXPORT int zmq_poll (zmq_pollitem_t *items, int nitems, long timeout);
/* Built-in message proxy (3-way) */ /* Built-in message proxy (3-way) */
@ -403,13 +413,28 @@ ZMQ_EXPORT char *zmq_z85_encode (char *dest, uint8_t *data, size_t size);
/* Encode a binary key from printable text per ZMQ RFC 32 */ /* Encode a binary key from printable text per ZMQ RFC 32 */
ZMQ_EXPORT uint8_t *zmq_z85_decode (uint8_t *dest, char *string); ZMQ_EXPORT uint8_t *zmq_z85_decode (uint8_t *dest, char *string);
/* Deprecated aliases */
#define ZMQ_STREAMER 1
#define ZMQ_FORWARDER 2
#define ZMQ_QUEUE 3
/* Deprecated method */ /* Deprecated method */
ZMQ_EXPORT int zmq_device (int type, void *frontend, void *backend); ZMQ_EXPORT int zmq_device (int type, void *frontend, void *backend);
/* Deprecated options and aliases */
enum zmq_deprecated_t {
/* Misc */
ZMQ_IPV4ONLY = 31,
ZMQ_DELAY_ATTACH_ON_CONNECT = ZMQ_IMMEDIATE,
ZMQ_NOBLOCK = ZMQ_DONTWAIT,
ZMQ_FAIL_UNROUTABLE = ZMQ_ROUTER_MANDATORY,
ZMQ_ROUTER_BEHAVIOR = ZMQ_ROUTER_MANDATORY,
/* Socket aliases */
ZMQ_XREQ=ZMQ_DEALER,
ZMQ_XREP=ZMQ_ROUTER,
/* I/O aliasses */
ZMQ_STREAMER = 1,
ZMQ_FORWARDER = 2,
ZMQ_QUEUE = 3
};
#undef ZMQ_EXPORT #undef ZMQ_EXPORT
#ifdef __cplusplus #ifdef __cplusplus

View File

@ -74,14 +74,12 @@ zmq::ctx_t::~ctx_t ()
delete io_threads [i]; delete io_threads [i];
// Deallocate the reaper thread object. // Deallocate the reaper thread object.
if (reaper) delete reaper;
delete reaper;
// Deallocate the array of mailboxes. No special work is // Deallocate the array of mailboxes. No special work is
// needed as mailboxes themselves were deallocated with their // needed as mailboxes themselves were deallocated with their
// corresponding io_thread/socket objects. // corresponding io_thread/socket objects.
if (slots) free (slots);
free (slots);
// Remove the tag, so that the object is considered dead. // Remove the tag, so that the object is considered dead.
tag = ZMQ_CTX_TAG_VALUE_BAD; tag = ZMQ_CTX_TAG_VALUE_BAD;

View File

@ -132,8 +132,8 @@ namespace zmq
sockets_t sockets; sockets_t sockets;
// List of unused thread slots. // List of unused thread slots.
typedef std::vector <uint32_t> emtpy_slots_t; typedef std::vector <uint32_t> empty_slots_t;
emtpy_slots_t empty_slots; empty_slots_t empty_slots;
// If true, zmq_init has been called but no socket has been created // If true, zmq_init has been called but no socket has been created
// yet. Launching of I/O threads is delayed. // yet. Launching of I/O threads is delayed.

View File

@ -51,11 +51,18 @@ int zmq::ipc_address_t::resolve (const char *path_)
errno = ENAMETOOLONG; errno = ENAMETOOLONG;
return -1; return -1;
} }
#if defined ZMQ_HAVE_LINUX
if (path_[0] == '@' && !path_[1]) {
errno = EINVAL;
return -1;
}
#endif
address.sun_family = AF_UNIX; address.sun_family = AF_UNIX;
strcpy (address.sun_path, path_); strcpy (address.sun_path, path_);
#if defined ZMQ_HAVE_LINUX #if defined ZMQ_HAVE_LINUX
if (*path_ == '@') /* Abstract sockets on Linux start with '\0' */
if (path_[0] == '@')
*address.sun_path = '\0'; *address.sun_path = '\0';
#endif #endif
return 0; return 0;
@ -73,10 +80,10 @@ int zmq::ipc_address_t::to_string (std::string &addr_)
s << "ipc://" << address.sun_path; s << "ipc://" << address.sun_path;
#else #else
s << "ipc://"; s << "ipc://";
if (*address.sun_path) if (!address.sun_path[0] && address.sun_path[1])
s << address.sun_path;
else
s << "@" << address.sun_path + 1; s << "@" << address.sun_path + 1;
else
s << address.sun_path;
#endif #endif
addr_ = s.str (); addr_ = s.str ();
return 0; return 0;
@ -89,6 +96,10 @@ const sockaddr *zmq::ipc_address_t::addr () const
socklen_t zmq::ipc_address_t::addrlen () const socklen_t zmq::ipc_address_t::addrlen () const
{ {
#if defined ZMQ_HAVE_LINUX
if (!address.sun_path[0] && address.sun_path[1])
return (socklen_t) strlen(address.sun_path + 1) + sizeof (sa_family_t) + 1;
#endif
return (socklen_t) sizeof (address); return (socklen_t) sizeof (address);
} }

View File

@ -54,8 +54,7 @@ zmq::mtrie_t::~mtrie_t ()
else else
if (count > 1) { if (count > 1) {
for (unsigned short i = 0; i != count; ++i) for (unsigned short i = 0; i != count; ++i)
if (next.table [i]) delete next.table [i];
delete next.table [i];
free (next.table); free (next.table);
} }
} }

View File

@ -96,8 +96,7 @@ zmq::session_base_t::~session_base_t ()
if (engine) if (engine)
engine->terminate (); engine->terminate ();
if (addr) delete addr;
delete addr;
} }
void zmq::session_base_t::attach_pipe (pipe_t *pipe_) void zmq::session_base_t::attach_pipe (pipe_t *pipe_)

View File

@ -114,12 +114,9 @@ zmq::stream_engine_t::~stream_engine_t ()
int rc = tx_msg.close (); int rc = tx_msg.close ();
errno_assert (rc == 0); errno_assert (rc == 0);
if (encoder != NULL) delete encoder;
delete encoder; delete decoder;
if (decoder != NULL) delete mechanism;
delete decoder;
if (mechanism != NULL)
delete mechanism;
} }
void zmq::stream_engine_t::plug (io_thread_t *io_thread_, void zmq::stream_engine_t::plug (io_thread_t *io_thread_,

View File

@ -48,8 +48,7 @@ zmq::trie_t::~trie_t ()
else else
if (count > 1) { if (count > 1) {
for (unsigned short i = 0; i != count; ++i) for (unsigned short i = 0; i != count; ++i)
if (next.table [i]) delete next.table [i];
delete next.table [i];
free (next.table); free (next.table);
} }
} }

View File

@ -72,8 +72,7 @@ namespace zmq
} }
chunk_t *sc = spare_chunk.xchg (NULL); chunk_t *sc = spare_chunk.xchg (NULL);
if (sc) free (sc);
free (sc);
} }
// Returns reference to the front element of the queue. // Returns reference to the front element of the queue.
@ -156,8 +155,7 @@ namespace zmq
// so for cache reasons we'll get rid of the spare and // so for cache reasons we'll get rid of the spare and
// use 'o' as the spare. // use 'o' as the spare.
chunk_t *cs = spare_chunk.xchg (o); chunk_t *cs = spare_chunk.xchg (o);
if (cs) free (cs);
free (cs);
} }
} }