Merge remote-tracking branch 'upstream/master'

This commit is contained in:
Steven McCoy 2013-07-13 12:26:11 -04:00
commit 703c1a6e04
12 changed files with 203 additions and 132 deletions

1
.gitignore vendored
View File

@ -42,6 +42,7 @@ tests/test_connect_resolve
tests/test_connect_delay tests/test_connect_delay
tests/test_term_endpoint tests/test_term_endpoint
tests/test_router_mandatory tests/test_router_mandatory
tests/test_disconnect_inproc
src/platform.hpp* src/platform.hpp*
src/stamp-h1 src/stamp-h1
perf/local_lat perf/local_lat

2
.travis.yml Normal file
View File

@ -0,0 +1,2 @@
script: ./autogen.sh && ./configure && make && make check
language: c

2
NEWS
View File

@ -1,7 +1,9 @@
0MQ version 3.2.4 stable, released on 2013/xx/xx 0MQ version 3.2.4 stable, released on 2013/xx/xx
================================================ ================================================
* LIBZMQ-532 (Windows) critical section not released on error
* LIBZMQ-456 ZMQ_XPUB_VERBOSE does not propagate in a tree of XPUB/XSUB devices * LIBZMQ-456 ZMQ_XPUB_VERBOSE does not propagate in a tree of XPUB/XSUB devices
* LIBZMQ-84 (Windows) Assertion failed: Address already in use at signaler.cpp:80
0MQ version 3.2.3 stable, released on 2013/05/02 0MQ version 3.2.3 stable, released on 2013/05/02

View File

@ -0,0 +1,31 @@
CC=gcc
CFLAGS=-Wall -Os -g -DDLL_EXPORT -DFD_SETSIZE=1024 -I.
LIBS=-lws2_32
OBJS = address.o clock.o ctx.o dealer.o decoder.o devpoll.o dist.o encoder.o epoll.o err.o fq.o \
io_object.o io_thread.o ip.o ipc_address.o ipc_connecter.o ipc_listener.o kqueue.o lb.o \
mailbox.o msg.o mtrie.o object.o options.o own.o pair.o pgm_receiver.o pgm_sender.o \
pgm_socket.o pipe.o poll.o poller_base.o precompiled.o proxy.o pub.o pull.o push.o \
random.o reaper.o rep.o req.o router.o select.o session_base.o \
signaler.o socket_base.o stream_engine.o sub.o tcp.o tcp_address.o tcp_connecter.o tcp_listener.o \
thread.o trie.o v1_decoder.o v1_encoder.o xpub.o xsub.o zmq.o zmq_utils.o
%.o: ../../src/%.cpp
$(CC) -c -o $@ $< $(CFLAGS)
%.o: ../../perf/%.cpp
$(CC) -c -o $@ $< $(CFLAGS)
all: libzmq.dll
perf: inproc_lat.exe inproc_thr.exe local_lat.exe local_thr.exe remote_lat.exe remote_thr.exe
libzmq.dll: $(OBJS)
g++ -shared -o $@ $^ -Wl,--out-implib,$@.a $(LIBS)
%.exe: %.o libzmq.dll
g++ -o $@ $^
clean:
del *.o *.a *.dll *.exe

View File

