From 2742c4afd416cb206976fbc2d1a1e81117252ea7 Mon Sep 17 00:00:00 2001 From: Richard Newton Date: Wed, 6 Nov 2013 15:19:04 +0000 Subject: [PATCH 1/4] Fix race condition on shutdown --- src/ctx.cpp | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/ctx.cpp b/src/ctx.cpp index 552ffec6..972dd53c 100644 --- a/src/ctx.cpp +++ b/src/ctx.cpp @@ -91,7 +91,6 @@ int zmq::ctx_t::terminate () // restarted. bool restarted = terminating; terminating = true; - slot_sync.unlock (); // First attempt to terminate the context. if (!restarted) { @@ -99,13 +98,12 @@ int zmq::ctx_t::terminate () // First send stop command to sockets so that any blocking calls // can be interrupted. If there are no sockets we can ask reaper // thread to stop. - slot_sync.lock (); for (sockets_t::size_type i = 0; i != sockets.size (); i++) sockets [i]->stop (); if (sockets.empty ()) reaper->stop (); - slot_sync.unlock (); } + slot_sync.unlock(); // Wait till reaper thread closes all the sockets. command_t cmd; From aca1fce1cc008a6cb2177dbbaf6e112366bbba32 Mon Sep 17 00:00:00 2001 From: Pieter Hintjens Date: Wed, 6 Nov 2013 20:18:33 +0100 Subject: [PATCH 2/4] Backported LIBZMQ-39 fix --- NEWS | 2 ++ 1 file changed, 2 insertions(+) diff --git a/NEWS b/NEWS index 7ecf28a4..0729934e 100644 --- a/NEWS +++ b/NEWS @@ -1,6 +1,8 @@ 0MQ version 3.2.5 stable, released on 2014/xx/xx ================================================ +* LIBZMQ-39 Assertion failure "Bad file descriptor" during shutdown (rare) + 0MQ version 3.2.4 stable, released on 2013/09/20 ================================================ From b5312d38cc5251c3962686ff3bfbea4dfc75b4cc Mon Sep 17 00:00:00 2001 From: Pieter Hintjens Date: Wed, 6 Nov 2013 20:21:55 +0100 Subject: [PATCH 3/4] Revert "zmq: add TIPC transport tests" This reverts commit d0aa05fac49ef3640a99c67d323bb3d062f6c355. --- AUTHORS | 1 - configure.in | 2 +- tests/Makefile.am | 22 --- tests/test_connect_delay_tipc.cpp | 238 --------------------------- tests/test_pair_tipc.cpp | 54 ------ tests/test_reqrep_device_tipc.cpp | 143 ---------------- tests/test_reqrep_tipc.cpp | 54 ------ tests/test_router_mandatory_tipc.cpp | 62 ------- tests/test_shutdown_stress_tipc.cpp | 93 ----------- tests/test_sub_forward_tipc.cpp | 99 ----------- tests/test_term_endpoint_tipc.cpp | 119 -------------- 11 files changed, 1 insertion(+), 886 deletions(-) delete mode 100644 tests/test_connect_delay_tipc.cpp delete mode 100644 tests/test_pair_tipc.cpp delete mode 100644 tests/test_reqrep_device_tipc.cpp delete mode 100644 tests/test_reqrep_tipc.cpp delete mode 100644 tests/test_router_mandatory_tipc.cpp delete mode 100644 tests/test_shutdown_stress_tipc.cpp delete mode 100644 tests/test_sub_forward_tipc.cpp delete mode 100644 tests/test_term_endpoint_tipc.cpp diff --git a/AUTHORS b/AUTHORS index 9b4d4c07..97da6a02 100644 --- a/AUTHORS +++ b/AUTHORS @@ -26,7 +26,6 @@ Dirk O. Kaar Douglas Creager Erich Heine Erik Rigtorp -Erik Hugne Fabien Ninoles Frank Denis George Neill diff --git a/configure.in b/configure.in index b3486acb..1de6778b 100644 --- a/configure.in +++ b/configure.in @@ -93,7 +93,7 @@ case "${host_os}" in CPPFLAGS="-D_GNU_SOURCE $CPPFLAGS" fi AC_DEFINE(ZMQ_HAVE_LINUX, 1, [Have Linux OS]) - AM_CONDITIONAL(ON_LINUX, true) + case "${host_os}" in *android*) AC_DEFINE(ZMQ_HAVE_ANDROID, 1, [Have Android OS]) diff --git a/tests/Makefile.am b/tests/Makefile.am index b2877b87..e4b8fe70 100644 --- a/tests/Makefile.am +++ b/tests/Makefile.am @@ -28,17 +28,6 @@ noinst_PROGRAMS += test_shutdown_stress \ test_timeo endif -if ON_LINUX -noinst_PROGRAMS += test_connect_delay_tipc \ - test_pair_tipc \ - test_reqrep_device_tipc \ - test_reqrep_tipc \ - test_router_mandatory_tipc \ - test_shutdown_stress_tipc \ - test_sub_forward_tipc \ - test_term_endpoint_tipc -endif - test_pair_inproc_SOURCES = test_pair_inproc.cpp testutil.hpp test_pair_tcp_SOURCES = test_pair_tcp.cpp testutil.hpp test_reqrep_inproc_SOURCES = test_reqrep_inproc.cpp testutil.hpp @@ -56,17 +45,6 @@ test_monitor_SOURCES = test_monitor.cpp test_disconnect_inproc_SOURCES = test_disconnect_inproc.cpp test_router_mandatory_SOURCES = test_router_mandatory.cpp -if ON_LINUX -test_connect_delay_tipc_SOURCES = test_connect_delay_tipc.cpp -test_pair_tipc_SOURCES = test_pair_tipc.cpp -test_reqrep_device_tipc_SOURCES = test_reqrep_device_tipc.cpp -test_reqrep_tipc_SOURCES = test_reqrep_tipc.cpp -test_router_mandatory_tipc_SOURCES = test_router_mandatory_tipc.cpp -test_shutdown_stress_tipc_SOURCES = test_shutdown_stress_tipc.cpp -test_sub_forward_tipc_SOURCES = test_sub_forward_tipc.cpp -test_term_endpoint_tipc_SOURCES = test_term_endpoint_tipc.cpp -endif - if !ON_MINGW test_shutdown_stress_SOURCES = test_shutdown_stress.cpp test_pair_ipc_SOURCES = test_pair_ipc.cpp testutil.hpp diff --git a/tests/test_connect_delay_tipc.cpp b/tests/test_connect_delay_tipc.cpp deleted file mode 100644 index 4093ef33..00000000 --- a/tests/test_connect_delay_tipc.cpp +++ /dev/null @@ -1,238 +0,0 @@ -/* - Copyright (c) 2007-2013 Contributors as noted in the AUTHORS file - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the GNU Lesser General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU Lesser General Public License for more details. - - You should have received a copy of the GNU Lesser General Public License - along with this program. If not, see . -*/ - -#include "../include/zmq.h" -#include -#include -#include -#include -#include - -#undef NDEBUG -#include - -int main (void) -{ - int val; - int rc; - char buffer[16]; - // TEST 1. - // First we're going to attempt to send messages to two - // pipes, one connected, the other not. We should see - // the PUSH load balancing to both pipes, and hence half - // of the messages getting queued, as connect() creates a - // pipe immediately. - - void *context = zmq_ctx_new(); - assert (context); - void *to = zmq_socket(context, ZMQ_PULL); - assert (to); - - // Bind the one valid receiver - val = 0; - rc = zmq_setsockopt(to, ZMQ_LINGER, &val, sizeof(val)); - assert (rc == 0); - rc = zmq_bind (to, "tipc://{6555,0,0}"); - assert (rc == 0); - - // Create a socket pushing to two endpoints - only 1 message should arrive. - void *from = zmq_socket (context, ZMQ_PUSH); - assert(from); - - val = 0; - zmq_setsockopt (from, ZMQ_LINGER, &val, sizeof (val)); - // This pipe will not connect - rc = zmq_connect (from, "tipc://{5556,0}"); - assert (rc == 0); - // This pipe will - rc = zmq_connect (from, "tipc://{6555,0}"); - assert (rc == 0); - - // We send 10 messages, 5 should just get stuck in the queue - // for the not-yet-connected pipe - for (int i = 0; i < 10; ++i) { - rc = zmq_send (from, "Hello", 5, 0); - assert (rc == 5); - } - - // We now consume from the connected pipe - // - we should see just 5 - int timeout = 100; - rc = zmq_setsockopt (to, ZMQ_RCVTIMEO, &timeout, sizeof (int)); - assert (rc == 0); - - int seen = 0; - while (true) { - rc = zmq_recv (to, &buffer, sizeof (buffer), 0); - if (rc == -1) - break; // Break when we didn't get a message - seen++; - } - assert (seen == 5); - - rc = zmq_close (from); - assert (rc == 0); - - rc = zmq_close (to); - assert (rc == 0); - - rc = zmq_term (context); - assert (rc == 0); - - // TEST 2 - // This time we will do the same thing, connect two pipes, - // one of which will succeed in connecting to a bound - // receiver, the other of which will fail. However, we will - // also set the delay attach on connect flag, which should - // cause the pipe attachment to be delayed until the connection - // succeeds. - context = zmq_ctx_new(); - - // Bind the valid socket - to = zmq_socket (context, ZMQ_PULL); - assert (to); - rc = zmq_bind (to, "tipc://{5560,0,0}"); - assert (rc == 0); - - val = 0; - rc = zmq_setsockopt (to, ZMQ_LINGER, &val, sizeof(val)); - assert (rc == 0); - - // Create a socket pushing to two endpoints - all messages should arrive. - from = zmq_socket (context, ZMQ_PUSH); - assert (from); - - val = 0; - rc = zmq_setsockopt (from, ZMQ_LINGER, &val, sizeof(val)); - assert (rc == 0); - - // Set the key flag - val = 1; - rc = zmq_setsockopt (from, ZMQ_DELAY_ATTACH_ON_CONNECT, &val, sizeof(val)); - assert (rc == 0); - - // Connect to the invalid socket - rc = zmq_connect (from, "tipc://{5561,0}"); - assert (rc == 0); - // Connect to the valid socket - rc = zmq_connect (from, "tipc://{5560,0}"); - assert (rc == 0); - - // Send 10 messages, all should be routed to the connected pipe - for (int i = 0; i < 10; ++i) { - rc = zmq_send (from, "Hello", 5, 0); - assert (rc == 5); - } - rc = zmq_setsockopt (to, ZMQ_RCVTIMEO, &timeout, sizeof (int)); - assert (rc == 0); - - seen = 0; - while (true) { - rc = zmq_recv (to, &buffer, sizeof (buffer), 0); - if (rc == -1) - break; // Break when we didn't get a message - seen++; - } - assert (seen == 10); - - rc = zmq_close (from); - assert (rc == 0); - - rc = zmq_close (to); - assert (rc == 0); - - rc = zmq_term (context); - assert (rc == 0); - - // TEST 3 - // This time we want to validate that the same blocking behaviour - // occurs with an existing connection that is broken. We will send - // messages to a connected pipe, disconnect and verify the messages - // block. Then we reconnect and verify messages flow again. - context = zmq_ctx_new (); - - void *backend = zmq_socket (context, ZMQ_DEALER); - assert (backend); - void *frontend = zmq_socket (context, ZMQ_DEALER); - assert (frontend); - int zero = 0; - rc = zmq_setsockopt (backend, ZMQ_LINGER, &zero, sizeof (zero)); - assert (rc == 0); - rc = zmq_setsockopt (frontend, ZMQ_LINGER, &zero, sizeof (zero)); - assert (rc == 0); - - // Frontend connects to backend using DELAY_ATTACH_ON_CONNECT - int on = 1; - rc = zmq_setsockopt (frontend, ZMQ_DELAY_ATTACH_ON_CONNECT, &on, sizeof (on)); - assert (rc == 0); - rc = zmq_bind (backend, "tipc://{5560,0,0}"); - assert (rc == 0); - rc = zmq_connect (frontend, "tipc://{5560,0}"); - assert (rc == 0); - - // Ping backend to frontend so we know when the connection is up - rc = zmq_send (backend, "Hello", 5, 0); - assert (rc == 5); - rc = zmq_recv (frontend, buffer, 255, 0); - assert (rc == 5); - - // Send message from frontend to backend - rc = zmq_send (frontend, "Hello", 5, ZMQ_DONTWAIT); - assert (rc == 5); - - rc = zmq_close (backend); - assert (rc == 0); - - // Give time to process disconnect - // There's no way to do this except with a sleep - struct timespec t = { 0, 250 * 1000000 }; - nanosleep (&t, NULL); - - // Send a message, should fail - rc = zmq_send (frontend, "Hello", 5, ZMQ_DONTWAIT); - assert (rc == -1); - - // Recreate backend socket - backend = zmq_socket (context, ZMQ_DEALER); - assert (backend); - rc = zmq_setsockopt (backend, ZMQ_LINGER, &zero, sizeof (zero)); - assert (rc == 0); - rc = zmq_bind (backend, "tipc://{5560,0,0}"); - assert (rc == 0); - - // Ping backend to frontend so we know when the connection is up - rc = zmq_send (backend, "Hello", 5, 0); - assert (rc == 5); - rc = zmq_recv (frontend, buffer, 255, 0); - assert (rc == 5); - - // After the reconnect, should succeed - rc = zmq_send (frontend, "Hello", 5, ZMQ_DONTWAIT); - assert (rc == 5); - - rc = zmq_close (backend); - assert (rc == 0); - - rc = zmq_close (frontend); - assert (rc == 0); - - rc = zmq_term (context); - assert (rc == 0); -} - diff --git a/tests/test_pair_tipc.cpp b/tests/test_pair_tipc.cpp deleted file mode 100644 index 9cca6394..00000000 --- a/tests/test_pair_tipc.cpp +++ /dev/null @@ -1,54 +0,0 @@ -/* - Copyright (c) 2010-2011 250bpm s.r.o. - Copyright (c) 2011 iMatix Corporation - Copyright (c) 2010-2011 Other contributors as noted in the AUTHORS file - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the GNU Lesser General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU Lesser General Public License for more details. - - You should have received a copy of the GNU Lesser General Public License - along with this program. If not, see . -*/ - -#include -#include "testutil.hpp" - -int main (void) -{ - fprintf (stderr, "test_pair_tipc running...\n"); - - void *ctx = zmq_init (1); - assert (ctx); - - void *sb = zmq_socket (ctx, ZMQ_PAIR); - assert (sb); - int rc = zmq_bind (sb, "tipc://{5560,0,0}"); - assert (rc == 0); - - void *sc = zmq_socket (ctx, ZMQ_PAIR); - assert (sc); - rc = zmq_connect (sc, "tipc://{5560,0}"); - assert (rc == 0); - - bounce (sb, sc); - - rc = zmq_close (sc); - assert (rc == 0); - - rc = zmq_close (sb); - assert (rc == 0); - - rc = zmq_term (ctx); - assert (rc == 0); - - return 0 ; -} diff --git a/tests/test_reqrep_device_tipc.cpp b/tests/test_reqrep_device_tipc.cpp deleted file mode 100644 index ffc6ff9e..00000000 --- a/tests/test_reqrep_device_tipc.cpp +++ /dev/null @@ -1,143 +0,0 @@ -/* - Copyright (c) 2010-2011 250bpm s.r.o. - Copyright (c) 2011 VMware, Inc. - Copyright (c) 2010-2011 Other contributors as noted in the AUTHORS file - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the GNU Lesser General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU Lesser General Public License for more details. - - You should have received a copy of the GNU Lesser General Public License - along with this program. If not, see . -*/ - -#include "../include/zmq.h" -#include -#include - -#undef NDEBUG -#include - -int main (void) -{ - fprintf (stderr, "test_reqrep_device_tipc running...\n"); - - void *ctx = zmq_init (1); - assert (ctx); - - // Create a req/rep device. - void *dealer = zmq_socket (ctx, ZMQ_DEALER); - assert (dealer); - int rc = zmq_bind (dealer, "tipc://{5560,0,0}"); - assert (rc == 0); - void *router = zmq_socket (ctx, ZMQ_ROUTER); - assert (router); - rc = zmq_bind (router, "tipc://{5561,0,0}"); - assert (rc == 0); - - // Create a worker. - void *rep = zmq_socket (ctx, ZMQ_REP); - assert (rep); - rc = zmq_connect (rep, "tipc://{5560,0}"); - assert (rc == 0); - - // Create a client. - void *req = zmq_socket (ctx, ZMQ_REQ); - assert (req); - rc = zmq_connect (req, "tipc://{5561,0}"); - assert (rc == 0); - - // Send a request. - rc = zmq_send (req, "ABC", 3, ZMQ_SNDMORE); - assert (rc == 3); - rc = zmq_send (req, "DEF", 3, 0); - assert (rc == 3); - - // Pass the request through the device. - for (int i = 0; i != 4; i++) { - zmq_msg_t msg; - rc = zmq_msg_init (&msg); - assert (rc == 0); - rc = zmq_recvmsg (router, &msg, 0); - assert (rc >= 0); - int rcvmore; - size_t sz = sizeof (rcvmore); - rc = zmq_getsockopt (router, ZMQ_RCVMORE, &rcvmore, &sz); - assert (rc == 0); - rc = zmq_sendmsg (dealer, &msg, rcvmore ? ZMQ_SNDMORE : 0); - assert (rc >= 0); - } - - // Receive the request. - char buff [3]; - rc = zmq_recv (rep, buff, 3, 0); - assert (rc == 3); - assert (memcmp (buff, "ABC", 3) == 0); - int rcvmore; - size_t sz = sizeof (rcvmore); - rc = zmq_getsockopt (rep, ZMQ_RCVMORE, &rcvmore, &sz); - assert (rc == 0); - assert (rcvmore); - rc = zmq_recv (rep, buff, 3, 0); - assert (rc == 3); - assert (memcmp (buff, "DEF", 3) == 0); - rc = zmq_getsockopt (rep, ZMQ_RCVMORE, &rcvmore, &sz); - assert (rc == 0); - assert (!rcvmore); - - // Send the reply. - rc = zmq_send (rep, "GHI", 3, ZMQ_SNDMORE); - assert (rc == 3); - rc = zmq_send (rep, "JKL", 3, 0); - assert (rc == 3); - - // Pass the reply through the device. - for (int i = 0; i != 4; i++) { - zmq_msg_t msg; - rc = zmq_msg_init (&msg); - assert (rc == 0); - rc = zmq_recvmsg (dealer, &msg, 0); - assert (rc >= 0); - int rcvmore; - rc = zmq_getsockopt (dealer, ZMQ_RCVMORE, &rcvmore, &sz); - assert (rc == 0); - rc = zmq_sendmsg (router, &msg, rcvmore ? ZMQ_SNDMORE : 0); - assert (rc >= 0); - } - - // Receive the reply. - rc = zmq_recv (req, buff, 3, 0); - assert (rc == 3); - assert (memcmp (buff, "GHI", 3) == 0); - rc = zmq_getsockopt (req, ZMQ_RCVMORE, &rcvmore, &sz); - assert (rc == 0); - assert (rcvmore); - rc = zmq_recv (req, buff, 3, 0); - assert (rc == 3); - assert (memcmp (buff, "JKL", 3) == 0); - rc = zmq_getsockopt (req, ZMQ_RCVMORE, &rcvmore, &sz); - assert (rc == 0); - assert (!rcvmore); - - // Clean up. - rc = zmq_close (req); - assert (rc == 0); - rc = zmq_close (rep); - assert (rc == 0); - rc = zmq_close (router); - assert (rc == 0); - rc = zmq_close (dealer); - assert (rc == 0); - rc = zmq_term (ctx); - assert (rc == 0); - - return 0 ; -} diff --git a/tests/test_reqrep_tipc.cpp b/tests/test_reqrep_tipc.cpp deleted file mode 100644 index fabe19f3..00000000 --- a/tests/test_reqrep_tipc.cpp +++ /dev/null @@ -1,54 +0,0 @@ -/* - Copyright (c) 2010-2011 250bpm s.r.o. - Copyright (c) 2011 iMatix Corporation - Copyright (c) 2010-2011 Other contributors as noted in the AUTHORS file - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the GNU Lesser General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU Lesser General Public License for more details. - - You should have received a copy of the GNU Lesser General Public License - along with this program. If not, see . -*/ - -#include -#include "testutil.hpp" - -int main (void) -{ - fprintf (stderr, "test_reqrep_tipc running...\n"); - - void *ctx = zmq_init (1); - assert (ctx); - - void *sb = zmq_socket (ctx, ZMQ_REP); - assert (sb); - int rc = zmq_bind (sb, "tipc://{5560,0,0}"); - assert (rc == 0); - - void *sc = zmq_socket (ctx, ZMQ_REQ); - assert (sc); - rc = zmq_connect (sc, "tipc://{5560,0}"); - assert (rc == 0); - - bounce (sb, sc); - - rc = zmq_close (sc); - assert (rc == 0); - - rc = zmq_close (sb); - assert (rc == 0); - - rc = zmq_term (ctx); - assert (rc == 0); - - return 0 ; -} diff --git a/tests/test_router_mandatory_tipc.cpp b/tests/test_router_mandatory_tipc.cpp deleted file mode 100644 index 0e972bbe..00000000 --- a/tests/test_router_mandatory_tipc.cpp +++ /dev/null @@ -1,62 +0,0 @@ -/* - Copyright (c) 2010-2011 250bpm s.r.o. - Copyright (c) 2011 iMatix Corporation - Copyright (c) 2010-2011 Other contributors as noted in the AUTHORS file - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the GNU Lesser General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU Lesser General Public License for more details. - - You should have received a copy of the GNU Lesser General Public License - along with this program. If not, see . -*/ - -#include -#include "testutil.hpp" - -int main (void) -{ - fprintf (stderr, "test_router_mandatory_tipc running...\n"); - - void *ctx = zmq_init (1); - assert (ctx); - - // Creating the first socket. - void *sa = zmq_socket (ctx, ZMQ_ROUTER); - assert (sa); - - int rc = zmq_bind (sa, "tipc://{15560,0,0}"); - assert (rc == 0); - - // Sending a message to an unknown peer with the default setting - rc = zmq_send (sa, "UNKNOWN", 7, ZMQ_SNDMORE); - assert (rc == 7); - rc = zmq_send (sa, "DATA", 4, 0); - assert (rc == 4); - - int mandatory = 1; - - // Set mandatory routing on socket - rc = zmq_setsockopt (sa, ZMQ_ROUTER_MANDATORY, &mandatory, sizeof (mandatory)); - assert (rc == 0); - - // Send a message and check that it fails - rc = zmq_send (sa, "UNKNOWN", 7, ZMQ_SNDMORE | ZMQ_DONTWAIT); - assert (rc == -1 && errno == EHOSTUNREACH); - - rc = zmq_close (sa); - assert (rc == 0); - - rc = zmq_term (ctx); - assert (rc == 0); - - return 0 ; -} diff --git a/tests/test_shutdown_stress_tipc.cpp b/tests/test_shutdown_stress_tipc.cpp deleted file mode 100644 index 51a5b988..00000000 --- a/tests/test_shutdown_stress_tipc.cpp +++ /dev/null @@ -1,93 +0,0 @@ -/* - Copyright (c) 2010-2011 250bpm s.r.o. - Copyright (c) 2011 iMatix Corporation - Copyright (c) 2010-2011 Other contributors as noted in the AUTHORS file - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the GNU Lesser General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU Lesser General Public License for more details. - - You should have received a copy of the GNU Lesser General Public License - along with this program. If not, see . -*/ - -#include "../include/zmq.h" -#include -#include -#include - -#undef NDEBUG -#include - -#define THREAD_COUNT 100 - -extern "C" -{ - static void *worker (void *s) - { - int rc; - - rc = zmq_connect (s, "tipc://{5560,0}"); - assert (rc == 0); - - // Start closing the socket while the connecting process is underway. - rc = zmq_close (s); - assert (rc == 0); - - return NULL; - } -} - -int main (void) -{ - void *ctx; - void *s1; - void *s2; - int i; - int j; - int rc; - pthread_t threads [THREAD_COUNT]; - - fprintf (stderr, "test_shutdown_stress_tipc running...\n"); - - for (j = 0; j != 10; j++) { - - // Check the shutdown with many parallel I/O threads. - ctx = zmq_init (7); - assert (ctx); - - s1 = zmq_socket (ctx, ZMQ_PUB); - assert (s1); - - rc = zmq_bind (s1, "tipc://{5560,0,0}"); - assert (rc == 0); - - for (i = 0; i != THREAD_COUNT; i++) { - s2 = zmq_socket (ctx, ZMQ_SUB); - assert (s2); - rc = pthread_create (&threads [i], NULL, worker, s2); - assert (rc == 0); - } - - for (i = 0; i != THREAD_COUNT; i++) { - rc = pthread_join (threads [i], NULL); - assert (rc == 0); - } - - rc = zmq_close (s1); - assert (rc == 0); - - rc = zmq_term (ctx); - assert (rc == 0); - } - - return 0; -} diff --git a/tests/test_sub_forward_tipc.cpp b/tests/test_sub_forward_tipc.cpp deleted file mode 100644 index 1dda2f3a..00000000 --- a/tests/test_sub_forward_tipc.cpp +++ /dev/null @@ -1,99 +0,0 @@ -/* - Copyright (c) 2010-2011 250bpm s.r.o. - Copyright (c) 2011 iMatix Corporation - Copyright (c) 2010-2011 Other contributors as noted in the AUTHORS file - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the GNU Lesser General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU Lesser General Public License for more details. - - You should have received a copy of the GNU Lesser General Public License - along with this program. If not, see . -*/ - -#include "../include/zmq.h" -#include "../include/zmq_utils.h" -#include - -#undef NDEBUG -#include - -int main (void) -{ - fprintf (stderr, "test_sub_forward running...\n"); - - void *ctx = zmq_init (1); - assert (ctx); - - // First, create an intermediate device. - void *xpub = zmq_socket (ctx, ZMQ_XPUB); - assert (xpub); - int rc = zmq_bind (xpub, "tipc://{5560,0,0}"); - assert (rc == 0); - void *xsub = zmq_socket (ctx, ZMQ_XSUB); - assert (xsub); - rc = zmq_bind (xsub, "tipc://{5561,0,0}"); - assert (rc == 0); - - // Create a publisher. - void *pub = zmq_socket (ctx, ZMQ_PUB); - assert (pub); - rc = zmq_connect (pub, "tipc://{5561,0}"); - assert (rc == 0); - - // Create a subscriber. - void *sub = zmq_socket (ctx, ZMQ_SUB); - assert (sub); - rc = zmq_connect (sub, "tipc://{5560,0}"); - assert (rc == 0); - - // Subscribe for all messages. - rc = zmq_setsockopt (sub, ZMQ_SUBSCRIBE, "", 0); - assert (rc == 0); - - // Pass the subscription upstream through the device. - char buff [32]; - rc = zmq_recv (xpub, buff, sizeof (buff), 0); - assert (rc >= 0); - rc = zmq_send (xsub, buff, rc, 0); - assert (rc >= 0); - - // Wait a bit till the subscription gets to the publisher. - zmq_sleep (1); - - // Send an empty message. - rc = zmq_send (pub, NULL, 0, 0); - assert (rc == 0); - - // Pass the message downstream through the device. - rc = zmq_recv (xsub, buff, sizeof (buff), 0); - assert (rc >= 0); - rc = zmq_send (xpub, buff, rc, 0); - assert (rc >= 0); - - // Receive the message in the subscriber. - rc = zmq_recv (sub, buff, sizeof (buff), 0); - assert (rc == 0); - - // Clean up. - rc = zmq_close (xpub); - assert (rc == 0); - rc = zmq_close (xsub); - assert (rc == 0); - rc = zmq_close (pub); - assert (rc == 0); - rc = zmq_close (sub); - assert (rc == 0); - rc = zmq_term (ctx); - assert (rc == 0); - - return 0 ; -} diff --git a/tests/test_term_endpoint_tipc.cpp b/tests/test_term_endpoint_tipc.cpp deleted file mode 100644 index c338ade4..00000000 --- a/tests/test_term_endpoint_tipc.cpp +++ /dev/null @@ -1,119 +0,0 @@ -/* - Copyright (c) 2010-2011 250bpm s.r.o. - Copyright (c) 2011 iMatix Corporation - Copyright (c) 2010-2011 Other contributors as noted in the AUTHORS file - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the GNU Lesser General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU Lesser General Public License for more details. - - You should have received a copy of the GNU Lesser General Public License - along with this program. If not, see . -*/ - -#include "../include/zmq.h" -#include "../include/zmq_utils.h" -#include -#include - -#undef NDEBUG -#include - -int main (void) -{ - int rc; - char buf[32]; - const char *ep = "tipc://{5560,0,0}"; - const char *name = "tipc://{5560,0}"; - - fprintf (stderr, "unbind endpoint test running...\n"); - - // Create infrastructure. - void *ctx = zmq_init (1); - assert (ctx); - void *push = zmq_socket (ctx, ZMQ_PUSH); - assert (push); - rc = zmq_bind (push, ep); - assert (rc == 0); - void *pull = zmq_socket (ctx, ZMQ_PULL); - assert (pull); - rc = zmq_connect (pull, name); - assert (rc == 0); - - // Pass one message through to ensure the connection is established. - rc = zmq_send (push, "ABC", 3, 0); - assert (rc == 3); - rc = zmq_recv (pull, buf, sizeof (buf), 0); - assert (rc == 3); - - // Unbind the lisnening endpoint - rc = zmq_unbind (push, ep); - assert (rc == 0); - - // Let events some time - zmq_sleep (1); - - // Check that sending would block (there's no outbound connection). - rc = zmq_send (push, "ABC", 3, ZMQ_DONTWAIT); - assert (rc == -1 && zmq_errno () == EAGAIN); - - // Clean up. - rc = zmq_close (pull); - assert (rc == 0); - rc = zmq_close (push); - assert (rc == 0); - rc = zmq_term (ctx); - assert (rc == 0); - - - // Now the other way round. - fprintf (stderr, "disconnect endpoint test running...\n"); - - - // Create infrastructure. - ctx = zmq_init (1); - assert (ctx); - push = zmq_socket (ctx, ZMQ_PUSH); - assert (push); - rc = zmq_connect (push, name); - assert (rc == 0); - pull = zmq_socket (ctx, ZMQ_PULL); - assert (pull); - rc = zmq_bind (pull, ep); - assert (rc == 0); - - // Pass one message through to ensure the connection is established. - rc = zmq_send (push, "ABC", 3, 0); - assert (rc == 3); - rc = zmq_recv (pull, buf, sizeof (buf), 0); - assert (rc == 3); - - // Disconnect the bound endpoint - rc = zmq_disconnect (push, name); - assert (rc == 0); - - // Let events some time - zmq_sleep (1); - - // Check that sending would block (there's no inbound connections). - rc = zmq_send (push, "ABC", 3, ZMQ_DONTWAIT); - assert (rc == -1 && zmq_errno () == EAGAIN); - - // Clean up. - rc = zmq_close (pull); - assert (rc == 0); - rc = zmq_close (push); - assert (rc == 0); - rc = zmq_term (ctx); - assert (rc == 0); - - return 0; -} From 84f00c06d62338e78fb1e24d12ed78945ca08ad9 Mon Sep 17 00:00:00 2001 From: Pieter Hintjens Date: Wed, 6 Nov 2013 20:22:06 +0100 Subject: [PATCH 4/4] Revert "zmq: add support for TIPC transport" This reverts commit dbc42ee9af098183714845a75139997dcc34c7a0. --- src/Makefile.am | 8 +- src/address.cpp | 17 --- src/address.hpp | 6 - src/session_base.cpp | 11 -- src/socket_base.cpp | 43 +------ src/tipc_address.cpp | 105 ----------------- src/tipc_address.hpp | 67 ----------- src/tipc_connecter.cpp | 257 ----------------------------------------- src/tipc_connecter.hpp | 127 -------------------- src/tipc_listener.cpp | 178 ---------------------------- src/tipc_listener.hpp | 96 --------------- 11 files changed, 3 insertions(+), 912 deletions(-) delete mode 100644 src/tipc_address.cpp delete mode 100644 src/tipc_address.hpp delete mode 100644 src/tipc_connecter.cpp delete mode 100644 src/tipc_connecter.hpp delete mode 100644 src/tipc_listener.cpp delete mode 100644 src/tipc_listener.hpp diff --git a/src/Makefile.am b/src/Makefile.am index 3df44a40..340c997b 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -146,13 +146,7 @@ libzmq_la_SOURCES = \ v1_protocol.hpp \ xsub.cpp \ zmq.cpp \ - zmq_utils.cpp \ - tipc_address.cpp \ - tipc_address.hpp \ - tipc_listener.cpp \ - tipc_listener.hpp \ - tipc_connecter.cpp\ - tipc_connecter.hpp + zmq_utils.cpp if ON_MINGW libzmq_la_LDFLAGS = -no-undefined -avoid-version -version-info @LTVER@ @LIBZMQ_EXTRA_LDFLAGS@ diff --git a/src/address.cpp b/src/address.cpp index a38c9870..6dc86dd1 100644 --- a/src/address.cpp +++ b/src/address.cpp @@ -18,12 +18,10 @@ along with this program. If not, see . */ -#include "platform.hpp" #include "address.hpp" #include "err.hpp" #include "tcp_address.hpp" #include "ipc_address.hpp" -#include "tipc_address.hpp" #include #include @@ -52,14 +50,6 @@ zmq::address_t::~address_t () } } #endif -#if defined ZMQ_HAVE_LINUX - else if (protocol == "tipc") { - if (resolved.tipc_addr) { - delete resolved.tipc_addr; - resolved.tipc_addr = 0; - } - } -#endif } int zmq::address_t::to_string (std::string &addr_) const @@ -76,13 +66,6 @@ int zmq::address_t::to_string (std::string &addr_) const } } #endif -#if defined ZMQ_HAVE_LINUX - else if (protocol == "tipc") { - if (resolved.tipc_addr) { - return resolved.tipc_addr->to_string(addr_); - } - } -#endif if (!protocol.empty () && !address.empty ()) { std::stringstream s; diff --git a/src/address.hpp b/src/address.hpp index 2156ccf3..a8a4765d 100644 --- a/src/address.hpp +++ b/src/address.hpp @@ -28,9 +28,6 @@ namespace zmq class tcp_address_t; #if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS class ipc_address_t; -#endif -#if defined ZMQ_HAVE_LINUX - class tipc_address_t; #endif struct address_t { address_t (const std::string &protocol_, const std::string &address_); @@ -45,9 +42,6 @@ namespace zmq tcp_address_t *tcp_addr; #if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS ipc_address_t *ipc_addr; -#endif -#if defined ZMQ_HAVE_LINUX - tipc_address_t *tipc_addr; #endif } resolved; diff --git a/src/session_base.cpp b/src/session_base.cpp index dc85e6e6..b2676570 100644 --- a/src/session_base.cpp +++ b/src/session_base.cpp @@ -27,7 +27,6 @@ #include "err.hpp" #include "pipe.hpp" #include "likely.hpp" -#include "tipc_connecter.hpp" #include "tcp_connecter.hpp" #include "ipc_connecter.hpp" #include "pgm_sender.hpp" @@ -457,16 +456,6 @@ void zmq::session_base_t::start_connecting (bool wait_) } #endif -#if defined ZMQ_HAVE_LINUX - if (addr->protocol == "tipc") { - tipc_connecter_t *connecter = new (std::nothrow) tipc_connecter_t ( - io_thread, this, options, addr, wait_); - alloc_assert (connecter); - launch_child(connecter); - return; - } -#endif - #if defined ZMQ_HAVE_OPENPGM // Both PGM and EPGM transports are using the same infrastructure. diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 4fea5448..7fe40089 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -55,8 +55,6 @@ #include "address.hpp" #include "ipc_address.hpp" #include "tcp_address.hpp" -#include "tipc_address.hpp" -#include "tipc_listener.hpp" #ifdef ZMQ_HAVE_OPENPGM #include "pgm_socket.hpp" #endif @@ -184,7 +182,7 @@ int zmq::socket_base_t::check_protocol (const std::string &protocol_) { // First check out whether the protcol is something we are aware of. if (protocol_ != "inproc" && protocol_ != "ipc" && protocol_ != "tcp" && - protocol_ != "pgm" && protocol_ != "epgm" && protocol_ != "tipc") { + protocol_ != "pgm" && protocol_ != "epgm") { errno = EPROTONOSUPPORT; return -1; } @@ -206,13 +204,7 @@ int zmq::socket_base_t::check_protocol (const std::string &protocol_) return -1; } #endif - // TIPC transport is only available on Linux. -#if !defined ZMQ_HAVE_LINUX - if (protocol_ = "tipc") { - errno = EPROTONOSUPPORT; - return -1; - } -#endif + // Check whether socket type and transport protocol match. // Specifically, multicast protocols can't be combined with // bi-directional messaging patterns (socket types). @@ -395,25 +387,6 @@ int zmq::socket_base_t::bind (const char *addr_) return 0; } #endif -#if defined ZMQ_HAVE_LINUX - if (protocol == "tipc") { - tipc_listener_t *listener = new (std::nothrow) tipc_listener_t ( - io_thread, this, options); - alloc_assert (listener); - int rc = listener->set_address (address.c_str ()); - if (rc != 0) { - delete listener; - event_bind_failed (address, zmq_errno()); - return -1; - } - - // Save last endpoint URI - listener->get_address (options.last_endpoint); - - add_endpoint (addr_, (own_t *) listener); - return 0; - } -#endif zmq_assert (false); return -1; @@ -544,18 +517,6 @@ int zmq::socket_base_t::connect (const char *addr_) } } #endif -#if defined ZMQ_HAVE_LINUX - else - if (protocol == "tipc") { - paddr->resolved.tipc_addr = new (std::nothrow) tipc_address_t (); - alloc_assert (paddr->resolved.tipc_addr); - int rc = paddr->resolved.tipc_addr->resolve (address.c_str()); - if (rc != 0) { - delete paddr; - return -1; - } - } -#endif #ifdef ZMQ_HAVE_OPENPGM if (protocol == "pgm" || protocol == "epgm") { struct pgm_addrinfo_t *res = NULL; diff --git a/src/tipc_address.cpp b/src/tipc_address.cpp deleted file mode 100644 index 555b1917..00000000 --- a/src/tipc_address.cpp +++ /dev/null @@ -1,105 +0,0 @@ -/* - Copyright (c) 2013 Ericsson AB - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the GNU Lesser General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU Lesser General Public License for more details. - - You should have received a copy of the GNU Lesser General Public License - along with this program. If not, see . -*/ - -#include "tipc_address.hpp" - -#if defined ZMQ_HAVE_LINUX - -#include "err.hpp" - -#include -#include - -zmq::tipc_address_t::tipc_address_t () -{ - memset (&address, 0, sizeof (address)); -} - -zmq::tipc_address_t::tipc_address_t (const sockaddr *sa, socklen_t sa_len) -{ - zmq_assert(sa && sa_len > 0); - - memset (&address, 0, sizeof (address)); - if (sa->sa_family == AF_TIPC) { - memcpy(&address, sa, sa_len); - } -} - -zmq::tipc_address_t::~tipc_address_t () -{ -} - -int zmq::tipc_address_t::resolve (const char *name) -{ - int res; - unsigned int type = 0; - unsigned int lower = 0; - unsigned int upper = 0; - - res = sscanf(name, "{%u,%u,%u}", &type, &lower, &upper); - if (res == 3) - goto nameseq; - else if (res == 2 && type > TIPC_RESERVED_TYPES) { - address.family = AF_TIPC; - address.addrtype = TIPC_ADDR_NAME; - address.addr.name.name.type = type; - address.addr.name.name.instance = lower; - address.scope = 0; - return 0; - } - else - return EINVAL; -nameseq: - if (type < TIPC_RESERVED_TYPES || upper < lower) - return EINVAL; - address.family = AF_TIPC; - address.addrtype = TIPC_ADDR_NAMESEQ; - address.addr.nameseq.type = type; - address.addr.nameseq.lower = lower; - address.addr.nameseq.upper = upper; - address.scope = TIPC_ZONE_SCOPE; - - return 0; -} - -int zmq::tipc_address_t::to_string (std::string &addr_) -{ - if (address.family != AF_TIPC) { - addr_.clear (); - return -1; - } - std::stringstream s; - s << "tipc://" << "{" << address.addr.nameseq.type; - s << ", " << address.addr.nameseq.lower; - s << ", " << address.addr.nameseq.upper << "}"; - addr_ = s.str (); - return 0; -} - -const sockaddr *zmq::tipc_address_t::addr () const -{ - return (sockaddr*) &address; -} - -socklen_t zmq::tipc_address_t::addrlen () const -{ - return (socklen_t) sizeof (address); -} - -#endif diff --git a/src/tipc_address.hpp b/src/tipc_address.hpp deleted file mode 100644 index d87dc43c..00000000 --- a/src/tipc_address.hpp +++ /dev/null @@ -1,67 +0,0 @@ -/* - Copyright (c) 2011 250bpm s.r.o. - Copyright (c) 2011 Other contributors as noted in the AUTHORS file - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the GNU Lesser General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU Lesser General Public License for more details. - - You should have received a copy of the GNU Lesser General Public License - along with this program. If not, see . -*/ - -#ifndef __ZMQ_TIPC_ADDRESS_HPP_INCLUDED__ -#define __ZMQ_TIPC_ADDRESS_HPP_INCLUDED__ - -#include - -#include "platform.hpp" - -#if defined ZMQ_HAVE_LINUX - -#include -#include - -namespace zmq -{ - - class tipc_address_t - { - public: - - tipc_address_t (); - tipc_address_t (const sockaddr *sa, socklen_t sa_len); - ~tipc_address_t (); - - // This function sets up the address "{type, lower, upper}" for TIPC transport - int resolve (const char* name); - - // The opposite to resolve() - int to_string (std::string &addr_); - - const sockaddr *addr () const; - socklen_t addrlen () const; - - private: - - struct sockaddr_tipc address; - - tipc_address_t (const tipc_address_t&); - const tipc_address_t &operator = (const tipc_address_t&); - }; - -} - -#endif - -#endif - - diff --git a/src/tipc_connecter.cpp b/src/tipc_connecter.cpp deleted file mode 100644 index 7c927b2f..00000000 --- a/src/tipc_connecter.cpp +++ /dev/null @@ -1,257 +0,0 @@ -/* - Copyright (c) 2013 Ericsson AB - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the GNU Lesser General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU Lesser General Public License for more details. - - You should have received a copy of the GNU Lesser General Public License - along with this program. If not, see . -*/ - -#include "tipc_connecter.hpp" - -#if defined ZMQ_HAVE_LINUX - -#include -#include - -#include "stream_engine.hpp" -#include "io_thread.hpp" -#include "platform.hpp" -#include "random.hpp" -#include "err.hpp" -#include "ip.hpp" -#include "address.hpp" -#include "tipc_address.hpp" -#include "session_base.hpp" - -#include -#include -#include - -zmq::tipc_connecter_t::tipc_connecter_t (class io_thread_t *io_thread_, - class session_base_t *session_, const options_t &options_, - const address_t *addr_, bool delayed_start_) : - own_t (io_thread_, options_), - io_object_t (io_thread_), - addr (addr_), - s (retired_fd), - handle_valid (false), - delayed_start (delayed_start_), - timer_started (false), - session (session_), - current_reconnect_ivl(options.reconnect_ivl) -{ - zmq_assert (addr); - zmq_assert (addr->protocol == "tipc"); - addr->to_string (endpoint); - socket = session-> get_socket(); -} - -zmq::tipc_connecter_t::~tipc_connecter_t () -{ - zmq_assert (!timer_started); - zmq_assert (!handle_valid); - zmq_assert (s == retired_fd); -} - -void zmq::tipc_connecter_t::process_plug () -{ - if (delayed_start) - add_reconnect_timer (); - else - start_connecting (); -} - -void zmq::tipc_connecter_t::process_term (int linger_) -{ - if (timer_started) { - cancel_timer (reconnect_timer_id); - timer_started = false; - } - - if (handle_valid) { - rm_fd (handle); - handle_valid = false; - } - - if (s != retired_fd) - close (); - - own_t::process_term (linger_); -} - -void zmq::tipc_connecter_t::in_event () -{ - // We are not polling for incomming data, so we are actually called - // because of error here. However, we can get error on out event as well - // on some platforms, so we'll simply handle both events in the same way. - out_event (); -} - -void zmq::tipc_connecter_t::out_event () -{ - fd_t fd = connect (); - rm_fd (handle); - handle_valid = false; - - // Handle the error condition by attempt to reconnect. - if (fd == retired_fd) { - close (); - add_reconnect_timer(); - return; - } - // Create the engine object for this connection. - stream_engine_t *engine = new (std::nothrow) stream_engine_t (fd, options, endpoint); - alloc_assert (engine); - - // Attach the engine to the corresponding session object. - send_attach (session, engine); - - // Shut the connecter down. - terminate (); - - socket->event_connected (endpoint, fd); -} - -void zmq::tipc_connecter_t::timer_event (int id_) -{ - zmq_assert (id_ == reconnect_timer_id); - timer_started = false; - start_connecting (); -} - -void zmq::tipc_connecter_t::start_connecting () -{ - // Open the connecting socket. - int rc = open (); - - // Connect may succeed in synchronous manner. - if (rc == 0) { - handle = add_fd (s); - handle_valid = true; - out_event (); - } - - // Connection establishment may be delayed. Poll for its completion. - else - if (rc == -1 && errno == EINPROGRESS) { - handle = add_fd (s); - handle_valid = true; - set_pollout (handle); - socket->event_connect_delayed (endpoint, zmq_errno()); - } - - // Handle any other error condition by eventual reconnect. - else { - if (s != retired_fd) - close (); - add_reconnect_timer (); - } -} - -void zmq::tipc_connecter_t::add_reconnect_timer() -{ - int rc_ivl = get_new_reconnect_ivl(); - add_timer (rc_ivl, reconnect_timer_id); - socket->event_connect_retried (endpoint, rc_ivl); - timer_started = true; -} - -int zmq::tipc_connecter_t::get_new_reconnect_ivl () -{ - // The new interval is the current interval + random value. - int this_interval = current_reconnect_ivl + - (generate_random () % options.reconnect_ivl); - - // Only change the current reconnect interval if the maximum reconnect - // interval was set and if it's larger than the reconnect interval. - if (options.reconnect_ivl_max > 0 && - options.reconnect_ivl_max > options.reconnect_ivl) { - - // Calculate the next interval - current_reconnect_ivl = current_reconnect_ivl * 2; - if(current_reconnect_ivl >= options.reconnect_ivl_max) { - current_reconnect_ivl = options.reconnect_ivl_max; - } - } - return this_interval; -} - -int zmq::tipc_connecter_t::open () -{ - zmq_assert (s == retired_fd); - - // Create the socket. - s = open_socket (AF_TIPC, SOCK_STREAM, 0); - if (s == -1) - return -1; - - // Set the non-blocking flag. - unblock_socket (s); - // Connect to the remote peer. - int rc = ::connect ( - s, addr->resolved.tipc_addr->addr (), - addr->resolved.tipc_addr->addrlen ()); - - // Connect was successfull immediately. - if (rc == 0) - return 0; - - // Translate other error codes indicating asynchronous connect has been - // launched to a uniform EINPROGRESS. - if (rc == -1 && errno == EINTR) { - errno = EINPROGRESS; - return -1; - } - // Forward the error. - return -1; -} - -void zmq::tipc_connecter_t::close () -{ - zmq_assert (s != retired_fd); - int rc = ::close (s); - errno_assert (rc == 0); - socket->event_closed (endpoint, s); - s = retired_fd; -} - -zmq::fd_t zmq::tipc_connecter_t::connect () -{ - // Following code should handle both Berkeley-derived socket - // implementations and Solaris. - int err = 0; - socklen_t len = sizeof (err); - - int rc = getsockopt (s, SOL_SOCKET, SO_ERROR, (char*) &err, &len); - if (rc == -1) - err = errno; - if (err != 0) { - - // Assert if the error was caused by 0MQ bug. - // Networking problems are OK. No need to assert. - errno = err; - errno_assert (errno == ECONNREFUSED || errno == ECONNRESET || - errno == ETIMEDOUT || errno == EHOSTUNREACH || - errno == ENETUNREACH || errno == ENETDOWN); - - return retired_fd; - } - - fd_t result = s; - s = retired_fd; - return result; -} - -#endif - diff --git a/src/tipc_connecter.hpp b/src/tipc_connecter.hpp deleted file mode 100644 index 5e4a3121..00000000 --- a/src/tipc_connecter.hpp +++ /dev/null @@ -1,127 +0,0 @@ -/* - Copyright (c) 2013 Ericsson AB - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the GNU Lesser General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU Lesser General Public License for more details. - - You should have received a copy of the GNU Lesser General Public License - along with this program. If not, see . -*/ - -#ifndef __TIPC_CONNECTER_HPP_INCLUDED__ -#define __TIPC_CONNECTER_HPP_INCLUDED__ - -#include "platform.hpp" - -#if defined ZMQ_HAVE_LINUX - -#include "fd.hpp" -#include "own.hpp" -#include "stdint.hpp" -#include "io_object.hpp" - -namespace zmq -{ - - class io_thread_t; - class session_base_t; - struct address_t; - - class tipc_connecter_t : public own_t, public io_object_t - { - public: - - // If 'delayed_start' is true connecter first waits for a while, - // then starts connection process. - tipc_connecter_t (zmq::io_thread_t *io_thread_, - zmq::session_base_t *session_, const options_t &options_, - const address_t *addr_, bool delayed_start_); - ~tipc_connecter_t (); - - private: - - // ID of the timer used to delay the reconnection. - enum {reconnect_timer_id = 1}; - - // Handlers for incoming commands. - void process_plug (); - void process_term (int linger_); - - // Handlers for I/O events. - void in_event (); - void out_event (); - void timer_event (int id_); - - // Internal function to start the actual connection establishment. - void start_connecting (); - - // Internal function to add a reconnect timer - void add_reconnect_timer(); - - // Internal function to return a reconnect backoff delay. - // Will modify the current_reconnect_ivl used for next call - // Returns the currently used interval - int get_new_reconnect_ivl (); - - // Open IPC connecting socket. Returns -1 in case of error, - // 0 if connect was successfull immediately. Returns -1 with - // EAGAIN errno if async connect was launched. - int open (); - - // Close the connecting socket. - void close (); - - // Get the file descriptor of newly created connection. Returns - // retired_fd if the connection was unsuccessfull. - fd_t connect (); - - // Address to connect to. Owned by session_base_t. - const address_t *addr; - - // Underlying socket. - fd_t s; - - // Handle corresponding to the listening socket. - handle_t handle; - - // If true file descriptor is registered with the poller and 'handle' - // contains valid value. - bool handle_valid; - - // If true, connecter is waiting a while before trying to connect. - const bool delayed_start; - - // True iff a timer has been started. - bool timer_started; - - // Reference to the session we belong to. - zmq::session_base_t *session; - - // Current reconnect ivl, updated for backoff strategy - int current_reconnect_ivl; - - // String representation of endpoint to connect to - std::string endpoint; - - // Socket - zmq::socket_base_t *socket; - - tipc_connecter_t (const tipc_connecter_t&); - const tipc_connecter_t &operator = (const tipc_connecter_t&); - }; - -} - -#endif - -#endif - diff --git a/src/tipc_listener.cpp b/src/tipc_listener.cpp deleted file mode 100644 index 0b1a1b09..00000000 --- a/src/tipc_listener.cpp +++ /dev/null @@ -1,178 +0,0 @@ -/* - Copyright (c) 2011 250bpm s.r.o. - Copyright (c) 2011 Other contributors as noted in the AUTHORS file - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the GNU Lesser General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU Lesser General Public License for more details. - - You should have received a copy of the GNU Lesser General Public License - along with this program. If not, see . -*/ - -#include "tipc_listener.hpp" - -#if defined ZMQ_HAVE_LINUX - -#include - -#include - -#include "stream_engine.hpp" -#include "tipc_address.hpp" -#include "io_thread.hpp" -#include "session_base.hpp" -#include "config.hpp" -#include "err.hpp" -#include "ip.hpp" -#include "socket_base.hpp" - -#include -#include -#include -#include - -zmq::tipc_listener_t::tipc_listener_t (io_thread_t *io_thread_, - socket_base_t *socket_, const options_t &options_) : - own_t (io_thread_, options_), - io_object_t (io_thread_), - s (retired_fd), - socket (socket_) -{ -} - -zmq::tipc_listener_t::~tipc_listener_t () -{ - zmq_assert (s == retired_fd); -} - -void zmq::tipc_listener_t::process_plug () -{ - // Start polling for incoming connections. - handle = add_fd (s); - set_pollin (handle); -} - -void zmq::tipc_listener_t::process_term (int linger_) -{ - rm_fd (handle); - close (); - own_t::process_term (linger_); -} - -void zmq::tipc_listener_t::in_event () -{ - fd_t fd = accept (); - - // If connection was reset by the peer in the meantime, just ignore it. - // TODO: Handle specific errors like ENFILE/EMFILE etc. - if (fd == retired_fd) { - socket->event_accept_failed (endpoint, zmq_errno()); - return; - } - - // Create the engine object for this connection. - stream_engine_t *engine = new (std::nothrow) stream_engine_t (fd, options, endpoint); - alloc_assert (engine); - - // Choose I/O thread to run connecter in. Given that we are already - // running in an I/O thread, there must be at least one available. - io_thread_t *io_thread = choose_io_thread (options.affinity); - zmq_assert (io_thread); - - // Create and launch a session object. - session_base_t *session = session_base_t::create (io_thread, false, socket, - options, NULL); - errno_assert (session); - session->inc_seqnum (); - launch_child (session); - send_attach (session, engine, false); - socket->event_accepted (endpoint, fd); -} - -int zmq::tipc_listener_t::get_address (std::string &addr_) -{ - struct sockaddr_storage ss; - socklen_t sl = sizeof (ss); - - int rc = getsockname (s, (sockaddr *) &ss, &sl); - if (rc != 0) { - addr_.clear (); - return rc; - } - - tipc_address_t addr ((struct sockaddr *) &ss, sl); - return addr.to_string (addr_); -} - -int zmq::tipc_listener_t::set_address (const char *addr_) -{ - //convert str to address struct - int rc = address.resolve(addr_); - if (rc != 0) - return -1; - // Create a listening socket. - s = open_socket (AF_TIPC, SOCK_STREAM, 0); - if (s == -1) - return -1; - - address.to_string (endpoint); - - // Bind the socket to tipc name. - rc = bind (s, address.addr (), address.addrlen ()); - if (rc != 0) - goto error; - - // Listen for incomming connections. - rc = listen (s, options.backlog); - if (rc != 0) - goto error; - - socket->event_listening (endpoint, s); - return 0; - -error: - int err = errno; - close (); - errno = err; - return -1; -} - -void zmq::tipc_listener_t::close () -{ - zmq_assert (s != retired_fd); - int rc = ::close (s); - errno_assert (rc == 0); - s = retired_fd; - socket->event_closed (endpoint, s); -} - -zmq::fd_t zmq::tipc_listener_t::accept () -{ - // Accept one connection and deal with different failure modes. - // The situation where connection cannot be accepted due to insufficient - // resources is considered valid and treated by ignoring the connection. - struct sockaddr_storage ss = {}; - socklen_t ss_len = sizeof(ss); - - zmq_assert (s != retired_fd); - fd_t sock = ::accept (s, (struct sockaddr *) &ss, &ss_len); - if (sock == -1) { - errno_assert (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS || - errno == EINTR || errno == ECONNABORTED || errno == EPROTO || errno == EMFILE || - errno == ENFILE); - return retired_fd; - } - /*FIXME Accept filters?*/ - return sock; -} - -#endif diff --git a/src/tipc_listener.hpp b/src/tipc_listener.hpp deleted file mode 100644 index d5d3e293..00000000 --- a/src/tipc_listener.hpp +++ /dev/null @@ -1,96 +0,0 @@ -/* - Copyright (c) 2013 Ericsson AB - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the GNU Lesser General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU Lesser General Public License for more details. - - You should have received a copy of the GNU Lesser General Public License - along with this program. If not, see . -*/ - -#ifndef __ZMQ_TIPC_LISTENER_HPP_INCLUDED__ -#define __ZMQ_TIPC_LISTENER_HPP_INCLUDED__ - -#include "platform.hpp" - -#if defined ZMQ_HAVE_LINUX - -#include - -#include "fd.hpp" -#include "own.hpp" -#include "stdint.hpp" -#include "io_object.hpp" -#include "tipc_address.hpp" - -namespace zmq -{ - - class io_thread_t; - class socket_base_t; - - class tipc_listener_t : public own_t, public io_object_t - { - public: - - tipc_listener_t (zmq::io_thread_t *io_thread_, - zmq::socket_base_t *socket_, const options_t &options_); - ~tipc_listener_t (); - - // Set address to listen on. - int set_address (const char *addr_); - - // Get the bound address for use with wildcards - int get_address (std::string &addr_); - - private: - - // Handlers for incoming commands. - void process_plug (); - void process_term (int linger_); - - // Handlers for I/O events. - void in_event (); - - // Close the listening socket. - void close (); - - // Accept the new connection. Returns the file descriptor of the - // newly created connection. The function may return retired_fd - // if the connection was dropped while waiting in the listen backlog. - fd_t accept (); - - // Address to listen on - tipc_address_t address; - - // Underlying socket. - fd_t s; - - // Handle corresponding to the listening socket. - handle_t handle; - - // Socket the listerner belongs to. - zmq::socket_base_t *socket; - - // String representation of endpoint to bind to - std::string endpoint; - - tipc_listener_t (const tipc_listener_t&); - const tipc_listener_t &operator = (const tipc_listener_t&); - }; - -} - -#endif - -#endif -