0
0
mirror of https://github.com/zeromq/libzmq.git synced 2025-01-16 12:16:33 +08:00

Merge pull request #34 from hintjens/master

Backporting fixes from master
This commit is contained in:
Pieter Hintjens 2013-11-19 02:01:23 -08:00
commit e1939155ff
11 changed files with 117 additions and 69 deletions

1
.gitignore vendored
View File

@ -80,6 +80,7 @@ tests/test_router_raw_empty
tests/test_shutdown_stress_tipc tests/test_shutdown_stress_tipc
tests/test_sub_forward_tipc tests/test_sub_forward_tipc
tests/test_term_endpoint_tipc tests/test_term_endpoint_tipc
tests/test_many_sockets
tests/test*.log tests/test*.log
tests/test*.trs tests/test*.trs
src/platform.hpp* src/platform.hpp*

View File

@ -269,7 +269,7 @@ if(MSVC)
-DWIN32 -DWIN32
-DDLL_EXPORT -DDLL_EXPORT
# NB: May require tweaking for highly connected applications. # NB: May require tweaking for highly connected applications.
-DFD_SETSIZE=1025 -DFD_SETSIZE=1024
-D_CRT_SECURE_NO_WARNINGS) -D_CRT_SECURE_NO_WARNINGS)
# Parallel make. # Parallel make.

View File

@ -1,5 +1,5 @@
CC=gcc CC=gcc
CFLAGS=-Wall -Os -g -DDLL_EXPORT -DFD_SETSIZE=1025 -I. CFLAGS=-Wall -Os -g -DDLL_EXPORT -DFD_SETSIZE=1024 -I.
LIBS=-lws2_32 LIBS=-lws2_32
OBJS = ctx.o reaper.o dist.o err.o \ OBJS = ctx.o reaper.o dist.o err.o \

View File

@ -40,7 +40,7 @@
/> />
<Tool <Tool
Name="VCCLCompilerTool" Name="VCCLCompilerTool"
AdditionalOptions="-DDLL_EXPORT -DFD_SETSIZE=1025 -D_CRT_SECURE_NO_WARNINGS" AdditionalOptions="-DDLL_EXPORT -DFD_SETSIZE=1024 -D_CRT_SECURE_NO_WARNINGS"
Optimization="0" Optimization="0"
PreprocessorDefinitions="NOMINMAX" PreprocessorDefinitions="NOMINMAX"
MinimalRebuild="true" MinimalRebuild="true"
@ -114,7 +114,7 @@
/> />
<Tool <Tool
Name="VCCLCompilerTool" Name="VCCLCompilerTool"
AdditionalOptions="-DDLL_EXPORT -DFD_SETSIZE=1025 -D_CRT_SECURE_NO_WARNINGS" AdditionalOptions="-DDLL_EXPORT -DFD_SETSIZE=1024 -D_CRT_SECURE_NO_WARNINGS"
Optimization="2" Optimization="2"
EnableIntrinsicFunctions="true" EnableIntrinsicFunctions="true"
RuntimeLibrary="2" RuntimeLibrary="2"
@ -188,7 +188,7 @@
/> />
<Tool <Tool
Name="VCCLCompilerTool" Name="VCCLCompilerTool"
AdditionalOptions="-DZMQ_STATIC -DFD_SETSIZE=1025 -D_CRT_SECURE_NO_WARNINGS" AdditionalOptions="-DZMQ_STATIC -DFD_SETSIZE=1024 -D_CRT_SECURE_NO_WARNINGS"
Optimization="0" Optimization="0"
PreprocessorDefinitions="NOMINMAX" PreprocessorDefinitions="NOMINMAX"
MinimalRebuild="true" MinimalRebuild="true"
@ -254,7 +254,7 @@
/> />
<Tool <Tool
Name="VCCLCompilerTool" Name="VCCLCompilerTool"
AdditionalOptions="-DZMQ_STATIC -DFD_SETSIZE=1025 -D_CRT_SECURE_NO_WARNINGS" AdditionalOptions="-DZMQ_STATIC -DFD_SETSIZE=1024 -D_CRT_SECURE_NO_WARNINGS"
Optimization="2" Optimization="2"
EnableIntrinsicFunctions="true" EnableIntrinsicFunctions="true"
RuntimeLibrary="2" RuntimeLibrary="2"
@ -319,7 +319,7 @@
/> />
<Tool <Tool
Name="VCCLCompilerTool" Name="VCCLCompilerTool"
AdditionalOptions="-DDLL_EXPORT -DFD_SETSIZE=1025 -D_CRT_SECURE_NO_WARNINGS" AdditionalOptions="-DDLL_EXPORT -DFD_SETSIZE=1024 -D_CRT_SECURE_NO_WARNINGS"
Optimization="2" Optimization="2"
EnableIntrinsicFunctions="true" EnableIntrinsicFunctions="true"
AdditionalIncludeDirectories="../../../../OpenPGM/include" AdditionalIncludeDirectories="../../../../OpenPGM/include"

