Merge pull request #32 from hintjens/master

Cherry picking changes from libzmq master
This commit is contained in:
Pieter Hintjens 2013-11-09 00:56:52 -08:00
commit 30309d660e
37 changed files with 340 additions and 195 deletions

1
.gitignore vendored
View File

@ -76,6 +76,7 @@ tests/test_reqrep_device_tipc
tests/test_reqrep_tipc tests/test_reqrep_tipc
tests/test_router_handover tests/test_router_handover
tests/test_router_mandatory_tipc tests/test_router_mandatory_tipc
tests/test_router_raw_empty
tests/test_shutdown_stress_tipc tests/test_shutdown_stress_tipc
tests/test_sub_forward_tipc tests/test_sub_forward_tipc
tests/test_term_endpoint_tipc tests/test_term_endpoint_tipc

View File

@ -269,7 +269,7 @@ if(MSVC)
-DWIN32 -DWIN32
-DDLL_EXPORT -DDLL_EXPORT
# NB: May require tweaking for highly connected applications. # NB: May require tweaking for highly connected applications.
-DFD_SETSIZE=1024 -DFD_SETSIZE=1025
-D_CRT_SECURE_NO_WARNINGS) -D_CRT_SECURE_NO_WARNINGS)
# Parallel make. # Parallel make.
@ -611,7 +611,9 @@ set(tests
test_term_endpoint test_term_endpoint
test_timeo test_timeo
test_inproc_connect test_inproc_connect
test_issue_566) test_issue_566
test_many_sockets
)
if(NOT WIN32) if(NOT WIN32)
list(APPEND tests list(APPEND tests
test_monitor test_monitor

13
NEWS
View File

@ -1,17 +1,18 @@
0MQ version 4.0.1 stable, released on 2013/xx/xx 0MQ version 4.0.2 stable, released on 2013/xx/xx
================================================ ================================================
Bug Fixes Bug Fixes
--------- ---------
* Fixed LIBZMQ-583 - improved low-res timer for Windows
* Fixed LIBZMQ-578 - z85_decode was extremely slow * Fixed LIBZMQ-578 - z85_decode was extremely slow
* Fixed LIBZMQ-577 - fault in man pages.
* Fixed LIBZMQ-569 - Socket server crashes with random client data * Fixed LIBZMQ-574 - assertion failure when ran out of system file handles
* Fixed LIBZMQ-571 - test_stream failing in some cases * Fixed LIBZMQ-571 - test_stream failing in some cases
* Fixed LIBZMQ-569 - Socket server crashes with random client data
* Fixed LIBZMQ-39 - Bad file descriptor during shutdown
* Pulled expected failing test_linger.cpp from release * Pulled expected failing test_linger.cpp from release
* Reduced pause time in tests to allow "make check" to run faster
0MQ version 4.0.1 stable, released on 2013/10/08 0MQ version 4.0.1 stable, released on 2013/10/08
================================================ ================================================

View File

@ -1,5 +1,5 @@
CC=gcc CC=gcc
CFLAGS=-Wall -Os -g -DDLL_EXPORT -DFD_SETSIZE=1024 -I. CFLAGS=-Wall -Os -g -DDLL_EXPORT -DFD_SETSIZE=1025 -I.
LIBS=-lws2_32 LIBS=-lws2_32
OBJS = ctx.o reaper.o dist.o err.o \ OBJS = ctx.o reaper.o dist.o err.o \

View File

@ -40,7 +40,7 @@
/> />
<Tool <Tool
Name="VCCLCompilerTool" Name="VCCLCompilerTool"
AdditionalOptions="-DDLL_EXPORT -DFD_SETSIZE=1024 -D_CRT_SECURE_NO_WARNINGS" AdditionalOptions="-DDLL_EXPORT -DFD_SETSIZE=1025 -D_CRT_SECURE_NO_WARNINGS"
Optimization="0" Optimization="0"
PreprocessorDefinitions="NOMINMAX" PreprocessorDefinitions="NOMINMAX"
MinimalRebuild="true" MinimalRebuild="true"
@ -114,7 +114,7 @@
/> />
<Tool <Tool
Name="VCCLCompilerTool" Name="VCCLCompilerTool"
AdditionalOptions="-DDLL_EXPORT -DFD_SETSIZE=1024 -D_CRT_SECURE_NO_WARNINGS" AdditionalOptions="-DDLL_EXPORT -DFD_SETSIZE=1025 -D_CRT_SECURE_NO_WARNINGS"
Optimization="2" Optimization="2"
EnableIntrinsicFunctions="true" EnableIntrinsicFunctions="true"
RuntimeLibrary="2" RuntimeLibrary="2"
@ -188,7 +188,7 @@
/> />
<Tool <Tool
Name="VCCLCompilerTool" Name="VCCLCompilerTool"
AdditionalOptions="-DZMQ_STATIC -DFD_SETSIZE=1024 -D_CRT_SECURE_NO_WARNINGS" AdditionalOptions="-DZMQ_STATIC -DFD_SETSIZE=1025 -D_CRT_SECURE_NO_WARNINGS"
Optimization="0" Optimization="0"
PreprocessorDefinitions="NOMINMAX" PreprocessorDefinitions="NOMINMAX"
MinimalRebuild="true" MinimalRebuild="true"
@ -254,7 +254,7 @@
/> />
<Tool <Tool
Name="VCCLCompilerTool" Name="VCCLCompilerTool"
AdditionalOptions="-DZMQ_STATIC -DFD_SETSIZE=1024 -D_CRT_SECURE_NO_WARNINGS" AdditionalOptions="-DZMQ_STATIC -DFD_SETSIZE=1025 -D_CRT_SECURE_NO_WARNINGS"
Optimization="2" Optimization="2"
EnableIntrinsicFunctions="true" EnableIntrinsicFunctions="true"
RuntimeLibrary="2" RuntimeLibrary="2"
@ -319,7 +319,7 @@
/> />
<Tool <Tool
Name="VCCLCompilerTool" Name="VCCLCompilerTool"
AdditionalOptions="-DDLL_EXPORT -DFD_SETSIZE=1024 -D_CRT_SECURE_NO_WARNINGS" AdditionalOptions="-DDLL_EXPORT -DFD_SETSIZE=1025 -D_CRT_SECURE_NO_WARNINGS"
Optimization="2" Optimization="2"
EnableIntrinsicFunctions="true" EnableIntrinsicFunctions="true"
AdditionalIncludeDirectories="../../../../OpenPGM/include" AdditionalIncludeDirectories="../../../../OpenPGM/include"

View File

@ -13,7 +13,7 @@
<Command>copy ..\platform.hpp ..\..\..\src</Command> <Command>copy ..\platform.hpp ..\..\..\src</Command>
</PreBuildEvent> </PreBuildEvent>
<ClCompile> <ClCompile>
<PreprocessorDefinitions>_CRT_SECURE_NO_WARNINGS;DLL_EXPORT;FD_SETSIZE=1024;%(PreprocessorDefinitions)</PreprocessorDefinitions> <PreprocessorDefinitions>_CRT_SECURE_NO_WARNINGS;DLL_EXPORT;FD_SETSIZE=1025;%(PreprocessorDefinitions)</PreprocessorDefinitions>
</ClCompile> </ClCompile>
<Link> <Link>
<AdditionalDependencies>Ws2_32.lib;Rpcrt4.lib;%(AdditionalDependencies)</AdditionalDependencies> <AdditionalDependencies>Ws2_32.lib;Rpcrt4.lib;%(AdditionalDependencies)</AdditionalDependencies>

View File

@ -13,7 +13,7 @@
<Command>copy ..\platform.hpp ..\..\..\src</Command> <Command>copy ..\platform.hpp ..\..\..\src</Command>
</PreBuildEvent> </PreBuildEvent>
<ClCompile> <ClCompile>
<PreprocessorDefinitions>_CRT_SECURE_NO_WARNINGS;ZMQ_STATIC;FD_SETSIZE=1024;%(PreprocessorDefinitions)</PreprocessorDefinitions> <PreprocessorDefinitions>_CRT_SECURE_NO_WARNINGS;ZMQ_STATIC;FD_SETSIZE=1025;%(PreprocessorDefinitions)</PreprocessorDefinitions>
</ClCompile> </ClCompile>
<Lib> <Lib>
<AdditionalDependencies>Ws2_32.lib;Rpcrt4.lib;%(AdditionalDependencies)</AdditionalDependencies> <AdditionalDependencies>Ws2_32.lib;Rpcrt4.lib;%(AdditionalDependencies)</AdditionalDependencies>

View File

@ -22,6 +22,7 @@
#include "likely.hpp" #include "likely.hpp"
#include "config.hpp" #include "config.hpp"
#include "err.hpp" #include "err.hpp"
#include "mutex.hpp"
#include <stddef.h> #include <stddef.h>
@ -41,9 +42,49 @@
#include <time.h> #include <time.h>
#endif #endif
#ifdef ZMQ_HAVE_WINDOWS
typedef ULONGLONG (*f_compatible_get_tick_count64)();
static zmq::mutex_t compatible_get_tick_count64_mutex;
ULONGLONG compatible_get_tick_count64()
{
compatible_get_tick_count64_mutex.lock();
static DWORD s_wrap = 0;
static DWORD s_last_tick = 0;
const DWORD current_tick = ::GetTickCount();
if (current_tick < s_last_tick)
++s_wrap;
s_last_tick = current_tick;
const ULONGLONG result = (static_cast<ULONGLONG>(s_wrap) << 32) + static_cast<ULONGLONG>(current_tick);
compatible_get_tick_count64_mutex.unlock();
return result;
}
f_compatible_get_tick_count64 init_compatible_get_tick_count64()
{
f_compatible_get_tick_count64 func = NULL;
HMODULE module = ::LoadLibraryA("Kernel32.dll");
if (module != NULL)
func = reinterpret_cast<f_compatible_get_tick_count64>(::GetProcAddress(module, "GetTickCount64"));
if (func == NULL)
func = compatible_get_tick_count64;
return func;
}
static f_compatible_get_tick_count64 my_get_tick_count64 = init_compatible_get_tick_count64();
#endif
zmq::clock_t::clock_t () : zmq::clock_t::clock_t () :
last_tsc (rdtsc ()), last_tsc (rdtsc ()),
#ifdef ZMQ_HAVE_WINDOWS
last_time (static_cast<uint64_t>((*my_get_tick_count64)()))
#else
last_time (now_us () / 1000) last_time (now_us () / 1000)
#endif
{ {
} }
@ -106,7 +147,18 @@ uint64_t zmq::clock_t::now_ms ()
// If TSC is not supported, get precise time and chop off the microseconds. // If TSC is not supported, get precise time and chop off the microseconds.
if (!tsc) if (!tsc)
{
#ifdef ZMQ_HAVE_WINDOWS
// Under Windows, now_us is not so reliable since QueryPerformanceCounter
// does not guarantee that it will use a hardware that offers a monotonic timer.
// So, lets use GetTickCount when GetTickCount64 is not available with an workaround
// to its 32 bit limitation.
static_assert(sizeof(uint64_t) >= sizeof(ULONGLONG), "Loosing timer information");
return static_cast<uint64_t>((*my_get_tick_count64)());
#else
return now_us () / 1000; return now_us () / 1000;
#endif
}
// If TSC haven't jumped back (in case of migration to a different // If TSC haven't jumped back (in case of migration to a different
// CPU core) and if not too much time elapsed since last measurement, // CPU core) and if not too much time elapsed since last measurement,
@ -115,7 +167,11 @@ uint64_t zmq::clock_t::now_ms ()
return last_time; return last_time;
last_tsc = tsc; last_tsc = tsc;
#ifdef ZMQ_HAVE_WINDOWS
last_time = static_cast<uint64_t>((*my_get_tick_count64)());
#else
last_time = now_us () / 1000; last_time = now_us () / 1000;
#endif
return last_time; return last_time;
} }