@ -0,0 +1,32 @@
/*
Copyright (c) 2007-2011 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_PLATFORM_HPP_INCLUDED__
#define __ZMQ_PLATFORM_HPP_INCLUDED__
// This is the platform definition for the MSVC platform.
// As a first step of the build process it is copied to
// zmq directory to take place of platform.hpp generated from
// platform.hpp.in on platforms supported by GNU autotools.
// Place any MSVC-specific definitions here.
#define ZMQ_HAVE_WINDOWS
#endif

View File

@ -35,8 +35,6 @@
namespace zmq namespace zmq
{ {
class i_msg_sink;
// Helper base class for decoders that know the amount of data to read // Helper base class for decoders that know the amount of data to read
// in advance at any moment. Knowing the amount in advance is a property // in advance at any moment. Knowing the amount in advance is a property
// of the protocol used. 0MQ framing protocol is based size-prefixed // of the protocol used. 0MQ framing protocol is based size-prefixed

View File

@ -40,8 +40,6 @@
namespace zmq namespace zmq
{ {
class i_msg_source;
// Helper base class for encoders. It implements the state machine that // Helper base class for encoders. It implements the state machine that
// fills the outgoing buffer. Derived classes should implement individual // fills the outgoing buffer. Derived classes should implement individual
// state machine actions. // state machine actions.

View File

@ -26,7 +26,8 @@
namespace zmq namespace zmq
{ {
class i_msg_sink; // Forward declaration
struct i_msg_sink;
// Interface to be implemented by message decoder. // Interface to be implemented by message decoder.

View File

@ -27,7 +27,7 @@ namespace zmq
{ {
// Forward declaration // Forward declaration
class i_msg_source; struct i_msg_source;
// Interface to be implemented by message encoder. // Interface to be implemented by message encoder.

View File

@ -95,7 +95,11 @@ zmq::signaler_t::~signaler_t ()
int rc = close (r); int rc = close (r);
errno_assert (rc == 0); errno_assert (rc == 0);
#elif defined ZMQ_HAVE_WINDOWS #elif defined ZMQ_HAVE_WINDOWS
int rc = closesocket (w); struct linger so_linger = { 1, 0 };
int rc = setsockopt (w, SOL_SOCKET, SO_LINGER,
(char *)&so_linger, sizeof (so_linger));
wsa_assert (rc != SOCKET_ERROR);
rc = closesocket (w);
wsa_assert (rc != SOCKET_ERROR); wsa_assert (rc != SOCKET_ERROR);
rc = closesocket (r); rc = closesocket (r);
wsa_assert (rc != SOCKET_ERROR); wsa_assert (rc != SOCKET_ERROR);
@ -279,7 +283,7 @@ int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_)
(char *)&tcp_nodelay, sizeof (tcp_nodelay)); (char *)&tcp_nodelay, sizeof (tcp_nodelay));
wsa_assert (rc != SOCKET_ERROR); wsa_assert (rc != SOCKET_ERROR);
// Bind listening socket to any free local port. // Bind listening socket to signaler port.
struct sockaddr_in addr; struct sockaddr_in addr;
memset (&addr, 0, sizeof (addr)); memset (&addr, 0, sizeof (addr));
addr.sin_family = AF_INET; addr.sin_family = AF_INET;
@ -307,15 +311,19 @@ int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_)
// Connect writer to the listener. // Connect writer to the listener.
rc = connect (*w_, (struct sockaddr*) &addr, sizeof (addr)); rc = connect (*w_, (struct sockaddr*) &addr, sizeof (addr));
wsa_assert (rc != SOCKET_ERROR);
// Save errno if connection fails
int conn_errno = 0;
if (rc == SOCKET_ERROR) {
conn_errno = WSAGetLastError ();
} else {
// Accept connection from writer. // Accept connection from writer.
*r_ = accept (listener, NULL, NULL); *r_ = accept (listener, NULL, NULL);
wsa_assert (*r_ != INVALID_SOCKET);
// On Windows, preventing sockets to be inherited by child processes. if (*r_ == INVALID_SOCKET) {
brc = SetHandleInformation ((HANDLE) *r_, HANDLE_FLAG_INHERIT, 0); conn_errno = WSAGetLastError ();
win_assert (brc); }
}
// We don't need the listening socket anymore. Close it. // We don't need the listening socket anymore. Close it.
rc = closesocket (listener); rc = closesocket (listener);
@ -329,7 +337,29 @@ int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_)
brc = CloseHandle (sync); brc = CloseHandle (sync);
win_assert (brc != 0); win_assert (brc != 0);
if (*r_ != INVALID_SOCKET) {
// On Windows, preventing sockets to be inherited by child processes.
brc = SetHandleInformation ((HANDLE) *r_, HANDLE_FLAG_INHERIT, 0);
win_assert (brc);
return 0; return 0;
} else {
// Cleanup writer if connection failed
rc = closesocket (*w_);
wsa_assert (rc != SOCKET_ERROR);
*w_ = INVALID_SOCKET;
// Set errno from saved value
errno = wsa_error_to_errno (conn_errno);
// Ideally, we would return errno to the caller signaler_t()
// Unfortunately, it uses errno_assert() which gives "Unknown error"
// We might as well assert here and print the actual error message
wsa_assert_no (conn_errno);
return -1;
}
#elif defined ZMQ_HAVE_OPENVMS #elif defined ZMQ_HAVE_OPENVMS

View File

@ -29,8 +29,6 @@
namespace zmq namespace zmq
{ {
class i_msg_source;
// Encoder for 0MQ framing protocol. Converts messages into data stream. // Encoder for 0MQ framing protocol. Converts messages into data stream.
class v1_encoder_t : public encoder_base_t <v1_encoder_t> class v1_encoder_t : public encoder_base_t <v1_encoder_t>

View File

@ -1,6 +1,5 @@
/* /*
Copyright (c) 2012 Ian Barber Copyright (c) 2007-2013 Contributors as noted in the AUTHORS file
Copyright (c) 2012 Other contributors as noted in the AUTHORS file
This file is part of 0MQ. This file is part of 0MQ.
@ -19,7 +18,6 @@ along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "../include/zmq.h" #include "../include/zmq.h"
#include "../include/zmq_utils.h"
#include <errno.h> #include <errno.h>
#include <stdlib.h> #include <stdlib.h>
#include <string.h> #include <string.h>
@ -31,12 +29,9 @@ along with this program. If not, see <http://www.gnu.org/licenses/>.
int main (void) int main (void)
{ {
fprintf (stderr, "test_connect_delay running...\n");
int val; int val;
int rc; int rc;
char buffer[16]; char buffer[16];
int seen = 0;
// TEST 1. // TEST 1.
// First we're going to attempt to send messages to two // First we're going to attempt to send messages to two
// pipes, one connected, the other not. We should see // pipes, one connected, the other not. We should see
@ -53,7 +48,7 @@ int main (void)
val = 0; val = 0;
rc = zmq_setsockopt(to, ZMQ_LINGER, &val, sizeof(val)); rc = zmq_setsockopt(to, ZMQ_LINGER, &val, sizeof(val));
assert (rc == 0); assert (rc == 0);
rc = zmq_bind(to, "tcp://*:5555"); rc = zmq_bind (to, "tcp://*:6555");
assert (rc == 0); assert (rc == 0);
// Create a socket pushing to two endpoints - only 1 message should arrive. // Create a socket pushing to two endpoints - only 1 message should arrive.
@ -71,26 +66,22 @@ int main (void)
// We send 10 messages, 5 should just get stuck in the queue // We send 10 messages, 5 should just get stuck in the queue
// for the not-yet-connected pipe // for the not-yet-connected pipe
for (int i = 0; i < 10; ++i) for (int i = 0; i < 10; ++i) {
{ rc = zmq_send (from, "Hello", 5, 0);
std::string message("message "); assert (rc == 5);
message += ('0' + i);
rc = zmq_send (from, message.data(), message.size(), 0);
assert(rc >= 0);
} }
// Sleep to allow the messages to be delivered
zmq_sleep (1);
// We now consume from the connected pipe // We now consume from the connected pipe
// - we should see just 5 // - we should see just 5
seen = 0; int timeout = 100;
for (int i = 0; i < 10; ++i) rc = zmq_setsockopt (to, ZMQ_RCVTIMEO, &timeout, sizeof (int));
{ assert (rc == 0);
memset (&buffer, 0, sizeof(buffer));
rc = zmq_recv (to, &buffer, sizeof(buffer), ZMQ_DONTWAIT); int seen = 0;
while (true) {
rc = zmq_recv (to, &buffer, sizeof (buffer), 0);
if (rc == -1) if (rc == -1)
break; break; // Break when we didn't get a message
seen++; seen++;
} }
assert (seen == 5); assert (seen == 5);
@ -101,7 +92,7 @@ int main (void)
rc = zmq_close (to); rc = zmq_close (to);
assert (rc == 0); assert (rc == 0);
rc = zmq_ctx_destroy(context); rc = zmq_term (context);
assert (rc == 0); assert (rc == 0);
// TEST 2 // TEST 2
@ -144,26 +135,21 @@ int main (void)
assert (rc == 0); assert (rc == 0);
// Send 10 messages, all should be routed to the connected pipe // Send 10 messages, all should be routed to the connected pipe
for (int i = 0; i < 10; ++i) for (int i = 0; i < 10; ++i) {
{ rc = zmq_send (from, "Hello", 5, 0);
std::string message("message "); assert (rc == 5);
message += ('0' + i);
rc = zmq_send (from, message.data(), message.size(), 0);
assert (rc >= 0);
} }
rc = zmq_setsockopt (to, ZMQ_RCVTIMEO, &timeout, sizeof (int));
assert (rc == 0);
// Sleep to allow the messages to be delivered
zmq_sleep (1);
// Send 10 messages, all should arrive.
seen = 0; seen = 0;
for (int i = 0; i < 10; ++i) while (true) {
{ rc = zmq_recv (to, &buffer, sizeof (buffer), 0);
memset(&buffer, 0, sizeof(buffer)); if (rc == -1)
rc = zmq_recv (to, &buffer, sizeof(buffer), ZMQ_DONTWAIT); break; // Break when we didn't get a message
// If there is a failed delivery, assert! seen++;
assert (rc != -1);
} }
assert (seen == 10);
rc = zmq_close (from); rc = zmq_close (from);
assert (rc == 0); assert (rc == 0);
@ -171,89 +157,81 @@ int main (void)
rc = zmq_close (to); rc = zmq_close (to);
assert (rc == 0); assert (rc == 0);
rc = zmq_ctx_destroy(context); rc = zmq_term (context);
assert (rc == 0); assert (rc == 0);
// TEST 3 // TEST 3
// This time we want to validate that the same blocking behaviour // This time we want to validate that the same blocking behaviour
// occurs with an existing connection that is broken. We will send // occurs with an existing connection that is broken. We will send
// messaages to a connected pipe, disconnect and verify the messages // messages to a connected pipe, disconnect and verify the messages
// block. Then we reconnect and verify messages flow again. // block. Then we reconnect and verify messages flow again.
context = zmq_ctx_new (); context = zmq_ctx_new ();
void *context2 = zmq_ctx_new();
to = zmq_socket (context2, ZMQ_PULL); void *backend = zmq_socket (context, ZMQ_DEALER);
assert (to); assert (backend);
rc = zmq_bind (to, "tcp://*:5560"); void *frontend = zmq_socket (context, ZMQ_DEALER);
assert (frontend);
int zero = 0;
rc = zmq_setsockopt (backend, ZMQ_LINGER, &zero, sizeof (zero));
assert (rc == 0);
rc = zmq_setsockopt (frontend, ZMQ_LINGER, &zero, sizeof (zero));
assert (rc == 0); assert (rc == 0);
val = 0; // Frontend connects to backend using DELAY_ATTACH_ON_CONNECT
rc = zmq_setsockopt (to, ZMQ_LINGER, &val, sizeof(val)); int on = 1;
rc = zmq_setsockopt (frontend, ZMQ_DELAY_ATTACH_ON_CONNECT, &on, sizeof (on));
assert (rc == 0);
rc = zmq_bind (backend, "tcp://*:5560");
assert (rc == 0);
rc = zmq_connect (frontend, "tcp://localhost:5560");
assert (rc == 0); assert (rc == 0);
// Create a socket pushing // Ping backend to frontend so we know when the connection is up
from = zmq_socket (context, ZMQ_PUSH); rc = zmq_send (backend, "Hello", 5, 0);
assert (from); assert (rc == 5);
rc = zmq_recv (frontend, buffer, 255, 0);
assert (rc == 5);
val = 0; // Send message from frontend to backend
rc = zmq_setsockopt (from, ZMQ_LINGER, &val, sizeof(val)); rc = zmq_send (frontend, "Hello", 5, ZMQ_DONTWAIT);
assert (rc == 0); assert (rc == 5);
val = 1;
rc = zmq_setsockopt (from, ZMQ_DELAY_ATTACH_ON_CONNECT, &val, sizeof(val));
assert (rc == 0);
// Connect to the valid socket socket rc = zmq_close (backend);
rc = zmq_connect (from, "tcp://localhost:5560");
assert (rc == 0);
// Allow connections to stabilise
zmq_sleep(1);
// Send a message, should succeed
std::string message("message ");
rc = zmq_send (from, message.data(), message.size(), 0);
assert (rc >= 0);
rc = zmq_close (to);
assert (rc == 0);
rc = zmq_ctx_destroy(context2);
assert (rc == 0); assert (rc == 0);
// Give time to process disconnect // Give time to process disconnect
zmq_sleep(1); // There's no way to do this except with a sleep
struct timespec t = { 0, 250 * 1000000 };
nanosleep (&t, NULL);
// Send a message, should fail // Send a message, should fail
rc = zmq_send (from, message.data(), message.size(), ZMQ_DONTWAIT); rc = zmq_send (frontend, "Hello", 5, ZMQ_DONTWAIT);
assert (rc == -1); assert (rc == -1);
context2 = zmq_ctx_new(); // Recreate backend socket
to = zmq_socket (context2, ZMQ_PULL); backend = zmq_socket (context, ZMQ_DEALER);
assert (to); assert (backend);
rc = zmq_bind (to, "tcp://*:5560"); rc = zmq_setsockopt (backend, ZMQ_LINGER, &zero, sizeof (zero));
assert (rc == 0);
rc = zmq_bind (backend, "tcp://*:5560");
assert (rc == 0); assert (rc == 0);
val = 0; // Ping backend to frontend so we know when the connection is up
rc = zmq_setsockopt (to, ZMQ_LINGER, &val, sizeof(val)); rc = zmq_send (backend, "Hello", 5, 0);
assert (rc == 0); assert (rc == 5);
rc = zmq_recv (frontend, buffer, 255, 0);
// Allow connections to stabilise assert (rc == 5);
zmq_sleep(1);
// After the reconnect, should succeed // After the reconnect, should succeed
rc = zmq_send (from, message.data(), message.size(), 0); rc = zmq_send (frontend, "Hello", 5, ZMQ_DONTWAIT);
assert (rc >= 0); assert (rc == 5);
rc = zmq_close (to); rc = zmq_close (backend);
assert (rc == 0); assert (rc == 0);
rc = zmq_close (from); rc = zmq_close (frontend);
assert (rc == 0); assert (rc == 0);
rc = zmq_ctx_destroy(context); rc = zmq_term (context);
assert (rc == 0);
rc = zmq_ctx_destroy(context2);
assert (rc == 0); assert (rc == 0);
} }