View File

@ -13,7 +13,7 @@
<Command>copy ..\platform.hpp ..\..\..\src</Command> <Command>copy ..\platform.hpp ..\..\..\src</Command>
</PreBuildEvent> </PreBuildEvent>
<ClCompile> <ClCompile>
<PreprocessorDefinitions>_CRT_SECURE_NO_WARNINGS;DLL_EXPORT;FD_SETSIZE=1025;%(PreprocessorDefinitions)</PreprocessorDefinitions> <PreprocessorDefinitions>_CRT_SECURE_NO_WARNINGS;DLL_EXPORT;FD_SETSIZE=1024;%(PreprocessorDefinitions)</PreprocessorDefinitions>
</ClCompile> </ClCompile>
<Link> <Link>
<AdditionalDependencies>Ws2_32.lib;Rpcrt4.lib;%(AdditionalDependencies)</AdditionalDependencies> <AdditionalDependencies>Ws2_32.lib;Rpcrt4.lib;%(AdditionalDependencies)</AdditionalDependencies>

View File

@ -13,7 +13,7 @@
<Command>copy ..\platform.hpp ..\..\..\src</Command> <Command>copy ..\platform.hpp ..\..\..\src</Command>
</PreBuildEvent> </PreBuildEvent>
<ClCompile> <ClCompile>
<PreprocessorDefinitions>_CRT_SECURE_NO_WARNINGS;ZMQ_STATIC;FD_SETSIZE=1025;%(PreprocessorDefinitions)</PreprocessorDefinitions> <PreprocessorDefinitions>_CRT_SECURE_NO_WARNINGS;ZMQ_STATIC;FD_SETSIZE=1024;%(PreprocessorDefinitions)</PreprocessorDefinitions>
</ClCompile> </ClCompile>
<Lib> <Lib>
<AdditionalDependencies>Ws2_32.lib;Rpcrt4.lib;%(AdditionalDependencies)</AdditionalDependencies> <AdditionalDependencies>Ws2_32.lib;Rpcrt4.lib;%(AdditionalDependencies)</AdditionalDependencies>

View File

@ -62,6 +62,7 @@ The request-reply pattern is used for sending requests from a ZMQ_REQ _client_
to one or more ZMQ_REP _services_, and receiving subsequent replies to each to one or more ZMQ_REP _services_, and receiving subsequent replies to each
request sent. request sent.
The request-reply pattern is formally defined by http://rfc.zeromq.org/spec:28.
ZMQ_REQ ZMQ_REQ
^^^^^^^ ^^^^^^^
@ -168,6 +169,7 @@ Publish-subscribe pattern
The publish-subscribe pattern is used for one-to-many distribution of data from The publish-subscribe pattern is used for one-to-many distribution of data from
a single _publisher_ to multiple _subscribers_ in a fan out fashion. a single _publisher_ to multiple _subscribers_ in a fan out fashion.
The publish-subscribe pattern is formally defined by http://rfc.zeromq.org/spec:29.
ZMQ_PUB ZMQ_PUB
^^^^^^^ ^^^^^^^
@ -249,6 +251,7 @@ a pipeline. Data always flows down the pipeline, and each stage of the pipeline
is connected to at least one _node_. When a pipeline stage is connected to is connected to at least one _node_. When a pipeline stage is connected to
multiple _nodes_ data is round-robined among all connected _nodes_. multiple _nodes_ data is round-robined among all connected _nodes_.
The pipeline pattern is formally defined by http://rfc.zeromq.org/spec:30.
ZMQ_PUSH ZMQ_PUSH
^^^^^^^^ ^^^^^^^^
@ -296,6 +299,7 @@ The exclusive pair pattern is used to connect a peer to precisely one other
peer. This pattern is used for inter-thread communication across the inproc peer. This pattern is used for inter-thread communication across the inproc
transport. transport.
The exclusive pair pattern is formally defined by http://rfc.zeromq.org/spec:31.
ZMQ_PAIR ZMQ_PAIR
^^^^^^^^ ^^^^^^^^

View File

