diff --git a/Makefile.am b/Makefile.am index d9ad16b6..dc379169 100644 --- a/Makefile.am +++ b/Makefile.am @@ -303,6 +303,12 @@ src_libzmq_la_LDFLAGS = \ @LIBZMQ_EXTRA_LDFLAGS@ \ -Wl,--version-script=$(srcdir)/src/libzmq.vers else +if ON_DEBIAN_KFREEBSD +src_libzmq_la_LDFLAGS = \ + -version-info @LTVER@ \ + @LIBZMQ_EXTRA_LDFLAGS@ \ + -Wl,--version-script=$(srcdir)/src/libzmq.vers +else src_libzmq_la_LDFLAGS = \ -version-info @LTVER@ \ @LIBZMQ_EXTRA_LDFLAGS@ \ @@ -312,6 +318,7 @@ endif endif endif endif +endif src_libzmq_la_CPPFLAGS = $(CODE_COVERAGE_CPPFLAGS) $(LIBUNWIND_CFLAGS) src_libzmq_la_CFLAGS = $(CODE_COVERAGE_CFLAGS) $(LIBUNWIND_CFLAGS) @@ -1014,6 +1021,7 @@ endif if ON_GNU XFAIL_TESTS += tests/test_ipc_wildcard \ + tests/test_reqrep_ipc \ tests/test_term_endpoint endif diff --git a/configure.ac b/configure.ac index 2e323b6b..254005b9 100644 --- a/configure.ac +++ b/configure.ac @@ -155,6 +155,7 @@ libzmq_on_cygwin="no" libzmq_on_android="no" libzmq_on_linux="no" libzmq_on_gnu="no" +libzmq_on_debian_kfreebsd="no" # Set some default features required by ZeroMQ code CPPFLAGS="-D_REENTRANT -D_THREAD_SAFE -Wno-long-long $CPPFLAGS" @@ -210,7 +211,7 @@ case "${host_os}" in case "${host_os}" in # On Debian/kFreeBSD with gnu set the --version-script flag kfreebsd*-gnu*) - libzmq_on_gnu="yes" + libzmq_on_debian_kfreebsd="yes" ;; esac CPPFLAGS="-D__BSD_VISIBLE $CPPFLAGS" @@ -321,10 +322,11 @@ case "${host_os}" in ;; esac -# Sun Studio does not like anonymous structures in unions +# Sun Studio does not like anonymous structures in unions and does not have weak attribute if test "x$libzmq_cv_[]_AC_LANG_ABBREV[]_sun_studio_compiler" = "xyes"; then CXXFLAGS="${CXXFLAGS} -features=extensions" CFLAGS="${CFLAGS} -features=extensions" + CPPFLAGS="${CPPFLAGS} -DUNITY_WEAK_PRAGMA" fi # Checks for libraries @@ -617,6 +619,7 @@ AM_CONDITIONAL(ON_CYGWIN, test "x$libzmq_on_cygwin" = "xyes") AM_CONDITIONAL(ON_ANDROID, test "x$libzmq_on_android" = "xyes") AM_CONDITIONAL(ON_LINUX, test "x$libzmq_on_linux" = "xyes") AM_CONDITIONAL(ON_GNU, test "x$libzmq_on_gnu" = "xyes") +AM_CONDITIONAL(ON_DEBIAN_KFREEBSD, test "x$libzmq_on_debian_kfreebsd" = "xyes") # Check for __atomic_Xxx compiler intrinsics AC_LANG_PUSH([C++]) diff --git a/src/session_base.cpp b/src/session_base.cpp index 35580c8c..48676921 100644 --- a/src/session_base.cpp +++ b/src/session_base.cpp @@ -550,51 +550,6 @@ void zmq::session_base_t::reconnect () _pipe->hiccup (); } -zmq::session_base_t::connecter_factory_entry_t - zmq::session_base_t::_connecter_factories[] = { - std::make_pair (protocol_name::tcp, - &zmq::session_base_t::create_connecter_tcp), -#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS \ - && !defined ZMQ_HAVE_VXWORKS - std::make_pair (protocol_name::ipc, - &zmq::session_base_t::create_connecter_ipc), -#endif -#if defined ZMQ_HAVE_TIPC - std::make_pair (protocol_name::tipc, - &zmq::session_base_t::create_connecter_tipc), -#endif -#if defined ZMQ_HAVE_VMCI - std::make_pair (protocol_name::vmci, - &zmq::session_base_t::create_connecter_vmci), -#endif -}; - -zmq::session_base_t::connecter_factory_map_t - zmq::session_base_t::_connecter_factories_map ( - _connecter_factories, - _connecter_factories - + sizeof (_connecter_factories) / sizeof (_connecter_factories[0])); - -zmq::session_base_t::start_connecting_entry_t - zmq::session_base_t::_start_connecting_entries[] = { - std::make_pair (protocol_name::udp, - &zmq::session_base_t::start_connecting_udp), -#if defined ZMQ_HAVE_OPENPGM - std::make_pair ("pgm", &zmq::session_base_t::start_connecting_pgm), - std::make_pair ("epgm", &zmq::session_base_t::start_connecting_pgm), -#endif -#if defined ZMQ_HAVE_NORM - std::make_pair ("norm", &zmq::session_base_t::start_connecting_norm), -#endif -}; - -zmq::session_base_t::start_connecting_map_t - zmq::session_base_t::_start_connecting_map ( - _start_connecting_entries, - _start_connecting_entries - + sizeof (_start_connecting_entries) - / sizeof (_start_connecting_entries[0])); - void zmq::session_base_t::start_connecting (bool wait_) { zmq_assert (_active); @@ -605,160 +560,145 @@ void zmq::session_base_t::start_connecting (bool wait_) zmq_assert (io_thread); // Create the connecter object. - const connecter_factory_map_t::const_iterator connecter_factories_it = - _connecter_factories_map.find (_addr->protocol); - if (connecter_factories_it != _connecter_factories_map.end ()) { - own_t *connecter = - (this->*connecter_factories_it->second) (io_thread, wait_); - + own_t *connecter = NULL; + if (_addr->protocol == protocol_name::tcp) { + if (!options.socks_proxy_address.empty ()) { + address_t *proxy_address = new (std::nothrow) + address_t (protocol_name::tcp, options.socks_proxy_address, + this->get_ctx ()); + alloc_assert (proxy_address); + connecter = new (std::nothrow) socks_connecter_t ( + io_thread, this, options, _addr, proxy_address, wait_); + } else { + connecter = new (std::nothrow) + tcp_connecter_t (io_thread, this, options, _addr, wait_); + } + } +#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS \ + && !defined ZMQ_HAVE_VXWORKS + else if (_addr->protocol == protocol_name::ipc) { + connecter = new (std::nothrow) + ipc_connecter_t (io_thread, this, options, _addr, wait_); + } +#endif +#if defined ZMQ_HAVE_TIPC + else if (_addr->protocol == protocol_name::tipc) { + connecter = new (std::nothrow) + tipc_connecter_t (io_thread, this, options, _addr, wait_); + } +#endif +#if defined ZMQ_HAVE_VMCI + else if (_addr->protocol == protocol_name::vmci) { + connecter = new (std::nothrow) + vmci_connecter_t (io_thread, this, options, _addr, wait_); + } +#endif + if (connecter != NULL) { alloc_assert (connecter); launch_child (connecter); return; } - const start_connecting_map_t::const_iterator start_connecting_it = - _start_connecting_map.find (_addr->protocol); - if (start_connecting_it != _start_connecting_map.end ()) { - (this->*start_connecting_it->second) (io_thread); + + if (_addr->protocol == protocol_name::udp) { + zmq_assert (options.type == ZMQ_DISH || options.type == ZMQ_RADIO + || options.type == ZMQ_DGRAM); + + udp_engine_t *engine = new (std::nothrow) udp_engine_t (options); + alloc_assert (engine); + + bool recv = false; + bool send = false; + + if (options.type == ZMQ_RADIO) { + send = true; + recv = false; + } else if (options.type == ZMQ_DISH) { + send = false; + recv = true; + } else if (options.type == ZMQ_DGRAM) { + send = true; + recv = true; + } + + int rc = engine->init (_addr, send, recv); + errno_assert (rc == 0); + + send_attach (this, engine); + return; } - zmq_assert (false); -} - -#if defined ZMQ_HAVE_VMCI -zmq::own_t *zmq::session_base_t::create_connecter_vmci (io_thread_t *io_thread_, - bool wait_) -{ - return new (std::nothrow) - vmci_connecter_t (io_thread_, this, options, _addr, wait_); -} -#endif - -#if defined ZMQ_HAVE_TIPC -zmq::own_t *zmq::session_base_t::create_connecter_tipc (io_thread_t *io_thread_, - bool wait_) -{ - return new (std::nothrow) - tipc_connecter_t (io_thread_, this, options, _addr, wait_); -} -#endif - -#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS \ - && !defined ZMQ_HAVE_VXWORKS -zmq::own_t *zmq::session_base_t::create_connecter_ipc (io_thread_t *io_thread_, - bool wait_) -{ - return new (std::nothrow) - ipc_connecter_t (io_thread_, this, options, _addr, wait_); -} -#endif - -zmq::own_t *zmq::session_base_t::create_connecter_tcp (io_thread_t *io_thread_, - bool wait_) -{ - if (!options.socks_proxy_address.empty ()) { - address_t *proxy_address = new (std::nothrow) address_t ( - protocol_name::tcp, options.socks_proxy_address, this->get_ctx ()); - alloc_assert (proxy_address); - return new (std::nothrow) socks_connecter_t ( - io_thread_, this, options, _addr, proxy_address, wait_); - } - return new (std::nothrow) - tcp_connecter_t (io_thread_, this, options, _addr, wait_); -} - #ifdef ZMQ_HAVE_OPENPGM -void zmq::session_base_t::start_connecting_pgm (io_thread_t *io_thread_) -{ - zmq_assert (options.type == ZMQ_PUB || options.type == ZMQ_XPUB - || options.type == ZMQ_SUB || options.type == ZMQ_XSUB); - // For EPGM transport with UDP encapsulation of PGM is used. - bool const udp_encapsulation = _addr->protocol == "epgm"; + // Both PGM and EPGM transports are using the same infrastructure. + if (_addr->protocol == "pgm" || _addr->protocol == "epgm") { + zmq_assert (options.type == ZMQ_PUB || options.type == ZMQ_XPUB + || options.type == ZMQ_SUB || options.type == ZMQ_XSUB); - // At this point we'll create message pipes to the session straight - // away. There's no point in delaying it as no concept of 'connect' - // exists with PGM anyway. - if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB) { - // PGM sender. - pgm_sender_t *pgm_sender = - new (std::nothrow) pgm_sender_t (io_thread_, options); - alloc_assert (pgm_sender); + // For EPGM transport with UDP encapsulation of PGM is used. + bool const udp_encapsulation = _addr->protocol == "epgm"; - int rc = pgm_sender->init (udp_encapsulation, _addr->address.c_str ()); - errno_assert (rc == 0); + // At this point we'll create message pipes to the session straight + // away. There's no point in delaying it as no concept of 'connect' + // exists with PGM anyway. + if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB) { + // PGM sender. + pgm_sender_t *pgm_sender = + new (std::nothrow) pgm_sender_t (io_thread, options); + alloc_assert (pgm_sender); - send_attach (this, pgm_sender); - } else { - // PGM receiver. - pgm_receiver_t *pgm_receiver = - new (std::nothrow) pgm_receiver_t (io_thread_, options); - alloc_assert (pgm_receiver); + int rc = + pgm_sender->init (udp_encapsulation, _addr->address.c_str ()); + errno_assert (rc == 0); - int rc = - pgm_receiver->init (udp_encapsulation, _addr->address.c_str ()); - errno_assert (rc == 0); + send_attach (this, pgm_sender); + } else { + // PGM receiver. + pgm_receiver_t *pgm_receiver = + new (std::nothrow) pgm_receiver_t (io_thread, options); + alloc_assert (pgm_receiver); - send_attach (this, pgm_receiver); + int rc = + pgm_receiver->init (udp_encapsulation, _addr->address.c_str ()); + errno_assert (rc == 0); + + send_attach (this, pgm_receiver); + } + + return; } -} #endif #ifdef ZMQ_HAVE_NORM -void zmq::session_base_t::start_connecting_norm (io_thread_t *io_thread_) -{ - // At this point we'll create message pipes to the session straight - // away. There's no point in delaying it as no concept of 'connect' - // exists with NORM anyway. - if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB) { - // NORM sender. - norm_engine_t *norm_sender = - new (std::nothrow) norm_engine_t (io_thread_, options); - alloc_assert (norm_sender); + if (_addr->protocol == "norm") { + // At this point we'll create message pipes to the session straight + // away. There's no point in delaying it as no concept of 'connect' + // exists with NORM anyway. + if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB) { + // NORM sender. + norm_engine_t *norm_sender = + new (std::nothrow) norm_engine_t (io_thread, options); + alloc_assert (norm_sender); - int rc = norm_sender->init (_addr->address.c_str (), true, false); - errno_assert (rc == 0); + int rc = norm_sender->init (_addr->address.c_str (), true, false); + errno_assert (rc == 0); - send_attach (this, norm_sender); - } else { // ZMQ_SUB or ZMQ_XSUB + send_attach (this, norm_sender); + } else { // ZMQ_SUB or ZMQ_XSUB - // NORM receiver. - norm_engine_t *norm_receiver = - new (std::nothrow) norm_engine_t (io_thread_, options); - alloc_assert (norm_receiver); + // NORM receiver. + norm_engine_t *norm_receiver = + new (std::nothrow) norm_engine_t (io_thread, options); + alloc_assert (norm_receiver); - int rc = norm_receiver->init (_addr->address.c_str (), false, true); - errno_assert (rc == 0); + int rc = norm_receiver->init (_addr->address.c_str (), false, true); + errno_assert (rc == 0); - send_attach (this, norm_receiver); + send_attach (this, norm_receiver); + } + return; } -} -#endif - -void zmq::session_base_t::start_connecting_udp (io_thread_t * /*io_thread_*/) -{ - zmq_assert (options.type == ZMQ_DISH || options.type == ZMQ_RADIO - || options.type == ZMQ_DGRAM); - - udp_engine_t *engine = new (std::nothrow) udp_engine_t (options); - alloc_assert (engine); - - bool recv = false; - bool send = false; - - if (options.type == ZMQ_RADIO) { - send = true; - recv = false; - } else if (options.type == ZMQ_DISH) { - send = false; - recv = true; - } else if (options.type == ZMQ_DGRAM) { - send = true; - recv = true; - } - - const int rc = engine->init (_addr, send, recv); - errno_assert (rc == 0); - - send_attach (this, engine); +#endif // ZMQ_HAVE_NORM + + zmq_assert (false); } diff --git a/src/session_base.hpp b/src/session_base.hpp index 20292a85..74e31810 100644 --- a/src/session_base.hpp +++ b/src/session_base.hpp @@ -105,33 +105,6 @@ class session_base_t : public own_t, public io_object_t, public i_pipe_events private: void start_connecting (bool wait_); - typedef own_t *(session_base_t::*connecter_factory_fun_t) ( - io_thread_t *io_thread, bool wait_); - typedef std::pair - connecter_factory_entry_t; - static connecter_factory_entry_t _connecter_factories[]; - typedef std::map - connecter_factory_map_t; - static connecter_factory_map_t _connecter_factories_map; - - own_t *create_connecter_vmci (io_thread_t *io_thread_, bool wait_); - own_t *create_connecter_tipc (io_thread_t *io_thread_, bool wait_); - own_t *create_connecter_ipc (io_thread_t *io_thread_, bool wait_); - own_t *create_connecter_tcp (io_thread_t *io_thread_, bool wait_); - - typedef void (session_base_t::*start_connecting_fun_t) ( - io_thread_t *io_thread); - typedef std::pair - start_connecting_entry_t; - static start_connecting_entry_t _start_connecting_entries[]; - typedef std::map - start_connecting_map_t; - static start_connecting_map_t _start_connecting_map; - - void start_connecting_pgm (io_thread_t *io_thread_); - void start_connecting_norm (io_thread_t *io_thread_); - void start_connecting_udp (io_thread_t *io_thread_); - void reconnect (); // Handlers for incoming commands. diff --git a/src/zmq.cpp b/src/zmq.cpp index 0ce4f531..1c9ea1cd 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -798,12 +798,16 @@ inline int zmq_poller_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) { -// TODO: the function implementation can just call zmq_pollfd_poll with -// pollfd as NULL, however pollfd is not yet stable. #if defined ZMQ_HAVE_POLLER - // if poller is present, use that. - return zmq_poller_poll (items_, nitems_, timeout_); -#else + // if poller is present, use that if there is at least 1 thread-safe socket, + // otherwise fall back to the previous implementation as it's faster. + for (int i = 0; i != nitems_; i++) { + if (items_[i].socket + && as_socket_base_t (items_[i].socket)->is_thread_safe ()) { + return zmq_poller_poll (items_, nitems_, timeout_); + } + } +#endif // ZMQ_HAVE_POLLER #if defined ZMQ_POLL_BASED_ON_POLL || defined ZMQ_POLL_BASED_ON_SELECT if (unlikely (nitems_ < 0)) { errno = EINVAL; @@ -1086,7 +1090,6 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) errno = ENOTSUP; return -1; #endif -#endif // ZMQ_HAVE_POLLER } // The poller functionality diff --git a/tests/test_filter_ipc.cpp b/tests/test_filter_ipc.cpp index f13c8440..60827a26 100644 --- a/tests/test_filter_ipc.cpp +++ b/tests/test_filter_ipc.cpp @@ -209,7 +209,7 @@ int main (void) RUN_TEST (test_filter_with_current_process_pid); RUN_TEST (test_filter_with_possibly_nonexistent_pid); #else - RUN_TEST (test_filter_with_pid_fails ()); + RUN_TEST (test_filter_with_pid_fails); #endif #else RUN_TEST (test_filter_with_zero_uid_fails); diff --git a/tests/test_hwm_pubsub.cpp b/tests/test_hwm_pubsub.cpp index 90dd75ee..ab8cec41 100644 --- a/tests/test_hwm_pubsub.cpp +++ b/tests/test_hwm_pubsub.cpp @@ -274,7 +274,7 @@ void test_inproc () TEST_ASSERT_EQUAL_INT (6000, test_blocking (2000, 6000, "inproc://c")); } -#ifndef ZMQ_HAVE_WINDOWS +#if !defined(ZMQ_HAVE_WINDOWS) && !defined(ZMQ_HAVE_GNU) void test_ipc () { @@ -295,7 +295,7 @@ int main () RUN_TEST (test_tcp); RUN_TEST (test_inproc); -#ifndef ZMQ_HAVE_WINDOWS +#if !defined(ZMQ_HAVE_WINDOWS) && !defined(ZMQ_HAVE_GNU) RUN_TEST (test_ipc); #endif RUN_TEST (test_reset_hwm); diff --git a/tests/test_reconnect_ivl.cpp b/tests/test_reconnect_ivl.cpp index 80ac4be0..b67b40e5 100644 --- a/tests/test_reconnect_ivl.cpp +++ b/tests/test_reconnect_ivl.cpp @@ -68,7 +68,7 @@ void test_reconnect_ivl_against_pair_socket (const char *my_endpoint_, test_context_socket_close (sc); } -#ifndef ZMQ_HAVE_WINDOWS +#if !defined(ZMQ_HAVE_WINDOWS) && !defined(ZMQ_HAVE_GNU) void test_reconnect_ivl_ipc (void) { const char *ipc_endpoint = "ipc:///tmp/test_reconnect_ivl"; @@ -112,7 +112,7 @@ int main (void) setup_test_environment (); UNITY_BEGIN (); -#ifndef ZMQ_HAVE_WINDOWS +#if !defined(ZMQ_HAVE_WINDOWS) && !defined(ZMQ_HAVE_GNU) RUN_TEST (test_reconnect_ivl_ipc); #endif RUN_TEST (test_reconnect_ivl_tcp_ipv4);