0
0
mirror of https://github.com/zeromq/libzmq.git synced 2024-12-27 07:31:03 +08:00

Problem: Source files contain mixed tabs and spaces.

Solution: Convert to spaces and remove trailing whitespace in these files.
This commit is contained in:
Joe Eli McIlvain 2015-08-20 07:46:34 -07:00
parent 6aa5c20b3d
commit 61217a2686
22 changed files with 397 additions and 398 deletions

View File

@ -397,7 +397,7 @@ ZMQ_EXPORT int zmq_remove_pollfd (void *s, void *p);
typedef struct zmq_pollitem_t typedef struct zmq_pollitem_t
{ {
void *socket; void *socket;
#if defined _WIN32 #if defined _WIN32
SOCKET fd; SOCKET fd;
#else #else
@ -426,7 +426,7 @@ ZMQ_EXPORT SOCKET zmq_pollfd_fd (void *p);
#else #else
ZMQ_EXPORT int zmq_pollfd_fd (void *p); ZMQ_EXPORT int zmq_pollfd_fd (void *p);
#endif #endif
/******************************************************************************/ /******************************************************************************/
/* Message proxying */ /* Message proxying */
/******************************************************************************/ /******************************************************************************/
@ -517,8 +517,8 @@ ZMQ_EXPORT void zmq_threadclose (void* thread);
#define ZMQ_UNUSED(object) (void)object #define ZMQ_UNUSED(object) (void)object
#define LIBZMQ_DELETE(p_object) {\ #define LIBZMQ_DELETE(p_object) {\
delete p_object; \ delete p_object; \
p_object = 0; \ p_object = 0; \
} }
#undef ZMQ_EXPORT #undef ZMQ_EXPORT

View File

@ -45,10 +45,10 @@ namespace std
{ {
typedef unsigned char char_type; typedef unsigned char char_type;
// Unsigned as wint_t in unsigned. // Unsigned as wint_t in unsigned.
typedef unsigned long int_type; typedef unsigned long int_type;
typedef streampos pos_type; typedef streampos pos_type;
typedef streamoff off_type; typedef streamoff off_type;
typedef mbstate_t state_type; typedef mbstate_t state_type;
static void static void
assign(char_type& __c1, const char_type& __c2) assign(char_type& __c1, const char_type& __c2)

View File

@ -149,16 +149,16 @@ uint64_t zmq::clock_t::now_us ()
// Use POSIX clock_gettime function to get precise monotonic time. // Use POSIX clock_gettime function to get precise monotonic time.
struct timespec tv; struct timespec tv;
int rc = clock_gettime (CLOCK_MONOTONIC, &tv); int rc = clock_gettime (CLOCK_MONOTONIC, &tv);
// Fix case where system has clock_gettime but CLOCK_MONOTONIC is not supported. // Fix case where system has clock_gettime but CLOCK_MONOTONIC is not supported.
// This should be a configuration check, but I looked into it and writing an // This should be a configuration check, but I looked into it and writing an
// AC_FUNC_CLOCK_MONOTONIC seems beyond my powers. // AC_FUNC_CLOCK_MONOTONIC seems beyond my powers.
if( rc != 0) { if( rc != 0) {
// Use POSIX gettimeofday function to get precise time. // Use POSIX gettimeofday function to get precise time.
struct timeval tv; struct timeval tv;
int rc = gettimeofday (&tv, NULL); int rc = gettimeofday (&tv, NULL);
errno_assert (rc == 0); errno_assert (rc == 0);
return (tv.tv_sec * (uint64_t) 1000000 + tv.tv_usec); return (tv.tv_sec * (uint64_t) 1000000 + tv.tv_usec);
} }
return (tv.tv_sec * (uint64_t) 1000000 + tv.tv_nsec / 1000); return (tv.tv_sec * (uint64_t) 1000000 + tv.tv_nsec / 1000);
#elif defined HAVE_GETHRTIME #elif defined HAVE_GETHRTIME

View File

@ -47,36 +47,36 @@
namespace zmq namespace zmq
{ {
class condition_variable_t class condition_variable_t
{ {
public: public:
inline condition_variable_t () inline condition_variable_t ()
{ {
zmq_assert(false); zmq_assert(false);
} }
inline ~condition_variable_t () inline ~condition_variable_t ()
{ {
} }
inline int wait (mutex_t* mutex_, int timeout_ ) inline int wait (mutex_t* mutex_, int timeout_ )
{ {
zmq_assert(false); zmq_assert(false);
return -1; return -1;
} }
inline void broadcast () inline void broadcast ()
{ {
zmq_assert(false); zmq_assert(false);
} }
private: private:
// Disable copy construction and assignment. // Disable copy construction and assignment.
condition_variable_t (const condition_variable_t&); condition_variable_t (const condition_variable_t&);
void operator = (const condition_variable_t&); void operator = (const condition_variable_t&);
}; };
} }
@ -95,7 +95,7 @@ namespace zmq
inline ~condition_variable_t () inline ~condition_variable_t ()
{ {
} }
inline int wait (mutex_t* mutex_, int timeout_ ) inline int wait (mutex_t* mutex_, int timeout_ )
@ -110,7 +110,7 @@ namespace zmq
if (rc != ERROR_TIMEOUT) if (rc != ERROR_TIMEOUT)
win_assert(rc); win_assert(rc);
errno = EAGAIN; errno = EAGAIN;
return -1; return -1;
} }
@ -161,9 +161,9 @@ namespace zmq
if (timeout_ != -1) { if (timeout_ != -1) {
struct timespec timeout; struct timespec timeout;
clock_gettime(CLOCK_REALTIME, &timeout); clock_gettime(CLOCK_REALTIME, &timeout);
timeout.tv_sec += timeout_ / 1000; timeout.tv_sec += timeout_ / 1000;
timeout.tv_nsec += (timeout_ % 1000) * 1000000; timeout.tv_nsec += (timeout_ % 1000) * 1000000;
rc = pthread_cond_timedwait (&cond, mutex_->get_mutex (), &timeout); rc = pthread_cond_timedwait (&cond, mutex_->get_mutex (), &timeout);
} }
else else

View File

@ -86,7 +86,7 @@ namespace zmq
// (except zmq_close). // (except zmq_close).
// This function is non-blocking. // This function is non-blocking.
// terminate must still be called afterwards. // terminate must still be called afterwards.
// This function is optional, terminate will unblock any current // This function is optional, terminate will unblock any current
// operations as well. // operations as well.
int shutdown(); int shutdown();
@ -98,7 +98,7 @@ namespace zmq
zmq::socket_base_t *create_socket (int type_); zmq::socket_base_t *create_socket (int type_);
void destroy_socket (zmq::socket_base_t *socket_); void destroy_socket (zmq::socket_base_t *socket_);
// Start a new thread with proper scheduling parameters. // Start a new thread with proper scheduling parameters.
void start_thread (thread_t &thread_, thread_fn *tfn_, void *arg_) const; void start_thread (thread_t &thread_, thread_fn *tfn_, void *arg_) const;
// Send command to the destination thread. // Send command to the destination thread.
@ -203,7 +203,7 @@ namespace zmq
// Is IPv6 enabled on this context? // Is IPv6 enabled on this context?
bool ipv6; bool ipv6;
// Thread scheduling parameters. // Thread scheduling parameters.
int thread_priority; int thread_priority;
int thread_sched_policy; int thread_sched_policy;

View File

@ -178,7 +178,7 @@ int zmq::gssapi_mechanism_base_t::decode_message (msg_t *msg_)
const uint8_t flags = static_cast <char *> (plaintext.value)[0]; const uint8_t flags = static_cast <char *> (plaintext.value)[0];
if (flags & 0x01) if (flags & 0x01)
msg_->set_flags (msg_t::more); msg_->set_flags (msg_t::more);
if (flags & 0x02) if (flags & 0x02)
msg_->set_flags (msg_t::command); msg_->set_flags (msg_t::command);

View File

@ -123,7 +123,7 @@ int zmq::gssapi_server_t::process_handshake_command (msg_t *msg_)
} }
if (security_context_established) { if (security_context_established) {
// Use ZAP protocol (RFC 27) to authenticate the user. // Use ZAP protocol (RFC 27) to authenticate the user.
bool expecting_zap_reply = false; bool expecting_zap_reply = false;
int rc = session->zap_connect (); int rc = session->zap_connect ();
if (rc == 0) { if (rc == 0) {

View File

@ -85,8 +85,8 @@ namespace zmq
void accept_context (); void accept_context ();
int produce_next_token (msg_t *msg_); int produce_next_token (msg_t *msg_);
int process_next_token (msg_t *msg_); int process_next_token (msg_t *msg_);
void send_zap_request (); void send_zap_request ();
int receive_and_process_zap_reply(); int receive_and_process_zap_reply();
}; };
} }

