From 573a1eab4b2df369a23b11576ba23dee4bf9eaae Mon Sep 17 00:00:00 2001 From: KIU Shueng Chuan Date: Fri, 15 Feb 2013 10:45:43 +0800 Subject: [PATCH 1/8] release critical section on failure to create signaler fdpair --- src/signaler.cpp | 46 ++++++++++++++++++++++++++++++++++++---------- 1 file changed, 36 insertions(+), 10 deletions(-) diff --git a/src/signaler.cpp b/src/signaler.cpp index eba0fab6..dd5c1fc6 100644 --- a/src/signaler.cpp +++ b/src/signaler.cpp @@ -279,7 +279,7 @@ int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_) (char *)&tcp_nodelay, sizeof (tcp_nodelay)); wsa_assert (rc != SOCKET_ERROR); - // Bind listening socket to any free local port. + // Bind listening socket to signaler port. struct sockaddr_in addr; memset (&addr, 0, sizeof (addr)); addr.sin_family = AF_INET; @@ -307,15 +307,19 @@ int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_) // Connect writer to the listener. rc = connect (*w_, (struct sockaddr*) &addr, sizeof (addr)); - wsa_assert (rc != SOCKET_ERROR); - // Accept connection from writer. - *r_ = accept (listener, NULL, NULL); - wsa_assert (*r_ != INVALID_SOCKET); + // Save errno if connection fails + int conn_errno = 0; + if (rc == SOCKET_ERROR) { + conn_errno = WSAGetLastError (); + } else { + // Accept connection from writer. + *r_ = accept (listener, NULL, NULL); - // On Windows, preventing sockets to be inherited by child processes. - brc = SetHandleInformation ((HANDLE) *r_, HANDLE_FLAG_INHERIT, 0); - win_assert (brc); + if (*r_ == INVALID_SOCKET) { + conn_errno = WSAGetLastError (); + } + } // We don't need the listening socket anymore. Close it. rc = closesocket (listener); @@ -325,11 +329,33 @@ int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_) brc = SetEvent (sync); win_assert (brc != 0); - // Release the kernel object + // Release the kernel object brc = CloseHandle (sync); win_assert (brc != 0); - return 0; + if (*r_ != INVALID_SOCKET) { + // On Windows, preventing sockets to be inherited by child processes. + brc = SetHandleInformation ((HANDLE) *r_, HANDLE_FLAG_INHERIT, 0); + win_assert (brc); + + return 0; + } else { + // Cleanup writer if connection failed + rc = closesocket (*w_); + wsa_assert (rc != SOCKET_ERROR); + + *w_ = INVALID_SOCKET; + + // Set errno from saved value + errno = wsa_error_to_errno (conn_errno); + + // Ideally, we would return errno to the caller signaler_t() + // Unfortunately, it uses errno_assert() which gives "Unknown error" + // We might as well assert here and print the actual error message + wsa_assert_no (conn_errno); + + return -1; + } #elif defined ZMQ_HAVE_OPENVMS From d7cad1b52afc85550cdfc6dcd69fbbd25f0f6310 Mon Sep 17 00:00:00 2001 From: KIU Shueng Chuan Date: Sat, 29 Dec 2012 18:05:15 +0800 Subject: [PATCH 2/8] set SO_LINGER on first signaler socket to close in order to avoid TIME_WAIT state. --- src/signaler.cpp | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/signaler.cpp b/src/signaler.cpp index dd5c1fc6..19eafce1 100644 --- a/src/signaler.cpp +++ b/src/signaler.cpp @@ -95,7 +95,11 @@ zmq::signaler_t::~signaler_t () int rc = close (r); errno_assert (rc == 0); #elif defined ZMQ_HAVE_WINDOWS - int rc = closesocket (w); + struct linger so_linger = { 1, 0 }; + int rc = setsockopt (w, SOL_SOCKET, SO_LINGER, + (char *)&so_linger, sizeof (so_linger)); + wsa_assert (rc != SOCKET_ERROR); + rc = closesocket (w); wsa_assert (rc != SOCKET_ERROR); rc = closesocket (r); wsa_assert (rc != SOCKET_ERROR); From 26b182fe76d0f5ce21b541dc89f43be865ec73fa Mon Sep 17 00:00:00 2001 From: Pieter Hintjens Date: Sat, 18 May 2013 07:17:00 +0100 Subject: [PATCH 3/8] Backported #84 and #532 --- NEWS | 2 ++ 1 file changed, 2 insertions(+) diff --git a/NEWS b/NEWS index a01a7096..20cd9642 100644 --- a/NEWS +++ b/NEWS @@ -1,7 +1,9 @@ 0MQ version 3.2.4 stable, released on 2013/xx/xx ================================================ +* LIBZMQ-532 (Windows) critical section not released on error * LIBZMQ-456 ZMQ_XPUB_VERBOSE does not propagate in a tree of XPUB/XSUB devices +* LIBZMQ-84 (Windows) Assertion failed: Address already in use at signaler.cpp:80 0MQ version 3.2.3 stable, released on 2013/05/02 From 96fbeea5f64fbf7fa84920bfae44ce749563b180 Mon Sep 17 00:00:00 2001 From: Pieter Hintjens Date: Tue, 21 May 2013 09:22:13 +0200 Subject: [PATCH 4/8] Added support for Travis CI --- .gitignore | 1 + .travis.yaml | 2 ++ 2 files changed, 3 insertions(+) create mode 100644 .travis.yaml diff --git a/.gitignore b/.gitignore index 8f1f00f3..0309a519 100644 --- a/.gitignore +++ b/.gitignore @@ -42,6 +42,7 @@ tests/test_connect_resolve tests/test_connect_delay tests/test_term_endpoint tests/test_router_mandatory +tests/test_disconnect_inproc src/platform.hpp* src/stamp-h1 perf/local_lat diff --git a/.travis.yaml b/.travis.yaml new file mode 100644 index 00000000..aafbbd1d --- /dev/null +++ b/.travis.yaml @@ -0,0 +1,2 @@ +script: ./autogen.sh && ./configure && make && make check +language: c From 37a0ec59360144704c66d116c0d7b4ddf43baaa8 Mon Sep 17 00:00:00 2001 From: Pieter Hintjens Date: Tue, 21 May 2013 09:32:39 +0200 Subject: [PATCH 5/8] Fixed name of Travis file --- .travis.yaml => .travis.yml | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename .travis.yaml => .travis.yml (100%) diff --git a/.travis.yaml b/.travis.yml similarity index 100% rename from .travis.yaml rename to .travis.yml From 3d353f830699eb8e1b0cd0ce8b939e4356de1556 Mon Sep 17 00:00:00 2001 From: Pieter Hintjens Date: Tue, 21 May 2013 10:25:02 +0200 Subject: [PATCH 6/8] Backported test_connect_delay.cpp from libzmq --- tests/test_connect_delay.cpp | 204 ++++++++++++++++------------------- 1 file changed, 91 insertions(+), 113 deletions(-) diff --git a/tests/test_connect_delay.cpp b/tests/test_connect_delay.cpp index 92c0d682..a0563512 100644 --- a/tests/test_connect_delay.cpp +++ b/tests/test_connect_delay.cpp @@ -1,25 +1,23 @@ /* -Copyright (c) 2012 Ian Barber -Copyright (c) 2012 Other contributors as noted in the AUTHORS file + Copyright (c) 2007-2013 Contributors as noted in the AUTHORS file + + This file is part of 0MQ. -This file is part of 0MQ. + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. -0MQ is free software; you can redistribute it and/or modify it under -the terms of the GNU Lesser General Public License as published by -the Free Software Foundation; either version 3 of the License, or -(at your option) any later version. + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. -0MQ is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU Lesser General Public License for more details. - -You should have received a copy of the GNU Lesser General Public License -along with this program. If not, see . + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . */ #include "../include/zmq.h" -#include "../include/zmq_utils.h" #include #include #include @@ -31,12 +29,9 @@ along with this program. If not, see . int main (void) { - fprintf (stderr, "test_connect_delay running...\n"); int val; int rc; char buffer[16]; - int seen = 0; - // TEST 1. // First we're going to attempt to send messages to two // pipes, one connected, the other not. We should see @@ -53,7 +48,7 @@ int main (void) val = 0; rc = zmq_setsockopt(to, ZMQ_LINGER, &val, sizeof(val)); assert (rc == 0); - rc = zmq_bind(to, "tcp://*:5555"); + rc = zmq_bind (to, "tcp://*:6555"); assert (rc == 0); // Create a socket pushing to two endpoints - only 1 message should arrive. @@ -71,26 +66,22 @@ int main (void) // We send 10 messages, 5 should just get stuck in the queue // for the not-yet-connected pipe - for (int i = 0; i < 10; ++i) - { - std::string message("message "); - message += ('0' + i); - rc = zmq_send (from, message.data(), message.size(), 0); - assert(rc >= 0); + for (int i = 0; i < 10; ++i) { + rc = zmq_send (from, "Hello", 5, 0); + assert (rc == 5); } - // Sleep to allow the messages to be delivered - zmq_sleep (1); - // We now consume from the connected pipe // - we should see just 5 - seen = 0; - for (int i = 0; i < 10; ++i) - { - memset (&buffer, 0, sizeof(buffer)); - rc = zmq_recv (to, &buffer, sizeof(buffer), ZMQ_DONTWAIT); - if( rc == -1) - break; + int timeout = 100; + rc = zmq_setsockopt (to, ZMQ_RCVTIMEO, &timeout, sizeof (int)); + assert (rc == 0); + + int seen = 0; + while (true) { + rc = zmq_recv (to, &buffer, sizeof (buffer), 0); + if (rc == -1) + break; // Break when we didn't get a message seen++; } assert (seen == 5); @@ -101,7 +92,7 @@ int main (void) rc = zmq_close (to); assert (rc == 0); - rc = zmq_ctx_destroy(context); + rc = zmq_term (context); assert (rc == 0); // TEST 2 @@ -144,26 +135,21 @@ int main (void) assert (rc == 0); // Send 10 messages, all should be routed to the connected pipe - for (int i = 0; i < 10; ++i) - { - std::string message("message "); - message += ('0' + i); - rc = zmq_send (from, message.data(), message.size(), 0); - assert (rc >= 0); + for (int i = 0; i < 10; ++i) { + rc = zmq_send (from, "Hello", 5, 0); + assert (rc == 5); } - - // Sleep to allow the messages to be delivered - zmq_sleep (1); - - // Send 10 messages, all should arrive. + rc = zmq_setsockopt (to, ZMQ_RCVTIMEO, &timeout, sizeof (int)); + assert (rc == 0); + seen = 0; - for (int i = 0; i < 10; ++i) - { - memset(&buffer, 0, sizeof(buffer)); - rc = zmq_recv (to, &buffer, sizeof(buffer), ZMQ_DONTWAIT); - // If there is a failed delivery, assert! - assert (rc != -1); + while (true) { + rc = zmq_recv (to, &buffer, sizeof (buffer), 0); + if (rc == -1) + break; // Break when we didn't get a message + seen++; } + assert (seen == 10); rc = zmq_close (from); assert (rc == 0); @@ -171,89 +157,81 @@ int main (void) rc = zmq_close (to); assert (rc == 0); - rc = zmq_ctx_destroy(context); + rc = zmq_term (context); assert (rc == 0); // TEST 3 // This time we want to validate that the same blocking behaviour // occurs with an existing connection that is broken. We will send - // messaages to a connected pipe, disconnect and verify the messages + // messages to a connected pipe, disconnect and verify the messages // block. Then we reconnect and verify messages flow again. - context = zmq_ctx_new(); - void *context2 = zmq_ctx_new(); + context = zmq_ctx_new (); - to = zmq_socket (context2, ZMQ_PULL); - assert (to); - rc = zmq_bind (to, "tcp://*:5560"); + void *backend = zmq_socket (context, ZMQ_DEALER); + assert (backend); + void *frontend = zmq_socket (context, ZMQ_DEALER); + assert (frontend); + int zero = 0; + rc = zmq_setsockopt (backend, ZMQ_LINGER, &zero, sizeof (zero)); + assert (rc == 0); + rc = zmq_setsockopt (frontend, ZMQ_LINGER, &zero, sizeof (zero)); assert (rc == 0); - val = 0; - rc = zmq_setsockopt (to, ZMQ_LINGER, &val, sizeof(val)); + // Frontend connects to backend using DELAY_ATTACH_ON_CONNECT + int on = 1; + rc = zmq_setsockopt (frontend, ZMQ_DELAY_ATTACH_ON_CONNECT, &on, sizeof (on)); + assert (rc == 0); + rc = zmq_bind (backend, "tcp://*:5560"); + assert (rc == 0); + rc = zmq_connect (frontend, "tcp://localhost:5560"); assert (rc == 0); - // Create a socket pushing - from = zmq_socket (context, ZMQ_PUSH); - assert (from); - - val = 0; - rc = zmq_setsockopt (from, ZMQ_LINGER, &val, sizeof(val)); - assert (rc == 0); - val = 1; - rc = zmq_setsockopt (from, ZMQ_DELAY_ATTACH_ON_CONNECT, &val, sizeof(val)); - assert (rc == 0); - - // Connect to the valid socket socket - rc = zmq_connect (from, "tcp://localhost:5560"); + // Ping backend to frontend so we know when the connection is up + rc = zmq_send (backend, "Hello", 5, 0); + assert (rc == 5); + rc = zmq_recv (frontend, buffer, 255, 0); + assert (rc == 5); + + // Send message from frontend to backend + rc = zmq_send (frontend, "Hello", 5, ZMQ_DONTWAIT); + assert (rc == 5); + + rc = zmq_close (backend); assert (rc == 0); - // Allow connections to stabilise - zmq_sleep(1); - - // Send a message, should succeed - std::string message("message "); - rc = zmq_send (from, message.data(), message.size(), 0); - assert (rc >= 0); - - rc = zmq_close (to); - assert (rc == 0); - - rc = zmq_ctx_destroy(context2); - assert (rc == 0); - - // Give time to process disconnect - zmq_sleep(1); + // Give time to process disconnect + // There's no way to do this except with a sleep + struct timespec t = { 0, 250 * 1000000 }; + nanosleep (&t, NULL); // Send a message, should fail - rc = zmq_send (from, message.data(), message.size(), ZMQ_DONTWAIT); + rc = zmq_send (frontend, "Hello", 5, ZMQ_DONTWAIT); assert (rc == -1); - - context2 = zmq_ctx_new(); - to = zmq_socket (context2, ZMQ_PULL); - assert (to); - rc = zmq_bind (to, "tcp://*:5560"); + + // Recreate backend socket + backend = zmq_socket (context, ZMQ_DEALER); + assert (backend); + rc = zmq_setsockopt (backend, ZMQ_LINGER, &zero, sizeof (zero)); + assert (rc == 0); + rc = zmq_bind (backend, "tcp://*:5560"); assert (rc == 0); - val = 0; - rc = zmq_setsockopt (to, ZMQ_LINGER, &val, sizeof(val)); - assert (rc == 0); - - // Allow connections to stabilise - zmq_sleep(1); - + // Ping backend to frontend so we know when the connection is up + rc = zmq_send (backend, "Hello", 5, 0); + assert (rc == 5); + rc = zmq_recv (frontend, buffer, 255, 0); + assert (rc == 5); + // After the reconnect, should succeed - rc = zmq_send (from, message.data(), message.size(), 0); - assert (rc >= 0); + rc = zmq_send (frontend, "Hello", 5, ZMQ_DONTWAIT); + assert (rc == 5); - rc = zmq_close (to); + rc = zmq_close (backend); assert (rc == 0); - rc = zmq_close (from); + rc = zmq_close (frontend); assert (rc == 0); - rc = zmq_ctx_destroy(context); - assert (rc == 0); - - rc = zmq_ctx_destroy(context2); + rc = zmq_term (context); assert (rc == 0); } - From 20cce750fabb13e5ad9399ecc4d4c71162b68394 Mon Sep 17 00:00:00 2001 From: Christophe Juniet Date: Wed, 3 Jul 2013 21:34:58 +0200 Subject: [PATCH 7/8] Fix a few invalid forward declarations A few forward declarations use mismatched struct and class types. Clang won't compile this with -Werror. --- src/decoder.hpp | 2 -- src/encoder.hpp | 2 -- src/i_decoder.hpp | 3 ++- src/i_encoder.hpp | 2 +- src/v1_encoder.hpp | 2 -- 5 files changed, 3 insertions(+), 8 deletions(-) diff --git a/src/decoder.hpp b/src/decoder.hpp index 4ed2eb31..c1280c5c 100644 --- a/src/decoder.hpp +++ b/src/decoder.hpp @@ -35,8 +35,6 @@ namespace zmq { - class i_msg_sink; - // Helper base class for decoders that know the amount of data to read // in advance at any moment. Knowing the amount in advance is a property // of the protocol used. 0MQ framing protocol is based size-prefixed diff --git a/src/encoder.hpp b/src/encoder.hpp index e30f7d9c..b77adaf0 100644 --- a/src/encoder.hpp +++ b/src/encoder.hpp @@ -40,8 +40,6 @@ namespace zmq { - class i_msg_source; - // Helper base class for encoders. It implements the state machine that // fills the outgoing buffer. Derived classes should implement individual // state machine actions. diff --git a/src/i_decoder.hpp b/src/i_decoder.hpp index 99eddeb0..bcfd05c6 100644 --- a/src/i_decoder.hpp +++ b/src/i_decoder.hpp @@ -26,7 +26,8 @@ namespace zmq { - class i_msg_sink; + // Forward declaration + struct i_msg_sink; // Interface to be implemented by message decoder. diff --git a/src/i_encoder.hpp b/src/i_encoder.hpp index ae491e25..9a0812fb 100644 --- a/src/i_encoder.hpp +++ b/src/i_encoder.hpp @@ -27,7 +27,7 @@ namespace zmq { // Forward declaration - class i_msg_source; + struct i_msg_source; // Interface to be implemented by message encoder. diff --git a/src/v1_encoder.hpp b/src/v1_encoder.hpp index bd09f6fa..0b7ec172 100644 --- a/src/v1_encoder.hpp +++ b/src/v1_encoder.hpp @@ -29,8 +29,6 @@ namespace zmq { - class i_msg_source; - // Encoder for 0MQ framing protocol. Converts messages into data stream. class v1_encoder_t : public encoder_base_t From 51066c9767ad8139fe13c9d8f6599b7046396d13 Mon Sep 17 00:00:00 2001 From: KIU Shueng Chuan Date: Thu, 11 Jul 2013 21:49:45 +0800 Subject: [PATCH 8/8] simple Makefile for mingw32 --- builds/mingw32/Makefile.mingw32 | 31 +++++++++++++++++++++++++++++++ builds/mingw32/platform.hpp | 32 ++++++++++++++++++++++++++++++++ 2 files changed, 63 insertions(+) create mode 100644 builds/mingw32/Makefile.mingw32 create mode 100644 builds/mingw32/platform.hpp diff --git a/builds/mingw32/Makefile.mingw32 b/builds/mingw32/Makefile.mingw32 new file mode 100644 index 00000000..e5d752bf --- /dev/null +++ b/builds/mingw32/Makefile.mingw32 @@ -0,0 +1,31 @@ +CC=gcc +CFLAGS=-Wall -Os -g -DDLL_EXPORT -DFD_SETSIZE=1024 -I. +LIBS=-lws2_32 + +OBJS = address.o clock.o ctx.o dealer.o decoder.o devpoll.o dist.o encoder.o epoll.o err.o fq.o \ + io_object.o io_thread.o ip.o ipc_address.o ipc_connecter.o ipc_listener.o kqueue.o lb.o \ + mailbox.o msg.o mtrie.o object.o options.o own.o pair.o pgm_receiver.o pgm_sender.o \ + pgm_socket.o pipe.o poll.o poller_base.o precompiled.o proxy.o pub.o pull.o push.o \ + random.o reaper.o rep.o req.o router.o select.o session_base.o \ + signaler.o socket_base.o stream_engine.o sub.o tcp.o tcp_address.o tcp_connecter.o tcp_listener.o \ + thread.o trie.o v1_decoder.o v1_encoder.o xpub.o xsub.o zmq.o zmq_utils.o + +%.o: ../../src/%.cpp + $(CC) -c -o $@ $< $(CFLAGS) + +%.o: ../../perf/%.cpp + $(CC) -c -o $@ $< $(CFLAGS) + +all: libzmq.dll + +perf: inproc_lat.exe inproc_thr.exe local_lat.exe local_thr.exe remote_lat.exe remote_thr.exe + +libzmq.dll: $(OBJS) + g++ -shared -o $@ $^ -Wl,--out-implib,$@.a $(LIBS) + +%.exe: %.o libzmq.dll + g++ -o $@ $^ + +clean: + del *.o *.a *.dll *.exe + diff --git a/builds/mingw32/platform.hpp b/builds/mingw32/platform.hpp new file mode 100644 index 00000000..4af872cd --- /dev/null +++ b/builds/mingw32/platform.hpp @@ -0,0 +1,32 @@ +/* + Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#ifndef __ZMQ_PLATFORM_HPP_INCLUDED__ +#define __ZMQ_PLATFORM_HPP_INCLUDED__ + +// This is the platform definition for the MSVC platform. +// As a first step of the build process it is copied to +// zmq directory to take place of platform.hpp generated from +// platform.hpp.in on platforms supported by GNU autotools. +// Place any MSVC-specific definitions here. + +#define ZMQ_HAVE_WINDOWS + +#endif