@ -185,7 +185,7 @@ ZMQ_EXPORT const char *zmq_strerror (int errnum);
/* Default for new contexts */ /* Default for new contexts */
#define ZMQ_IO_THREADS_DFLT 1 #define ZMQ_IO_THREADS_DFLT 1
#define ZMQ_MAX_SOCKETS_DFLT 1024 #define ZMQ_MAX_SOCKETS_DFLT 1023
ZMQ_EXPORT void *zmq_ctx_new (void); ZMQ_EXPORT void *zmq_ctx_new (void);
ZMQ_EXPORT int zmq_ctx_term (void *context); ZMQ_EXPORT int zmq_ctx_term (void *context);

View File

@ -153,7 +153,6 @@ uint64_t zmq::clock_t::now_ms ()
// does not guarantee that it will use a hardware that offers a monotonic timer. // does not guarantee that it will use a hardware that offers a monotonic timer.
// So, lets use GetTickCount when GetTickCount64 is not available with an workaround // So, lets use GetTickCount when GetTickCount64 is not available with an workaround
// to its 32 bit limitation. // to its 32 bit limitation.
static_assert(sizeof(uint64_t) >= sizeof(ULONGLONG), "Loosing timer information");
return static_cast<uint64_t>((*my_get_tick_count64)()); return static_cast<uint64_t>((*my_get_tick_count64)());
#else #else
return now_us () / 1000; return now_us () / 1000;

View File