View File

@ -38,6 +38,14 @@
#define ZMQ_CTX_TAG_VALUE_GOOD 0xabadcafe #define ZMQ_CTX_TAG_VALUE_GOOD 0xabadcafe
#define ZMQ_CTX_TAG_VALUE_BAD 0xdeadbeef #define ZMQ_CTX_TAG_VALUE_BAD 0xdeadbeef
int clipped_maxsocket(int max_requested)
{
if (max_requested >= zmq::poller_t::max_fds () && zmq::poller_t::max_fds () != -1)
max_requested = zmq::poller_t::max_fds () - 1; // -1 because we need room for the repear mailbox.
return max_requested;
}
zmq::ctx_t::ctx_t () : zmq::ctx_t::ctx_t () :
tag (ZMQ_CTX_TAG_VALUE_GOOD), tag (ZMQ_CTX_TAG_VALUE_GOOD),
starting (true), starting (true),
@ -45,7 +53,7 @@ zmq::ctx_t::ctx_t () :
reaper (NULL), reaper (NULL),
slot_count (0), slot_count (0),
slots (NULL), slots (NULL),
max_sockets (ZMQ_MAX_SOCKETS_DFLT), max_sockets (clipped_maxsocket (ZMQ_MAX_SOCKETS_DFLT)),
io_thread_count (ZMQ_IO_THREADS_DFLT), io_thread_count (ZMQ_IO_THREADS_DFLT),
ipv6 (false) ipv6 (false)
{ {
@ -161,7 +169,7 @@ int zmq::ctx_t::shutdown ()
int zmq::ctx_t::set (int option_, int optval_) int zmq::ctx_t::set (int option_, int optval_)
{ {
int rc = 0; int rc = 0;
if (option_ == ZMQ_MAX_SOCKETS && optval_ >= 1) { if (option_ == ZMQ_MAX_SOCKETS && optval_ >= 1 && optval_ == clipped_maxsocket (optval_)) {
opt_sync.lock (); opt_sync.lock ();
max_sockets = optval_; max_sockets = optval_;
opt_sync.unlock (); opt_sync.unlock ();

View File

@ -133,6 +133,11 @@ void zmq::devpoll_t::stop ()
stopping = true; stopping = true;
} }
int zmq::devpoll_t::max_fds ()
{
return -1;
}
void zmq::devpoll_t::loop () void zmq::devpoll_t::loop ()
{ {
while (!stopping) { while (!stopping) {

View File

@ -56,6 +56,8 @@ namespace zmq
void start (); void start ();
void stop (); void stop ();
static int max_fds ();
private: private:
// Main worker thread routine. // Main worker thread routine.

View File

@ -126,6 +126,11 @@ void zmq::epoll_t::stop ()
stopping = true; stopping = true;
} }
int zmq::epoll_t::max_fds ()
{
return -1;
}
void zmq::epoll_t::loop () void zmq::epoll_t::loop ()
{ {
epoll_event ev_buf [max_io_events]; epoll_event ev_buf [max_io_events];

View File

@ -58,6 +58,8 @@ namespace zmq
void start (); void start ();
void stop (); void stop ();
static int max_fds ();
private: private:
// Main worker thread routine. // Main worker thread routine.

View File

@ -152,6 +152,11 @@ void zmq::kqueue_t::stop ()
stopping = true; stopping = true;
} }
int zmq::kqueue_t::max_fds ()
{
return -1;
}
void zmq::kqueue_t::loop () void zmq::kqueue_t::loop ()
{ {
while (!stopping) { while (!stopping) {

View File

@ -58,6 +58,8 @@ namespace zmq
void start (); void start ();
void stop (); void stop ();
static int max_fds ();
private: private:
// Main worker thread routine. // Main worker thread routine.

View File

@ -114,6 +114,11 @@ void zmq::poll_t::stop ()
stopping = true; stopping = true;
} }
int zmq::poll_t::max_fds ()
{
return -1;
}
void zmq::poll_t::loop () void zmq::poll_t::loop ()
{ {
while (!stopping) { while (!stopping) {

View File

@ -59,6 +59,8 @@ namespace zmq
void start (); void start ();
void stop (); void stop ();
static int max_fds ();
private: private:
// Main worker thread routine. // Main worker thread routine.

View File

@ -144,6 +144,11 @@ void zmq::select_t::stop ()
stopping = true; stopping = true;
} }
int zmq::select_t::max_fds ()
{
return FD_SETSIZE;
}
void zmq::select_t::loop () void zmq::select_t::loop ()
{ {
while (!stopping) { while (!stopping) {

View File

@ -69,6 +69,8 @@ namespace zmq
void start (); void start ();
void stop (); void stop ();
static int max_fds ();
private: private:
// Main worker thread routine. // Main worker thread routine.

View File

@ -80,13 +80,10 @@
zmq::signaler_t::signaler_t () zmq::signaler_t::signaler_t ()
{ {
// Create the socketpair for signaling. // Create the socketpair for signaling.
int rc = make_fdpair (&r, &w); if (make_fdpair (&r, &w) == 0) {
errno_assert (rc == 0);
// Set both fds to non-blocking mode.
unblock_socket (w); unblock_socket (w);
unblock_socket (r); unblock_socket (r);
}
#ifdef HAVE_FORK #ifdef HAVE_FORK
pid = getpid(); pid = getpid();
#endif #endif
@ -184,8 +181,7 @@ int zmq::signaler_t::wait (int timeout_)
return -1; return -1;
} }
#ifdef HAVE_FORK #ifdef HAVE_FORK
if (unlikely(pid != getpid())) if (unlikely(pid != getpid())) {
{
// we have forked and the file descriptor is closed. Emulate an interupt // we have forked and the file descriptor is closed. Emulate an interupt
// response. // response.
//printf("Child process %d signaler_t::wait returning simulating interrupt #2\n", getpid()); //printf("Child process %d signaler_t::wait returning simulating interrupt #2\n", getpid());
@ -266,39 +262,27 @@ void zmq::signaler_t::recv ()
#ifdef HAVE_FORK #ifdef HAVE_FORK
void zmq::signaler_t::forked() void zmq::signaler_t::forked()
{ {
int oldr = r; // Close file descriptors created in the parent and create new pair
#if !defined ZMQ_HAVE_EVENTFD close (r);
int oldw = w; close (w);
#endif
// replace the file descriptors created in the parent with new
// ones, and close the inherited ones
make_fdpair (&r, &w); make_fdpair (&r, &w);
#if defined ZMQ_HAVE_EVENTFD
int rc = close (oldr);
errno_assert (rc == 0);
#else
int rc = close (oldw);
errno_assert (rc == 0);
rc = close (oldr);
errno_assert (rc == 0);
#endif
} }
#endif #endif
// Returns -1 if we could not make the socket pair successfully
int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_) int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_)
{ {
#if defined ZMQ_HAVE_EVENTFD #if defined ZMQ_HAVE_EVENTFD
// Create eventfd object.
fd_t fd = eventfd (0, 0); fd_t fd = eventfd (0, 0);
errno_assert (fd != -1); if (fd == -1) {
*w_ = fd; errno_assert (errno == ENFILE || errno == EMFILE);
*r_ = fd; *w_ = *r_ = -1;
return -1;
}
else {
*w_ = *r_ = fd;
return 0; return 0;
}
#elif defined ZMQ_HAVE_WINDOWS #elif defined ZMQ_HAVE_WINDOWS
# if !defined _WIN32_WCE # if !defined _WIN32_WCE
@ -328,7 +312,8 @@ int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_)
HANDLE sync = CreateEvent (NULL, FALSE, TRUE, TEXT ("Global\\zmq-signaler-port-sync")); HANDLE sync = CreateEvent (NULL, FALSE, TRUE, TEXT ("Global\\zmq-signaler-port-sync"));
# endif # endif
if (sync == NULL && GetLastError () == ERROR_ACCESS_DENIED) if (sync == NULL && GetLastError () == ERROR_ACCESS_DENIED)
sync = OpenEvent (SYNCHRONIZE | EVENT_MODIFY_STATE, FALSE, TEXT ("Global\\zmq-signaler-port-sync")); sync = OpenEvent (SYNCHRONIZE | EVENT_MODIFY_STATE,
FALSE, TEXT ("Global\\zmq-signaler-port-sync"));
win_assert (sync != NULL); win_assert (sync != NULL);
@ -391,17 +376,14 @@ int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_)
// Save errno if connection fails // Save errno if connection fails
int conn_errno = 0; int conn_errno = 0;
if (rc == SOCKET_ERROR) { if (rc == SOCKET_ERROR)
conn_errno = WSAGetLastError (); conn_errno = WSAGetLastError ();
} else { else {
// Accept connection from writer. // Accept connection from writer.
*r_ = accept (listener, NULL, NULL); *r_ = accept (listener, NULL, NULL);
if (*r_ == INVALID_SOCKET)
if (*r_ == INVALID_SOCKET) {
conn_errno = WSAGetLastError (); conn_errno = WSAGetLastError ();
} }
}
// We don't need the listening socket anymore. Close it. // We don't need the listening socket anymore. Close it.
rc = closesocket (listener); rc = closesocket (listener);
wsa_assert (rc != SOCKET_ERROR); wsa_assert (rc != SOCKET_ERROR);
@ -421,21 +403,14 @@ int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_)
win_assert (brc); win_assert (brc);
# endif # endif
return 0; return 0;
} else { }
else {
// Cleanup writer if connection failed // Cleanup writer if connection failed
rc = closesocket (*w_); rc = closesocket (*w_);
wsa_assert (rc != SOCKET_ERROR); wsa_assert (rc != SOCKET_ERROR);
*w_ = INVALID_SOCKET; *w_ = INVALID_SOCKET;
// Set errno from saved value // Set errno from saved value
errno = wsa_error_to_errno (conn_errno); errno = wsa_error_to_errno (conn_errno);
// Ideally, we would return errno to the caller signaler_t()
// Unfortunately, it uses errno_assert() which gives "Unknown error"
// We might as well assert here and print the actual error message
wsa_assert_no (conn_errno);
return -1; return -1;
} }
@ -493,15 +468,20 @@ int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_)
return 0; return 0;
#else // All other implementations support socketpair() #else
// All other implementations support socketpair()
int sv [2]; int sv [2];
int rc = socketpair (AF_UNIX, SOCK_STREAM, 0, sv); int rc = socketpair (AF_UNIX, SOCK_STREAM, 0, sv);
errno_assert (rc == 0); if (rc == -1) {
errno_assert (errno == ENFILE || errno == EMFILE);
sv [0] = sv [1] = -1;
return -1;
}
else {
*w_ = sv [0]; *w_ = sv [0];
*r_ = sv [1]; *r_ = sv [1];
return 0; return 0;
}
#endif #endif
} }

View File

@ -58,7 +58,8 @@ namespace zmq
// to pass the signals. // to pass the signals.
static int make_fdpair (fd_t *r_, fd_t *w_); static int make_fdpair (fd_t *r_, fd_t *w_);
// Underlying write & read file descriptor. // Underlying write & read file descriptor
// Will be -1 if we exceeded number of available handles
fd_t w; fd_t w;
fd_t r; fd_t r;
@ -74,7 +75,6 @@ namespace zmq
void close_internal(); void close_internal();
#endif #endif
}; };
} }
#endif #endif

View File

@ -79,7 +79,6 @@ zmq::socket_base_t *zmq::socket_base_t::create (int type_, class ctx_t *parent_,
{ {
socket_base_t *s = NULL; socket_base_t *s = NULL;
switch (type_) { switch (type_) {
case ZMQ_PAIR: case ZMQ_PAIR:
s = new (std::nothrow) pair_t (parent_, tid_, sid_); s = new (std::nothrow) pair_t (parent_, tid_, sid_);
break; break;
@ -120,6 +119,9 @@ zmq::socket_base_t *zmq::socket_base_t::create (int type_, class ctx_t *parent_,
errno = EINVAL; errno = EINVAL;
return NULL; return NULL;
} }
if (s->mailbox.get_fd () == -1)
return NULL;
alloc_assert (s); alloc_assert (s);
return s; return s;
} }

View File

@ -217,17 +217,22 @@ void zmq::stream_engine_t::in_event ()
// Note that buffer can be arbitrarily large. However, we assume // Note that buffer can be arbitrarily large. However, we assume
// the underlying TCP layer has fixed buffer size and thus the // the underlying TCP layer has fixed buffer size and thus the
// number of bytes read will be always limited. // number of bytes read will be always limited.
decoder->get_buffer (&inpos, &insize); size_t bufsize = 0;
const int bytes_read = read (inpos, insize); decoder->get_buffer (&inpos, &bufsize);
// Check whether the peer has closed the connection. int const rc = read (inpos, bufsize);
if (bytes_read == -1) { if (rc == 0) {
error ();
return;
}
if (rc == -1) {
if (errno != EAGAIN)
error (); error ();
return; return;
} }
// Adjust input size // Adjust input size
insize = static_cast <size_t> (bytes_read); insize = static_cast <size_t> (rc);
} }
int rc = 0; int rc = 0;
@ -396,12 +401,15 @@ bool zmq::stream_engine_t::handshake ()
while (greeting_bytes_read < greeting_size) { while (greeting_bytes_read < greeting_size) {
const int n = read (greeting_recv + greeting_bytes_read, const int n = read (greeting_recv + greeting_bytes_read,
greeting_size - greeting_bytes_read); greeting_size - greeting_bytes_read);
if (n == -1) { if (n == 0) {
error (); error ();
return false; return false;
} }
if (n == 0) if (n == -1) {
if (errno != EAGAIN)
error ();
return false; return false;
}
greeting_bytes_read += n; greeting_bytes_read += n;
@ -792,58 +800,45 @@ int zmq::stream_engine_t::read (void *data_, size_t size_)
{ {
#ifdef ZMQ_HAVE_WINDOWS #ifdef ZMQ_HAVE_WINDOWS
int nbytes = recv (s, (char*) data_, (int) size_, 0); const int rc = recv (s, (char*) data_, (int) size_, 0);
// If not a single byte can be read from the socket in non-blocking mode // If not a single byte can be read from the socket in non-blocking mode
// we'll get an error (this may happen during the speculative read). // we'll get an error (this may happen during the speculative read).
if (nbytes == SOCKET_ERROR && WSAGetLastError () == WSAEWOULDBLOCK) if (rc == SOCKET_ERROR) {
return 0; if (WSAGetLastError () == WSAEWOULDBLOCK)
errno = EAGAIN;
else {
wsa_assert (WSAGetLastError () == WSAENETDOWN
|| WSAGetLastError () == WSAENETRESET
|| WSAGetLastError () == WSAECONNABORTED
|| WSAGetLastError () == WSAETIMEDOUT
|| WSAGetLastError () == WSAECONNRESET
|| WSAGetLastError () == WSAECONNREFUSED
|| WSAGetLastError () == WSAENOTCONN);
errno = wsa_error_to_errno (WSAGetLastError ());
}
}
// Connection failure. return rc == SOCKET_ERROR? -1: rc;
if (nbytes == SOCKET_ERROR && (
WSAGetLastError () == WSAENETDOWN ||
WSAGetLastError () == WSAENETRESET ||
WSAGetLastError () == WSAECONNABORTED ||
WSAGetLastError () == WSAETIMEDOUT ||
WSAGetLastError () == WSAECONNRESET ||
WSAGetLastError () == WSAECONNREFUSED ||
WSAGetLastError () == WSAENOTCONN))
return -1;
wsa_assert (nbytes != SOCKET_ERROR);
// Orderly shutdown by the other peer.
if (nbytes == 0)
return -1;
return nbytes;
#else #else
ssize_t nbytes = recv (s, data_, size_, 0); const ssize_t rc = recv (s, data_, size_, 0);
// Several errors are OK. When speculative read is being done we may not // Several errors are OK. When speculative read is being done we may not
// be able to read a single byte from the socket. Also, SIGSTOP issued // be able to read a single byte from the socket. Also, SIGSTOP issued
// by a debugging tool can result in EINTR error. // by a debugging tool can result in EINTR error.
if (nbytes == -1 && (errno == EAGAIN || errno == EWOULDBLOCK || if (rc == -1) {
errno == EINTR))
return 0;
// Signalise peer failure.
if (nbytes == -1) {
errno_assert (errno != EBADF errno_assert (errno != EBADF
&& errno != EFAULT && errno != EFAULT
&& errno != EINVAL && errno != EINVAL
&& errno != ENOMEM && errno != ENOMEM
&& errno != ENOTSOCK); && errno != ENOTSOCK);
return -1; if (errno == EWOULDBLOCK || errno == EINTR)
errno = EAGAIN;
} }
// Orderly shutdown by the peer. return static_cast <int> (rc);
if (nbytes == 0)
return -1;
return static_cast <int> (nbytes);
#endif #endif
} }

View File

@ -87,10 +87,9 @@ namespace zmq
// of error or orderly shutdown by the other peer -1 is returned. // of error or orderly shutdown by the other peer -1 is returned.
int write (const void *data_, size_t size_); int write (const void *data_, size_t size_);
// Reads data from the socket (up to 'size' bytes). Returns the number // Reads data from the socket (up to 'size' bytes).
// of bytes actually read (even zero is to be considered to be // Returns the number of bytes actually read or -1 on error.
// a success). In case of error or orderly shutdown by the other // Zero indicates the peer has closed the connection.
// peer -1 is returned.
int read (void *data_, size_t size_); int read (void *data_, size_t size_);
int read_identity (msg_t *msg_); int read_identity (msg_t *msg_);

View File

@ -39,7 +39,8 @@ noinst_PROGRAMS = test_system \
test_conflate \ test_conflate \
test_inproc_connect \ test_inproc_connect \
test_issue_566 \ test_issue_566 \
test_abstract_ipc test_abstract_ipc \
test_many_sockets
if !ON_MINGW if !ON_MINGW
noinst_PROGRAMS += test_shutdown_stress \ noinst_PROGRAMS += test_shutdown_stress \
@ -86,6 +87,7 @@ test_conflate_SOURCES = test_conflate.cpp
test_inproc_connect_SOURCES = test_inproc_connect.cpp test_inproc_connect_SOURCES = test_inproc_connect.cpp
test_issue_566_SOURCES = test_issue_566.cpp test_issue_566_SOURCES = test_issue_566.cpp
test_abstract_ipc_SOURCES = test_abstract_ipc.cpp test_abstract_ipc_SOURCES = test_abstract_ipc.cpp
test_many_sockets_SOURCES = test_many_sockets.cpp
if !ON_MINGW if !ON_MINGW
test_shutdown_stress_SOURCES = test_shutdown_stress.cpp test_shutdown_stress_SOURCES = test_shutdown_stress.cpp
test_pair_ipc_SOURCES = test_pair_ipc.cpp testutil.hpp test_pair_ipc_SOURCES = test_pair_ipc.cpp testutil.hpp

View File

@ -45,7 +45,6 @@ int main (int argc, char *argv [])
assert (rc == 0); assert (rc == 0);
int message_count = 20; int message_count = 20;
for (int j = 0; j < message_count; ++j) { for (int j = 0; j < message_count; ++j) {
rc = zmq_send(s_out, (void*)&j, sizeof(int), 0); rc = zmq_send(s_out, (void*)&j, sizeof(int), 0);
if (rc < 0) { if (rc < 0) {
@ -53,15 +52,13 @@ int main (int argc, char *argv [])
return -1; return -1;
} }
} }
msleep (SETTLE_TIME);
zmq_sleep (1);
int payload_recved = 0; int payload_recved = 0;
rc = zmq_recv (s_in, (void*)&payload_recved, sizeof(int), 0); rc = zmq_recv (s_in, (void*)&payload_recved, sizeof(int), 0);
assert (rc > 0); assert (rc > 0);
assert (payload_recved == message_count - 1); assert (payload_recved == message_count - 1);
rc = zmq_close (s_in); rc = zmq_close (s_in);
assert (rc == 0); assert (rc == 0);

View File

@ -61,7 +61,7 @@ void test_ctx_shutdown()
void *receiver_thread = zmq_threadstart (&receiver, socket); void *receiver_thread = zmq_threadstart (&receiver, socket);
// Wait for thread to start up and block // Wait for thread to start up and block
zmq_sleep (1); msleep (SETTLE_TIME);
// Shutdown context, if we used destroy here we would deadlock. // Shutdown context, if we used destroy here we would deadlock.
rc = zmq_ctx_shutdown (ctx); rc = zmq_ctx_shutdown (ctx);

View File

@ -193,8 +193,7 @@ int main (void)
assert (rc == 0); assert (rc == 0);
// Give time to process disconnect // Give time to process disconnect
// There's no way to do this except with a sleep msleep (SETTLE_TIME);
zmq_sleep(1);
// Send a message, should fail // Send a message, should fail
rc = zmq_send (frontend, "Hello", 5, ZMQ_DONTWAIT); rc = zmq_send (frontend, "Hello", 5, ZMQ_DONTWAIT);

View File

@ -142,7 +142,7 @@ void test_connect_before_bind_pub_sub()
assert (rc == 0); assert (rc == 0);
// Wait for pub-sub connection to happen // Wait for pub-sub connection to happen
zmq_sleep (1); msleep (SETTLE_TIME);
// Queue up some data, this not will be dropped // Queue up some data, this not will be dropped
rc = zmq_send_const (connectSocket, "after", 6, 0); rc = zmq_send_const (connectSocket, "after", 6, 0);

View File

@ -80,7 +80,7 @@ int main (void)
rc = zmq_bind (sb, "inproc://a"); rc = zmq_bind (sb, "inproc://a");
assert (rc == 0); assert (rc == 0);
zmq_sleep(1); msleep (SETTLE_TIME);
void *sc = zmq_socket (ctx, ZMQ_PUSH); void *sc = zmq_socket (ctx, ZMQ_PUSH);
rc = zmq_connect (sc, "inproc://a"); rc = zmq_connect (sc, "inproc://a");

View File

@ -0,0 +1,51 @@
/*
Copyright (c) 2007-2013 Contributors as noted in the AUTHORS file
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "testutil.hpp"
#include <zmq.h>
#include <stdio.h>
#include <stdlib.h>
const int no_of_sockets = 5000;
int main(void)
{
setup_test_environment();
void *ctx = zmq_ctx_new();
void *sockets[no_of_sockets];
int sockets_created = 0;
for ( int i = 0; i < no_of_sockets; ++i )
{
sockets[i] = zmq_socket(ctx, ZMQ_PAIR);
if (sockets[i])
++sockets_created;
}
assert(sockets_created < no_of_sockets);
for ( int i = 0; i < no_of_sockets; ++i )
if (sockets[i])
zmq_close (sockets[i]);
zmq_ctx_destroy (ctx);
return 0;
}

View File

@ -211,7 +211,7 @@ int main (void)
rc = zmq_socket_monitor (req, "inproc://monitor.req", ZMQ_EVENT_ALL); rc = zmq_socket_monitor (req, "inproc://monitor.req", ZMQ_EVENT_ALL);
assert (rc == 0); assert (rc == 0);
threads [1] = zmq_threadstart(&req_socket_monitor, ctx); threads [1] = zmq_threadstart(&req_socket_monitor, ctx);
zmq_sleep(1); msleep (SETTLE_TIME);
// Bind REQ and REP // Bind REQ and REP
rc = zmq_bind (rep, addr.c_str()); rc = zmq_bind (rep, addr.c_str());
@ -238,8 +238,8 @@ int main (void)
rc = zmq_close (rep); rc = zmq_close (rep);
assert (rc == 0); assert (rc == 0);
// Allow some time for detecting error states // Allow enough time for detecting error states
zmq_sleep(1); msleep (250);
// Close the REQ socket // Close the REQ socket
rc = zmq_close (req); rc = zmq_close (req);

View File

@ -54,7 +54,7 @@ int main (void)
// We have to give the connects time to finish otherwise the requests // We have to give the connects time to finish otherwise the requests
// will not properly round-robin. We could alternatively connect the // will not properly round-robin. We could alternatively connect the
// REQ sockets to the REP sockets. // REQ sockets to the REP sockets.
zmq_sleep(1); msleep (SETTLE_TIME);
// Case 1: Second send() before a reply arrives in a pipe. // Case 1: Second send() before a reply arrives in a pipe.

View File

@ -46,7 +46,7 @@ void test_round_robin_out (void *ctx)
// We have to give the connects time to finish otherwise the requests // We have to give the connects time to finish otherwise the requests
// will not properly round-robin. We could alternatively connect the // will not properly round-robin. We could alternatively connect the
// REQ sockets to the REP sockets. // REQ sockets to the REP sockets.
zmq_sleep(1); msleep (SETTLE_TIME);
// Send our peer-replies, and expect every REP it used once in order // Send our peer-replies, and expect every REP it used once in order
for (size_t peer = 0; peer < services; peer++) { for (size_t peer = 0; peer < services; peer++) {

View File

@ -59,7 +59,7 @@ int main (void)
assert (rc >= 0); assert (rc >= 0);
// Wait a bit till the subscription gets to the publisher // Wait a bit till the subscription gets to the publisher
zmq_sleep(1); msleep (SETTLE_TIME);
// Send an empty message // Send an empty message
rc = zmq_send (pub, NULL, 0, 0); rc = zmq_send (pub, NULL, 0, 0);

View File

@ -49,7 +49,7 @@ int main (void)
assert (rc == 0); assert (rc == 0);
// Allow unbind to settle // Allow unbind to settle
zmq_sleep(1); msleep (SETTLE_TIME);
// Check that sending would block (there's no outbound connection) // Check that sending would block (there's no outbound connection)
rc = zmq_send (push, "ABC", 3, ZMQ_DONTWAIT); rc = zmq_send (push, "ABC", 3, ZMQ_DONTWAIT);
@ -86,7 +86,7 @@ int main (void)
assert (rc == 0); assert (rc == 0);
// Allow disconnect to settle // Allow disconnect to settle
zmq_sleep(1); msleep (SETTLE_TIME);
// Check that sending would block (there's no inbound connections). // Check that sending would block (there's no inbound connections).
rc = zmq_send (push, "ABC", 3, ZMQ_DONTWAIT); rc = zmq_send (push, "ABC", 3, ZMQ_DONTWAIT);

View File

@ -24,6 +24,11 @@
#include "../include/zmq_utils.h" #include "../include/zmq_utils.h"
#include "platform.hpp" #include "platform.hpp"
// This defines the settle time used in tests; raise this if we
// get test failures on slower systems due to binds/connects not
// settled. Tested to work reliably at 1 msec on a fast PC.
#define SETTLE_TIME 10 // In msec
#undef NDEBUG #undef NDEBUG
#include <time.h> #include <time.h>
#include <assert.h> #include <assert.h>
@ -259,4 +264,16 @@ void setup_test_environment()
#endif #endif
} }
// Provide portable millisecond sleep
// http://www.cplusplus.com/forum/unices/60161/ http://en.cppreference.com/w/cpp/thread/sleep_for
void msleep (int milliseconds)
{
#ifdef ZMQ_HAVE_WINDOWS
Sleep (milliseconds);
#else
usleep (static_cast <useconds_t> (milliseconds) * 1000);
#endif
}
#endif #endif