diff --git a/src/session_base.cpp b/src/session_base.cpp index 48676921..ef7daf95 100644 --- a/src/session_base.cpp +++ b/src/session_base.cpp @@ -550,6 +550,54 @@ void zmq::session_base_t::reconnect () _pipe->hiccup (); } +zmq::session_base_t::connecter_factory_entry_t + zmq::session_base_t::_connecter_factories[] = { + connecter_factory_entry_t (protocol_name::tcp, + &zmq::session_base_t::create_connecter_tcp), +#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS \ + && !defined ZMQ_HAVE_VXWORKS + connecter_factory_entry_t (protocol_name::ipc, + &zmq::session_base_t::create_connecter_ipc), +#endif +#if defined ZMQ_HAVE_TIPC + connecter_factory_entry_t (protocol_name::tipc, + &zmq::session_base_t::create_connecter_tipc), +#endif +#if defined ZMQ_HAVE_VMCI + connecter_factory_entry_t (protocol_name::vmci, + &zmq::session_base_t::create_connecter_vmci), +#endif +}; + +zmq::session_base_t::connecter_factory_map_t + zmq::session_base_t::_connecter_factories_map ( + _connecter_factories, + _connecter_factories + + sizeof (_connecter_factories) / sizeof (_connecter_factories[0])); + +zmq::session_base_t::start_connecting_entry_t + zmq::session_base_t::_start_connecting_entries[] = { + start_connecting_entry_t (protocol_name::udp, + &zmq::session_base_t::start_connecting_udp), +#if defined ZMQ_HAVE_OPENPGM + start_connecting_entry_t ("pgm", + &zmq::session_base_t::start_connecting_pgm), + start_connecting_entry_t ("epgm", + &zmq::session_base_t::start_connecting_pgm), +#endif +#if defined ZMQ_HAVE_NORM + start_connecting_entry_t ("norm", + &zmq::session_base_t::start_connecting_norm), +#endif +}; + +zmq::session_base_t::start_connecting_map_t + zmq::session_base_t::_start_connecting_map ( + _start_connecting_entries, + _start_connecting_entries + + sizeof (_start_connecting_entries) + / sizeof (_start_connecting_entries[0])); + void zmq::session_base_t::start_connecting (bool wait_) { zmq_assert (_active); @@ -560,145 +608,160 @@ void zmq::session_base_t::start_connecting (bool wait_) zmq_assert (io_thread); // Create the connecter object. - own_t *connecter = NULL; - if (_addr->protocol == protocol_name::tcp) { - if (!options.socks_proxy_address.empty ()) { - address_t *proxy_address = new (std::nothrow) - address_t (protocol_name::tcp, options.socks_proxy_address, - this->get_ctx ()); - alloc_assert (proxy_address); - connecter = new (std::nothrow) socks_connecter_t ( - io_thread, this, options, _addr, proxy_address, wait_); - } else { - connecter = new (std::nothrow) - tcp_connecter_t (io_thread, this, options, _addr, wait_); - } - } -#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS \ - && !defined ZMQ_HAVE_VXWORKS - else if (_addr->protocol == protocol_name::ipc) { - connecter = new (std::nothrow) - ipc_connecter_t (io_thread, this, options, _addr, wait_); - } -#endif -#if defined ZMQ_HAVE_TIPC - else if (_addr->protocol == protocol_name::tipc) { - connecter = new (std::nothrow) - tipc_connecter_t (io_thread, this, options, _addr, wait_); - } -#endif -#if defined ZMQ_HAVE_VMCI - else if (_addr->protocol == protocol_name::vmci) { - connecter = new (std::nothrow) - vmci_connecter_t (io_thread, this, options, _addr, wait_); - } -#endif - if (connecter != NULL) { + const connecter_factory_map_t::const_iterator connecter_factories_it = + _connecter_factories_map.find (_addr->protocol); + if (connecter_factories_it != _connecter_factories_map.end ()) { + own_t *connecter = + (this->*connecter_factories_it->second) (io_thread, wait_); + alloc_assert (connecter); launch_child (connecter); return; } - - if (_addr->protocol == protocol_name::udp) { - zmq_assert (options.type == ZMQ_DISH || options.type == ZMQ_RADIO - || options.type == ZMQ_DGRAM); - - udp_engine_t *engine = new (std::nothrow) udp_engine_t (options); - alloc_assert (engine); - - bool recv = false; - bool send = false; - - if (options.type == ZMQ_RADIO) { - send = true; - recv = false; - } else if (options.type == ZMQ_DISH) { - send = false; - recv = true; - } else if (options.type == ZMQ_DGRAM) { - send = true; - recv = true; - } - - int rc = engine->init (_addr, send, recv); - errno_assert (rc == 0); - - send_attach (this, engine); - + const start_connecting_map_t::const_iterator start_connecting_it = + _start_connecting_map.find (_addr->protocol); + if (start_connecting_it != _start_connecting_map.end ()) { + (this->*start_connecting_it->second) (io_thread); return; } -#ifdef ZMQ_HAVE_OPENPGM - - // Both PGM and EPGM transports are using the same infrastructure. - if (_addr->protocol == "pgm" || _addr->protocol == "epgm") { - zmq_assert (options.type == ZMQ_PUB || options.type == ZMQ_XPUB - || options.type == ZMQ_SUB || options.type == ZMQ_XSUB); - - // For EPGM transport with UDP encapsulation of PGM is used. - bool const udp_encapsulation = _addr->protocol == "epgm"; - - // At this point we'll create message pipes to the session straight - // away. There's no point in delaying it as no concept of 'connect' - // exists with PGM anyway. - if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB) { - // PGM sender. - pgm_sender_t *pgm_sender = - new (std::nothrow) pgm_sender_t (io_thread, options); - alloc_assert (pgm_sender); - - int rc = - pgm_sender->init (udp_encapsulation, _addr->address.c_str ()); - errno_assert (rc == 0); - - send_attach (this, pgm_sender); - } else { - // PGM receiver. - pgm_receiver_t *pgm_receiver = - new (std::nothrow) pgm_receiver_t (io_thread, options); - alloc_assert (pgm_receiver); - - int rc = - pgm_receiver->init (udp_encapsulation, _addr->address.c_str ()); - errno_assert (rc == 0); - - send_attach (this, pgm_receiver); - } - - return; - } -#endif - -#ifdef ZMQ_HAVE_NORM - if (_addr->protocol == "norm") { - // At this point we'll create message pipes to the session straight - // away. There's no point in delaying it as no concept of 'connect' - // exists with NORM anyway. - if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB) { - // NORM sender. - norm_engine_t *norm_sender = - new (std::nothrow) norm_engine_t (io_thread, options); - alloc_assert (norm_sender); - - int rc = norm_sender->init (_addr->address.c_str (), true, false); - errno_assert (rc == 0); - - send_attach (this, norm_sender); - } else { // ZMQ_SUB or ZMQ_XSUB - - // NORM receiver. - norm_engine_t *norm_receiver = - new (std::nothrow) norm_engine_t (io_thread, options); - alloc_assert (norm_receiver); - - int rc = norm_receiver->init (_addr->address.c_str (), false, true); - errno_assert (rc == 0); - - send_attach (this, norm_receiver); - } - return; - } -#endif // ZMQ_HAVE_NORM - zmq_assert (false); } + +#if defined ZMQ_HAVE_VMCI +zmq::own_t *zmq::session_base_t::create_connecter_vmci (io_thread_t *io_thread_, + bool wait_) +{ + return new (std::nothrow) + vmci_connecter_t (io_thread_, this, options, _addr, wait_); +} +#endif + +#if defined ZMQ_HAVE_TIPC +zmq::own_t *zmq::session_base_t::create_connecter_tipc (io_thread_t *io_thread_, + bool wait_) +{ + return new (std::nothrow) + tipc_connecter_t (io_thread_, this, options, _addr, wait_); +} +#endif + +#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS \ + && !defined ZMQ_HAVE_VXWORKS +zmq::own_t *zmq::session_base_t::create_connecter_ipc (io_thread_t *io_thread_, + bool wait_) +{ + return new (std::nothrow) + ipc_connecter_t (io_thread_, this, options, _addr, wait_); +} +#endif + +zmq::own_t *zmq::session_base_t::create_connecter_tcp (io_thread_t *io_thread_, + bool wait_) +{ + if (!options.socks_proxy_address.empty ()) { + address_t *proxy_address = new (std::nothrow) address_t ( + protocol_name::tcp, options.socks_proxy_address, this->get_ctx ()); + alloc_assert (proxy_address); + return new (std::nothrow) socks_connecter_t ( + io_thread_, this, options, _addr, proxy_address, wait_); + } + return new (std::nothrow) + tcp_connecter_t (io_thread_, this, options, _addr, wait_); +} + +#ifdef ZMQ_HAVE_OPENPGM +void zmq::session_base_t::start_connecting_pgm (io_thread_t *io_thread_) +{ + zmq_assert (options.type == ZMQ_PUB || options.type == ZMQ_XPUB + || options.type == ZMQ_SUB || options.type == ZMQ_XSUB); + + // For EPGM transport with UDP encapsulation of PGM is used. + bool const udp_encapsulation = _addr->protocol == "epgm"; + + // At this point we'll create message pipes to the session straight + // away. There's no point in delaying it as no concept of 'connect' + // exists with PGM anyway. + if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB) { + // PGM sender. + pgm_sender_t *pgm_sender = + new (std::nothrow) pgm_sender_t (io_thread_, options); + alloc_assert (pgm_sender); + + int rc = pgm_sender->init (udp_encapsulation, _addr->address.c_str ()); + errno_assert (rc == 0); + + send_attach (this, pgm_sender); + } else { + // PGM receiver. + pgm_receiver_t *pgm_receiver = + new (std::nothrow) pgm_receiver_t (io_thread_, options); + alloc_assert (pgm_receiver); + + int rc = + pgm_receiver->init (udp_encapsulation, _addr->address.c_str ()); + errno_assert (rc == 0); + + send_attach (this, pgm_receiver); + } +} +#endif + +#ifdef ZMQ_HAVE_NORM +void zmq::session_base_t::start_connecting_norm (io_thread_t *io_thread_) +{ + // At this point we'll create message pipes to the session straight + // away. There's no point in delaying it as no concept of 'connect' + // exists with NORM anyway. + if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB) { + // NORM sender. + norm_engine_t *norm_sender = + new (std::nothrow) norm_engine_t (io_thread_, options); + alloc_assert (norm_sender); + + int rc = norm_sender->init (_addr->address.c_str (), true, false); + errno_assert (rc == 0); + + send_attach (this, norm_sender); + } else { // ZMQ_SUB or ZMQ_XSUB + + // NORM receiver. + norm_engine_t *norm_receiver = + new (std::nothrow) norm_engine_t (io_thread_, options); + alloc_assert (norm_receiver); + + int rc = norm_receiver->init (_addr->address.c_str (), false, true); + errno_assert (rc == 0); + + send_attach (this, norm_receiver); + } +} +#endif + +void zmq::session_base_t::start_connecting_udp (io_thread_t * /*io_thread_*/) +{ + zmq_assert (options.type == ZMQ_DISH || options.type == ZMQ_RADIO + || options.type == ZMQ_DGRAM); + + udp_engine_t *engine = new (std::nothrow) udp_engine_t (options); + alloc_assert (engine); + + bool recv = false; + bool send = false; + + if (options.type == ZMQ_RADIO) { + send = true; + recv = false; + } else if (options.type == ZMQ_DISH) { + send = false; + recv = true; + } else if (options.type == ZMQ_DGRAM) { + send = true; + recv = true; + } + + const int rc = engine->init (_addr, send, recv); + errno_assert (rc == 0); + + send_attach (this, engine); +} diff --git a/src/session_base.hpp b/src/session_base.hpp index 74e31810..002cdf87 100644 --- a/src/session_base.hpp +++ b/src/session_base.hpp @@ -105,6 +105,33 @@ class session_base_t : public own_t, public io_object_t, public i_pipe_events private: void start_connecting (bool wait_); + typedef own_t *(session_base_t::*connecter_factory_fun_t) ( + io_thread_t *io_thread, bool wait_); + typedef std::pair + connecter_factory_entry_t; + static connecter_factory_entry_t _connecter_factories[]; + typedef std::map + connecter_factory_map_t; + static connecter_factory_map_t _connecter_factories_map; + + own_t *create_connecter_vmci (io_thread_t *io_thread_, bool wait_); + own_t *create_connecter_tipc (io_thread_t *io_thread_, bool wait_); + own_t *create_connecter_ipc (io_thread_t *io_thread_, bool wait_); + own_t *create_connecter_tcp (io_thread_t *io_thread_, bool wait_); + + typedef void (session_base_t::*start_connecting_fun_t) ( + io_thread_t *io_thread); + typedef std::pair + start_connecting_entry_t; + static start_connecting_entry_t _start_connecting_entries[]; + typedef std::map + start_connecting_map_t; + static start_connecting_map_t _start_connecting_map; + + void start_connecting_pgm (io_thread_t *io_thread_); + void start_connecting_norm (io_thread_t *io_thread_); + void start_connecting_udp (io_thread_t *io_thread_); + void reconnect (); // Handlers for incoming commands.