@ -317,10 +317,6 @@ int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_)
win_assert (sync != NULL); win_assert (sync != NULL);
// Enter the critical section.
DWORD dwrc = WaitForSingleObject (sync, INFINITE);
zmq_assert (dwrc == WAIT_OBJECT_0);
// Windows has no 'socketpair' function. CreatePipe is no good as pipe // Windows has no 'socketpair' function. CreatePipe is no good as pipe
// handles cannot be polled on. Here we create the socketpair by hand. // handles cannot be polled on. Here we create the socketpair by hand.
*w_ = INVALID_SOCKET; *w_ = INVALID_SOCKET;
@ -341,55 +337,51 @@ int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_)
(char *)&tcp_nodelay, sizeof (tcp_nodelay)); (char *)&tcp_nodelay, sizeof (tcp_nodelay));
wsa_assert (rc != SOCKET_ERROR); wsa_assert (rc != SOCKET_ERROR);
// Bind listening socket to signaler port. // Init sockaddr to signaler port.
struct sockaddr_in addr; struct sockaddr_in addr;
memset (&addr, 0, sizeof (addr)); memset (&addr, 0, sizeof (addr));
addr.sin_family = AF_INET; addr.sin_family = AF_INET;
addr.sin_addr.s_addr = htonl (INADDR_LOOPBACK); addr.sin_addr.s_addr = htonl (INADDR_LOOPBACK);
addr.sin_port = htons (signaler_port); addr.sin_port = htons (signaler_port);
rc = bind (listener, (const struct sockaddr*) &addr, sizeof (addr));
wsa_assert (rc != SOCKET_ERROR);
// Listen for incomming connections.
rc = listen (listener, 1);
wsa_assert (rc != SOCKET_ERROR);
// Create the writer socket. // Create the writer socket.
*w_ = WSASocket (AF_INET, SOCK_STREAM, 0, NULL, 0, 0); *w_ = open_socket (AF_INET, SOCK_STREAM, 0);
wsa_assert (*w_ != INVALID_SOCKET); wsa_assert (*w_ != INVALID_SOCKET);
# if !defined _WIN32_WCE
// On Windows, preventing sockets to be inherited by child processes.
BOOL brc = SetHandleInformation ((HANDLE) *w_, HANDLE_FLAG_INHERIT, 0);
win_assert (brc);
# else
BOOL brc;
# endif
// Set TCP_NODELAY on writer socket. // Set TCP_NODELAY on writer socket.
rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELAY, rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELAY,
(char *)&tcp_nodelay, sizeof (tcp_nodelay)); (char *)&tcp_nodelay, sizeof (tcp_nodelay));
wsa_assert (rc != SOCKET_ERROR); wsa_assert (rc != SOCKET_ERROR);
// Enter the critical section.
DWORD dwrc = WaitForSingleObject (sync, INFINITE);
zmq_assert (dwrc == WAIT_OBJECT_0);
// Bind listening socket to signaler port.
rc = bind (listener, (const struct sockaddr*) &addr, sizeof (addr));
// Listen for incoming connections.
if (rc != SOCKET_ERROR)
rc = listen (listener, 1);
// Connect writer to the listener. // Connect writer to the listener.
if (rc != SOCKET_ERROR)
rc = connect (*w_, (struct sockaddr*) &addr, sizeof (addr)); rc = connect (*w_, (struct sockaddr*) &addr, sizeof (addr));
// Save errno if connection fails
int conn_errno = 0;
if (rc == SOCKET_ERROR)
conn_errno = WSAGetLastError ();
else {
// Accept connection from writer. // Accept connection from writer.
if (rc != SOCKET_ERROR)
*r_ = accept (listener, NULL, NULL); *r_ = accept (listener, NULL, NULL);
// Save errno if error occurred in bind/listen/connect/accept.
int saved_errno = 0;
if (*r_ == INVALID_SOCKET) if (*r_ == INVALID_SOCKET)
conn_errno = WSAGetLastError (); saved_errno = WSAGetLastError ();
}
// We don't need the listening socket anymore. Close it. // We don't need the listening socket anymore. Close it.
rc = closesocket (listener); closesocket (listener);
wsa_assert (rc != SOCKET_ERROR);
// Exit the critical section. // Exit the critical section.
brc = SetEvent (sync); BOOL brc = SetEvent (sync);
win_assert (brc != 0); win_assert (brc != 0);
// Release the kernel object // Release the kernel object
@ -406,11 +398,13 @@ int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_)
} }
else { else {
// Cleanup writer if connection failed // Cleanup writer if connection failed
if (*w_ != INVALID_SOCKET) {
rc = closesocket (*w_); rc = closesocket (*w_);
wsa_assert (rc != SOCKET_ERROR); wsa_assert (rc != SOCKET_ERROR);
*w_ = INVALID_SOCKET; *w_ = INVALID_SOCKET;
}
// Set errno from saved value // Set errno from saved value
errno = wsa_error_to_errno (conn_errno); errno = wsa_error_to_errno (saved_errno);
return -1; return -1;
} }
@ -474,7 +468,7 @@ int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_)
int rc = socketpair (AF_UNIX, SOCK_STREAM, 0, sv); int rc = socketpair (AF_UNIX, SOCK_STREAM, 0, sv);
if (rc == -1) { if (rc == -1) {
errno_assert (errno == ENFILE || errno == EMFILE); errno_assert (errno == ENFILE || errno == EMFILE);
sv [0] = sv [1] = -1; *w_ = *r_ = -1;
return -1; return -1;
} }
else { else {

View File

@ -21,31 +21,81 @@
#include <zmq.h> #include <zmq.h>
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include <vector>
const int no_of_sockets = 5000;
void test_system_max ()
{
// Keep allocating sockets until we run out of system resources
const int no_of_sockets = 2 * 65536;
void *ctx = zmq_ctx_new();
zmq_ctx_set(ctx, ZMQ_MAX_SOCKETS, no_of_sockets);
std::vector<void*> sockets;
while (true)
{
void *socket = zmq_socket(ctx, ZMQ_PAIR);
if (!socket)
break;
sockets.push_back(socket);
}
assert((int)sockets.size() < no_of_sockets);
// System is out of resources, further calls to zmq_socket should return NULL.
for (unsigned int i = 0; i < 10; ++i)
{
void *socket = zmq_socket(ctx, ZMQ_PAIR);
assert(socket == NULL);
}
// Clean up.
for (unsigned int i = 0; i < sockets.size(); ++i)
zmq_close(sockets[i]);
zmq_ctx_destroy(ctx);
}
void test_zmq_default_max ()
{
// Keep allocating sockets until we hit the default zeromq limit
void *ctx = zmq_ctx_new();
std::vector<void*> sockets;
while (true)
{
void *socket = zmq_socket(ctx, ZMQ_PAIR);
if (!socket)
break;
sockets.push_back(socket);
}
assert(sockets.size() == ZMQ_MAX_SOCKETS_DFLT);
// At zeromq max, further calls to zmq_socket should return NULL.
for (unsigned int i = 0; i < 10; ++i)
{
void *socket = zmq_socket(ctx, ZMQ_PAIR);
assert(socket == NULL);
}
// Clean up.
for (unsigned int i = 0; i < sockets.size(); ++i)
zmq_close(sockets[i]);
zmq_ctx_destroy(ctx);
}
int main(void) int main(void)
{ {
setup_test_environment(); setup_test_environment();
void *ctx = zmq_ctx_new(); test_system_max ();
void *sockets[no_of_sockets]; test_zmq_default_max ();
int sockets_created = 0;
for ( int i = 0; i < no_of_sockets; ++i )
{
sockets[i] = zmq_socket(ctx, ZMQ_PAIR);
if (sockets[i])
++sockets_created;
}
assert(sockets_created < no_of_sockets);
for ( int i = 0; i < no_of_sockets; ++i )
if (sockets[i])
zmq_close (sockets[i]);
zmq_ctx_destroy (ctx);
return 0; return 0;
} }