From 4726f7262d076ed0b20111cfcdc0df6b33cf7c82 Mon Sep 17 00:00:00 2001 From: Manuel Segura Date: Sat, 10 Mar 2018 03:03:02 -0800 Subject: [PATCH] Pull request to merge porting to WindRiver VxWorks 6.x (#2966) * Problem: Still need to port over more files to VxWorks 6.x Solution: Port more files to VxWorks 6.x * Problem: Need to port over remaining files to VxWorks 6.x. Also remove POSIX thread dependency for VxWorks (because of priority inversion problem in POSIX mutexes with VxWorks 6.x processes) Solution: Port over remaining files to VxWorks 6.x. Also removed POSIX thread dependency for VxWorks * Problem: Needed to modify TCP, UDP, TIPC classes with #ifdefs to be compatible with VxWorks 6.x. Solution: Modify TCP, UDP, TIPC classes with #ifdefs to be compatible with VxWorks 6.x --- builds/vxworks/platform.hpp | 306 ++++++++++++++++++++++++++++++++++++ src/address.cpp | 6 +- src/address.hpp | 7 +- src/atomic_ptr.hpp | 4 + src/clock.cpp | 9 +- src/condition_variable.hpp | 84 ++++++++++ src/ip.cpp | 71 +++++++-- src/ipc_address.cpp | 3 +- src/ipc_address.hpp | 3 +- src/ipc_connecter.cpp | 4 +- src/ipc_connecter.hpp | 3 +- src/ipc_listener.cpp | 6 +- src/ipc_listener.hpp | 3 +- src/mutex.hpp | 39 +++++ src/options.cpp | 8 +- src/select.cpp | 4 + src/session_base.cpp | 3 +- src/signaler.cpp | 39 +++++ src/socket_base.cpp | 17 +- src/socket_poller.cpp | 6 + src/socket_poller.hpp | 4 + src/socks_connecter.cpp | 15 +- src/tcp.cpp | 9 +- src/tcp_address.cpp | 14 +- src/tcp_connecter.cpp | 23 ++- src/tcp_listener.cpp | 14 +- src/thread.cpp | 62 ++++++++ src/thread.hpp | 22 ++- src/tipc_address.hpp | 4 + src/tipc_connecter.cpp | 14 +- src/tipc_listener.cpp | 21 +++ src/udp_engine.cpp | 20 +++ src/ypipe.hpp | 6 +- src/ypipe_conflate.hpp | 6 +- src/zmq.cpp | 10 +- 35 files changed, 809 insertions(+), 60 deletions(-) create mode 100644 builds/vxworks/platform.hpp diff --git a/builds/vxworks/platform.hpp b/builds/vxworks/platform.hpp new file mode 100644 index 00000000..df18f138 --- /dev/null +++ b/builds/vxworks/platform.hpp @@ -0,0 +1,306 @@ +/* src/platform.hpp. Generated from platform.hpp.in by configure. */ +/* src/platform.hpp.in. Generated from configure.ac by autoheader. */ + +/* Define to 1 if you have the header file. */ +/* #undef HAVE_ALLOCA_H */ + +/* Define to 1 if you have the header file. */ +#define HAVE_ARPA_INET_H 1 + +/* Define to 1 if you have the `clock_gettime' function. */ +#define HAVE_CLOCK_GETTIME 1 + +/* Define to 1 if you have the declaration of `LOCAL_PEERCRED', and to 0 if + you don't. */ +#define HAVE_DECL_LOCAL_PEERCRED 0 + +/* Define to 1 if you have the declaration of `SO_PEERCRED', and to 0 if you + don't. */ +#define HAVE_DECL_SO_PEERCRED 0 + +/* Define to 1 if you have the header file. */ +#define HAVE_DLFCN_H 1 + +/* Define to 1 if you have the header file. */ +#define HAVE_ERRNO_H 1 + +/* Define to 1 if you have the `fork' function. */ +#define HAVE_FORK 1 + +/* Define to 1 if you have the `freeifaddrs' function. */ +#define HAVE_FREEIFADDRS 1 + +/* Define to 1 if you have the `gethrtime' function. */ +/* #undef HAVE_GETHRTIME */ + +/* Define to 1 if you have the `getifaddrs' function. */ +#define HAVE_GETIFADDRS 1 + +/* Define to 1 if you have the `gettimeofday' function. */ +#define HAVE_GETTIMEOFDAY 1 + +/* Define to 1 if you have the header file. */ +//#define HAVE_IFADDRS_H 1 + +/* Define to 1 if you have the header file. */ +#define HAVE_INTTYPES_H 1 + +/* Define to 1 if you have the `gssapi_krb5' library (-lgssapi_krb5). */ +/* #undef HAVE_LIBGSSAPI_KRB5 */ + +/* Define to 1 if you have the `iphlpapi' library (-liphlpapi). */ +/* #undef HAVE_LIBIPHLPAPI */ + +/* Define to 1 if you have the `nsl' library (-lnsl). */ +/* #undef HAVE_LIBNSL */ + +/* Define to 1 if you have the `pthread' library (-lpthread). */ +/* #undef HAVE_LIBPTHREAD */ + +/* Define to 1 if you have the `rpcrt4' library (-lrpcrt4). */ +/* #undef HAVE_LIBRPCRT4 */ + +/* Define to 1 if you have the `rt' library (-lrt). */ +/* #undef HAVE_LIBRT */ + +/* Define to 1 if you have the `socket' library (-lsocket). */ +/* #undef HAVE_LIBSOCKET */ + +/* Define to 1 if you have the `sodium' library (-lsodium). */ +/* #undef HAVE_LIBSODIUM */ + +/* Define to 1 if you have the `ws2_32' library (-lws2_32). */ +/* #undef HAVE_LIBWS2_32 */ + +/* Define to 1 if you have the header file. */ +#define HAVE_LIMITS_H 1 + +/* Define to 1 if you have the header file. */ +#define HAVE_MEMORY_H 1 + +/* Define to 1 if you have the `memset' function. */ +#define HAVE_MEMSET 1 + +/* Define to 1 if you have the header file. */ +#define HAVE_NETINET_IN_H 1 + +/* Define to 1 if you have the header file. */ +#define HAVE_NETINET_TCP_H 1 + +/* Define to 1 if you have the `perror' function. */ +#define HAVE_PERROR 1 + +/* Define to 1 if you have the `socket' function. */ +#define HAVE_SOCKET 1 + +/* Define to 1 if stdbool.h conforms to C99. */ +#define HAVE_STDBOOL_H 1 + +/* Define to 1 if you have the header file. */ +#define HAVE_STDDEF_H 1 + +/* Define to 1 if you have the header file. */ +#define HAVE_STDINT_H 1 + +/* Define to 1 if you have the header file. */ +#define HAVE_STDLIB_H 1 + +/* Define to 1 if you have the header file. */ +#define HAVE_STRINGS_H 1 + +/* Define to 1 if you have the header file. */ +#define HAVE_STRING_H 1 + +/* Define to 1 if you have the header file. */ +/* #undef HAVE_SYS_EVENTFD_H */ + +/* Define to 1 if you have the header file. */ +#define HAVE_SYS_SOCKET_H 1 + +/* Define to 1 if you have the header file. */ +#define HAVE_SYS_STAT_H 1 + +/* Define to 1 if you have the header file. */ +#define HAVE_SYS_TIME_H 1 + +/* Define to 1 if you have the header file. */ +#define HAVE_SYS_TYPES_H 1 + +/* Define to 1 if you have the header file. */ +#define HAVE_SYS_UIO_H 1 + +/* Define to 1 if you have the header file. */ +#define HAVE_TIME_H 1 + +/* Define to 1 if you have the header file. */ +#define HAVE_UNISTD_H 1 + +/* Define to 1 if you have the header file. */ +/* #undef HAVE_WINDOWS_H */ + +/* Define to 1 if the system has the type `_Bool'. */ +/* #undef HAVE__BOOL */ + +/* Define to the sub-directory in which libtool stores uninstalled libraries. + */ +#define LT_OBJDIR ".libs/" + +/* Name of package */ +#define PACKAGE "zeromq" + +/* Define to the address where bug reports for this package should be sent. */ +#define PACKAGE_BUGREPORT "zeromq-dev@lists.zeromq.org" + +/* Define to the full name of this package. */ +#define PACKAGE_NAME "zeromq" + +/* Define to the full name and version of this package. */ +#define PACKAGE_STRING "zeromq 4.1.0" + +/* Define to the one symbol short name of this package. */ +#define PACKAGE_TARNAME "zeromq" + +/* Define to the home page for this package. */ +#define PACKAGE_URL "" + +/* Define to the version of this package. */ +#define PACKAGE_VERSION "4.1.0" + +/* Define as the return type of signal handlers (`int' or `void'). */ +#define RETSIGTYPE void + +/* Define to 1 if you have the ANSI C header files. */ +#define STDC_HEADERS 1 + +/* Define to 1 if you can safely include both and . */ +#define TIME_WITH_SYS_TIME 1 + +/* Version number of package */ +#define VERSION "4.1.0" + +/* Enable militant API assertions */ +/* #undef ZMQ_ACT_MILITANT */ + +/* Force to use mutexes */ +/* #undef ZMQ_FORCE_MUTEXES */ + +/* Have VxWorks OS */ +#define ZMQ_HAVE_VXWORKS 1 + +/* Have AIX OS */ +/* #undef ZMQ_HAVE_AIX */ + +/* Have Android OS */ +/* #undef ZMQ_HAVE_ANDROID */ + +/* Have Cygwin */ +/* #undef ZMQ_HAVE_CYGWIN */ + +/* Have eventfd extension. */ +/* #undef ZMQ_HAVE_EVENTFD */ + +/* Have FreeBSD OS */ +/* #undef ZMQ_HAVE_FREEBSD */ + +/* Have HPUX OS */ +/* #undef ZMQ_HAVE_HPUX */ + +/* Have ifaddrs.h header. */ +//#define ZMQ_HAVE_IFADDRS 1 + +/* Have Linux OS */ +/* #undef ZMQ_HAVE_LINUX */ + +/* Have LOCAL_PEERCRED socket option */ +/* #undef ZMQ_HAVE_LOCAL_PEERCRED */ + +/* Have MinGW32 */ +/* #undef ZMQ_HAVE_MINGW32 */ + +/* Have NetBSD OS */ +/* #undef ZMQ_HAVE_NETBSD */ + +/* Have NORM protocol extension */ +/* #undef ZMQ_HAVE_NORM */ + +/* Have OpenBSD OS */ +/* #undef ZMQ_HAVE_OPENBSD */ + +/* Have OpenPGM extension */ +/* #undef ZMQ_HAVE_OPENPGM */ + +/* Have DarwinOSX OS */ +/* #undef ZMQ_HAVE_OSX */ + +/* Have QNX Neutrino OS */ +/* #undef ZMQ_HAVE_QNXNTO */ + +/* Whether SOCK_CLOEXEC is defined and functioning. */ +/* #undef ZMQ_HAVE_SOCK_CLOEXEC */ + +/* Have Solaris OS */ +/* #undef ZMQ_HAVE_SOLARIS */ + +/* Whether SO_KEEPALIVE is supported. */ +#define ZMQ_HAVE_SO_KEEPALIVE 1 + +/* Have SO_PEERCRED socket option */ +/* #undef ZMQ_HAVE_SO_PEERCRED */ + +/* Whether TCP_KEEPALIVE is supported. */ +/* #undef ZMQ_HAVE_TCP_KEEPALIVE */ + +/* Whether TCP_KEEPCNT is supported. */ +/* #undef ZMQ_HAVE_TCP_KEEPCNT */ + +/* Whether TCP_KEEPIDLE is supported. */ +/* #undef ZMQ_HAVE_TCP_KEEPIDLE */ + +/* Whether TCP_KEEPINTVL is supported. */ +/* #undef ZMQ_HAVE_TCP_KEEPINTVL */ + +/* Have TIPC support */ +#define ZMQ_HAVE_TIPC 1 + +/* Have uio.h header. */ +//#define ZMQ_HAVE_UIO 1 + +#define ZMQ_USE_SELECT 1 + +/* Have Windows OS */ +/* #undef ZMQ_HAVE_WINDOWS */ + +/* Define for Solaris 2.5.1 so the uint32_t typedef from , + , or is not used. If the typedef were allowed, the + #define below would cause a syntax error. */ +/* #undef _UINT32_T */ + +/* Define to empty if `const' does not conform to ANSI C. */ +/* #undef const */ + +/* Define to `__inline__' or `__inline' if that's what the C compiler + calls it, or to nothing if 'inline' is not supported under any name. */ +#ifndef __cplusplus +/* #undef inline */ +#endif + +/* Define to `unsigned int' if does not define. */ +/* #undef size_t */ + +/* Define to `int' if does not define. */ +/* #undef ssize_t */ + +/* Define to the type of an unsigned integer type of width exactly 32 bits if + such a type exists and the standard includes do not define it. */ +/* #undef uint32_t */ + +/* Define to empty if the keyword `volatile' does not work. Warning: valid + code using `volatile' can become incorrect without. Disable with care. */ +/* #undef volatile */ + +/* ---- Special case for z/OS Unix Services: openedition ---- */ +#include + +#ifndef NI_MAXHOST +#define NI_MAXHOST 1025 +#endif diff --git a/src/address.cpp b/src/address.cpp index a14e5d07..ed98cdca 100644 --- a/src/address.cpp +++ b/src/address.cpp @@ -66,7 +66,8 @@ zmq::address_t::~address_t () LIBZMQ_DELETE (resolved.udp_addr); } } -#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS +#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS \ + && !defined ZMQ_HAVE_VXWORKS else if (protocol == "ipc") { if (resolved.ipc_addr) { LIBZMQ_DELETE (resolved.ipc_addr); @@ -99,7 +100,8 @@ int zmq::address_t::to_string (std::string &addr_) const if (resolved.udp_addr) return resolved.udp_addr->to_string (addr_); } -#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS +#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS \ + && !defined ZMQ_HAVE_VXWORKS else if (protocol == "ipc") { if (resolved.ipc_addr) return resolved.ipc_addr->to_string (addr_); diff --git a/src/address.hpp b/src/address.hpp index 7c767b7e..61b6d717 100644 --- a/src/address.hpp +++ b/src/address.hpp @@ -40,7 +40,7 @@ class udp_address_t; #if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS class ipc_address_t; #endif -#if defined ZMQ_HAVE_LINUX +#if defined ZMQ_HAVE_LINUX || defined ZMQ_HAVE_VXWORKS class tipc_address_t; #endif #if defined ZMQ_HAVE_VMCI @@ -63,10 +63,11 @@ struct address_t { tcp_address_t *tcp_addr; udp_address_t *udp_addr; -#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS +#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS \ + && !defined ZMQ_HAVE_VXWORKS ipc_address_t *ipc_addr; #endif -#if defined ZMQ_HAVE_LINUX +#if defined ZMQ_HAVE_LINUX || defined ZMQ_HAVE_VXWORKS tipc_address_t *tipc_addr; #endif #if defined ZMQ_HAVE_VMCI diff --git a/src/atomic_ptr.hpp b/src/atomic_ptr.hpp index a4912bb3..ed6d7b41 100644 --- a/src/atomic_ptr.hpp +++ b/src/atomic_ptr.hpp @@ -280,7 +280,11 @@ struct atomic_value_t #endif #if defined ZMQ_ATOMIC_PTR_MUTEX +#if defined ZMQ_HAVE_VXWORKS + mutable mutex_t sync; +#else mutex_t sync; +#endif #endif private: diff --git a/src/clock.cpp b/src/clock.cpp index f868baca..b1aae587 100644 --- a/src/clock.cpp +++ b/src/clock.cpp @@ -52,6 +52,10 @@ #include #endif +#if defined ZMQ_HAVE_VXWORKS +#include "timers.h" +#endif + #if defined ZMQ_HAVE_OSX #include #include @@ -154,7 +158,8 @@ uint64_t zmq::clock_t::now_us () double ticks_div = ticksPerSecond.QuadPart / 1000000.0; return (uint64_t) (tick.QuadPart / ticks_div); -#elif defined HAVE_CLOCK_GETTIME && defined CLOCK_MONOTONIC +#elif defined HAVE_CLOCK_GETTIME \ + && (defined CLOCK_MONOTONIC || defined ZMQ_HAVE_VXWORKS) // Use POSIX clock_gettime function to get precise monotonic time. struct timespec tv; @@ -169,11 +174,13 @@ uint64_t zmq::clock_t::now_us () // This should be a configuration check, but I looked into it and writing an // AC_FUNC_CLOCK_MONOTONIC seems beyond my powers. if (rc != 0) { +#ifndef ZMQ_HAVE_VXWORKS // Use POSIX gettimeofday function to get precise time. struct timeval tv; int rc = gettimeofday (&tv, NULL); errno_assert (rc == 0); return (tv.tv_sec * (uint64_t) 1000000 + tv.tv_usec); +#endif } return (tv.tv_sec * (uint64_t) 1000000 + tv.tv_nsec / 1000); diff --git a/src/condition_variable.hpp b/src/condition_variable.hpp index e01ad410..8108777b 100644 --- a/src/condition_variable.hpp +++ b/src/condition_variable.hpp @@ -170,6 +170,90 @@ class condition_variable_t #endif +#elif defined ZMQ_HAVE_VXWORKS + +#include + +namespace zmq +{ +class condition_variable_t +{ + public: + inline condition_variable_t () {} + + inline ~condition_variable_t () + { + scoped_lock_t l (m_listenersMutex); + for (size_t i = 0; i < m_listeners.size (); i++) { + semDelete (m_listeners[i]); + } + } + + inline int wait (mutex_t *mutex_, int timeout_) + { + //Atomically releases lock, blocks the current executing thread, + //and adds it to the list of threads waiting on *this. The thread + //will be unblocked when broadcast() is executed. + //It may also be unblocked spuriously. When unblocked, regardless + //of the reason, lock is reacquired and wait exits. + + SEM_ID sem = semBCreate (SEM_Q_PRIORITY, SEM_EMPTY); + { + scoped_lock_t l (m_listenersMutex); + m_listeners.push_back (sem); + } + mutex_->unlock (); + + int rc; + if (timeout_ < 0) + rc = semTake (sem, WAIT_FOREVER); + else { + int ticksPerSec = sysClkRateGet (); + int timeoutTicks = (timeout_ * ticksPerSec) / 1000 + 1; + rc = semTake (sem, timeoutTicks); + } + + { + scoped_lock_t l (m_listenersMutex); + // remove sem from listeners + for (size_t i = 0; i < m_listeners.size (); i++) { + if (m_listeners[i] == sem) { + m_listeners.erase (m_listeners.begin () + i); + break; + } + } + semDelete (sem); + } + mutex_->lock (); + + if (rc == 0) + return 0; + + if (rc == S_objLib_OBJ_TIMEOUT) { + errno = EAGAIN; + return -1; + } + + return -1; + } + + inline void broadcast () + { + scoped_lock_t l (m_listenersMutex); + for (size_t i = 0; i < m_listeners.size (); i++) { + semGive (m_listeners[i]); + } + } + + private: + mutex_t m_listenersMutex; + std::vector m_listeners; + + // Disable copy construction and assignment. + condition_variable_t (const condition_variable_t &); + const condition_variable_t &operator= (const condition_variable_t &); +}; +} #else #include diff --git a/src/ip.cpp b/src/ip.cpp index 0e07f780..f47852df 100644 --- a/src/ip.cpp +++ b/src/ip.cpp @@ -44,10 +44,16 @@ #include "tcp.hpp" #endif -#if defined ZMQ_HAVE_OPENVMS +#if defined ZMQ_HAVE_OPENVMS || defined ZMQ_HAVE_VXWORKS #include #endif +#if defined ZMQ_HAVE_VXWORKS +#include +#include +#include +#endif + #if defined ZMQ_HAVE_EVENTFD #include #endif @@ -106,7 +112,7 @@ void zmq::unblock_socket (fd_t s_) u_long nonblock = 1; int rc = ioctlsocket (s_, FIONBIO, &nonblock); wsa_assert (rc != SOCKET_ERROR); -#elif defined ZMQ_HAVE_OPENVMS +#elif defined ZMQ_HAVE_OPENVMS || defined ZMQ_HAVE_VXWORKS int nonblock = 1; int rc = ioctl (s_, FIONBIO, &nonblock); errno_assert (rc != -1); @@ -129,8 +135,8 @@ void zmq::enable_ipv4_mapping (fd_t s_) #else int flag = 0; #endif - int rc = setsockopt (s_, IPPROTO_IPV6, IPV6_V6ONLY, (const char *) &flag, - sizeof (flag)); + int rc = + setsockopt (s_, IPPROTO_IPV6, IPV6_V6ONLY, (char *) &flag, sizeof (flag)); #ifdef ZMQ_HAVE_WINDOWS wsa_assert (rc != SOCKET_ERROR); #else @@ -144,7 +150,8 @@ int zmq::get_peer_ip_address (fd_t sockfd_, std::string &ip_addr_) int rc; struct sockaddr_storage ss; -#if defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_WINDOWS +#if defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_WINDOWS \ + || defined ZMQ_HAVE_VXWORKS int addrlen = static_cast (sizeof ss); #else socklen_t addrlen = sizeof ss; @@ -185,9 +192,8 @@ int zmq::get_peer_ip_address (fd_t sockfd_, std::string &ip_addr_) void zmq::set_ip_type_of_service (fd_t s_, int iptos) { - int rc = - setsockopt (s_, IPPROTO_IP, IP_TOS, - reinterpret_cast (&iptos), sizeof (iptos)); + int rc = setsockopt (s_, IPPROTO_IP, IP_TOS, + reinterpret_cast (&iptos), sizeof (iptos)); #ifdef ZMQ_HAVE_WINDOWS wsa_assert (rc != SOCKET_ERROR); @@ -198,7 +204,7 @@ void zmq::set_ip_type_of_service (fd_t s_, int iptos) // Windows and Hurd do not support IPV6_TCLASS #if !defined(ZMQ_HAVE_WINDOWS) && defined(IPV6_TCLASS) rc = setsockopt (s_, IPPROTO_IPV6, IPV6_TCLASS, - reinterpret_cast (&iptos), sizeof (iptos)); + reinterpret_cast (&iptos), sizeof (iptos)); // If IPv6 is not enabled ENOPROTOOPT will be returned on Linux and // EINVAL on OSX @@ -577,7 +583,48 @@ int zmq::make_fdpair (fd_t *r_, fd_t *w_) close (listener); return 0; +#elif defined ZMQ_HAVE_VXWORKS + struct sockaddr_in lcladdr; + memset (&lcladdr, 0, sizeof lcladdr); + lcladdr.sin_family = AF_INET; + lcladdr.sin_addr.s_addr = htonl (INADDR_LOOPBACK); + lcladdr.sin_port = 0; + int listener = open_socket (AF_INET, SOCK_STREAM, 0); + errno_assert (listener != -1); + + int on = 1; + int rc = + setsockopt (listener, IPPROTO_TCP, TCP_NODELAY, (char *) &on, sizeof on); + errno_assert (rc != -1); + + rc = bind (listener, (struct sockaddr *) &lcladdr, sizeof lcladdr); + errno_assert (rc != -1); + + socklen_t lcladdr_len = sizeof lcladdr; + + rc = getsockname (listener, (struct sockaddr *) &lcladdr, + (int *) &lcladdr_len); + errno_assert (rc != -1); + + rc = listen (listener, 1); + errno_assert (rc != -1); + + *w_ = open_socket (AF_INET, SOCK_STREAM, 0); + errno_assert (*w_ != -1); + + rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELAY, (char *) &on, sizeof on); + errno_assert (rc != -1); + + rc = connect (*w_, (struct sockaddr *) &lcladdr, sizeof lcladdr); + errno_assert (rc != -1); + + *r_ = accept (listener, NULL, NULL); + errno_assert (*r_ != -1); + + close (listener); + + return 0; #else // All other implementations support socketpair() int sv[2]; @@ -594,9 +641,9 @@ int zmq::make_fdpair (fd_t *r_, fd_t *w_) *w_ = *r_ = -1; return -1; } else { - // If there's no SOCK_CLOEXEC, let's try the second best option. Note that - // race condition can cause socket not to be closed (if fork happens - // between socket creation and this point). + // If there's no SOCK_CLOEXEC, let's try the second best option. Note that + // race condition can cause socket not to be closed (if fork happens + // between socket creation and this point). #if !defined ZMQ_HAVE_SOCK_CLOEXEC && defined FD_CLOEXEC rc = fcntl (sv[0], F_SETFD, FD_CLOEXEC); errno_assert (rc != -1); diff --git a/src/ipc_address.cpp b/src/ipc_address.cpp index a26d519b..0f4cb829 100644 --- a/src/ipc_address.cpp +++ b/src/ipc_address.cpp @@ -30,7 +30,8 @@ #include "precompiled.hpp" #include "ipc_address.hpp" -#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS +#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS \ + && !defined ZMQ_HAVE_VXWORKS #include "err.hpp" diff --git a/src/ipc_address.hpp b/src/ipc_address.hpp index e6ce271a..e9b74c30 100644 --- a/src/ipc_address.hpp +++ b/src/ipc_address.hpp @@ -32,7 +32,8 @@ #include -#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS +#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS \ + && !defined ZMQ_HAVE_VXWORKS #include #include diff --git a/src/ipc_connecter.cpp b/src/ipc_connecter.cpp index 7ec35381..eaebfe9f 100644 --- a/src/ipc_connecter.cpp +++ b/src/ipc_connecter.cpp @@ -30,7 +30,8 @@ #include "precompiled.hpp" #include "ipc_connecter.hpp" -#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS +#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS \ + && !defined ZMQ_HAVE_VXWORKS #include #include @@ -49,6 +50,7 @@ #include #include + zmq::ipc_connecter_t::ipc_connecter_t (class io_thread_t *io_thread_, class session_base_t *session_, const options_t &options_, diff --git a/src/ipc_connecter.hpp b/src/ipc_connecter.hpp index 1506d3c2..39f41c0f 100644 --- a/src/ipc_connecter.hpp +++ b/src/ipc_connecter.hpp @@ -30,7 +30,8 @@ #ifndef __IPC_CONNECTER_HPP_INCLUDED__ #define __IPC_CONNECTER_HPP_INCLUDED__ -#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS +#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS \ + && !defined ZMQ_HAVE_VXWORKS #include "fd.hpp" #include "own.hpp" diff --git a/src/ipc_listener.cpp b/src/ipc_listener.cpp index 3a688122..470aef75 100644 --- a/src/ipc_listener.cpp +++ b/src/ipc_listener.cpp @@ -30,7 +30,8 @@ #include "precompiled.hpp" #include "ipc_listener.hpp" -#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS +#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS \ + && !defined ZMQ_HAVE_VXWORKS #include @@ -69,6 +70,7 @@ const char *zmq::ipc_listener_t::tmp_env_vars[] = { 0 // Sentinel }; + int zmq::ipc_listener_t::create_wildcard_address (std::string &path_, std::string &file_) { @@ -263,7 +265,7 @@ int zmq::ipc_listener_t::set_address (const char *addr_) } // Bind the socket to the file path. - rc = bind (s, address.addr (), address.addrlen ()); + rc = bind (s, (sockaddr *) address.addr (), address.addrlen ()); if (rc != 0) goto error; diff --git a/src/ipc_listener.hpp b/src/ipc_listener.hpp index 4c5d53c5..7db42865 100644 --- a/src/ipc_listener.hpp +++ b/src/ipc_listener.hpp @@ -30,7 +30,8 @@ #ifndef __ZMQ_IPC_LISTENER_HPP_INCLUDED__ #define __ZMQ_IPC_LISTENER_HPP_INCLUDED__ -#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS +#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS \ + && !defined ZMQ_HAVE_VXWORKS #include diff --git a/src/mutex.hpp b/src/mutex.hpp index fd0cfa6c..3f591543 100644 --- a/src/mutex.hpp +++ b/src/mutex.hpp @@ -67,6 +67,45 @@ class mutex_t }; } +#elif defined ZMQ_HAVE_VXWORKS + +#include +#include + +namespace zmq +{ +class mutex_t +{ + public: + inline mutex_t () + { + m_semId = + semMCreate (SEM_Q_PRIORITY | SEM_INVERSION_SAFE | SEM_DELETE_SAFE); + } + + inline ~mutex_t () { semDelete (m_semId); } + + inline void lock () { semTake (m_semId, WAIT_FOREVER); } + + inline bool try_lock () + { + if (semTake (m_semId, NO_WAIT) == OK) { + return true; + } + return false; + } + + inline void unlock () { semGive (m_semId); } + + private: + SEM_ID m_semId; + + // Disable copy construction and assignment. + mutex_t (const mutex_t &); + const mutex_t &operator= (const mutex_t &); +}; +} + #else #include diff --git a/src/options.cpp b/src/options.cpp index 754f6256..0f1a06de 100644 --- a/src/options.cpp +++ b/src/options.cpp @@ -717,10 +717,10 @@ int zmq::options_t::setsockopt (int option_, break; } - // TODO mechanism should either be set explicitly, or determined when - // connecting. currently, it depends on the order of setsockopt calls - // if there is some inconsistency, which is confusing. in addition, - // the assumed or set mechanism should be queryable (as a socket option) + // TODO mechanism should either be set explicitly, or determined when + // connecting. currently, it depends on the order of setsockopt calls + // if there is some inconsistency, which is confusing. in addition, + // the assumed or set mechanism should be queryable (as a socket option) #if defined(ZMQ_ACT_MILITANT) // There is no valid use case for passing an error back to the application diff --git a/src/select.cpp b/src/select.cpp index 625f6c53..af8f0d7e 100644 --- a/src/select.cpp +++ b/src/select.cpp @@ -39,6 +39,10 @@ #elif defined ZMQ_HAVE_OPENVMS #include #include +#elif defined ZMQ_HAVE_VXWORKS +#include +#include +#include #else #include #endif diff --git a/src/session_base.cpp b/src/session_base.cpp index 87a08772..dfb2c205 100644 --- a/src/session_base.cpp +++ b/src/session_base.cpp @@ -576,7 +576,8 @@ void zmq::session_base_t::start_connecting (bool wait_) return; } -#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS +#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS \ + && !defined ZMQ_HAVE_VXWORKS if (addr->protocol == "ipc") { ipc_connecter_t *connecter = new (std::nothrow) ipc_connecter_t (io_thread, this, options, addr, wait_); diff --git a/src/signaler.cpp b/src/signaler.cpp index fb8c5e61..f63e55e5 100644 --- a/src/signaler.cpp +++ b/src/signaler.cpp @@ -53,6 +53,11 @@ #elif defined ZMQ_HAVE_OPENVMS #include #include +#elif defined ZMQ_HAVE_VXWORKS +#include +#include +#include +#include #else #include #endif @@ -87,6 +92,11 @@ static int sleep_ms (unsigned int ms_) #elif defined ZMQ_HAVE_ANDROID usleep (ms_ * 1000); return 0; +#elif defined ZMQ_HAVE_VXWORKS + struct timespec ns_; + ns_.tv_sec = ms_ / 1000; + ns_.tv_nsec = ms_ % 1000 * 1000000; + return nanosleep (&ns_, 0); #else return usleep (ms_ * 1000); #endif @@ -194,6 +204,22 @@ void zmq::signaler_t::send () zmq_assert (nbytes == sizeof (dummy)); break; } +#elif defined ZMQ_HAVE_VXWORKS + unsigned char dummy = 0; + while (true) { + ssize_t nbytes = ::send (w, (char *) &dummy, sizeof (dummy), 0); + if (unlikely (nbytes == -1 && errno == EINTR)) + continue; +#if defined(HAVE_FORK) + if (unlikely (pid != getpid ())) { + //printf("Child process %d signaler_t::send returning without sending #2\n", getpid()); + errno = EINTR; + break; + } +#endif + zmq_assert (nbytes == sizeof dummy); + break; + } #else unsigned char dummy = 0; while (true) { @@ -305,6 +331,9 @@ void zmq::signaler_t::recv () #if defined ZMQ_HAVE_WINDOWS int nbytes = ::recv (r, (char *) &dummy, sizeof (dummy), 0); wsa_assert (nbytes != SOCKET_ERROR); +#elif defined ZMQ_HAVE_VXWORKS + ssize_t nbytes = ::recv (r, (char *) &dummy, sizeof (dummy), 0); + errno_assert (nbytes >= 0); #else ssize_t nbytes = ::recv (r, &dummy, sizeof (dummy), 0); errno_assert (nbytes >= 0); @@ -349,6 +378,16 @@ int zmq::signaler_t::recv_failable () } wsa_assert (last_error == WSAEWOULDBLOCK); } +#elif defined ZMQ_HAVE_VXWORKS + ssize_t nbytes = ::recv (r, (char *) &dummy, sizeof (dummy), 0); + if (nbytes == -1) { + if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) { + errno = EAGAIN; + return -1; + } + errno_assert (errno == EAGAIN || errno == EWOULDBLOCK + || errno == EINTR); + } #else ssize_t nbytes = ::recv (r, &dummy, sizeof (dummy), 0); if (nbytes == -1) { diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 41c2b6a3..17531e0f 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -288,7 +288,8 @@ int zmq::socket_base_t::check_protocol (const std::string &protocol_) { // First check out whether the protocol is something we are aware of. if (protocol_ != "inproc" -#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS +#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS \ + && !defined ZMQ_HAVE_VXWORKS && protocol_ != "ipc" #endif && protocol_ != "tcp" @@ -312,9 +313,9 @@ int zmq::socket_base_t::check_protocol (const std::string &protocol_) return -1; } - // Check whether socket type and transport protocol match. - // Specifically, multicast protocols can't be combined with - // bi-directional messaging patterns (socket types). + // Check whether socket type and transport protocol match. + // Specifically, multicast protocols can't be combined with + // bi-directional messaging patterns (socket types). #if defined ZMQ_HAVE_OPENPGM || defined ZMQ_HAVE_NORM if ((protocol_ == "pgm" || protocol_ == "epgm" || protocol_ == "norm") && options.type != ZMQ_PUB && options.type != ZMQ_SUB @@ -591,7 +592,8 @@ int zmq::socket_base_t::bind (const char *addr_) return 0; } -#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS +#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS \ + && !defined ZMQ_HAVE_VXWORKS if (protocol == "ipc") { ipc_listener_t *listener = new (std::nothrow) ipc_listener_t (io_thread, this, options); @@ -847,7 +849,8 @@ int zmq::socket_base_t::connect (const char *addr_) // Defer resolution until a socket is opened paddr->resolved.tcp_addr = NULL; } -#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS +#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS \ + && !defined ZMQ_HAVE_VXWORKS else if (protocol == "ipc") { paddr->resolved.ipc_addr = new (std::nothrow) ipc_address_t (); alloc_assert (paddr->resolved.ipc_addr); @@ -875,7 +878,7 @@ int zmq::socket_base_t::connect (const char *addr_) } } - // TBD - Should we check address for ZMQ_HAVE_NORM??? + // TBD - Should we check address for ZMQ_HAVE_NORM??? #ifdef ZMQ_HAVE_OPENPGM if (protocol == "pgm" || protocol == "epgm") { diff --git a/src/socket_poller.cpp b/src/socket_poller.cpp index 5e9dea30..e45bec9e 100644 --- a/src/socket_poller.cpp +++ b/src/socket_poller.cpp @@ -541,6 +541,12 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *events_, usleep (timeout_ * 1000); errno = EAGAIN; return -1; +#elif defined ZMQ_HAVE_VXWORKS + struct timespec ns_; + ns_.tv_sec = timeout_ / 1000; + ns_.tv_nsec = timeout_ % 1000 * 1000000; + nanosleep (&ns_, 0); + return -1; #else usleep (timeout_ * 1000); return -1; diff --git a/src/socket_poller.hpp b/src/socket_poller.hpp index 67599d92..eb79a7ea 100644 --- a/src/socket_poller.hpp +++ b/src/socket_poller.hpp @@ -38,6 +38,10 @@ #if defined ZMQ_HAVE_WINDOWS #include "windows.hpp" +#elif defined ZMQ_HAVE_VXWORKS +#include +#include +#include #else #include #endif diff --git a/src/socks_connecter.cpp b/src/socks_connecter.cpp index 85206673..c16bd91e 100644 --- a/src/socks_connecter.cpp +++ b/src/socks_connecter.cpp @@ -47,6 +47,9 @@ #include #include #include +#if defined ZMQ_HAVE_VXWORKS +#include +#endif #endif zmq::socks_connecter_t::socks_connecter_t (class io_thread_t *io_thread_, @@ -342,7 +345,12 @@ int zmq::socks_connecter_t::connect_to_proxy () // Set a source address for conversations if (tcp_addr->has_src_addr ()) { +#if defined ZMQ_HAVE_VXWORKS + rc = ::bind (s, (sockaddr *) tcp_addr->src_addr (), + tcp_addr->src_addrlen ()); +#else rc = ::bind (s, tcp_addr->src_addr (), tcp_addr->src_addrlen ()); +#endif if (rc == -1) { close (); return -1; @@ -350,8 +358,11 @@ int zmq::socks_connecter_t::connect_to_proxy () } // Connect to the remote peer. +#if defined ZMQ_HAVE_VXWORKS + rc = ::connect (s, (sockaddr *) tcp_addr->addr (), tcp_addr->addrlen ()); +#else rc = ::connect (s, tcp_addr->addr (), tcp_addr->addrlen ()); - +#endif // Connect was successful immediately. if (rc == 0) return 0; @@ -377,7 +388,7 @@ zmq::fd_t zmq::socks_connecter_t::check_proxy_connection () { // Async connect has finished. Check whether an error occurred int err = 0; -#ifdef ZMQ_HAVE_HPUX +#if defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_VXWORKS int len = sizeof err; #else socklen_t len = sizeof err; diff --git a/src/tcp.cpp b/src/tcp.cpp index a183c9f1..fd9b6ad0 100644 --- a/src/tcp.cpp +++ b/src/tcp.cpp @@ -39,6 +39,9 @@ #include #include #include +#ifdef ZMQ_HAVE_VXWORKS +#include +#endif #endif #if defined ZMQ_HAVE_OPENVMS @@ -224,7 +227,7 @@ int zmq::tcp_write (fd_t s_, const void *data_, size_t size_) return nbytes; #else - ssize_t nbytes = send (s_, data_, size_, 0); + ssize_t nbytes = send (s_, (char *) data_, size_, 0); // Several errors are OK. When speculative write is being done we may not // be able to write a single byte from the socket. Also, SIGSTOP issued @@ -273,7 +276,7 @@ int zmq::tcp_read (fd_t s_, void *data_, size_t size_) #else - const ssize_t rc = recv (s_, data_, size_, 0); + const ssize_t rc = recv (s_, (char *) data_, size_, 0); // Several errors are OK. When speculative read is being done we may not // be able to read a single byte from the socket. Also, SIGSTOP issued @@ -297,7 +300,7 @@ void zmq::tcp_assert_tuning_error (zmq::fd_t s_, int rc_) // Check whether an error occurred int err = 0; -#ifdef ZMQ_HAVE_HPUX +#if defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_VXWORKS int len = sizeof err; #else socklen_t len = sizeof err; diff --git a/src/tcp_address.cpp b/src/tcp_address.cpp index deeb3d03..95eb889c 100644 --- a/src/tcp_address.cpp +++ b/src/tcp_address.cpp @@ -113,8 +113,12 @@ int zmq::tcp_address_t::resolve_nic_name (const char *nic_, return 0; } -#elif defined ZMQ_HAVE_AIX || defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_ANDROID +#elif defined ZMQ_HAVE_AIX || defined ZMQ_HAVE_HPUX \ + || defined ZMQ_HAVE_ANDROID || defined ZMQ_HAVE_VXWORKS #include +#ifdef ZMQ_HAVE_VXWORKS +#include +#endif int zmq::tcp_address_t::resolve_nic_name (const char *nic_, bool ipv6_, @@ -413,7 +417,12 @@ int zmq::tcp_address_t::resolve_interface (const char *interface_, sockaddr_in6 ip6_addr; memset (&ip6_addr, 0, sizeof (ip6_addr)); ip6_addr.sin6_family = AF_INET6; +#ifdef ZMQ_HAVE_VXWORKS + struct in6_addr newaddr = IN6ADDR_ANY_INIT; + memcpy (&ip6_addr.sin6_addr, &newaddr, sizeof (in6_addr)); +#else memcpy (&ip6_addr.sin6_addr, &in6addr_any, sizeof (in6addr_any)); +#endif out_addrlen = sizeof (ip6_addr); memcpy (out_addr, &ip6_addr, out_addrlen); } else { @@ -646,7 +655,8 @@ int zmq::tcp_address_t::resolve (const char *name_, std::string if_str = addr_str.substr (pos + 1); addr_str = addr_str.substr (0, pos); if (isalpha (if_str.at (0))) -#if !defined ZMQ_HAVE_WINDOWS_TARGET_XP && !defined ZMQ_HAVE_WINDOWS_UWP +#if !defined ZMQ_HAVE_WINDOWS_TARGET_XP && !defined ZMQ_HAVE_WINDOWS_UWP \ + && !defined ZMQ_HAVE_VXWORKS zone_id = if_nametoindex (if_str.c_str ()); #else // The function 'if_nametoindex' is not supported on Windows XP. diff --git a/src/tcp_connecter.cpp b/src/tcp_connecter.cpp index ae9c6088..65b8226d 100644 --- a/src/tcp_connecter.cpp +++ b/src/tcp_connecter.cpp @@ -52,6 +52,9 @@ #include #include #include +#ifdef ZMQ_HAVE_VXWORKS +#include +#endif #ifdef ZMQ_HAVE_OPENVMS #include #endif @@ -325,26 +328,38 @@ int zmq::tcp_connecter_t::open () rc = setsockopt (s, SOL_SOCKET, SO_REUSEADDR, (const char *) &flag, sizeof (int)); wsa_assert (rc != SOCKET_ERROR); +#elif defined ZMQ_HAVE_VXWORKS + rc = setsockopt (s, SOL_SOCKET, SO_REUSEADDR, (char *) &flag, + sizeof (int)); + errno_assert (rc == 0); #else rc = setsockopt (s, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof (int)); errno_assert (rc == 0); #endif +#if defined ZMQ_HAVE_VXWORKS + rc = ::bind (s, (sockaddr *) tcp_addr->src_addr (), + tcp_addr->src_addrlen ()); +#else rc = ::bind (s, tcp_addr->src_addr (), tcp_addr->src_addrlen ()); +#endif if (rc == -1) return -1; } // Connect to the remote peer. +#if defined ZMQ_HAVE_VXWORKS + rc = ::connect (s, (sockaddr *) tcp_addr->addr (), tcp_addr->addrlen ()); +#else rc = ::connect (s, tcp_addr->addr (), tcp_addr->addrlen ()); - +#endif // Connect was successful immediately. if (rc == 0) { return 0; } - // Translate error codes indicating asynchronous connect has been - // launched to a uniform EINPROGRESS. + // Translate error codes indicating asynchronous connect has been + // launched to a uniform EINPROGRESS. #ifdef ZMQ_HAVE_WINDOWS const int last_error = WSAGetLastError (); if (last_error == WSAEINPROGRESS || last_error == WSAEWOULDBLOCK) @@ -362,7 +377,7 @@ zmq::fd_t zmq::tcp_connecter_t::connect () { // Async connect has finished. Check whether an error occurred int err = 0; -#ifdef ZMQ_HAVE_HPUX +#if defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_VXWORKS int len = sizeof err; #else socklen_t len = sizeof err; diff --git a/src/tcp_listener.cpp b/src/tcp_listener.cpp index 9a54521f..2b81b684 100644 --- a/src/tcp_listener.cpp +++ b/src/tcp_listener.cpp @@ -51,6 +51,9 @@ #include #include #include +#ifdef ZMQ_HAVE_VXWORKS +#include +#endif #endif #ifdef ZMQ_HAVE_OPENVMS @@ -149,7 +152,7 @@ int zmq::tcp_listener_t::get_address (std::string &addr_) { // Get the details of the TCP socket struct sockaddr_storage ss; -#ifdef ZMQ_HAVE_HPUX +#if defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_VXWORKS int sl = sizeof (ss); #else socklen_t sl = sizeof (ss); @@ -236,13 +239,20 @@ int zmq::tcp_listener_t::set_address (const char *addr_) rc = setsockopt (s, SOL_SOCKET, SO_EXCLUSIVEADDRUSE, (const char *) &flag, sizeof (int)); wsa_assert (rc != SOCKET_ERROR); +#elif defined ZMQ_HAVE_VXWORKS + rc = setsockopt (s, SOL_SOCKET, SO_REUSEADDR, (char *) &flag, sizeof (int)); + errno_assert (rc == 0); #else rc = setsockopt (s, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof (int)); errno_assert (rc == 0); #endif // Bind the socket to the network interface and port. +#if defined ZMQ_HAVE_VXWORKS + rc = bind (s, (sockaddr *) address.addr (), address.addrlen ()); +#else rc = bind (s, address.addr (), address.addrlen ()); +#endif #ifdef ZMQ_HAVE_WINDOWS if (rc == SOCKET_ERROR) { errno = wsa_error_to_errno (WSAGetLastError ()); @@ -284,7 +294,7 @@ zmq::fd_t zmq::tcp_listener_t::accept () struct sockaddr_storage ss; memset (&ss, 0, sizeof (ss)); -#ifdef ZMQ_HAVE_HPUX +#if defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_VXWORKS int ss_len = sizeof (ss); #else socklen_t ss_len = sizeof (ss); diff --git a/src/thread.cpp b/src/thread.cpp index 288f4091..a1086b0c 100644 --- a/src/thread.cpp +++ b/src/thread.cpp @@ -97,6 +97,66 @@ void zmq::thread_t::setThreadName (const char *name_) LIBZMQ_UNUSED (name_); } +#elif defined ZMQ_HAVE_VXWORKS + +extern "C" { +static void *thread_routine (void *arg_) +{ + zmq::thread_t *self = (zmq::thread_t *) arg_; + self->applySchedulingParameters (); + self->tfn (self->arg); + return NULL; +} +} + +void zmq::thread_t::start (thread_fn *tfn_, void *arg_) +{ + tfn = tfn_; + arg = arg_; + descriptor = taskSpawn (NULL, DEFAULT_PRIORITY, DEFAULT_OPTIONS, + DEFAULT_STACK_SIZE, (FUNCPTR) thread_routine, + (int) this, 0, 0, 0, 0, 0, 0, 0, 0, 0); + if (descriptor != NULL || descriptor > 0) + started = true; +} + +void zmq::thread_t::stop () +{ + if (started) + while ((descriptor != NULL || descriptor > 0) + && taskIdVerify (descriptor) == 0) { + } +} + +bool zmq::thread_t::is_current_thread () const +{ + return taskIdSelf () == descriptor; +} + +void zmq::thread_t::setSchedulingParameters ( + int priority_, int schedulingPolicy_, const std::set &affinity_cpus_) +{ + thread_priority = priority_; + thread_sched_policy = schedulingPolicy_; + thread_affinity_cpus = affinity_cpus_; +} + +void zmq::thread_t:: + applySchedulingParameters () // to be called in secondary thread context +{ + int priority = (thread_priority >= 0 ? thread_priority : DEFAULT_PRIORITY); + priority = (priority < 255 ? priority : DEFAULT_PRIORITY); + if (descriptor != NULL || descriptor > 0) { + taskPrioritySet (descriptor, priority); + } +} + +void zmq::thread_t::setThreadName (const char *name_) +{ + // not implemented + LIBZMQ_UNUSED (name_); +} + #else #include @@ -206,6 +266,7 @@ void zmq::thread_t:: posix_assert (rc); +#if !defined ZMQ_HAVE_VXWORKS if (use_nice_instead_priority && thread_priority != ZMQ_THREAD_PRIORITY_DFLT) { // assume the user wants to decrease the thread's nice value @@ -217,6 +278,7 @@ void zmq::thread_t:: // IMPORTANT: EPERM is typically returned for unprivileged processes: that's because // CAP_SYS_NICE capability is required or RLIMIT_NICE resource limit should be changed to avoid EPERM! } +#endif #ifdef ZMQ_HAVE_PTHREAD_SET_AFFINITY if (!thread_affinity_cpus.empty ()) { diff --git a/src/thread.hpp b/src/thread.hpp index 1f8258b6..b818232f 100644 --- a/src/thread.hpp +++ b/src/thread.hpp @@ -30,7 +30,10 @@ #ifndef __ZMQ_THREAD_HPP_INCLUDED__ #define __ZMQ_THREAD_HPP_INCLUDED__ -#ifndef ZMQ_HAVE_WINDOWS +#if defined ZMQ_HAVE_VXWORKS +#include +#include +#elif !defined ZMQ_HAVE_WINDOWS #include #endif #include @@ -58,6 +61,15 @@ class thread_t { } +#ifdef ZMQ_HAVE_VXWORKS + ~thread_t () + { + if (descriptor != NULL || descriptor > 0) { + taskDelete (descriptor); + } + } +#endif + // Creates OS thread. 'tfn' is main thread function. It'll be passed // 'arg' as an argument. void start (thread_fn *tfn_, void *arg_); @@ -93,6 +105,14 @@ class thread_t #ifdef ZMQ_HAVE_WINDOWS HANDLE descriptor; +#elif defined ZMQ_HAVE_VXWORKS + int descriptor; + enum + { + DEFAULT_PRIORITY = 100, + DEFAULT_OPTIONS = 0, + DEFAULT_STACK_SIZE = 4000 + }; #else pthread_t descriptor; #endif diff --git a/src/tipc_address.hpp b/src/tipc_address.hpp index c6a8d277..8d7cbdd7 100644 --- a/src/tipc_address.hpp +++ b/src/tipc_address.hpp @@ -37,7 +37,11 @@ #if defined ZMQ_HAVE_TIPC #include +#if defined ZMQ_HAVE_VXWORKS +#include +#else #include +#endif namespace zmq { diff --git a/src/tipc_connecter.cpp b/src/tipc_connecter.cpp index ad900a06..4f0a2992 100644 --- a/src/tipc_connecter.cpp +++ b/src/tipc_connecter.cpp @@ -49,6 +49,9 @@ #include #include #include +#ifdef ZMQ_HAVE_VXWORKS +#include +#endif zmq::tipc_connecter_t::tipc_connecter_t (class io_thread_t *io_thread_, class session_base_t *session_, @@ -217,9 +220,13 @@ int zmq::tipc_connecter_t::open () // Set the non-blocking flag. unblock_socket (s); // Connect to the remote peer. +#ifdef ZMQ_HAVE_VXWORKS + int rc = ::connect (s, (sockaddr *) addr->resolved.tipc_addr->addr (), + addr->resolved.tipc_addr->addrlen ()); +#else int rc = ::connect (s, addr->resolved.tipc_addr->addr (), addr->resolved.tipc_addr->addrlen ()); - +#endif // Connect was successful immediately. if (rc == 0) return 0; @@ -248,8 +255,11 @@ zmq::fd_t zmq::tipc_connecter_t::connect () // Following code should handle both Berkeley-derived socket // implementations and Solaris. int err = 0; +#if ZMQ_HAVE_VXWORKS + int len = sizeof (err); +#else socklen_t len = sizeof (err); - +#endif int rc = getsockopt (s, SOL_SOCKET, SO_ERROR, (char *) &err, &len); if (rc == -1) err = errno; diff --git a/src/tipc_listener.cpp b/src/tipc_listener.cpp index 4cbdc122..9c05f765 100644 --- a/src/tipc_listener.cpp +++ b/src/tipc_listener.cpp @@ -49,7 +49,12 @@ #include #include #include +#if defined ZMQ_HAVE_VXWORKS +#include +#include +#else #include +#endif zmq::tipc_listener_t::tipc_listener_t (io_thread_t *io_thread_, socket_base_t *socket_, @@ -116,7 +121,11 @@ int zmq::tipc_listener_t::get_address (std::string &addr_) struct sockaddr_storage ss; socklen_t sl = sizeof (ss); +#ifdef ZMQ_HAVE_VXWORKS + int rc = getsockname (s, (sockaddr *) &ss, (int *) &sl); +#else int rc = getsockname (s, (sockaddr *) &ss, &sl); +#endif if (rc != 0) { addr_.clear (); return rc; @@ -148,7 +157,11 @@ int zmq::tipc_listener_t::set_address (const char *addr_) // If random Port Identity, update address object to reflect the assigned address if (address.is_random ()) { struct sockaddr_storage ss; +#ifdef ZMQ_HAVE_VXWORKS + int sl = sizeof (ss); +#else socklen_t sl = sizeof (ss); +#endif int rc = getsockname (s, (sockaddr *) &ss, &sl); if (rc != 0) goto error; @@ -161,7 +174,11 @@ int zmq::tipc_listener_t::set_address (const char *addr_) // Bind the socket to tipc name if (address.is_service ()) { +#ifdef ZMQ_HAVE_VXWORKS + rc = bind (s, (sockaddr *) address.addr (), address.addrlen ()); +#else rc = bind (s, address.addr (), address.addrlen ()); +#endif if (rc != 0) goto error; } @@ -199,7 +216,11 @@ zmq::fd_t zmq::tipc_listener_t::accept () socklen_t ss_len = sizeof (ss); zmq_assert (s != retired_fd); +#ifdef ZMQ_HAVE_VXWORKS + fd_t sock = ::accept (s, (struct sockaddr *) &ss, (int *) &ss_len); +#else fd_t sock = ::accept (s, (struct sockaddr *) &ss, &ss_len); +#endif if (sock == -1) { errno_assert (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS || errno == EINTR diff --git a/src/udp_engine.cpp b/src/udp_engine.cpp index 2dcb0753..5317ad86 100644 --- a/src/udp_engine.cpp +++ b/src/udp_engine.cpp @@ -35,6 +35,9 @@ along with this program. If not, see . #include #include #include +#ifdef ZMQ_HAVE_VXWORKS +#include +#endif #endif #include "udp_engine.hpp" @@ -128,8 +131,13 @@ void zmq::udp_engine_t::plug (io_thread_t *io_thread_, session_base_t *session_) errno_assert (rc == 0); #endif +#ifdef ZMQ_HAVE_VXWORKS + rc = bind (fd, (sockaddr *) address->resolved.udp_addr->bind_addr (), + address->resolved.udp_addr->bind_addrlen ()); +#else rc = bind (fd, address->resolved.udp_addr->bind_addr (), address->resolved.udp_addr->bind_addrlen ()); +#endif #ifdef ZMQ_HAVE_WINDOWS wsa_assert (rc != SOCKET_ERROR); #else @@ -282,6 +290,10 @@ void zmq::udp_engine_t::out_event () rc = sendto (fd, (const char *) out_buffer, (int) size, 0, out_address, (int) out_addrlen); wsa_assert (rc != SOCKET_ERROR); +#elif defined ZMQ_HAVE_VXWORKS + rc = sendto (fd, (caddr_t) out_buffer, size, 0, + (sockaddr *) out_address, (int) out_addrlen); + errno_assert (rc != -1); #else rc = sendto (fd, out_buffer, size, 0, out_address, out_addrlen); errno_assert (rc != -1); @@ -321,6 +333,14 @@ void zmq::udp_engine_t::in_event () || last_error == WSAEWOULDBLOCK); return; } +#elif defined ZMQ_HAVE_VXWORKS + int nbytes = recvfrom (fd, (char *) in_buffer, MAX_UDP_MSG, 0, + (sockaddr *) &in_address, (int *) &in_addrlen); + if (nbytes == -1) { + errno_assert (errno != EBADF && errno != EFAULT && errno != ENOMEM + && errno != ENOTSOCK); + return; + } #else int nbytes = recvfrom (fd, in_buffer, MAX_UDP_MSG, 0, (sockaddr *) &in_address, &in_addrlen); diff --git a/src/ypipe.hpp b/src/ypipe.hpp index 146bd6d0..935bf183 100644 --- a/src/ypipe.hpp +++ b/src/ypipe.hpp @@ -62,9 +62,9 @@ template class ypipe_t : public ypipe_base_t // just to keep ICC and code checking tools from complaining. inline virtual ~ypipe_t () {} - // Following function (write) deliberately copies uninitialised data - // when used with zmq_msg. Initialising the VSM body for - // non-VSM messages won't be good for performance. + // Following function (write) deliberately copies uninitialised data + // when used with zmq_msg. Initialising the VSM body for + // non-VSM messages won't be good for performance. #ifdef ZMQ_HAVE_OPENVMS #pragma message save diff --git a/src/ypipe_conflate.hpp b/src/ypipe_conflate.hpp index 88cd5064..dd9d22aa 100644 --- a/src/ypipe_conflate.hpp +++ b/src/ypipe_conflate.hpp @@ -53,9 +53,9 @@ template class ypipe_conflate_t : public ypipe_base_t // just to keep ICC and code checking tools from complaining. inline virtual ~ypipe_conflate_t () {} - // Following function (write) deliberately copies uninitialised data - // when used with zmq_msg. Initialising the VSM body for - // non-VSM messages won't be good for performance. + // Following function (write) deliberately copies uninitialised data + // when used with zmq_msg. Initialising the VSM body for + // non-VSM messages won't be good for performance. #ifdef ZMQ_HAVE_OPENVMS #pragma message save diff --git a/src/zmq.cpp b/src/zmq.cpp index eec06492..864139da 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -55,6 +55,9 @@ #if !defined ZMQ_HAVE_WINDOWS #include +#ifdef ZMQ_HAVE_VXWORKS +#include +#endif #endif // XSI vector I/O @@ -682,7 +685,7 @@ const char *zmq_msg_gets (const zmq_msg_t *msg_, const char *property_) } } - // Polling. +// Polling. #if defined ZMQ_HAVE_POLLER inline int zmq_poller_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) @@ -967,6 +970,11 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) #if defined ZMQ_HAVE_WINDOWS Sleep (timeout_ > 0 ? timeout_ : INFINITE); return 0; +#elif defined ZMQ_HAVE_VXWORKS + struct timespec ns_; + ns_.tv_sec = timeout_ / 1000; + ns_.tv_nsec = timeout_ % 1000 * 1000000; + return nanosleep (&ns_, 0); #else return usleep (timeout_ * 1000); #endif