diff --git a/.gitignore b/.gitignore index 138475ac..053e644d 100644 --- a/.gitignore +++ b/.gitignore @@ -76,6 +76,7 @@ tests/test_reqrep_device_tipc tests/test_reqrep_tipc tests/test_router_handover tests/test_router_mandatory_tipc +tests/test_router_raw_empty tests/test_shutdown_stress_tipc tests/test_sub_forward_tipc tests/test_term_endpoint_tipc diff --git a/CMakeLists.txt b/CMakeLists.txt index 106d1123..afab40cf 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -269,7 +269,7 @@ if(MSVC) -DWIN32 -DDLL_EXPORT # NB: May require tweaking for highly connected applications. - -DFD_SETSIZE=1024 + -DFD_SETSIZE=1025 -D_CRT_SECURE_NO_WARNINGS) # Parallel make. @@ -611,7 +611,9 @@ set(tests test_term_endpoint test_timeo test_inproc_connect - test_issue_566) + test_issue_566 + test_many_sockets +) if(NOT WIN32) list(APPEND tests test_monitor diff --git a/NEWS b/NEWS index 6506e0b7..bb7f3ec5 100644 --- a/NEWS +++ b/NEWS @@ -1,17 +1,18 @@ -0MQ version 4.0.1 stable, released on 2013/xx/xx +0MQ version 4.0.2 stable, released on 2013/xx/xx ================================================ Bug Fixes --------- +* Fixed LIBZMQ-583 - improved low-res timer for Windows * Fixed LIBZMQ-578 - z85_decode was extremely slow - -* Fixed LIBZMQ-569 - Socket server crashes with random client data - +* Fixed LIBZMQ-577 - fault in man pages. +* Fixed LIBZMQ-574 - assertion failure when ran out of system file handles * Fixed LIBZMQ-571 - test_stream failing in some cases - +* Fixed LIBZMQ-569 - Socket server crashes with random client data +* Fixed LIBZMQ-39 - Bad file descriptor during shutdown * Pulled expected failing test_linger.cpp from release - +* Reduced pause time in tests to allow "make check" to run faster 0MQ version 4.0.1 stable, released on 2013/10/08 ================================================ diff --git a/builds/mingw32/Makefile.mingw32 b/builds/mingw32/Makefile.mingw32 index 447cf504..95cea7d3 100644 --- a/builds/mingw32/Makefile.mingw32 +++ b/builds/mingw32/Makefile.mingw32 @@ -1,5 +1,5 @@ CC=gcc -CFLAGS=-Wall -Os -g -DDLL_EXPORT -DFD_SETSIZE=1024 -I. +CFLAGS=-Wall -Os -g -DDLL_EXPORT -DFD_SETSIZE=1025 -I. LIBS=-lws2_32 OBJS = ctx.o reaper.o dist.o err.o \ diff --git a/builds/msvc/libzmq/libzmq.vcproj b/builds/msvc/libzmq/libzmq.vcproj index 9cdef064..01adfd1a 100644 --- a/builds/msvc/libzmq/libzmq.vcproj +++ b/builds/msvc/libzmq/libzmq.vcproj @@ -40,7 +40,7 @@ /> copy ..\platform.hpp ..\..\..\src - _CRT_SECURE_NO_WARNINGS;DLL_EXPORT;FD_SETSIZE=1024;%(PreprocessorDefinitions) + _CRT_SECURE_NO_WARNINGS;DLL_EXPORT;FD_SETSIZE=1025;%(PreprocessorDefinitions) Ws2_32.lib;Rpcrt4.lib;%(AdditionalDependencies) diff --git a/builds/msvc/properties/ZeroMQ_Static.props b/builds/msvc/properties/ZeroMQ_Static.props index dec17f85..9cdb107a 100644 --- a/builds/msvc/properties/ZeroMQ_Static.props +++ b/builds/msvc/properties/ZeroMQ_Static.props @@ -13,7 +13,7 @@ copy ..\platform.hpp ..\..\..\src - _CRT_SECURE_NO_WARNINGS;ZMQ_STATIC;FD_SETSIZE=1024;%(PreprocessorDefinitions) + _CRT_SECURE_NO_WARNINGS;ZMQ_STATIC;FD_SETSIZE=1025;%(PreprocessorDefinitions) Ws2_32.lib;Rpcrt4.lib;%(AdditionalDependencies) diff --git a/src/clock.cpp b/src/clock.cpp index a3fc0d16..de1ea867 100644 --- a/src/clock.cpp +++ b/src/clock.cpp @@ -22,6 +22,7 @@ #include "likely.hpp" #include "config.hpp" #include "err.hpp" +#include "mutex.hpp" #include @@ -41,9 +42,49 @@ #include #endif +#ifdef ZMQ_HAVE_WINDOWS +typedef ULONGLONG (*f_compatible_get_tick_count64)(); + +static zmq::mutex_t compatible_get_tick_count64_mutex; + +ULONGLONG compatible_get_tick_count64() +{ + compatible_get_tick_count64_mutex.lock(); + static DWORD s_wrap = 0; + static DWORD s_last_tick = 0; + const DWORD current_tick = ::GetTickCount(); + if (current_tick < s_last_tick) + ++s_wrap; + + s_last_tick = current_tick; + const ULONGLONG result = (static_cast(s_wrap) << 32) + static_cast(current_tick); + compatible_get_tick_count64_mutex.unlock(); + return result; +} + +f_compatible_get_tick_count64 init_compatible_get_tick_count64() +{ + f_compatible_get_tick_count64 func = NULL; + HMODULE module = ::LoadLibraryA("Kernel32.dll"); + if (module != NULL) + func = reinterpret_cast(::GetProcAddress(module, "GetTickCount64")); + + if (func == NULL) + func = compatible_get_tick_count64; + + return func; +} + +static f_compatible_get_tick_count64 my_get_tick_count64 = init_compatible_get_tick_count64(); +#endif + zmq::clock_t::clock_t () : last_tsc (rdtsc ()), +#ifdef ZMQ_HAVE_WINDOWS + last_time (static_cast((*my_get_tick_count64)())) +#else last_time (now_us () / 1000) +#endif { } @@ -65,7 +106,7 @@ uint64_t zmq::clock_t::now_us () // Convert the tick number into the number of seconds // since the system was started. - double ticks_div = ticksPerSecond.QuadPart / 1000000.0; + double ticks_div = ticksPerSecond.QuadPart / 1000000.0; return (uint64_t) (tick.QuadPart / ticks_div); #elif defined HAVE_CLOCK_GETTIME && defined CLOCK_MONOTONIC @@ -74,7 +115,7 @@ uint64_t zmq::clock_t::now_us () struct timespec tv; int rc = clock_gettime (CLOCK_MONOTONIC, &tv); // Fix case where system has clock_gettime but CLOCK_MONOTONIC is not supported. - // This should be a configuration check, but I looked into it and writing an + // This should be a configuration check, but I looked into it and writing an // AC_FUNC_CLOCK_MONOTONIC seems beyond my powers. if( rc != 0) { // Use POSIX gettimeofday function to get precise time. @@ -106,7 +147,18 @@ uint64_t zmq::clock_t::now_ms () // If TSC is not supported, get precise time and chop off the microseconds. if (!tsc) + { +#ifdef ZMQ_HAVE_WINDOWS + // Under Windows, now_us is not so reliable since QueryPerformanceCounter + // does not guarantee that it will use a hardware that offers a monotonic timer. + // So, lets use GetTickCount when GetTickCount64 is not available with an workaround + // to its 32 bit limitation. + static_assert(sizeof(uint64_t) >= sizeof(ULONGLONG), "Loosing timer information"); + return static_cast((*my_get_tick_count64)()); +#else return now_us () / 1000; +#endif + } // If TSC haven't jumped back (in case of migration to a different // CPU core) and if not too much time elapsed since last measurement, @@ -115,7 +167,11 @@ uint64_t zmq::clock_t::now_ms () return last_time; last_tsc = tsc; +#ifdef ZMQ_HAVE_WINDOWS + last_time = static_cast((*my_get_tick_count64)()); +#else last_time = now_us () / 1000; +#endif return last_time; } diff --git a/src/ctx.cpp b/src/ctx.cpp index b078c7c5..6d934cdc 100644 --- a/src/ctx.cpp +++ b/src/ctx.cpp @@ -38,6 +38,14 @@ #define ZMQ_CTX_TAG_VALUE_GOOD 0xabadcafe #define ZMQ_CTX_TAG_VALUE_BAD 0xdeadbeef +int clipped_maxsocket(int max_requested) +{ + if (max_requested >= zmq::poller_t::max_fds () && zmq::poller_t::max_fds () != -1) + max_requested = zmq::poller_t::max_fds () - 1; // -1 because we need room for the repear mailbox. + + return max_requested; +} + zmq::ctx_t::ctx_t () : tag (ZMQ_CTX_TAG_VALUE_GOOD), starting (true), @@ -45,7 +53,7 @@ zmq::ctx_t::ctx_t () : reaper (NULL), slot_count (0), slots (NULL), - max_sockets (ZMQ_MAX_SOCKETS_DFLT), + max_sockets (clipped_maxsocket (ZMQ_MAX_SOCKETS_DFLT)), io_thread_count (ZMQ_IO_THREADS_DFLT), ipv6 (false) { @@ -161,7 +169,7 @@ int zmq::ctx_t::shutdown () int zmq::ctx_t::set (int option_, int optval_) { int rc = 0; - if (option_ == ZMQ_MAX_SOCKETS && optval_ >= 1) { + if (option_ == ZMQ_MAX_SOCKETS && optval_ >= 1 && optval_ == clipped_maxsocket (optval_)) { opt_sync.lock (); max_sockets = optval_; opt_sync.unlock (); diff --git a/src/devpoll.cpp b/src/devpoll.cpp index dfa3cddf..02b4a9c6 100644 --- a/src/devpoll.cpp +++ b/src/devpoll.cpp @@ -133,6 +133,11 @@ void zmq::devpoll_t::stop () stopping = true; } +int zmq::devpoll_t::max_fds () +{ + return -1; +} + void zmq::devpoll_t::loop () { while (!stopping) { diff --git a/src/devpoll.hpp b/src/devpoll.hpp index f47691ce..fbf93287 100644 --- a/src/devpoll.hpp +++ b/src/devpoll.hpp @@ -56,6 +56,8 @@ namespace zmq void start (); void stop (); + static int max_fds (); + private: // Main worker thread routine. diff --git a/src/epoll.cpp b/src/epoll.cpp index bcedd758..492e06cd 100644 --- a/src/epoll.cpp +++ b/src/epoll.cpp @@ -126,6 +126,11 @@ void zmq::epoll_t::stop () stopping = true; } +int zmq::epoll_t::max_fds () +{ + return -1; +} + void zmq::epoll_t::loop () { epoll_event ev_buf [max_io_events]; diff --git a/src/epoll.hpp b/src/epoll.hpp index d8caf9fe..0c89cb5b 100644 --- a/src/epoll.hpp +++ b/src/epoll.hpp @@ -58,6 +58,8 @@ namespace zmq void start (); void stop (); + static int max_fds (); + private: // Main worker thread routine. diff --git a/src/kqueue.cpp b/src/kqueue.cpp index 266c418c..8ecf8137 100644 --- a/src/kqueue.cpp +++ b/src/kqueue.cpp @@ -152,6 +152,11 @@ void zmq::kqueue_t::stop () stopping = true; } +int zmq::kqueue_t::max_fds () +{ + return -1; +} + void zmq::kqueue_t::loop () { while (!stopping) { diff --git a/src/kqueue.hpp b/src/kqueue.hpp index 04e24776..20d256cc 100644 --- a/src/kqueue.hpp +++ b/src/kqueue.hpp @@ -58,6 +58,8 @@ namespace zmq void start (); void stop (); + static int max_fds (); + private: // Main worker thread routine. diff --git a/src/poll.cpp b/src/poll.cpp index f8fe198f..cf23afc0 100644 --- a/src/poll.cpp +++ b/src/poll.cpp @@ -114,6 +114,11 @@ void zmq::poll_t::stop () stopping = true; } +int zmq::poll_t::max_fds () +{ + return -1; +} + void zmq::poll_t::loop () { while (!stopping) { diff --git a/src/poll.hpp b/src/poll.hpp index 3bf23ba6..a3311688 100644 --- a/src/poll.hpp +++ b/src/poll.hpp @@ -59,6 +59,8 @@ namespace zmq void start (); void stop (); + static int max_fds (); + private: // Main worker thread routine. diff --git a/src/select.cpp b/src/select.cpp index ba60e8e8..b5cb972c 100644 --- a/src/select.cpp +++ b/src/select.cpp @@ -144,6 +144,11 @@ void zmq::select_t::stop () stopping = true; } +int zmq::select_t::max_fds () +{ + return FD_SETSIZE; +} + void zmq::select_t::loop () { while (!stopping) { diff --git a/src/select.hpp b/src/select.hpp index cb38489f..9c3ef472 100644 --- a/src/select.hpp +++ b/src/select.hpp @@ -69,6 +69,8 @@ namespace zmq void start (); void stop (); + static int max_fds (); + private: // Main worker thread routine. diff --git a/src/signaler.cpp b/src/signaler.cpp index 30deb1ec..4b13329b 100644 --- a/src/signaler.cpp +++ b/src/signaler.cpp @@ -80,13 +80,10 @@ zmq::signaler_t::signaler_t () { // Create the socketpair for signaling. - int rc = make_fdpair (&r, &w); - errno_assert (rc == 0); - - // Set both fds to non-blocking mode. - unblock_socket (w); - unblock_socket (r); - + if (make_fdpair (&r, &w) == 0) { + unblock_socket (w); + unblock_socket (r); + } #ifdef HAVE_FORK pid = getpid(); #endif @@ -184,8 +181,7 @@ int zmq::signaler_t::wait (int timeout_) return -1; } #ifdef HAVE_FORK - if (unlikely(pid != getpid())) - { + if (unlikely(pid != getpid())) { // we have forked and the file descriptor is closed. Emulate an interupt // response. //printf("Child process %d signaler_t::wait returning simulating interrupt #2\n", getpid()); @@ -266,42 +262,30 @@ void zmq::signaler_t::recv () #ifdef HAVE_FORK void zmq::signaler_t::forked() { - int oldr = r; -#if !defined ZMQ_HAVE_EVENTFD - int oldw = w; -#endif - - // replace the file descriptors created in the parent with new - // ones, and close the inherited ones - make_fdpair(&r, &w); -#if defined ZMQ_HAVE_EVENTFD - int rc = close (oldr); - errno_assert (rc == 0); -#else - int rc = close (oldw); - errno_assert (rc == 0); - rc = close (oldr); - errno_assert (rc == 0); -#endif + // Close file descriptors created in the parent and create new pair + close (r); + close (w); + make_fdpair (&r, &w); } #endif - - - +// Returns -1 if we could not make the socket pair successfully int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_) { #if defined ZMQ_HAVE_EVENTFD - - // Create eventfd object. fd_t fd = eventfd (0, 0); - errno_assert (fd != -1); - *w_ = fd; - *r_ = fd; - return 0; + if (fd == -1) { + errno_assert (errno == ENFILE || errno == EMFILE); + *w_ = *r_ = -1; + return -1; + } + else { + *w_ = *r_ = fd; + return 0; + } #elif defined ZMQ_HAVE_WINDOWS -#if !defined _WIN32_WCE +# if !defined _WIN32_WCE // Windows CE does not manage security attributes SECURITY_DESCRIPTOR sd; SECURITY_ATTRIBUTES sa; @@ -313,7 +297,7 @@ int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_) sa.nLength = sizeof(SECURITY_ATTRIBUTES); sa.lpSecurityDescriptor = &sd; -#endif +# endif // This function has to be in a system-wide critical section so that // two instances of the library don't accidentally create signaler @@ -322,13 +306,14 @@ int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_) // Note that if the event object already exists, the CreateEvent requests // EVENT_ALL_ACCESS access right. If this fails, we try to open // the event object asking for SYNCHRONIZE access only. -#if !defined _WIN32_WCE +# if !defined _WIN32_WCE HANDLE sync = CreateEvent (&sa, FALSE, TRUE, TEXT ("Global\\zmq-signaler-port-sync")); -#else +# else HANDLE sync = CreateEvent (NULL, FALSE, TRUE, TEXT ("Global\\zmq-signaler-port-sync")); -#endif +# endif if (sync == NULL && GetLastError () == ERROR_ACCESS_DENIED) - sync = OpenEvent (SYNCHRONIZE | EVENT_MODIFY_STATE, FALSE, TEXT ("Global\\zmq-signaler-port-sync")); + sync = OpenEvent (SYNCHRONIZE | EVENT_MODIFY_STATE, + FALSE, TEXT ("Global\\zmq-signaler-port-sync")); win_assert (sync != NULL); @@ -373,13 +358,13 @@ int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_) *w_ = WSASocket (AF_INET, SOCK_STREAM, 0, NULL, 0, 0); wsa_assert (*w_ != INVALID_SOCKET); -#if !defined _WIN32_WCE +# if !defined _WIN32_WCE // On Windows, preventing sockets to be inherited by child processes. BOOL brc = SetHandleInformation ((HANDLE) *w_, HANDLE_FLAG_INHERIT, 0); win_assert (brc); -#else +# else BOOL brc; -#endif +# endif // Set TCP_NODELAY on writer socket. rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELAY, @@ -391,17 +376,14 @@ int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_) // Save errno if connection fails int conn_errno = 0; - if (rc == SOCKET_ERROR) { + if (rc == SOCKET_ERROR) conn_errno = WSAGetLastError (); - } else { + else { // Accept connection from writer. *r_ = accept (listener, NULL, NULL); - - if (*r_ == INVALID_SOCKET) { + if (*r_ == INVALID_SOCKET) conn_errno = WSAGetLastError (); - } } - // We don't need the listening socket anymore. Close it. rc = closesocket (listener); wsa_assert (rc != SOCKET_ERROR); @@ -415,27 +397,20 @@ int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_) win_assert (brc != 0); if (*r_ != INVALID_SOCKET) { -#if !defined _WIN32_WCE +# if !defined _WIN32_WCE // On Windows, preventing sockets to be inherited by child processes. brc = SetHandleInformation ((HANDLE) *r_, HANDLE_FLAG_INHERIT, 0); win_assert (brc); -#endif +# endif return 0; - } else { + } + else { // Cleanup writer if connection failed rc = closesocket (*w_); wsa_assert (rc != SOCKET_ERROR); - *w_ = INVALID_SOCKET; - // Set errno from saved value errno = wsa_error_to_errno (conn_errno); - - // Ideally, we would return errno to the caller signaler_t() - // Unfortunately, it uses errno_assert() which gives "Unknown error" - // We might as well assert here and print the actual error message - wsa_assert_no (conn_errno); - return -1; } @@ -463,7 +438,7 @@ int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_) rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELACK, &on, sizeof (on)); errno_assert (rc != -1); - rc = bind(listener, (struct sockaddr*) &lcladdr, sizeof (lcladdr)); + rc = bind (listener, (struct sockaddr*) &lcladdr, sizeof (lcladdr)); errno_assert (rc != -1); socklen_t lcladdr_len = sizeof (lcladdr); @@ -493,15 +468,20 @@ int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_) return 0; -#else // All other implementations support socketpair() - +#else + // All other implementations support socketpair() int sv [2]; int rc = socketpair (AF_UNIX, SOCK_STREAM, 0, sv); - errno_assert (rc == 0); - *w_ = sv [0]; - *r_ = sv [1]; - return 0; - + if (rc == -1) { + errno_assert (errno == ENFILE || errno == EMFILE); + sv [0] = sv [1] = -1; + return -1; + } + else { + *w_ = sv [0]; + *r_ = sv [1]; + return 0; + } #endif } diff --git a/src/signaler.hpp b/src/signaler.hpp index b951011f..3e0d4f6d 100644 --- a/src/signaler.hpp +++ b/src/signaler.hpp @@ -58,7 +58,8 @@ namespace zmq // to pass the signals. static int make_fdpair (fd_t *r_, fd_t *w_); - // Underlying write & read file descriptor. + // Underlying write & read file descriptor + // Will be -1 if we exceeded number of available handles fd_t w; fd_t r; @@ -74,7 +75,6 @@ namespace zmq void close_internal(); #endif }; - } #endif diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 7b4d7cc4..935c0099 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -79,47 +79,49 @@ zmq::socket_base_t *zmq::socket_base_t::create (int type_, class ctx_t *parent_, { socket_base_t *s = NULL; switch (type_) { - - case ZMQ_PAIR: - s = new (std::nothrow) pair_t (parent_, tid_, sid_); - break; - case ZMQ_PUB: - s = new (std::nothrow) pub_t (parent_, tid_, sid_); - break; - case ZMQ_SUB: - s = new (std::nothrow) sub_t (parent_, tid_, sid_); - break; - case ZMQ_REQ: - s = new (std::nothrow) req_t (parent_, tid_, sid_); - break; - case ZMQ_REP: - s = new (std::nothrow) rep_t (parent_, tid_, sid_); - break; - case ZMQ_DEALER: - s = new (std::nothrow) dealer_t (parent_, tid_, sid_); - break; - case ZMQ_ROUTER: - s = new (std::nothrow) router_t (parent_, tid_, sid_); - break; - case ZMQ_PULL: - s = new (std::nothrow) pull_t (parent_, tid_, sid_); - break; - case ZMQ_PUSH: - s = new (std::nothrow) push_t (parent_, tid_, sid_); - break; - case ZMQ_XPUB: - s = new (std::nothrow) xpub_t (parent_, tid_, sid_); - break; - case ZMQ_XSUB: - s = new (std::nothrow) xsub_t (parent_, tid_, sid_); - break; - case ZMQ_STREAM: - s = new (std::nothrow) stream_t (parent_, tid_, sid_); - break; - default: - errno = EINVAL; - return NULL; + case ZMQ_PAIR: + s = new (std::nothrow) pair_t (parent_, tid_, sid_); + break; + case ZMQ_PUB: + s = new (std::nothrow) pub_t (parent_, tid_, sid_); + break; + case ZMQ_SUB: + s = new (std::nothrow) sub_t (parent_, tid_, sid_); + break; + case ZMQ_REQ: + s = new (std::nothrow) req_t (parent_, tid_, sid_); + break; + case ZMQ_REP: + s = new (std::nothrow) rep_t (parent_, tid_, sid_); + break; + case ZMQ_DEALER: + s = new (std::nothrow) dealer_t (parent_, tid_, sid_); + break; + case ZMQ_ROUTER: + s = new (std::nothrow) router_t (parent_, tid_, sid_); + break; + case ZMQ_PULL: + s = new (std::nothrow) pull_t (parent_, tid_, sid_); + break; + case ZMQ_PUSH: + s = new (std::nothrow) push_t (parent_, tid_, sid_); + break; + case ZMQ_XPUB: + s = new (std::nothrow) xpub_t (parent_, tid_, sid_); + break; + case ZMQ_XSUB: + s = new (std::nothrow) xsub_t (parent_, tid_, sid_); + break; + case ZMQ_STREAM: + s = new (std::nothrow) stream_t (parent_, tid_, sid_); + break; + default: + errno = EINVAL; + return NULL; } + if (s->mailbox.get_fd () == -1) + return NULL; + alloc_assert (s); return s; } diff --git a/src/stream_engine.cpp b/src/stream_engine.cpp index 2c078890..68780d28 100644 --- a/src/stream_engine.cpp +++ b/src/stream_engine.cpp @@ -217,17 +217,22 @@ void zmq::stream_engine_t::in_event () // Note that buffer can be arbitrarily large. However, we assume // the underlying TCP layer has fixed buffer size and thus the // number of bytes read will be always limited. - decoder->get_buffer (&inpos, &insize); - const int bytes_read = read (inpos, insize); + size_t bufsize = 0; + decoder->get_buffer (&inpos, &bufsize); - // Check whether the peer has closed the connection. - if (bytes_read == -1) { + int const rc = read (inpos, bufsize); + if (rc == 0) { error (); return; } + if (rc == -1) { + if (errno != EAGAIN) + error (); + return; + } // Adjust input size - insize = static_cast (bytes_read); + insize = static_cast (rc); } int rc = 0; @@ -396,12 +401,15 @@ bool zmq::stream_engine_t::handshake () while (greeting_bytes_read < greeting_size) { const int n = read (greeting_recv + greeting_bytes_read, greeting_size - greeting_bytes_read); - if (n == -1) { + if (n == 0) { error (); return false; } - if (n == 0) + if (n == -1) { + if (errno != EAGAIN) + error (); return false; + } greeting_bytes_read += n; @@ -792,58 +800,45 @@ int zmq::stream_engine_t::read (void *data_, size_t size_) { #ifdef ZMQ_HAVE_WINDOWS - int nbytes = recv (s, (char*) data_, (int) size_, 0); + const int rc = recv (s, (char*) data_, (int) size_, 0); // If not a single byte can be read from the socket in non-blocking mode // we'll get an error (this may happen during the speculative read). - if (nbytes == SOCKET_ERROR && WSAGetLastError () == WSAEWOULDBLOCK) - return 0; + if (rc == SOCKET_ERROR) { + if (WSAGetLastError () == WSAEWOULDBLOCK) + errno = EAGAIN; + else { + wsa_assert (WSAGetLastError () == WSAENETDOWN + || WSAGetLastError () == WSAENETRESET + || WSAGetLastError () == WSAECONNABORTED + || WSAGetLastError () == WSAETIMEDOUT + || WSAGetLastError () == WSAECONNRESET + || WSAGetLastError () == WSAECONNREFUSED + || WSAGetLastError () == WSAENOTCONN); + errno = wsa_error_to_errno (WSAGetLastError ()); + } + } - // Connection failure. - if (nbytes == SOCKET_ERROR && ( - WSAGetLastError () == WSAENETDOWN || - WSAGetLastError () == WSAENETRESET || - WSAGetLastError () == WSAECONNABORTED || - WSAGetLastError () == WSAETIMEDOUT || - WSAGetLastError () == WSAECONNRESET || - WSAGetLastError () == WSAECONNREFUSED || - WSAGetLastError () == WSAENOTCONN)) - return -1; - - wsa_assert (nbytes != SOCKET_ERROR); - - // Orderly shutdown by the other peer. - if (nbytes == 0) - return -1; - - return nbytes; + return rc == SOCKET_ERROR? -1: rc; #else - ssize_t nbytes = recv (s, data_, size_, 0); + const ssize_t rc = recv (s, data_, size_, 0); // Several errors are OK. When speculative read is being done we may not // be able to read a single byte from the socket. Also, SIGSTOP issued // by a debugging tool can result in EINTR error. - if (nbytes == -1 && (errno == EAGAIN || errno == EWOULDBLOCK || - errno == EINTR)) - return 0; - - // Signalise peer failure. - if (nbytes == -1) { + if (rc == -1) { errno_assert (errno != EBADF && errno != EFAULT && errno != EINVAL && errno != ENOMEM && errno != ENOTSOCK); - return -1; + if (errno == EWOULDBLOCK || errno == EINTR) + errno = EAGAIN; } - // Orderly shutdown by the peer. - if (nbytes == 0) - return -1; - - return static_cast (nbytes); + return static_cast (rc); #endif } diff --git a/src/stream_engine.hpp b/src/stream_engine.hpp index 583a8608..2b3857d5 100644 --- a/src/stream_engine.hpp +++ b/src/stream_engine.hpp @@ -87,10 +87,9 @@ namespace zmq // of error or orderly shutdown by the other peer -1 is returned. int write (const void *data_, size_t size_); - // Reads data from the socket (up to 'size' bytes). Returns the number - // of bytes actually read (even zero is to be considered to be - // a success). In case of error or orderly shutdown by the other - // peer -1 is returned. + // Reads data from the socket (up to 'size' bytes). + // Returns the number of bytes actually read or -1 on error. + // Zero indicates the peer has closed the connection. int read (void *data_, size_t size_); int read_identity (msg_t *msg_); diff --git a/tests/Makefile.am b/tests/Makefile.am index ccc4b64a..0cfe4e8c 100644 --- a/tests/Makefile.am +++ b/tests/Makefile.am @@ -39,7 +39,8 @@ noinst_PROGRAMS = test_system \ test_conflate \ test_inproc_connect \ test_issue_566 \ - test_abstract_ipc + test_abstract_ipc \ + test_many_sockets if !ON_MINGW noinst_PROGRAMS += test_shutdown_stress \ @@ -86,6 +87,7 @@ test_conflate_SOURCES = test_conflate.cpp test_inproc_connect_SOURCES = test_inproc_connect.cpp test_issue_566_SOURCES = test_issue_566.cpp test_abstract_ipc_SOURCES = test_abstract_ipc.cpp +test_many_sockets_SOURCES = test_many_sockets.cpp if !ON_MINGW test_shutdown_stress_SOURCES = test_shutdown_stress.cpp test_pair_ipc_SOURCES = test_pair_ipc.cpp testutil.hpp diff --git a/tests/test_conflate.cpp b/tests/test_conflate.cpp index b7e3331b..9c3ba5ae 100644 --- a/tests/test_conflate.cpp +++ b/tests/test_conflate.cpp @@ -45,7 +45,6 @@ int main (int argc, char *argv []) assert (rc == 0); int message_count = 20; - for (int j = 0; j < message_count; ++j) { rc = zmq_send(s_out, (void*)&j, sizeof(int), 0); if (rc < 0) { @@ -53,15 +52,13 @@ int main (int argc, char *argv []) return -1; } } - - zmq_sleep (1); + msleep (SETTLE_TIME); int payload_recved = 0; - rc = zmq_recv(s_in, (void*)&payload_recved, sizeof(int), 0); + rc = zmq_recv (s_in, (void*)&payload_recved, sizeof(int), 0); assert (rc > 0); assert (payload_recved == message_count - 1); - rc = zmq_close (s_in); assert (rc == 0); diff --git a/tests/test_ctx_destroy.cpp b/tests/test_ctx_destroy.cpp index 7b3c5691..a269b2bd 100644 --- a/tests/test_ctx_destroy.cpp +++ b/tests/test_ctx_destroy.cpp @@ -61,7 +61,7 @@ void test_ctx_shutdown() void *receiver_thread = zmq_threadstart (&receiver, socket); // Wait for thread to start up and block - zmq_sleep (1); + msleep (SETTLE_TIME); // Shutdown context, if we used destroy here we would deadlock. rc = zmq_ctx_shutdown (ctx); diff --git a/tests/test_immediate.cpp b/tests/test_immediate.cpp index f385774b..2ac4aa3e 100644 --- a/tests/test_immediate.cpp +++ b/tests/test_immediate.cpp @@ -193,8 +193,7 @@ int main (void) assert (rc == 0); // Give time to process disconnect - // There's no way to do this except with a sleep - zmq_sleep(1); + msleep (SETTLE_TIME); // Send a message, should fail rc = zmq_send (frontend, "Hello", 5, ZMQ_DONTWAIT); diff --git a/tests/test_inproc_connect.cpp b/tests/test_inproc_connect.cpp index e3090fd2..10d828f4 100644 --- a/tests/test_inproc_connect.cpp +++ b/tests/test_inproc_connect.cpp @@ -142,7 +142,7 @@ void test_connect_before_bind_pub_sub() assert (rc == 0); // Wait for pub-sub connection to happen - zmq_sleep (1); + msleep (SETTLE_TIME); // Queue up some data, this not will be dropped rc = zmq_send_const (connectSocket, "after", 6, 0); diff --git a/tests/test_iov.cpp b/tests/test_iov.cpp index 2eef04ac..718b318f 100644 --- a/tests/test_iov.cpp +++ b/tests/test_iov.cpp @@ -80,7 +80,7 @@ int main (void) rc = zmq_bind (sb, "inproc://a"); assert (rc == 0); - zmq_sleep(1); + msleep (SETTLE_TIME); void *sc = zmq_socket (ctx, ZMQ_PUSH); rc = zmq_connect (sc, "inproc://a"); diff --git a/tests/test_many_sockets.cpp b/tests/test_many_sockets.cpp new file mode 100644 index 00000000..d7d85d7e --- /dev/null +++ b/tests/test_many_sockets.cpp @@ -0,0 +1,51 @@ +/* + Copyright (c) 2007-2013 Contributors as noted in the AUTHORS file + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include "testutil.hpp" +#include +#include +#include + +const int no_of_sockets = 5000; + +int main(void) +{ + setup_test_environment(); + + void *ctx = zmq_ctx_new(); + void *sockets[no_of_sockets]; + + int sockets_created = 0; + + for ( int i = 0; i < no_of_sockets; ++i ) + { + sockets[i] = zmq_socket(ctx, ZMQ_PAIR); + if (sockets[i]) + ++sockets_created; + } + + assert(sockets_created < no_of_sockets); + + for ( int i = 0; i < no_of_sockets; ++i ) + if (sockets[i]) + zmq_close (sockets[i]); + + zmq_ctx_destroy (ctx); + return 0; +} diff --git a/tests/test_monitor.cpp b/tests/test_monitor.cpp index 36b835a0..84376193 100644 --- a/tests/test_monitor.cpp +++ b/tests/test_monitor.cpp @@ -211,7 +211,7 @@ int main (void) rc = zmq_socket_monitor (req, "inproc://monitor.req", ZMQ_EVENT_ALL); assert (rc == 0); threads [1] = zmq_threadstart(&req_socket_monitor, ctx); - zmq_sleep(1); + msleep (SETTLE_TIME); // Bind REQ and REP rc = zmq_bind (rep, addr.c_str()); @@ -238,8 +238,8 @@ int main (void) rc = zmq_close (rep); assert (rc == 0); - // Allow some time for detecting error states - zmq_sleep(1); + // Allow enough time for detecting error states + msleep (250); // Close the REQ socket rc = zmq_close (req); diff --git a/tests/test_req_relaxed.cpp b/tests/test_req_relaxed.cpp index d4ce0468..7970ed41 100644 --- a/tests/test_req_relaxed.cpp +++ b/tests/test_req_relaxed.cpp @@ -54,7 +54,7 @@ int main (void) // We have to give the connects time to finish otherwise the requests // will not properly round-robin. We could alternatively connect the // REQ sockets to the REP sockets. - zmq_sleep(1); + msleep (SETTLE_TIME); // Case 1: Second send() before a reply arrives in a pipe. diff --git a/tests/test_spec_req.cpp b/tests/test_spec_req.cpp index 83619acd..8e3e7218 100644 --- a/tests/test_spec_req.cpp +++ b/tests/test_spec_req.cpp @@ -46,7 +46,7 @@ void test_round_robin_out (void *ctx) // We have to give the connects time to finish otherwise the requests // will not properly round-robin. We could alternatively connect the // REQ sockets to the REP sockets. - zmq_sleep(1); + msleep (SETTLE_TIME); // Send our peer-replies, and expect every REP it used once in order for (size_t peer = 0; peer < services; peer++) { diff --git a/tests/test_sub_forward.cpp b/tests/test_sub_forward.cpp index 53533086..e77ed3b8 100644 --- a/tests/test_sub_forward.cpp +++ b/tests/test_sub_forward.cpp @@ -59,7 +59,7 @@ int main (void) assert (rc >= 0); // Wait a bit till the subscription gets to the publisher - zmq_sleep(1); + msleep (SETTLE_TIME); // Send an empty message rc = zmq_send (pub, NULL, 0, 0); diff --git a/tests/test_term_endpoint.cpp b/tests/test_term_endpoint.cpp index 6d9a5df0..186d3773 100644 --- a/tests/test_term_endpoint.cpp +++ b/tests/test_term_endpoint.cpp @@ -49,7 +49,7 @@ int main (void) assert (rc == 0); // Allow unbind to settle - zmq_sleep(1); + msleep (SETTLE_TIME); // Check that sending would block (there's no outbound connection) rc = zmq_send (push, "ABC", 3, ZMQ_DONTWAIT); @@ -86,7 +86,7 @@ int main (void) assert (rc == 0); // Allow disconnect to settle - zmq_sleep(1); + msleep (SETTLE_TIME); // Check that sending would block (there's no inbound connections). rc = zmq_send (push, "ABC", 3, ZMQ_DONTWAIT); diff --git a/tests/testutil.hpp b/tests/testutil.hpp index 6a9fb1a4..31db1e40 100644 --- a/tests/testutil.hpp +++ b/tests/testutil.hpp @@ -24,6 +24,11 @@ #include "../include/zmq_utils.h" #include "platform.hpp" +// This defines the settle time used in tests; raise this if we +// get test failures on slower systems due to binds/connects not +// settled. Tested to work reliably at 1 msec on a fast PC. +#define SETTLE_TIME 10 // In msec + #undef NDEBUG #include #include @@ -259,4 +264,16 @@ void setup_test_environment() #endif } +// Provide portable millisecond sleep +// http://www.cplusplus.com/forum/unices/60161/ http://en.cppreference.com/w/cpp/thread/sleep_for +void msleep (int milliseconds) +{ +#ifdef ZMQ_HAVE_WINDOWS + Sleep (milliseconds); +#else + usleep (static_cast (milliseconds) * 1000); +#endif +} + + #endif