mirror of
https://github.com/zeromq/libzmq.git
synced 2025-01-14 09:47:56 +08:00
Merge pull request #3296 from sigiesec/reapply-session-base-refactoring
Reapply session_base_t refactoring
This commit is contained in:
commit
63abe83388
@ -550,6 +550,54 @@ void zmq::session_base_t::reconnect ()
|
|||||||
_pipe->hiccup ();
|
_pipe->hiccup ();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
zmq::session_base_t::connecter_factory_entry_t
|
||||||
|
zmq::session_base_t::_connecter_factories[] = {
|
||||||
|
connecter_factory_entry_t (protocol_name::tcp,
|
||||||
|
&zmq::session_base_t::create_connecter_tcp),
|
||||||
|
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS \
|
||||||
|
&& !defined ZMQ_HAVE_VXWORKS
|
||||||
|
connecter_factory_entry_t (protocol_name::ipc,
|
||||||
|
&zmq::session_base_t::create_connecter_ipc),
|
||||||
|
#endif
|
||||||
|
#if defined ZMQ_HAVE_TIPC
|
||||||
|
connecter_factory_entry_t (protocol_name::tipc,
|
||||||
|
&zmq::session_base_t::create_connecter_tipc),
|
||||||
|
#endif
|
||||||
|
#if defined ZMQ_HAVE_VMCI
|
||||||
|
connecter_factory_entry_t (protocol_name::vmci,
|
||||||
|
&zmq::session_base_t::create_connecter_vmci),
|
||||||
|
#endif
|
||||||
|
};
|
||||||
|
|
||||||
|
zmq::session_base_t::connecter_factory_map_t
|
||||||
|
zmq::session_base_t::_connecter_factories_map (
|
||||||
|
_connecter_factories,
|
||||||
|
_connecter_factories
|
||||||
|
+ sizeof (_connecter_factories) / sizeof (_connecter_factories[0]));
|
||||||
|
|
||||||
|
zmq::session_base_t::start_connecting_entry_t
|
||||||
|
zmq::session_base_t::_start_connecting_entries[] = {
|
||||||
|
start_connecting_entry_t (protocol_name::udp,
|
||||||
|
&zmq::session_base_t::start_connecting_udp),
|
||||||
|
#if defined ZMQ_HAVE_OPENPGM
|
||||||
|
start_connecting_entry_t ("pgm",
|
||||||
|
&zmq::session_base_t::start_connecting_pgm),
|
||||||
|
start_connecting_entry_t ("epgm",
|
||||||
|
&zmq::session_base_t::start_connecting_pgm),
|
||||||
|
#endif
|
||||||
|
#if defined ZMQ_HAVE_NORM
|
||||||
|
start_connecting_entry_t ("norm",
|
||||||
|
&zmq::session_base_t::start_connecting_norm),
|
||||||
|
#endif
|
||||||
|
};
|
||||||
|
|
||||||
|
zmq::session_base_t::start_connecting_map_t
|
||||||
|
zmq::session_base_t::_start_connecting_map (
|
||||||
|
_start_connecting_entries,
|
||||||
|
_start_connecting_entries
|
||||||
|
+ sizeof (_start_connecting_entries)
|
||||||
|
/ sizeof (_start_connecting_entries[0]));
|
||||||
|
|
||||||
void zmq::session_base_t::start_connecting (bool wait_)
|
void zmq::session_base_t::start_connecting (bool wait_)
|
||||||
{
|
{
|
||||||
zmq_assert (_active);
|
zmq_assert (_active);
|
||||||
@ -560,145 +608,160 @@ void zmq::session_base_t::start_connecting (bool wait_)
|
|||||||
zmq_assert (io_thread);
|
zmq_assert (io_thread);
|
||||||
|
|
||||||
// Create the connecter object.
|
// Create the connecter object.
|
||||||
own_t *connecter = NULL;
|
const connecter_factory_map_t::const_iterator connecter_factories_it =
|
||||||
if (_addr->protocol == protocol_name::tcp) {
|
_connecter_factories_map.find (_addr->protocol);
|
||||||
if (!options.socks_proxy_address.empty ()) {
|
if (connecter_factories_it != _connecter_factories_map.end ()) {
|
||||||
address_t *proxy_address = new (std::nothrow)
|
own_t *connecter =
|
||||||
address_t (protocol_name::tcp, options.socks_proxy_address,
|
(this->*connecter_factories_it->second) (io_thread, wait_);
|
||||||
this->get_ctx ());
|
|
||||||
alloc_assert (proxy_address);
|
|
||||||
connecter = new (std::nothrow) socks_connecter_t (
|
|
||||||
io_thread, this, options, _addr, proxy_address, wait_);
|
|
||||||
} else {
|
|
||||||
connecter = new (std::nothrow)
|
|
||||||
tcp_connecter_t (io_thread, this, options, _addr, wait_);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS \
|
|
||||||
&& !defined ZMQ_HAVE_VXWORKS
|
|
||||||
else if (_addr->protocol == protocol_name::ipc) {
|
|
||||||
connecter = new (std::nothrow)
|
|
||||||
ipc_connecter_t (io_thread, this, options, _addr, wait_);
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
#if defined ZMQ_HAVE_TIPC
|
|
||||||
else if (_addr->protocol == protocol_name::tipc) {
|
|
||||||
connecter = new (std::nothrow)
|
|
||||||
tipc_connecter_t (io_thread, this, options, _addr, wait_);
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
#if defined ZMQ_HAVE_VMCI
|
|
||||||
else if (_addr->protocol == protocol_name::vmci) {
|
|
||||||
connecter = new (std::nothrow)
|
|
||||||
vmci_connecter_t (io_thread, this, options, _addr, wait_);
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
if (connecter != NULL) {
|
|
||||||
alloc_assert (connecter);
|
alloc_assert (connecter);
|
||||||
launch_child (connecter);
|
launch_child (connecter);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
const start_connecting_map_t::const_iterator start_connecting_it =
|
||||||
if (_addr->protocol == protocol_name::udp) {
|
_start_connecting_map.find (_addr->protocol);
|
||||||
zmq_assert (options.type == ZMQ_DISH || options.type == ZMQ_RADIO
|
if (start_connecting_it != _start_connecting_map.end ()) {
|
||||||
|| options.type == ZMQ_DGRAM);
|
(this->*start_connecting_it->second) (io_thread);
|
||||||
|
|
||||||
udp_engine_t *engine = new (std::nothrow) udp_engine_t (options);
|
|
||||||
alloc_assert (engine);
|
|
||||||
|
|
||||||
bool recv = false;
|
|
||||||
bool send = false;
|
|
||||||
|
|
||||||
if (options.type == ZMQ_RADIO) {
|
|
||||||
send = true;
|
|
||||||
recv = false;
|
|
||||||
} else if (options.type == ZMQ_DISH) {
|
|
||||||
send = false;
|
|
||||||
recv = true;
|
|
||||||
} else if (options.type == ZMQ_DGRAM) {
|
|
||||||
send = true;
|
|
||||||
recv = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
int rc = engine->init (_addr, send, recv);
|
|
||||||
errno_assert (rc == 0);
|
|
||||||
|
|
||||||
send_attach (this, engine);
|
|
||||||
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
#ifdef ZMQ_HAVE_OPENPGM
|
|
||||||
|
|
||||||
// Both PGM and EPGM transports are using the same infrastructure.
|
|
||||||
if (_addr->protocol == "pgm" || _addr->protocol == "epgm") {
|
|
||||||
zmq_assert (options.type == ZMQ_PUB || options.type == ZMQ_XPUB
|
|
||||||
|| options.type == ZMQ_SUB || options.type == ZMQ_XSUB);
|
|
||||||
|
|
||||||
// For EPGM transport with UDP encapsulation of PGM is used.
|
|
||||||
bool const udp_encapsulation = _addr->protocol == "epgm";
|
|
||||||
|
|
||||||
// At this point we'll create message pipes to the session straight
|
|
||||||
// away. There's no point in delaying it as no concept of 'connect'
|
|
||||||
// exists with PGM anyway.
|
|
||||||
if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB) {
|
|
||||||
// PGM sender.
|
|
||||||
pgm_sender_t *pgm_sender =
|
|
||||||
new (std::nothrow) pgm_sender_t (io_thread, options);
|
|
||||||
alloc_assert (pgm_sender);
|
|
||||||
|
|
||||||
int rc =
|
|
||||||
pgm_sender->init (udp_encapsulation, _addr->address.c_str ());
|
|
||||||
errno_assert (rc == 0);
|
|
||||||
|
|
||||||
send_attach (this, pgm_sender);
|
|
||||||
} else {
|
|
||||||
// PGM receiver.
|
|
||||||
pgm_receiver_t *pgm_receiver =
|
|
||||||
new (std::nothrow) pgm_receiver_t (io_thread, options);
|
|
||||||
alloc_assert (pgm_receiver);
|
|
||||||
|
|
||||||
int rc =
|
|
||||||
pgm_receiver->init (udp_encapsulation, _addr->address.c_str ());
|
|
||||||
errno_assert (rc == 0);
|
|
||||||
|
|
||||||
send_attach (this, pgm_receiver);
|
|
||||||
}
|
|
||||||
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
#ifdef ZMQ_HAVE_NORM
|
|
||||||
if (_addr->protocol == "norm") {
|
|
||||||
// At this point we'll create message pipes to the session straight
|
|
||||||
// away. There's no point in delaying it as no concept of 'connect'
|
|
||||||
// exists with NORM anyway.
|
|
||||||
if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB) {
|
|
||||||
// NORM sender.
|
|
||||||
norm_engine_t *norm_sender =
|
|
||||||
new (std::nothrow) norm_engine_t (io_thread, options);
|
|
||||||
alloc_assert (norm_sender);
|
|
||||||
|
|
||||||
int rc = norm_sender->init (_addr->address.c_str (), true, false);
|
|
||||||
errno_assert (rc == 0);
|
|
||||||
|
|
||||||
send_attach (this, norm_sender);
|
|
||||||
} else { // ZMQ_SUB or ZMQ_XSUB
|
|
||||||
|
|
||||||
// NORM receiver.
|
|
||||||
norm_engine_t *norm_receiver =
|
|
||||||
new (std::nothrow) norm_engine_t (io_thread, options);
|
|
||||||
alloc_assert (norm_receiver);
|
|
||||||
|
|
||||||
int rc = norm_receiver->init (_addr->address.c_str (), false, true);
|
|
||||||
errno_assert (rc == 0);
|
|
||||||
|
|
||||||
send_attach (this, norm_receiver);
|
|
||||||
}
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
#endif // ZMQ_HAVE_NORM
|
|
||||||
|
|
||||||
zmq_assert (false);
|
zmq_assert (false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#if defined ZMQ_HAVE_VMCI
|
||||||
|
zmq::own_t *zmq::session_base_t::create_connecter_vmci (io_thread_t *io_thread_,
|
||||||
|
bool wait_)
|
||||||
|
{
|
||||||
|
return new (std::nothrow)
|
||||||
|
vmci_connecter_t (io_thread_, this, options, _addr, wait_);
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#if defined ZMQ_HAVE_TIPC
|
||||||
|
zmq::own_t *zmq::session_base_t::create_connecter_tipc (io_thread_t *io_thread_,
|
||||||
|
bool wait_)
|
||||||
|
{
|
||||||
|
return new (std::nothrow)
|
||||||
|
tipc_connecter_t (io_thread_, this, options, _addr, wait_);
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS \
|
||||||
|
&& !defined ZMQ_HAVE_VXWORKS
|
||||||
|
zmq::own_t *zmq::session_base_t::create_connecter_ipc (io_thread_t *io_thread_,
|
||||||
|
bool wait_)
|
||||||
|
{
|
||||||
|
return new (std::nothrow)
|
||||||
|
ipc_connecter_t (io_thread_, this, options, _addr, wait_);
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
zmq::own_t *zmq::session_base_t::create_connecter_tcp (io_thread_t *io_thread_,
|
||||||
|
bool wait_)
|
||||||
|
{
|
||||||
|
if (!options.socks_proxy_address.empty ()) {
|
||||||
|
address_t *proxy_address = new (std::nothrow) address_t (
|
||||||
|
protocol_name::tcp, options.socks_proxy_address, this->get_ctx ());
|
||||||
|
alloc_assert (proxy_address);
|
||||||
|
return new (std::nothrow) socks_connecter_t (
|
||||||
|
io_thread_, this, options, _addr, proxy_address, wait_);
|
||||||
|
}
|
||||||
|
return new (std::nothrow)
|
||||||
|
tcp_connecter_t (io_thread_, this, options, _addr, wait_);
|
||||||
|
}
|
||||||
|
|
||||||
|
#ifdef ZMQ_HAVE_OPENPGM
|
||||||
|
void zmq::session_base_t::start_connecting_pgm (io_thread_t *io_thread_)
|
||||||
|
{
|
||||||
|
zmq_assert (options.type == ZMQ_PUB || options.type == ZMQ_XPUB
|
||||||
|
|| options.type == ZMQ_SUB || options.type == ZMQ_XSUB);
|
||||||
|
|
||||||
|
// For EPGM transport with UDP encapsulation of PGM is used.
|
||||||
|
bool const udp_encapsulation = _addr->protocol == "epgm";
|
||||||
|
|
||||||
|
// At this point we'll create message pipes to the session straight
|
||||||
|
// away. There's no point in delaying it as no concept of 'connect'
|
||||||
|
// exists with PGM anyway.
|
||||||
|
if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB) {
|
||||||
|
// PGM sender.
|
||||||
|
pgm_sender_t *pgm_sender =
|
||||||
|
new (std::nothrow) pgm_sender_t (io_thread_, options);
|
||||||
|
alloc_assert (pgm_sender);
|
||||||
|
|
||||||
|
int rc = pgm_sender->init (udp_encapsulation, _addr->address.c_str ());
|
||||||
|
errno_assert (rc == 0);
|
||||||
|
|
||||||
|
send_attach (this, pgm_sender);
|
||||||
|
} else {
|
||||||
|
// PGM receiver.
|
||||||
|
pgm_receiver_t *pgm_receiver =
|
||||||
|
new (std::nothrow) pgm_receiver_t (io_thread_, options);
|
||||||
|
alloc_assert (pgm_receiver);
|
||||||
|
|
||||||
|
int rc =
|
||||||
|
pgm_receiver->init (udp_encapsulation, _addr->address.c_str ());
|
||||||
|
errno_assert (rc == 0);
|
||||||
|
|
||||||
|
send_attach (this, pgm_receiver);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#ifdef ZMQ_HAVE_NORM
|
||||||
|
void zmq::session_base_t::start_connecting_norm (io_thread_t *io_thread_)
|
||||||
|
{
|
||||||
|
// At this point we'll create message pipes to the session straight
|
||||||
|
// away. There's no point in delaying it as no concept of 'connect'
|
||||||
|
// exists with NORM anyway.
|
||||||
|
if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB) {
|
||||||
|
// NORM sender.
|
||||||
|
norm_engine_t *norm_sender =
|
||||||
|
new (std::nothrow) norm_engine_t (io_thread_, options);
|
||||||
|
alloc_assert (norm_sender);
|
||||||
|
|
||||||
|
int rc = norm_sender->init (_addr->address.c_str (), true, false);
|
||||||
|
errno_assert (rc == 0);
|
||||||
|
|
||||||
|
send_attach (this, norm_sender);
|
||||||
|
} else { // ZMQ_SUB or ZMQ_XSUB
|
||||||
|
|
||||||
|
// NORM receiver.
|
||||||
|
norm_engine_t *norm_receiver =
|
||||||
|
new (std::nothrow) norm_engine_t (io_thread_, options);
|
||||||
|
alloc_assert (norm_receiver);
|
||||||
|
|
||||||
|
int rc = norm_receiver->init (_addr->address.c_str (), false, true);
|
||||||
|
errno_assert (rc == 0);
|
||||||
|
|
||||||
|
send_attach (this, norm_receiver);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
void zmq::session_base_t::start_connecting_udp (io_thread_t * /*io_thread_*/)
|
||||||
|
{
|
||||||
|
zmq_assert (options.type == ZMQ_DISH || options.type == ZMQ_RADIO
|
||||||
|
|| options.type == ZMQ_DGRAM);
|
||||||
|
|
||||||
|
udp_engine_t *engine = new (std::nothrow) udp_engine_t (options);
|
||||||
|
alloc_assert (engine);
|
||||||
|
|
||||||
|
bool recv = false;
|
||||||
|
bool send = false;
|
||||||
|
|
||||||
|
if (options.type == ZMQ_RADIO) {
|
||||||
|
send = true;
|
||||||
|
recv = false;
|
||||||
|
} else if (options.type == ZMQ_DISH) {
|
||||||
|
send = false;
|
||||||
|
recv = true;
|
||||||
|
} else if (options.type == ZMQ_DGRAM) {
|
||||||
|
send = true;
|
||||||
|
recv = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
const int rc = engine->init (_addr, send, recv);
|
||||||
|
errno_assert (rc == 0);
|
||||||
|
|
||||||
|
send_attach (this, engine);
|
||||||
|
}
|
||||||
|
@ -105,6 +105,33 @@ class session_base_t : public own_t, public io_object_t, public i_pipe_events
|
|||||||
private:
|
private:
|
||||||
void start_connecting (bool wait_);
|
void start_connecting (bool wait_);
|
||||||
|
|
||||||
|
typedef own_t *(session_base_t::*connecter_factory_fun_t) (
|
||||||
|
io_thread_t *io_thread, bool wait_);
|
||||||
|
typedef std::pair<const std::string, connecter_factory_fun_t>
|
||||||
|
connecter_factory_entry_t;
|
||||||
|
static connecter_factory_entry_t _connecter_factories[];
|
||||||
|
typedef std::map<std::string, connecter_factory_fun_t>
|
||||||
|
connecter_factory_map_t;
|
||||||
|
static connecter_factory_map_t _connecter_factories_map;
|
||||||
|
|
||||||
|
own_t *create_connecter_vmci (io_thread_t *io_thread_, bool wait_);
|
||||||
|
own_t *create_connecter_tipc (io_thread_t *io_thread_, bool wait_);
|
||||||
|
own_t *create_connecter_ipc (io_thread_t *io_thread_, bool wait_);
|
||||||
|
own_t *create_connecter_tcp (io_thread_t *io_thread_, bool wait_);
|
||||||
|
|
||||||
|
typedef void (session_base_t::*start_connecting_fun_t) (
|
||||||
|
io_thread_t *io_thread);
|
||||||
|
typedef std::pair<const std::string, start_connecting_fun_t>
|
||||||
|
start_connecting_entry_t;
|
||||||
|
static start_connecting_entry_t _start_connecting_entries[];
|
||||||
|
typedef std::map<std::string, start_connecting_fun_t>
|
||||||
|
start_connecting_map_t;
|
||||||
|
static start_connecting_map_t _start_connecting_map;
|
||||||
|
|
||||||
|
void start_connecting_pgm (io_thread_t *io_thread_);
|
||||||
|
void start_connecting_norm (io_thread_t *io_thread_);
|
||||||
|
void start_connecting_udp (io_thread_t *io_thread_);
|
||||||
|
|
||||||
void reconnect ();
|
void reconnect ();
|
||||||
|
|
||||||
// Handlers for incoming commands.
|
// Handlers for incoming commands.
|
||||||
|
Loading…
x
Reference in New Issue
Block a user