View File

@ -96,7 +96,7 @@ bool zmq::mtrie_t::add_helper (unsigned char *prefix_, size_t size_,
count = 1; count = 1;
next.node = NULL; next.node = NULL;
} }
else else
if (count == 1) { if (count == 1) {
unsigned char oldc = min; unsigned char oldc = min;
mtrie_t *oldp = next.node; mtrie_t *oldp = next.node;
@ -109,7 +109,7 @@ bool zmq::mtrie_t::add_helper (unsigned char *prefix_, size_t size_,
min = std::min (min, c); min = std::min (min, c);
next.table [oldc - min] = oldp; next.table [oldc - min] = oldp;
} }
else else
if (min < c) { if (min < c) {
// The new character is above the current character range. // The new character is above the current character range.
unsigned short old_count = count; unsigned short old_count = count;
@ -252,7 +252,7 @@ void zmq::mtrie_t::rm_helper (pipe_t *pipe_, unsigned char **buff_,
count = 0; count = 0;
} }
// Compact the node table if possible // Compact the node table if possible
else else
if (live_nodes == 1) { if (live_nodes == 1) {
// If there's only one live node in the table we can // If there's only one live node in the table we can
// switch to using the more compact single-node // switch to using the more compact single-node
@ -412,16 +412,16 @@ void zmq::mtrie_t::match (unsigned char *data_, size_t size_,
break; break;
// If there's one subnode (optimisation). // If there's one subnode (optimisation).
if (current->count == 1) { if (current->count == 1) {
if (data_ [0] != current->min) if (data_ [0] != current->min)
break; break;
current = current->next.node; current = current->next.node;
data_++; data_++;
size_--; size_--;
continue; continue;
} }
// If there are multiple subnodes. // If there are multiple subnodes.
if (data_ [0] < current->min || data_ [0] >= if (data_ [0] < current->min || data_ [0] >=
current->min + current->count) current->min + current->count)
break; break;

View File

@ -82,7 +82,7 @@ int zmq::pgm_socket_t::init_address (const char *network_,
} }
*port_number = atoi (port_delim + 1); *port_number = atoi (port_delim + 1);
char network [256]; char network [256];
if (port_delim - network_ >= (int) sizeof (network) - 1) { if (port_delim - network_ >= (int) sizeof (network) - 1) {
errno = EINVAL; errno = EINVAL;
@ -195,24 +195,24 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_)
} }
{ {
const int rcvbuf = (int) options.rcvbuf; const int rcvbuf = (int) options.rcvbuf;
if (rcvbuf >= 0) { if (rcvbuf >= 0) {
if (!pgm_setsockopt (sock, SOL_SOCKET, SO_RCVBUF, &rcvbuf, if (!pgm_setsockopt (sock, SOL_SOCKET, SO_RCVBUF, &rcvbuf,
sizeof (rcvbuf))) sizeof (rcvbuf)))
goto err_abort; goto err_abort;
} }
const int sndbuf = (int) options.sndbuf; const int sndbuf = (int) options.sndbuf;
if (sndbuf >= 0) { if (sndbuf >= 0) {
if (!pgm_setsockopt (sock, SOL_SOCKET, SO_SNDBUF, &sndbuf, if (!pgm_setsockopt (sock, SOL_SOCKET, SO_SNDBUF, &sndbuf,
sizeof (sndbuf))) sizeof (sndbuf)))
goto err_abort; goto err_abort;
} }
const int max_tpdu = (int) pgm_max_tpdu; const int max_tpdu = (int) pgm_max_tpdu;
if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_MTU, &max_tpdu, if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_MTU, &max_tpdu,
sizeof (max_tpdu))) sizeof (max_tpdu)))
goto err_abort; goto err_abort;
} }
if (receiver) { if (receiver) {
@ -334,28 +334,28 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_)
// Set IP level parameters. // Set IP level parameters.
{ {
// Multicast loopback disabled by default // Multicast loopback disabled by default
const int multicast_loop = 0; const int multicast_loop = 0;
if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_MULTICAST_LOOP, if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_MULTICAST_LOOP,
&multicast_loop, sizeof (multicast_loop))) &multicast_loop, sizeof (multicast_loop)))
goto err_abort; goto err_abort;
const int multicast_hops = options.multicast_hops; const int multicast_hops = options.multicast_hops;
if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_MULTICAST_HOPS, if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_MULTICAST_HOPS,
&multicast_hops, sizeof (multicast_hops))) &multicast_hops, sizeof (multicast_hops)))
goto err_abort; goto err_abort;
// Expedited Forwarding PHB for network elements, no ECN. // Expedited Forwarding PHB for network elements, no ECN.
// Ignore return value due to varied runtime support. // Ignore return value due to varied runtime support.
const int dscp = 0x2e << 2; const int dscp = 0x2e << 2;
if (AF_INET6 != sa_family) if (AF_INET6 != sa_family)
pgm_setsockopt (sock, IPPROTO_PGM, PGM_TOS, pgm_setsockopt (sock, IPPROTO_PGM, PGM_TOS,
&dscp, sizeof (dscp)); &dscp, sizeof (dscp));
const int nonblocking = 1; const int nonblocking = 1;
if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_NOBLOCK, if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_NOBLOCK,
&nonblocking, sizeof (nonblocking))) &nonblocking, sizeof (nonblocking)))
goto err_abort; goto err_abort;
} }
// Connect PGM transport to start state machine. // Connect PGM transport to start state machine.
@ -402,13 +402,13 @@ zmq::pgm_socket_t::~pgm_socket_t ()
{ {
if (pgm_msgv) if (pgm_msgv)
free (pgm_msgv); free (pgm_msgv);
if (sock) if (sock)
pgm_close (sock, TRUE); pgm_close (sock, TRUE);
} }
// Get receiver fds. receive_fd_ is signaled for incoming packets, // Get receiver fds. receive_fd_ is signaled for incoming packets,
// waiting_pipe_fd_ is signaled for state driven events and data. // waiting_pipe_fd_ is signaled for state driven events and data.
void zmq::pgm_socket_t::get_receiver_fds (fd_t *receive_fd_, void zmq::pgm_socket_t::get_receiver_fds (fd_t *receive_fd_,
fd_t *waiting_pipe_fd_) fd_t *waiting_pipe_fd_)
{ {
socklen_t socklen; socklen_t socklen;
@ -430,12 +430,12 @@ void zmq::pgm_socket_t::get_receiver_fds (fd_t *receive_fd_,
zmq_assert (socklen == sizeof (*waiting_pipe_fd_)); zmq_assert (socklen == sizeof (*waiting_pipe_fd_));
} }
// Get fds and store them into user allocated memory. // Get fds and store them into user allocated memory.
// send_fd is for non-blocking send wire notifications. // send_fd is for non-blocking send wire notifications.
// receive_fd_ is for incoming back-channel protocol packets. // receive_fd_ is for incoming back-channel protocol packets.
// rdata_notify_fd_ is raised for waiting repair transmissions. // rdata_notify_fd_ is raised for waiting repair transmissions.
// pending_notify_fd_ is for state driven events. // pending_notify_fd_ is for state driven events.
void zmq::pgm_socket_t::get_sender_fds (fd_t *send_fd_, fd_t *receive_fd_, void zmq::pgm_socket_t::get_sender_fds (fd_t *send_fd_, fd_t *receive_fd_,
fd_t *rdata_notify_fd_, fd_t *pending_notify_fd_) fd_t *rdata_notify_fd_, fd_t *pending_notify_fd_)
{ {
socklen_t socklen; socklen_t socklen;
@ -475,7 +475,7 @@ void zmq::pgm_socket_t::get_sender_fds (fd_t *send_fd_, fd_t *receive_fd_,
size_t zmq::pgm_socket_t::send (unsigned char *data_, size_t data_len_) size_t zmq::pgm_socket_t::send (unsigned char *data_, size_t data_len_)
{ {
size_t nbytes = 0; size_t nbytes = 0;
const int status = pgm_send (sock, data_, data_len_, &nbytes); const int status = pgm_send (sock, data_, data_len_, &nbytes);
// We have to write all data as one packet. // We have to write all data as one packet.
@ -551,7 +551,7 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_)
{ {
size_t raw_data_len = 0; size_t raw_data_len = 0;
// We just sent all data from pgm_transport_recvmsgv up // We just sent all data from pgm_transport_recvmsgv up
// and have to return 0 that another engine in this thread is scheduled. // and have to return 0 that another engine in this thread is scheduled.
if (nbytes_rec == nbytes_processed && nbytes_rec > 0) { if (nbytes_rec == nbytes_processed && nbytes_rec > 0) {
@ -572,7 +572,7 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_)
zmq_assert (nbytes_processed == 0); zmq_assert (nbytes_processed == 0);
zmq_assert (nbytes_rec == 0); zmq_assert (nbytes_rec == 0);
// Receive a vector of Application Protocol Domain Unit's (APDUs) // Receive a vector of Application Protocol Domain Unit's (APDUs)
// from the transport. // from the transport.
pgm_error_t *pgm_error = NULL; pgm_error_t *pgm_error = NULL;
@ -590,7 +590,7 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_)
zmq_assert (nbytes_rec == 0); zmq_assert (nbytes_rec == 0);
// In case if no RDATA/ODATA caused POLLIN 0 is // In case if no RDATA/ODATA caused POLLIN 0 is
// returned. // returned.
nbytes_rec = 0; nbytes_rec = 0;
errno = EBUSY; errno = EBUSY;
@ -646,8 +646,8 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_)
// Only one APDU per pgm_msgv_t structure is allowed. // Only one APDU per pgm_msgv_t structure is allowed.
zmq_assert (pgm_msgv [pgm_msgv_processed].msgv_len == 1); zmq_assert (pgm_msgv [pgm_msgv_processed].msgv_len == 1);
struct pgm_sk_buff_t* skb = struct pgm_sk_buff_t* skb =
pgm_msgv [pgm_msgv_processed].msgv_skb [0]; pgm_msgv [pgm_msgv_processed].msgv_skb [0];
// Take pointers from pgm_msgv_t structure. // Take pointers from pgm_msgv_t structure.
@ -679,7 +679,7 @@ void zmq::pgm_socket_t::process_upstream ()
zmq_assert (status != PGM_IO_STATUS_ERROR); zmq_assert (status != PGM_IO_STATUS_ERROR);
// No data should be returned. // No data should be returned.
zmq_assert (dummy_bytes == 0 && (status == PGM_IO_STATUS_TIMER_PENDING || zmq_assert (dummy_bytes == 0 && (status == PGM_IO_STATUS_TIMER_PENDING ||
status == PGM_IO_STATUS_RATE_LIMITED || status == PGM_IO_STATUS_RATE_LIMITED ||
status == PGM_IO_STATUS_WOULD_BLOCK)); status == PGM_IO_STATUS_WOULD_BLOCK));
@ -698,7 +698,7 @@ int zmq::pgm_socket_t::compute_sqns (int tpdu_)
{ {
// Convert rate into B/ms. // Convert rate into B/ms.
uint64_t rate = uint64_t (options.rate) / 8; uint64_t rate = uint64_t (options.rate) / 8;
// Compute the size of the buffer in bytes. // Compute the size of the buffer in bytes.
uint64_t size = uint64_t (options.recovery_ivl) * rate; uint64_t size = uint64_t (options.recovery_ivl) * rate;

View File

@ -515,13 +515,13 @@ void zmq::pipe_t::set_hwms (int inhwm_, int outhwm_)
// if either send or recv side has hwm <= 0 it means infinite so we should set hwms infinite // if either send or recv side has hwm <= 0 it means infinite so we should set hwms infinite
if (inhwm_ <= 0 || inhwmboost <= 0) if (inhwm_ <= 0 || inhwmboost <= 0)
in = 0; in = 0;
if (outhwm_ <= 0 || outhwmboost <= 0)
out = 0;
lwm = compute_lwm(in); if (outhwm_ <= 0 || outhwmboost <= 0)
hwm = out; out = 0;
lwm = compute_lwm(in);
hwm = out;
} }
void zmq::pipe_t::set_hwms_boost(int inhwmboost_, int outhwmboost_) void zmq::pipe_t::set_hwms_boost(int inhwmboost_, int outhwmboost_)

View File

@ -299,7 +299,7 @@ void zmq::socket_base_t::attach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
// First, register the pipe so that we can terminate it later on. // First, register the pipe so that we can terminate it later on.
pipe_->set_event_sink (this); pipe_->set_event_sink (this);
pipes.push_back (pipe_); pipes.push_back (pipe_);
// Let the derived socket type know about new pipe. // Let the derived socket type know about new pipe.
xattach_pipe (pipe_, subscribe_to_all_); xattach_pipe (pipe_, subscribe_to_all_);
@ -316,12 +316,11 @@ int zmq::socket_base_t::setsockopt (int option_, const void *optval_,
{ {
ENTER_MUTEX(); ENTER_MUTEX();
if (!options.is_valid(option_)) { if (!options.is_valid(option_)) {
errno = EINVAL; errno = EINVAL;
EXIT_MUTEX(); EXIT_MUTEX();
return -1; return -1;
} }
if (unlikely (ctx_terminated)) { if (unlikely (ctx_terminated)) {
errno = ETERM; errno = ETERM;
@ -339,7 +338,7 @@ int zmq::socket_base_t::setsockopt (int option_, const void *optval_,
// If the socket type doesn't support the option, pass it to // If the socket type doesn't support the option, pass it to
// the generic option parser. // the generic option parser.
rc = options.setsockopt (option_, optval_, optvallen_); rc = options.setsockopt (option_, optval_, optvallen_);
update_pipe_options(option_); update_pipe_options(option_);
EXIT_MUTEX(); EXIT_MUTEX();
return rc; return rc;
@ -382,10 +381,10 @@ int zmq::socket_base_t::getsockopt (int option_, void *optval_,
EXIT_MUTEX(); EXIT_MUTEX();
return -1; return -1;
} }
*((fd_t*)optval_) = ((mailbox_t*)mailbox)->get_fd(); *((fd_t*)optval_) = ((mailbox_t*)mailbox)->get_fd();
*optvallen_ = sizeof(fd_t); *optvallen_ = sizeof(fd_t);
EXIT_MUTEX(); EXIT_MUTEX();
return 0; return 0;
} }
@ -809,9 +808,9 @@ int zmq::socket_base_t::connect (const char *addr_)
} }
} }
#endif #endif
// TBD - Should we check address for ZMQ_HAVE_NORM??? // TBD - Should we check address for ZMQ_HAVE_NORM???
#ifdef ZMQ_HAVE_OPENPGM #ifdef ZMQ_HAVE_OPENPGM
if (protocol == "pgm" || protocol == "epgm") { if (protocol == "pgm" || protocol == "epgm") {
struct pgm_addrinfo_t *res = NULL; struct pgm_addrinfo_t *res = NULL;
@ -1027,7 +1026,7 @@ int zmq::socket_base_t::send (msg_t *msg_, int flags_)
if (unlikely (process_commands (timeout, false) != 0)) { if (unlikely (process_commands (timeout, false) != 0)) {
EXIT_MUTEX(); EXIT_MUTEX();
return -1; return -1;
} }
rc = xsend (msg_); rc = xsend (msg_);
if (rc == 0) if (rc == 0)
break; break;
@ -1167,7 +1166,7 @@ int zmq::socket_base_t::close ()
{ {
// Mark the socket as dead // Mark the socket as dead
tag = 0xdeadbeef; tag = 0xdeadbeef;
// Transfer the ownership of the socket from this application thread // Transfer the ownership of the socket from this application thread
// to the reaper thread which will take care of the rest of shutdown // to the reaper thread which will take care of the rest of shutdown
// process. // process.
@ -1195,13 +1194,13 @@ void zmq::socket_base_t::start_reaping (poller_t *poller_)
if (!thread_safe) if (!thread_safe)
fd = ((mailbox_t*)mailbox)->get_fd(); fd = ((mailbox_t*)mailbox)->get_fd();
else { else {
ENTER_MUTEX(); ENTER_MUTEX();
reaper_signaler = new signaler_t(); reaper_signaler = new signaler_t();
// Add signaler to the safe mailbox // Add signaler to the safe mailbox
fd = reaper_signaler->get_fd(); fd = reaper_signaler->get_fd();
((mailbox_safe_t*)mailbox)->add_signaler(reaper_signaler); ((mailbox_safe_t*)mailbox)->add_signaler(reaper_signaler);
// Send a signal to make sure reaper handle existing commands // Send a signal to make sure reaper handle existing commands
@ -1308,13 +1307,13 @@ void zmq::socket_base_t::process_term (int linger_)
void zmq::socket_base_t::update_pipe_options(int option_) void zmq::socket_base_t::update_pipe_options(int option_)
{ {
if (option_ == ZMQ_SNDHWM || option_ == ZMQ_RCVHWM) if (option_ == ZMQ_SNDHWM || option_ == ZMQ_RCVHWM)
{ {
for (pipes_t::size_type i = 0; i != pipes.size(); ++i) for (pipes_t::size_type i = 0; i != pipes.size(); ++i)
{ {
pipes[i]->set_hwms(options.rcvhwm, options.sndhwm); pipes[i]->set_hwms(options.rcvhwm, options.sndhwm);
} }
} }
} }
@ -1378,11 +1377,11 @@ void zmq::socket_base_t::in_event ()
// be destroyed. // be destroyed.
ENTER_MUTEX(); ENTER_MUTEX();
// If the socket is thread safe we need to unsignal the reaper signaler // If the socket is thread safe we need to unsignal the reaper signaler
if (thread_safe) if (thread_safe)
reaper_signaler->recv(); reaper_signaler->recv();
process_commands (0, false); process_commands (0, false);
EXIT_MUTEX(); EXIT_MUTEX();
check_destroy (); check_destroy ();

View File

@ -36,7 +36,7 @@
extern "C" extern "C"
{ {
#if defined _WIN32_WCE #if defined _WIN32_WCE
static DWORD thread_routine (LPVOID arg_) static DWORD thread_routine (LPVOID arg_)
#else #else
static unsigned int __stdcall thread_routine (void *arg_) static unsigned int __stdcall thread_routine (void *arg_)
#endif #endif
@ -58,7 +58,7 @@ void zmq::thread_t::start (thread_fn *tfn_, void *arg_)
descriptor = (HANDLE) _beginthreadex (NULL, 0, descriptor = (HANDLE) _beginthreadex (NULL, 0,
&::thread_routine, this, 0 , NULL); &::thread_routine, this, 0 , NULL);
#endif #endif
win_assert (descriptor != NULL); win_assert (descriptor != NULL);
} }
void zmq::thread_t::stop () void zmq::thread_t::stop ()
@ -92,7 +92,7 @@ extern "C"
posix_assert (rc); posix_assert (rc);
#endif #endif
zmq::thread_t *self = (zmq::thread_t*) arg_; zmq::thread_t *self = (zmq::thread_t*) arg_;
self->tfn (self->arg); self->tfn (self->arg);
return NULL; return NULL;
} }

View File

@ -40,39 +40,39 @@ zmq::xpub_t::xpub_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
verbose_unsubs (false), verbose_unsubs (false),
more (false), more (false),
lossy (true), lossy (true),
manual(false), manual(false),
welcome_msg () welcome_msg ()
{ {
last_pipe = NULL; last_pipe = NULL;
options.type = ZMQ_XPUB; options.type = ZMQ_XPUB;
welcome_msg.init(); welcome_msg.init();
} }
zmq::xpub_t::~xpub_t () zmq::xpub_t::~xpub_t ()
{ {
welcome_msg.close(); welcome_msg.close();
} }
void zmq::xpub_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_) void zmq::xpub_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
{ {
zmq_assert (pipe_); zmq_assert (pipe_);
dist.attach (pipe_); dist.attach (pipe_);
// If subscribe_to_all_ is specified, the caller would like to subscribe // If subscribe_to_all_ is specified, the caller would like to subscribe
// to all data on this pipe, implicitly. // to all data on this pipe, implicitly.
if (subscribe_to_all_) if (subscribe_to_all_)
subscriptions.add (NULL, 0, pipe_); subscriptions.add (NULL, 0, pipe_);
// if welcome message exist // if welcome message exist
if (welcome_msg.size() > 0) if (welcome_msg.size() > 0)
{ {
msg_t copy; msg_t copy;
copy.init(); copy.init();
copy.copy(welcome_msg); copy.copy(welcome_msg);
pipe_->write(&copy); pipe_->write(&copy);
pipe_->flush(); pipe_->flush();
} }
// The pipe is active when attached. Let's read the subscriptions from // The pipe is active when attached. Let's read the subscriptions from
// it, if any. // it, if any.
@ -87,32 +87,32 @@ void zmq::xpub_t::xread_activated (pipe_t *pipe_)
// Apply the subscription to the trie // Apply the subscription to the trie
unsigned char *const data = (unsigned char *) sub.data (); unsigned char *const data = (unsigned char *) sub.data ();
const size_t size = sub.size (); const size_t size = sub.size ();
if (size > 0 && (*data == 0 || *data == 1)) { if (size > 0 && (*data == 0 || *data == 1)) {
if (manual) if (manual)
{ {
last_pipe = pipe_; last_pipe = pipe_;
pending_data.push_back(blob_t(data, size)); pending_data.push_back(blob_t(data, size));
pending_metadata.push_back(sub.metadata()); pending_metadata.push_back(sub.metadata());
pending_flags.push_back(0); pending_flags.push_back(0);
} }
else else
{ {
bool unique; bool unique;
if (*data == 0) if (*data == 0)
unique = subscriptions.rm(data + 1, size - 1, pipe_); unique = subscriptions.rm(data + 1, size - 1, pipe_);
else else
unique = subscriptions.add(data + 1, size - 1, pipe_); unique = subscriptions.add(data + 1, size - 1, pipe_);
// If the (un)subscription is not a duplicate store it so that it can be // If the (un)subscription is not a duplicate store it so that it can be
// passed to the user on next recv call unless verbose mode is enabled // passed to the user on next recv call unless verbose mode is enabled
// which makes to pass always these messages. // which makes to pass always these messages.
if (options.type == ZMQ_XPUB && (unique || (*data == 1 && verbose_subs) || if (options.type == ZMQ_XPUB && (unique || (*data == 1 && verbose_subs) ||
(*data == 0 && verbose_unsubs && verbose_subs))) { (*data == 0 && verbose_unsubs && verbose_subs))) {
pending_data.push_back(blob_t(data, size)); pending_data.push_back(blob_t(data, size));
pending_metadata.push_back(sub.metadata()); pending_metadata.push_back(sub.metadata());
pending_flags.push_back(0); pending_flags.push_back(0);
} }
} }
} }
else { else {
// Process user message coming upstream from xsub socket // Process user message coming upstream from xsub socket
@ -131,46 +131,46 @@ void zmq::xpub_t::xwrite_activated (pipe_t *pipe_)
int zmq::xpub_t::xsetsockopt (int option_, const void *optval_, int zmq::xpub_t::xsetsockopt (int option_, const void *optval_,
size_t optvallen_) size_t optvallen_)
{ {
if (option_ == ZMQ_XPUB_VERBOSE || option_ == ZMQ_XPUB_VERBOSE_UNSUBSCRIBE || if (option_ == ZMQ_XPUB_VERBOSE || option_ == ZMQ_XPUB_VERBOSE_UNSUBSCRIBE ||
option_ == ZMQ_XPUB_NODROP || option_ == ZMQ_XPUB_MANUAL) option_ == ZMQ_XPUB_NODROP || option_ == ZMQ_XPUB_MANUAL)
{ {
if (optvallen_ != sizeof(int) || *static_cast <const int*> (optval_) < 0) { if (optvallen_ != sizeof(int) || *static_cast <const int*> (optval_) < 0) {
errno = EINVAL; errno = EINVAL;
return -1; return -1;
} }
if (option_ == ZMQ_XPUB_VERBOSE) if (option_ == ZMQ_XPUB_VERBOSE)
verbose_subs = (*static_cast <const int*> (optval_) != 0); verbose_subs = (*static_cast <const int*> (optval_) != 0);
else else
if (option_ == ZMQ_XPUB_VERBOSE_UNSUBSCRIBE) if (option_ == ZMQ_XPUB_VERBOSE_UNSUBSCRIBE)
verbose_unsubs = (*static_cast <const int*> (optval_) != 0); verbose_unsubs = (*static_cast <const int*> (optval_) != 0);
else else
if (option_ == ZMQ_XPUB_NODROP) if (option_ == ZMQ_XPUB_NODROP)
lossy = (*static_cast <const int*> (optval_) == 0); lossy = (*static_cast <const int*> (optval_) == 0);
else else
if (option_ == ZMQ_XPUB_MANUAL) if (option_ == ZMQ_XPUB_MANUAL)
manual = (*static_cast <const int*> (optval_) != 0); manual = (*static_cast <const int*> (optval_) != 0);
} }
else else
if (option_ == ZMQ_SUBSCRIBE && manual && last_pipe != NULL) if (option_ == ZMQ_SUBSCRIBE && manual && last_pipe != NULL)
subscriptions.add((unsigned char *)optval_, optvallen_, last_pipe); subscriptions.add((unsigned char *)optval_, optvallen_, last_pipe);
else else
if (option_ == ZMQ_UNSUBSCRIBE && manual && last_pipe != NULL) if (option_ == ZMQ_UNSUBSCRIBE && manual && last_pipe != NULL)
subscriptions.rm((unsigned char *)optval_, optvallen_, last_pipe); subscriptions.rm((unsigned char *)optval_, optvallen_, last_pipe);
else else
if (option_ == ZMQ_XPUB_WELCOME_MSG) { if (option_ == ZMQ_XPUB_WELCOME_MSG) {
welcome_msg.close(); welcome_msg.close();
if (optvallen_ > 0) { if (optvallen_ > 0) {
welcome_msg.init_size(optvallen_); welcome_msg.init_size(optvallen_);
unsigned char *data = (unsigned char*)welcome_msg.data(); unsigned char *data = (unsigned char*)welcome_msg.data();
memcpy(data, optval_, optvallen_); memcpy(data, optval_, optvallen_);
} }
else else
welcome_msg.init(); welcome_msg.init();
} }
else { else {
errno = EINVAL; errno = EINVAL;
return -1; return -1;
@ -211,7 +211,7 @@ int zmq::xpub_t::xsend (msg_t *msg_)
int rc = -1; // Assume we fail int rc = -1; // Assume we fail
if (lossy || dist.check_hwm ()) { if (lossy || dist.check_hwm ()) {
if (dist.send_to_matching (msg_) == 0) { if (dist.send_to_matching (msg_) == 0) {
// If we are at the end of multi-part message we can mark // If we are at the end of multi-part message we can mark
// all the pipes as non-matching. // all the pipes as non-matching.
if (!msg_more) if (!msg_more)
dist.unmatch (); dist.unmatch ();
@ -244,11 +244,11 @@ int zmq::xpub_t::xrecv (msg_t *msg_)
memcpy (msg_->data (), memcpy (msg_->data (),
pending_data.front ().data (), pending_data.front ().data (),
pending_data.front ().size ()); pending_data.front ().size ());
// set metadata only if there is some // set metadata only if there is some
if (metadata_t* metadata = pending_metadata.front ()) { if (metadata_t* metadata = pending_metadata.front ()) {
msg_->set_metadata (metadata); msg_->set_metadata (metadata);
} }
msg_->set_flags (pending_flags.front ()); msg_->set_flags (pending_flags.front ());
pending_data.pop_front (); pending_data.pop_front ();

View File

@ -96,14 +96,14 @@ namespace zmq
// Drop messages if HWM reached, otherwise return with EAGAIN // Drop messages if HWM reached, otherwise return with EAGAIN
bool lossy; bool lossy;
// Subscriptions will not bed added automatically, only after calling set option with ZMQ_SUBSCRIBE or ZMQ_UNSUBSCRIBE // Subscriptions will not bed added automatically, only after calling set option with ZMQ_SUBSCRIBE or ZMQ_UNSUBSCRIBE
bool manual; bool manual;
// Last pipe send subscription message, only used if xpub is on manual // Last pipe send subscription message, only used if xpub is on manual
pipe_t *last_pipe; pipe_t *last_pipe;
// Welcome message to send to pipe when attached // Welcome message to send to pipe when attached
msg_t welcome_msg; msg_t welcome_msg;
// List of pending (un)subscriptions, ie. those that were already // List of pending (un)subscriptions, ie. those that were already
// applied to the trie, but not yet received by the user. // applied to the trie, but not yet received by the user.

View File

@ -1065,11 +1065,11 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
errno = ENOTSUP; errno = ENOTSUP;
return -1; return -1;
#endif #endif
} }
// Create pollfd // Create pollfd
void *zmq_pollfd_new () void *zmq_pollfd_new ()
{ {
return new zmq::signaler_t (); return new zmq::signaler_t ();
} }
@ -1080,7 +1080,7 @@ int zmq_pollfd_close (void* p_)
{ {
zmq::signaler_t *s = (zmq::signaler_t*)p_; zmq::signaler_t *s = (zmq::signaler_t*)p_;
LIBZMQ_DELETE(s); LIBZMQ_DELETE(s);
return 0; return 0;
} }
// Recv signal from pollfd // Recv signal from pollfd
@ -1088,7 +1088,7 @@ int zmq_pollfd_close (void* p_)
void zmq_pollfd_recv(void *p_) void zmq_pollfd_recv(void *p_)
{ {
zmq::signaler_t *s = (zmq::signaler_t*)p_; zmq::signaler_t *s = (zmq::signaler_t*)p_;
s->recv (); s->recv ();
} }
// Wait until pollfd is signalled // Wait until pollfd is signalled
@ -1096,7 +1096,7 @@ void zmq_pollfd_recv(void *p_)
int zmq_pollfd_wait(void *p_, int timeout_) int zmq_pollfd_wait(void *p_, int timeout_)
{ {
zmq::signaler_t *s = (zmq::signaler_t*)p_; zmq::signaler_t *s = (zmq::signaler_t*)p_;
return s->wait (timeout_); return s->wait (timeout_);
} }
// Get pollfd fd // Get pollfd fd
@ -1108,7 +1108,7 @@ int zmq_pollfd_fd (void *p_)
#endif #endif
{ {
zmq::signaler_t *s = (zmq::signaler_t*)p_; zmq::signaler_t *s = (zmq::signaler_t*)p_;
return s->get_fd (); return s->get_fd ();
} }
// Polling thread safe sockets version // Polling thread safe sockets version
@ -1153,27 +1153,27 @@ int zmq_pollfd_poll (void* p_, zmq_pollitem_t *items_, int nitems_, long timeout
int thread_safe; int thread_safe;
size_t thread_safe_size = sizeof(int); size_t thread_safe_size = sizeof(int);
if (zmq_getsockopt (items_ [i].socket, ZMQ_THREAD_SAFE, &thread_safe, if (zmq_getsockopt (items_ [i].socket, ZMQ_THREAD_SAFE, &thread_safe,
&thread_safe_size) == -1) { &thread_safe_size) == -1) {
return -1; return -1;
} }
// All thread safe sockets share same fd // All thread safe sockets share same fd
if (thread_safe) { if (thread_safe) {
// if poll fd is not set yet and events are set for this socket // if poll fd is not set yet and events are set for this socket
if (!use_pollfd && items_ [i].events) { if (!use_pollfd && items_ [i].events) {
use_pollfd = true; use_pollfd = true;
pollfds_size++; pollfds_size++;
} }
} }
else else
pollfds_size++; pollfds_size++;
} }
else else
pollfds_size++; pollfds_size++;
} }
if (pollfds_size > ZMQ_POLLITEMS_DFLT) { if (pollfds_size > ZMQ_POLLITEMS_DFLT) {
pollfds = (pollfd*) malloc (pollfds_size * sizeof (pollfd)); pollfds = (pollfd*) malloc (pollfds_size * sizeof (pollfd));
alloc_assert (pollfds); alloc_assert (pollfds);
@ -1195,7 +1195,7 @@ int zmq_pollfd_poll (void* p_, zmq_pollitem_t *items_, int nitems_, long timeout
int thread_safe; int thread_safe;
size_t thread_safe_size = sizeof(int); size_t thread_safe_size = sizeof(int);
if (zmq_getsockopt (items_ [i].socket, ZMQ_THREAD_SAFE, &thread_safe, if (zmq_getsockopt (items_ [i].socket, ZMQ_THREAD_SAFE, &thread_safe,
&thread_safe_size) == -1) { &thread_safe_size) == -1) {
if (pollfds != spollfds) if (pollfds != spollfds)
free (pollfds); free (pollfds);
@ -1212,7 +1212,7 @@ int zmq_pollfd_poll (void* p_, zmq_pollitem_t *items_, int nitems_, long timeout
return -1; return -1;
} }
pollfds [pollfds_index].events = items_ [i].events ? POLLIN : 0; pollfds [pollfds_index].events = items_ [i].events ? POLLIN : 0;
pollfds_index++; pollfds_index++;
} }
} }
// Else, the poll item is a raw file descriptor. Just convert the // Else, the poll item is a raw file descriptor. Just convert the
@ -1374,16 +1374,16 @@ int zmq_pollfd_poll (void* p_, zmq_pollitem_t *items_, int nitems_, long timeout
int thread_safe; int thread_safe;
size_t thread_safe_size = sizeof(int); size_t thread_safe_size = sizeof(int);
if (zmq_getsockopt (items_ [i].socket, ZMQ_THREAD_SAFE, &thread_safe, if (zmq_getsockopt (items_ [i].socket, ZMQ_THREAD_SAFE, &thread_safe,
&thread_safe_size) == -1) &thread_safe_size) == -1)
return -1; return -1;
if (thread_safe && items_ [i].events) { if (thread_safe && items_ [i].events) {
use_pollfd = true; use_pollfd = true;
FD_SET (zmq_pollfd_fd (p_), &pollset_in); FD_SET (zmq_pollfd_fd (p_), &pollset_in);
break; break;
} }
} }
} }
zmq::fd_t maxfd = 0; zmq::fd_t maxfd = 0;
@ -1397,17 +1397,17 @@ int zmq_pollfd_poll (void* p_, zmq_pollitem_t *items_, int nitems_, long timeout
int thread_safe; int thread_safe;
size_t thread_safe_size = sizeof(int); size_t thread_safe_size = sizeof(int);
if (zmq_getsockopt (items_ [i].socket, ZMQ_THREAD_SAFE, &thread_safe, if (zmq_getsockopt (items_ [i].socket, ZMQ_THREAD_SAFE, &thread_safe,
&thread_safe_size) == -1) &thread_safe_size) == -1)
return -1; return -1;
if (!thread_safe) { if (!thread_safe) {
zmq::fd_t notify_fd; zmq::fd_t notify_fd;
size_t zmq_fd_size = sizeof (zmq::fd_t); size_t zmq_fd_size = sizeof (zmq::fd_t);
if (zmq_getsockopt (items_ [i].socket, ZMQ_FD, &notify_fd, if (zmq_getsockopt (items_ [i].socket, ZMQ_FD, &notify_fd,
&zmq_fd_size) == -1) &zmq_fd_size) == -1)
return -1; return -1;
if (items_ [i].events) { if (items_ [i].events) {
FD_SET (notify_fd, &pollset_in); FD_SET (notify_fd, &pollset_in);
if (maxfd < notify_fd) if (maxfd < notify_fd)

View File

@ -55,7 +55,7 @@ void test_stream_2_stream(){
assert (rconn1); assert (rconn1);
ret = zmq_setsockopt (rconn1, ZMQ_LINGER, &zero, sizeof (zero)); ret = zmq_setsockopt (rconn1, ZMQ_LINGER, &zero, sizeof (zero));
assert (0 == ret); assert (0 == ret);
// Do the connection. // Do the connection.
ret = zmq_setsockopt (rconn1, ZMQ_CONNECT_RID, "conn1", 6); ret = zmq_setsockopt (rconn1, ZMQ_CONNECT_RID, "conn1", 6);
assert (0 == ret); assert (0 == ret);
@ -67,7 +67,7 @@ void test_stream_2_stream(){
assert (0 == ret); assert (0 == ret);
ret = zmq_connect (rconn1, bindip); ret = zmq_connect (rconn1, bindip);
assert (0 == ret); assert (0 == ret);
*/ */
// Send data to the bound stream. // Send data to the bound stream.
ret = zmq_send (rconn1, "conn1", 6, ZMQ_SNDMORE); ret = zmq_send (rconn1, "conn1", 6, ZMQ_SNDMORE);
assert (6 == ret); assert (6 == ret);
@ -112,7 +112,7 @@ void test_router_2_router(bool named){
// Create connection socket. // Create connection socket.
rconn1 = zmq_socket (ctx, ZMQ_ROUTER); rconn1 = zmq_socket (ctx, ZMQ_ROUTER);
assert (rconn1); assert (rconn1);
ret = zmq_setsockopt (rconn1, ZMQ_LINGER, &zero, sizeof (zero)); ret = zmq_setsockopt (rconn1, ZMQ_LINGER, &zero, sizeof (zero));
assert (0 == ret); assert (0 == ret);
@ -122,12 +122,12 @@ void test_router_2_router(bool named){
ret = zmq_setsockopt (rconn1, ZMQ_IDENTITY, "Y", 1); ret = zmq_setsockopt (rconn1, ZMQ_IDENTITY, "Y", 1);
} }
// Make call to connect using a connect_rid. // Make call to connect using a connect_rid.
ret = zmq_setsockopt (rconn1, ZMQ_CONNECT_RID, "conn1", 6); ret = zmq_setsockopt (rconn1, ZMQ_CONNECT_RID, "conn1", 6);
assert (0 == ret); assert (0 == ret);
ret = zmq_connect (rconn1, bindip); ret = zmq_connect (rconn1, bindip);
assert (0 == ret); assert (0 == ret);
/* Uncomment to test assert on duplicate rid /* Uncomment to test assert on duplicate rid
// Test duplicate connect attempt. // Test duplicate connect attempt.
ret = zmq_setsockopt (rconn1, ZMQ_CONNECT_RID, "conn1", 6); ret = zmq_setsockopt (rconn1, ZMQ_CONNECT_RID, "conn1", 6);
assert (0 == ret); assert (0 == ret);
@ -142,9 +142,9 @@ void test_router_2_router(bool named){
// Receive the name. // Receive the name.
ret = zmq_recv (rbind, buff, 256, 0); ret = zmq_recv (rbind, buff, 256, 0);
if (named) if (named)
assert (ret && 'Y' == buff[0]); assert (ret && 'Y' == buff[0]);
else else
assert (ret && 0 == buff[0]); assert (ret && 0 == buff[0]);
// Receive the data. // Receive the data.
@ -162,7 +162,7 @@ void test_router_2_router(bool named){
} }
ret = zmq_send_const (rbind, "ok", 3, 0); ret = zmq_send_const (rbind, "ok", 3, 0);
assert (3 == ret); assert (3 == ret);
// If bound socket identity naming a problem, we'll likely see something funky here. // If bound socket identity naming a problem, we'll likely see something funky here.
ret = zmq_recv (rconn1, buff, 256, 0); ret = zmq_recv (rconn1, buff, 256, 0);
assert ('c' == buff[0] && 6 == ret); assert ('c' == buff[0] && 6 == ret);
@ -183,7 +183,7 @@ int main (void)
{ {
setup_test_environment (); setup_test_environment ();
test_stream_2_stream (); test_stream_2_stream ();
test_router_2_router (false); test_router_2_router (false);
test_router_2_router (true); test_router_2_router (true);

View File

@ -44,7 +44,7 @@
int main (void) int main (void)
{ {
int rc; int rc;
setup_test_environment(); setup_test_environment();
// Create the infrastructure // Create the infrastructure
void *ctx = zmq_ctx_new (); void *ctx = zmq_ctx_new ();
@ -70,15 +70,15 @@ int main (void)
zmq_recvmsg(rep, &msg, 0); zmq_recvmsg(rep, &msg, 0);
assert(zmq_msg_size(&msg) == MSG_SIZE); assert(zmq_msg_size(&msg) == MSG_SIZE);
// get the messages source file descriptor // get the messages source file descriptor
int srcFd = zmq_msg_get(&msg, ZMQ_SRCFD); int srcFd = zmq_msg_get(&msg, ZMQ_SRCFD);
assert(srcFd >= 0); assert(srcFd >= 0);
rc = zmq_msg_close(&msg); rc = zmq_msg_close(&msg);
assert (rc == 0); assert (rc == 0);
// get the remote endpoint // get the remote endpoint
struct sockaddr_storage ss; struct sockaddr_storage ss;
#ifdef ZMQ_HAVE_HPUX #ifdef ZMQ_HAVE_HPUX
int addrlen = sizeof ss; int addrlen = sizeof ss;
@ -92,7 +92,7 @@ int main (void)
rc = getnameinfo ((struct sockaddr*) &ss, addrlen, host, sizeof host, NULL, 0, NI_NUMERICHOST); rc = getnameinfo ((struct sockaddr*) &ss, addrlen, host, sizeof host, NULL, 0, NI_NUMERICHOST);
assert (rc == 0); assert (rc == 0);
// assert it is localhost which connected // assert it is localhost which connected
assert (strcmp(host, "127.0.0.1") == 0); assert (strcmp(host, "127.0.0.1") == 0);
rc = zmq_close (rep); rc = zmq_close (rep);
@ -100,14 +100,14 @@ int main (void)
rc = zmq_close (req); rc = zmq_close (req);
assert (rc == 0); assert (rc == 0);
// sleep a bit for the socket to be freed // sleep a bit for the socket to be freed
usleep(30000); usleep(30000);
// getting name from closed socket will fail // getting name from closed socket will fail
rc = getpeername (srcFd, (struct sockaddr*) &ss, &addrlen); rc = getpeername (srcFd, (struct sockaddr*) &ss, &addrlen);
assert (rc == -1); assert (rc == -1);
assert (errno == EBADF); assert (errno == EBADF);
rc = zmq_ctx_term (ctx); rc = zmq_ctx_term (ctx);
assert (rc == 0); assert (rc == 0);

View File

@ -50,7 +50,7 @@ bool has_more (void* socket)
int more = 0; int more = 0;
size_t more_size = sizeof(more); size_t more_size = sizeof(more);
int rc = zmq_getsockopt (socket, ZMQ_RCVMORE, &more, &more_size); int rc = zmq_getsockopt (socket, ZMQ_RCVMORE, &more, &more_size);
if (rc != 0) if (rc != 0)
return false; return false;
return more != 0; return more != 0;
} }
@ -165,18 +165,18 @@ int main(int, char**)
// Grab the 1st frame (peer identity). // Grab the 1st frame (peer identity).
zmq_msg_t peer_frame; zmq_msg_t peer_frame;
rc = zmq_msg_init (&peer_frame); rc = zmq_msg_init (&peer_frame);
assert (rc == 0); assert (rc == 0);
rc = zmq_msg_recv (&peer_frame, sockets [SERVER], 0); rc = zmq_msg_recv (&peer_frame, sockets [SERVER], 0);
assert (rc != -1); assert (rc != -1);
assert(zmq_msg_size (&peer_frame) > 0); assert(zmq_msg_size (&peer_frame) > 0);
assert (has_more (sockets [SERVER])); assert (has_more (sockets [SERVER]));
// Grab the 2nd frame (actual payload). // Grab the 2nd frame (actual payload).
zmq_msg_t data_frame; zmq_msg_t data_frame;
rc = zmq_msg_init (&data_frame); rc = zmq_msg_init (&data_frame);
assert (rc == 0); assert (rc == 0);
rc = zmq_msg_recv (&data_frame, sockets [SERVER], 0); rc = zmq_msg_recv (&data_frame, sockets [SERVER], 0);
assert (rc != -1); assert (rc != -1);
// Make sure payload matches what we expect. // Make sure payload matches what we expect.
const char * const data = (const char*)zmq_msg_data (&data_frame); const char * const data = (const char*)zmq_msg_data (&data_frame);
@ -184,39 +184,39 @@ int main(int, char**)
// 0-length frame is a disconnection notification. The server // 0-length frame is a disconnection notification. The server
// should receive it as the last step in the dialogue. // should receive it as the last step in the dialogue.
if (size == 0) { if (size == 0) {
++step; ++step;
assert (step == steps); assert (step == steps);
} }
else { else {
assert ((size_t) size == strlen (dialog [step].text)); assert ((size_t) size == strlen (dialog [step].text));
int cmp = memcmp (dialog [step].text, data, size); int cmp = memcmp (dialog [step].text, data, size);
assert (cmp == 0); assert (cmp == 0);
++step; ++step;
assert (step < steps); assert (step < steps);
// Prepare the response. // Prepare the response.
rc = zmq_msg_close (&data_frame); rc = zmq_msg_close (&data_frame);
assert (rc == 0); assert (rc == 0);
rc = zmq_msg_init_size (&data_frame, rc = zmq_msg_init_size (&data_frame,
strlen (dialog [step].text)); strlen (dialog [step].text));
assert (rc == 0); assert (rc == 0);
memcpy (zmq_msg_data (&data_frame), dialog [step].text, memcpy (zmq_msg_data (&data_frame), dialog [step].text,
zmq_msg_size (&data_frame)); zmq_msg_size (&data_frame));
// Send the response. // Send the response.
rc = zmq_msg_send (&peer_frame, sockets [SERVER], ZMQ_SNDMORE); rc = zmq_msg_send (&peer_frame, sockets [SERVER], ZMQ_SNDMORE);
assert (rc != -1); assert (rc != -1);
rc = zmq_msg_send (&data_frame, sockets [SERVER], ZMQ_SNDMORE); rc = zmq_msg_send (&data_frame, sockets [SERVER], ZMQ_SNDMORE);
assert (rc != -1); assert (rc != -1);
} }
// Release resources. // Release resources.
rc = zmq_msg_close (&peer_frame); rc = zmq_msg_close (&peer_frame);
assert (rc == 0); assert (rc == 0);
rc = zmq_msg_close (&data_frame); rc = zmq_msg_close (&data_frame);
assert (rc == 0); assert (rc == 0);
} }
// Check for data received by the client. // Check for data received by the client.
@ -226,24 +226,24 @@ int main(int, char**)
// Grab the 1st frame (peer identity). // Grab the 1st frame (peer identity).
zmq_msg_t peer_frame; zmq_msg_t peer_frame;
rc = zmq_msg_init (&peer_frame); rc = zmq_msg_init (&peer_frame);
assert (rc == 0); assert (rc == 0);
rc = zmq_msg_recv (&peer_frame, sockets [CLIENT], 0); rc = zmq_msg_recv (&peer_frame, sockets [CLIENT], 0);
assert (rc != -1); assert (rc != -1);
assert(zmq_msg_size (&peer_frame) > 0); assert(zmq_msg_size (&peer_frame) > 0);
assert (has_more (sockets [CLIENT])); assert (has_more (sockets [CLIENT]));
// Grab the 2nd frame (actual payload). // Grab the 2nd frame (actual payload).
zmq_msg_t data_frame; zmq_msg_t data_frame;
rc = zmq_msg_init (&data_frame); rc = zmq_msg_init (&data_frame);
assert (rc == 0); assert (rc == 0);
rc = zmq_msg_recv (&data_frame, sockets [CLIENT], 0); rc = zmq_msg_recv (&data_frame, sockets [CLIENT], 0);
assert (rc != -1); assert (rc != -1);
assert(zmq_msg_size (&data_frame) > 0); assert(zmq_msg_size (&data_frame) > 0);
// Make sure payload matches what we expect. // Make sure payload matches what we expect.
const char * const data = (const char*)zmq_msg_data (&data_frame); const char * const data = (const char*)zmq_msg_data (&data_frame);
const int size = zmq_msg_size (&data_frame); const int size = zmq_msg_size (&data_frame);
assert ((size_t)size == strlen(dialog [step].text)); assert ((size_t)size == strlen(dialog [step].text));
int cmp = memcmp(dialog [step].text, data, size); int cmp = memcmp(dialog [step].text, data, size);
assert (cmp == 0); assert (cmp == 0);
@ -252,22 +252,22 @@ int main(int, char**)
// Prepare the response (next line in the dialog). // Prepare the response (next line in the dialog).
assert (step < steps); assert (step < steps);
rc = zmq_msg_close (&data_frame); rc = zmq_msg_close (&data_frame);
assert (rc == 0); assert (rc == 0);
rc = zmq_msg_init_size (&data_frame, strlen (dialog [step].text)); rc = zmq_msg_init_size (&data_frame, strlen (dialog [step].text));
assert (rc == 0); assert (rc == 0);
memcpy (zmq_msg_data (&data_frame), dialog [step].text, zmq_msg_size (&data_frame)); memcpy (zmq_msg_data (&data_frame), dialog [step].text, zmq_msg_size (&data_frame));
// Send the response. // Send the response.
rc = zmq_msg_send (&peer_frame, sockets [CLIENT], ZMQ_SNDMORE); rc = zmq_msg_send (&peer_frame, sockets [CLIENT], ZMQ_SNDMORE);
assert (rc != -1); assert (rc != -1);
rc = zmq_msg_send (&data_frame, sockets [CLIENT], ZMQ_SNDMORE); rc = zmq_msg_send (&data_frame, sockets [CLIENT], ZMQ_SNDMORE);
assert (rc != -1); assert (rc != -1);
// Release resources. // Release resources.
rc = zmq_msg_close (&peer_frame); rc = zmq_msg_close (&peer_frame);
assert (rc == 0); assert (rc == 0);
rc = zmq_msg_close (&data_frame); rc = zmq_msg_close (&data_frame);
assert (rc == 0); assert (rc == 0);
} }
} }
assert (step == steps); assert (step == steps);

View File

@ -56,42 +56,42 @@ int main (void)
rc = zmq_connect (client2, "tcp://127.0.0.1:5560"); rc = zmq_connect (client2, "tcp://127.0.0.1:5560");
assert (rc == 0); assert (rc == 0);
void* t1 = zmq_threadstart(worker1, client2); void* t1 = zmq_threadstart(worker1, client2);
void* t2 = zmq_threadstart(worker2, client2); void* t2 = zmq_threadstart(worker2, client2);
char data[1]; char data[1];
data[0] = 0; data[0] = 0;
for (int i=0; i < 10; i++) { for (int i=0; i < 10; i++) {
rc = zmq_send_const(client, data, 1, 0); rc = zmq_send_const(client, data, 1, 0);
assert (rc == 1); assert (rc == 1);
rc = zmq_send_const(client, data, 1, 0); rc = zmq_send_const(client, data, 1, 0);
assert(rc == 1); assert(rc == 1);
char a, b; char a, b;
rc = zmq_recv(client, &a, 1, 0); rc = zmq_recv(client, &a, 1, 0);
assert(rc == 1); assert(rc == 1);
rc = zmq_recv(client, &b, 1, 0); rc = zmq_recv(client, &b, 1, 0);
assert(rc == 1); assert(rc == 1);
// make sure they came from different threads // make sure they came from different threads
assert((a == 1 && b == 2) || (a == 2 && b == 1)); assert((a == 1 && b == 2) || (a == 2 && b == 1));
} }
// make the thread exit // make the thread exit
data[0] = 1; data[0] = 1;
rc = zmq_send_const(client, data, 1, 0); rc = zmq_send_const(client, data, 1, 0);
assert (rc == 1); assert (rc == 1);
rc = zmq_send_const(client, data, 1, 0); rc = zmq_send_const(client, data, 1, 0);
assert(rc == 1); assert(rc == 1);
zmq_threadclose(t1); zmq_threadclose(t1);
zmq_threadclose(t2); zmq_threadclose(t2);
rc = zmq_close (client2); rc = zmq_close (client2);
assert (rc == 0); assert (rc == 0);
@ -107,52 +107,52 @@ int main (void)
void worker1(void* s) void worker1(void* s)
{ {
const char worker_id = 1; const char worker_id = 1;
char c; char c;
while (true) while (true)
{ {
int rc = zmq_recv(s, &c,1, 0); int rc = zmq_recv(s, &c,1, 0);
assert(rc == 1); assert(rc == 1);
if (c == 0) if (c == 0)
{ {
msleep(100); msleep(100);
rc = zmq_send_const(s,&worker_id, 1, 0); rc = zmq_send_const(s,&worker_id, 1, 0);
assert(rc == 1); assert(rc == 1);
} }
else else
{ {
// we got exit request // we got exit request
break; break;
} }
} }
} }
void worker2(void* s) void worker2(void* s)
{ {
const char worker_id = 2; const char worker_id = 2;
char c; char c;
while (true) while (true)
{ {
int rc = zmq_recv(s, &c,1, 0); int rc = zmq_recv(s, &c,1, 0);
assert(rc == 1); assert(rc == 1);
assert(c == 1 || c == 0); assert(c == 1 || c == 0);
if (c == 0) if (c == 0)
{ {
msleep(100); msleep(100);
rc = zmq_send_const(s,&worker_id, 1, 0); rc = zmq_send_const(s,&worker_id, 1, 0);
assert(rc == 1); assert(rc == 1);
} }
else else
{ {
// we got exit request // we got exit request
break; break;
} }
} }
} }

View File

@ -43,42 +43,42 @@ int main (void)
// set pub socket options // set pub socket options
int manual = 1; int manual = 1;
rc = zmq_setsockopt(pub, ZMQ_XPUB_MANUAL, &manual, 4); rc = zmq_setsockopt(pub, ZMQ_XPUB_MANUAL, &manual, 4);
assert (rc == 0); assert (rc == 0);
// Create a subscriber // Create a subscriber
void *sub = zmq_socket (ctx, ZMQ_XSUB); void *sub = zmq_socket (ctx, ZMQ_XSUB);
assert (sub); assert (sub);
rc = zmq_connect (sub, "inproc://soname"); rc = zmq_connect (sub, "inproc://soname");
assert (rc == 0); assert (rc == 0);
// Subscribe for A // Subscribe for A
char subscription[2] = { 1, 'A'}; char subscription[2] = { 1, 'A'};
rc = zmq_send_const(sub, subscription, 2, 0); rc = zmq_send_const(sub, subscription, 2, 0);
assert (rc == 2); assert (rc == 2);
char buffer[2]; char buffer[2];
// Receive subscriptions from subscriber
rc = zmq_recv(pub, buffer, 2, 0);
assert(rc == 2);
assert(buffer[0] == 1);
assert(buffer[1] == 'A');
// Subscribe socket for B instead // Receive subscriptions from subscriber
rc = zmq_setsockopt(pub, ZMQ_SUBSCRIBE, "B", 1); rc = zmq_recv(pub, buffer, 2, 0);
assert(rc == 0); assert(rc == 2);
assert(buffer[0] == 1);
assert(buffer[1] == 'A');
// Sending A message and B Message // Subscribe socket for B instead
rc = zmq_send_const(pub, "A", 1, 0); rc = zmq_setsockopt(pub, ZMQ_SUBSCRIBE, "B", 1);
assert(rc == 1); assert(rc == 0);
rc = zmq_send_const(pub, "B", 1, 0); // Sending A message and B Message
assert(rc == 1); rc = zmq_send_const(pub, "A", 1, 0);
assert(rc == 1);
rc = zmq_recv(sub, buffer, 1, ZMQ_DONTWAIT); rc = zmq_send_const(pub, "B", 1, 0);
assert(rc == 1); assert(rc == 1);
assert(buffer[0] == 'B');
rc = zmq_recv(sub, buffer, 1, ZMQ_DONTWAIT);
assert(rc == 1);
assert(buffer[0] == 'B');
// Clean up. // Clean up.
rc = zmq_close (pub); rc = zmq_close (pub);

View File

@ -41,20 +41,20 @@ int main (void)
int rc = zmq_bind (pub, "inproc://soname"); int rc = zmq_bind (pub, "inproc://soname");
assert (rc == 0); assert (rc == 0);
// set pub socket options // set pub socket options
rc = zmq_setsockopt(pub, ZMQ_XPUB_WELCOME_MSG, "W", 1); rc = zmq_setsockopt(pub, ZMQ_XPUB_WELCOME_MSG, "W", 1);
assert (rc == 0); assert (rc == 0);
// Create a subscriber // Create a subscriber
void *sub = zmq_socket (ctx, ZMQ_SUB); void *sub = zmq_socket (ctx, ZMQ_SUB);
// Subscribe to the welcome message // Subscribe to the welcome message
rc = zmq_setsockopt(sub, ZMQ_SUBSCRIBE, "W", 1); rc = zmq_setsockopt(sub, ZMQ_SUBSCRIBE, "W", 1);
assert(rc == 0); assert(rc == 0);
assert (sub); assert (sub);
rc = zmq_connect (sub, "inproc://soname"); rc = zmq_connect (sub, "inproc://soname");
assert (rc == 0); assert (rc == 0);
char buffer[2]; char buffer[2];
@ -63,11 +63,11 @@ int main (void)
assert(rc == 2); assert(rc == 2);
assert(buffer[0] == 1); assert(buffer[0] == 1);
assert(buffer[1] == 'W'); assert(buffer[1] == 'W');
// Receive the welcome message // Receive the welcome message
rc = zmq_recv(sub, buffer, 1, 0); rc = zmq_recv(sub, buffer, 1, 0);
assert(rc == 1); assert(rc == 1);
assert(buffer[0] == 'W'); assert(buffer[0] == 'W');
// Clean up. // Clean up.
rc = zmq_close (pub); rc = zmq_close (pub);