diff --git a/src/xreq.cpp b/src/dealer.cpp similarity index 73% rename from src/xreq.cpp rename to src/dealer.cpp index b116610d..85efd746 100644 --- a/src/xreq.cpp +++ b/src/dealer.cpp @@ -19,17 +19,17 @@ along with this program. If not, see . */ -#include "xreq.hpp" +#include "dealer.hpp" #include "err.hpp" #include "msg.hpp" -zmq::xreq_t::xreq_t (class ctx_t *parent_, uint32_t tid_, int sid_) : +zmq::dealer_t::dealer_t (class ctx_t *parent_, uint32_t tid_, int sid_) : socket_base_t (parent_, tid_, sid_), prefetched (false) { - options.type = ZMQ_XREQ; + options.type = ZMQ_DEALER; - // TODO: Uncomment the following line when XREQ will become true XREQ + // TODO: Uncomment the following line when DEALER will become true DEALER // rather than generic dealer socket. // If the socket is closing we can drop all the outbound requests. There'll // be noone to receive the replies anyway. @@ -41,24 +41,24 @@ zmq::xreq_t::xreq_t (class ctx_t *parent_, uint32_t tid_, int sid_) : prefetched_msg.init (); } -zmq::xreq_t::~xreq_t () +zmq::dealer_t::~dealer_t () { prefetched_msg.close (); } -void zmq::xreq_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_) +void zmq::dealer_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_) { zmq_assert (pipe_); fq.attach (pipe_); lb.attach (pipe_); } -int zmq::xreq_t::xsend (msg_t *msg_, int flags_) +int zmq::dealer_t::xsend (msg_t *msg_, int flags_) { return lb.send (msg_, flags_); } -int zmq::xreq_t::xrecv (msg_t *msg_, int flags_) +int zmq::dealer_t::xrecv (msg_t *msg_, int flags_) { // If there is a prefetched message, return it. if (prefetched) { @@ -68,7 +68,7 @@ int zmq::xreq_t::xrecv (msg_t *msg_, int flags_) return 0; } - // XREQ socket doesn't use identities. We can safely drop it and + // DEALER socket doesn't use identities. We can safely drop it and while (true) { int rc = fq.recv (msg_, flags_); if (rc != 0) @@ -79,14 +79,14 @@ int zmq::xreq_t::xrecv (msg_t *msg_, int flags_) return 0; } -bool zmq::xreq_t::xhas_in () +bool zmq::dealer_t::xhas_in () { // We may already have a message pre-fetched. if (prefetched) return true; // Try to read the next message to the pre-fetch buffer. - int rc = xreq_t::xrecv (&prefetched_msg, ZMQ_DONTWAIT); + int rc = dealer_t::xrecv (&prefetched_msg, ZMQ_DONTWAIT); if (rc != 0 && errno == EAGAIN) return false; zmq_assert (rc == 0); @@ -94,35 +94,35 @@ bool zmq::xreq_t::xhas_in () return true; } -bool zmq::xreq_t::xhas_out () +bool zmq::dealer_t::xhas_out () { return lb.has_out (); } -void zmq::xreq_t::xread_activated (pipe_t *pipe_) +void zmq::dealer_t::xread_activated (pipe_t *pipe_) { fq.activated (pipe_); } -void zmq::xreq_t::xwrite_activated (pipe_t *pipe_) +void zmq::dealer_t::xwrite_activated (pipe_t *pipe_) { lb.activated (pipe_); } -void zmq::xreq_t::xterminated (pipe_t *pipe_) +void zmq::dealer_t::xterminated (pipe_t *pipe_) { fq.terminated (pipe_); lb.terminated (pipe_); } -zmq::xreq_session_t::xreq_session_t (io_thread_t *io_thread_, bool connect_, +zmq::dealer_session_t::dealer_session_t (io_thread_t *io_thread_, bool connect_, socket_base_t *socket_, const options_t &options_, const address_t *addr_) : session_base_t (io_thread_, connect_, socket_, options_, addr_) { } -zmq::xreq_session_t::~xreq_session_t () +zmq::dealer_session_t::~dealer_session_t () { } diff --git a/src/xreq.hpp b/src/dealer.hpp similarity index 78% rename from src/xreq.hpp rename to src/dealer.hpp index d7ddde06..fae3a632 100644 --- a/src/xreq.hpp +++ b/src/dealer.hpp @@ -18,8 +18,8 @@ along with this program. If not, see . */ -#ifndef __ZMQ_XREQ_HPP_INCLUDED__ -#define __ZMQ_XREQ_HPP_INCLUDED__ +#ifndef __ZMQ_DEALER_HPP_INCLUDED__ +#define __ZMQ_DEALER_HPP_INCLUDED__ #include "socket_base.hpp" #include "session_base.hpp" @@ -35,13 +35,13 @@ namespace zmq class io_thread_t; class socket_base_t; - class xreq_t : + class dealer_t : public socket_base_t { public: - xreq_t (zmq::ctx_t *parent_, uint32_t tid_, int sid); - ~xreq_t (); + dealer_t (zmq::ctx_t *parent_, uint32_t tid_, int sid); + ~dealer_t (); protected: @@ -68,23 +68,23 @@ namespace zmq // Holds the prefetched message. msg_t prefetched_msg; - xreq_t (const xreq_t&); - const xreq_t &operator = (const xreq_t&); + dealer_t (const dealer_t&); + const dealer_t &operator = (const dealer_t&); }; - class xreq_session_t : public session_base_t + class dealer_session_t : public session_base_t { public: - xreq_session_t (zmq::io_thread_t *io_thread_, bool connect_, + dealer_session_t (zmq::io_thread_t *io_thread_, bool connect_, zmq::socket_base_t *socket_, const options_t &options_, const address_t *addr_); - ~xreq_session_t (); + ~dealer_session_t (); private: - xreq_session_t (const xreq_session_t&); - const xreq_session_t &operator = (const xreq_session_t&); + dealer_session_t (const dealer_session_t&); + const dealer_session_t &operator = (const dealer_session_t&); }; } diff --git a/src/rep.cpp b/src/rep.cpp index bb3ca039..affa6ab9 100644 --- a/src/rep.cpp +++ b/src/rep.cpp @@ -24,7 +24,7 @@ #include "msg.hpp" zmq::rep_t::rep_t (class ctx_t *parent_, uint32_t tid_, int sid_) : - xrep_t (parent_, tid_, sid_), + router_t (parent_, tid_, sid_), sending_reply (false), request_begins (true) { @@ -46,7 +46,7 @@ int zmq::rep_t::xsend (msg_t *msg_, int flags_) bool more = msg_->flags () & msg_t::more ? true : false; // Push message to the reply pipe. - int rc = xrep_t::xsend (msg_, flags_); + int rc = router_t::xsend (msg_, flags_); if (rc != 0) return rc; @@ -69,12 +69,12 @@ int zmq::rep_t::xrecv (msg_t *msg_, int flags_) // to the reply pipe. if (request_begins) { while (true) { - int rc = xrep_t::xrecv (msg_, flags_); + int rc = router_t::xrecv (msg_, flags_); if (rc != 0) return rc; zmq_assert (msg_->flags () & msg_t::more); bool bottom = (msg_->size () == 0); - rc = xrep_t::xsend (msg_, flags_); + rc = router_t::xsend (msg_, flags_); errno_assert (rc == 0); if (bottom) break; @@ -83,7 +83,7 @@ int zmq::rep_t::xrecv (msg_t *msg_, int flags_) } // Get next message part to return to the user. - int rc = xrep_t::xrecv (msg_, flags_); + int rc = router_t::xrecv (msg_, flags_); if (rc != 0) return rc; @@ -101,7 +101,7 @@ bool zmq::rep_t::xhas_in () if (sending_reply) return false; - return xrep_t::xhas_in (); + return router_t::xhas_in (); } bool zmq::rep_t::xhas_out () @@ -109,13 +109,13 @@ bool zmq::rep_t::xhas_out () if (!sending_reply) return false; - return xrep_t::xhas_out (); + return router_t::xhas_out (); } zmq::rep_session_t::rep_session_t (io_thread_t *io_thread_, bool connect_, socket_base_t *socket_, const options_t &options_, const address_t *addr_) : - xrep_session_t (io_thread_, connect_, socket_, options_, addr_) + router_session_t (io_thread_, connect_, socket_, options_, addr_) { } diff --git a/src/rep.hpp b/src/rep.hpp index a92ee6aa..5c6eca75 100644 --- a/src/rep.hpp +++ b/src/rep.hpp @@ -22,7 +22,7 @@ #ifndef __ZMQ_REP_HPP_INCLUDED__ #define __ZMQ_REP_HPP_INCLUDED__ -#include "xrep.hpp" +#include "router.hpp" namespace zmq { @@ -32,7 +32,7 @@ namespace zmq class io_thread_t; class socket_base_t; - class rep_t : public xrep_t + class rep_t : public router_t { public: @@ -60,7 +60,7 @@ namespace zmq }; - class rep_session_t : public xrep_session_t + class rep_session_t : public router_session_t { public: diff --git a/src/req.cpp b/src/req.cpp index f7bf11fa..2ad9dbf8 100644 --- a/src/req.cpp +++ b/src/req.cpp @@ -28,7 +28,7 @@ #include "likely.hpp" zmq::req_t::req_t (class ctx_t *parent_, uint32_t tid_, int sid_) : - xreq_t (parent_, tid_, sid_), + dealer_t (parent_, tid_, sid_), receiving_reply (false), message_begins (true) { @@ -54,7 +54,7 @@ int zmq::req_t::xsend (msg_t *msg_, int flags_) int rc = bottom.init (); errno_assert (rc == 0); bottom.set_flags (msg_t::more); - rc = xreq_t::xsend (&bottom, 0); + rc = dealer_t::xsend (&bottom, 0); if (rc != 0) return -1; message_begins = false; @@ -62,7 +62,7 @@ int zmq::req_t::xsend (msg_t *msg_, int flags_) bool more = msg_->flags () & msg_t::more ? true : false; - int rc = xreq_t::xsend (msg_, flags_); + int rc = dealer_t::xsend (msg_, flags_); if (rc != 0) return rc; @@ -85,14 +85,14 @@ int zmq::req_t::xrecv (msg_t *msg_, int flags_) // First part of the reply should be the original request ID. if (message_begins) { - int rc = xreq_t::xrecv (msg_, flags_); + int rc = dealer_t::xrecv (msg_, flags_); if (rc != 0) return rc; // TODO: This should also close the connection with the peer! if (unlikely (!(msg_->flags () & msg_t::more) || msg_->size () != 0)) { while (true) { - int rc = xreq_t::xrecv (msg_, flags_); + int rc = dealer_t::xrecv (msg_, flags_); errno_assert (rc == 0); if (!(msg_->flags () & msg_t::more)) break; @@ -106,7 +106,7 @@ int zmq::req_t::xrecv (msg_t *msg_, int flags_) message_begins = false; } - int rc = xreq_t::xrecv (msg_, flags_); + int rc = dealer_t::xrecv (msg_, flags_); if (rc != 0) return rc; @@ -126,7 +126,7 @@ bool zmq::req_t::xhas_in () if (!receiving_reply) return false; - return xreq_t::xhas_in (); + return dealer_t::xhas_in (); } bool zmq::req_t::xhas_out () @@ -134,13 +134,13 @@ bool zmq::req_t::xhas_out () if (receiving_reply) return false; - return xreq_t::xhas_out (); + return dealer_t::xhas_out (); } zmq::req_session_t::req_session_t (io_thread_t *io_thread_, bool connect_, socket_base_t *socket_, const options_t &options_, const address_t *addr_) : - xreq_session_t (io_thread_, connect_, socket_, options_, addr_), + dealer_session_t (io_thread_, connect_, socket_, options_, addr_), state (identity) { } @@ -156,21 +156,21 @@ int zmq::req_session_t::write (msg_t *msg_) case bottom: if (msg_->flags () == msg_t::more && msg_->size () == 0) { state = body; - return xreq_session_t::write (msg_); + return dealer_session_t::write (msg_); } break; case body: if (msg_->flags () == msg_t::more) - return xreq_session_t::write (msg_); + return dealer_session_t::write (msg_); if (msg_->flags () == 0) { state = bottom; - return xreq_session_t::write (msg_); + return dealer_session_t::write (msg_); } break; case identity: if (msg_->flags () == 0) { state = bottom; - return xreq_session_t::write (msg_); + return dealer_session_t::write (msg_); } break; } diff --git a/src/req.hpp b/src/req.hpp index 22c3c089..99dbd171 100644 --- a/src/req.hpp +++ b/src/req.hpp @@ -23,7 +23,7 @@ #ifndef __ZMQ_REQ_HPP_INCLUDED__ #define __ZMQ_REQ_HPP_INCLUDED__ -#include "xreq.hpp" +#include "dealer.hpp" #include "stdint.hpp" namespace zmq @@ -34,7 +34,7 @@ namespace zmq class io_thread_t; class socket_base_t; - class req_t : public xreq_t + class req_t : public dealer_t { public: @@ -61,7 +61,7 @@ namespace zmq const req_t &operator = (const req_t&); }; - class req_session_t : public xreq_session_t + class req_session_t : public dealer_session_t { public: diff --git a/src/xrep.cpp b/src/router.cpp similarity index 90% rename from src/xrep.cpp rename to src/router.cpp index a45c51e7..26ac771a 100644 --- a/src/xrep.cpp +++ b/src/router.cpp @@ -20,14 +20,14 @@ along with this program. If not, see . */ -#include "xrep.hpp" +#include "router.hpp" #include "pipe.hpp" #include "wire.hpp" #include "random.hpp" #include "likely.hpp" #include "err.hpp" -zmq::xrep_t::xrep_t (class ctx_t *parent_, uint32_t tid_, int sid_) : +zmq::router_t::router_t (class ctx_t *parent_, uint32_t tid_, int sid_) : socket_base_t (parent_, tid_, sid_), prefetched (0), more_in (false), @@ -36,9 +36,9 @@ zmq::xrep_t::xrep_t (class ctx_t *parent_, uint32_t tid_, int sid_) : next_peer_id (generate_random ()), fail_unroutable(false) { - options.type = ZMQ_XREP; + options.type = ZMQ_ROUTER; - // TODO: Uncomment the following line when XREP will become true XREP + // TODO: Uncomment the following line when ROUTER will become true ROUTER // rather than generic router socket. // If peer disconnect there's noone to send reply to anyway. We can drop // all the outstanding requests from that peer. @@ -50,13 +50,13 @@ zmq::xrep_t::xrep_t (class ctx_t *parent_, uint32_t tid_, int sid_) : prefetched_msg.init (); } -zmq::xrep_t::~xrep_t () +zmq::router_t::~router_t () { zmq_assert (outpipes.empty ()); prefetched_msg.close (); } -void zmq::xrep_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_) +void zmq::router_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_) { zmq_assert (pipe_); @@ -78,7 +78,7 @@ void zmq::xrep_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_) fq.attach (pipe_); } -int zmq::xrep_t::xsetsockopt (int option_, const void *optval_, +int zmq::router_t::xsetsockopt (int option_, const void *optval_, size_t optvallen_) { if (option_ != ZMQ_FAIL_UNROUTABLE) { @@ -93,7 +93,7 @@ int zmq::xrep_t::xsetsockopt (int option_, const void *optval_, return 0; } -void zmq::xrep_t::xterminated (pipe_t *pipe_) +void zmq::router_t::xterminated (pipe_t *pipe_) { fq.terminated (pipe_); @@ -109,12 +109,12 @@ void zmq::xrep_t::xterminated (pipe_t *pipe_) zmq_assert (false); } -void zmq::xrep_t::xread_activated (pipe_t *pipe_) +void zmq::router_t::xread_activated (pipe_t *pipe_) { fq.activated (pipe_); } -void zmq::xrep_t::xwrite_activated (pipe_t *pipe_) +void zmq::router_t::xwrite_activated (pipe_t *pipe_) { for (outpipes_t::iterator it = outpipes.begin (); it != outpipes.end (); ++it) { @@ -127,7 +127,7 @@ void zmq::xrep_t::xwrite_activated (pipe_t *pipe_) zmq_assert (false); } -int zmq::xrep_t::xsend (msg_t *msg_, int flags_) +int zmq::router_t::xsend (msg_t *msg_, int flags_) { // If this is the first part of the message it's the ID of the // peer to send the message to. @@ -200,7 +200,7 @@ int zmq::xrep_t::xsend (msg_t *msg_, int flags_) return 0; } -int zmq::xrep_t::xrecv (msg_t *msg_, int flags_) +int zmq::router_t::xrecv (msg_t *msg_, int flags_) { // if there is a prefetched identity, return it. if (prefetched == 2) @@ -280,7 +280,7 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_) return 0; } -int zmq::xrep_t::rollback (void) +int zmq::router_t::rollback (void) { if (current_out) { current_out->rollback (); @@ -290,7 +290,7 @@ int zmq::xrep_t::rollback (void) return 0; } -bool zmq::xrep_t::xhas_in () +bool zmq::router_t::xhas_in () { // If we are in the middle of reading the messages, there are // definitely more parts available. @@ -305,7 +305,7 @@ bool zmq::xrep_t::xhas_in () // it will be identity of the peer sending the message. msg_t id; id.init (); - int rc = xrep_t::xrecv (&id, ZMQ_DONTWAIT); + int rc = router_t::xrecv (&id, ZMQ_DONTWAIT); if (rc != 0 && errno == EAGAIN) { id.close (); return false; @@ -321,22 +321,22 @@ bool zmq::xrep_t::xhas_in () return true; } -bool zmq::xrep_t::xhas_out () +bool zmq::router_t::xhas_out () { - // In theory, XREP socket is always ready for writing. Whether actual + // In theory, ROUTER socket is always ready for writing. Whether actual // attempt to write succeeds depends on whitch pipe the message is going // to be routed to. return true; } -zmq::xrep_session_t::xrep_session_t (io_thread_t *io_thread_, bool connect_, +zmq::router_session_t::router_session_t (io_thread_t *io_thread_, bool connect_, socket_base_t *socket_, const options_t &options_, const address_t *addr_) : session_base_t (io_thread_, connect_, socket_, options_, addr_) { } -zmq::xrep_session_t::~xrep_session_t () +zmq::router_session_t::~router_session_t () { } diff --git a/src/xrep.hpp b/src/router.hpp similarity index 85% rename from src/xrep.hpp rename to src/router.hpp index d9355811..2e18176a 100644 --- a/src/xrep.hpp +++ b/src/router.hpp @@ -20,8 +20,8 @@ along with this program. If not, see . */ -#ifndef __ZMQ_XREP_HPP_INCLUDED__ -#define __ZMQ_XREP_HPP_INCLUDED__ +#ifndef __ZMQ_ROUTER_HPP_INCLUDED__ +#define __ZMQ_ROUTER_HPP_INCLUDED__ #include @@ -39,13 +39,13 @@ namespace zmq class pipe_t; // TODO: This class uses O(n) scheduling. Rewrite it to use O(1) algorithm. - class xrep_t : + class router_t : public socket_base_t { public: - xrep_t (zmq::ctx_t *parent_, uint32_t tid_, int sid); - ~xrep_t (); + router_t (zmq::ctx_t *parent_, uint32_t tid_, int sid); + ~router_t (); // Overloads of functions from socket_base_t. void xattach_pipe (zmq::pipe_t *pipe_, bool icanhasall_); @@ -104,23 +104,23 @@ namespace zmq // If true, fail on unroutable messages instead of silently dropping them. bool fail_unroutable; - xrep_t (const xrep_t&); - const xrep_t &operator = (const xrep_t&); + router_t (const router_t&); + const router_t &operator = (const router_t&); }; - class xrep_session_t : public session_base_t + class router_session_t : public session_base_t { public: - xrep_session_t (zmq::io_thread_t *io_thread_, bool connect_, + router_session_t (zmq::io_thread_t *io_thread_, bool connect_, socket_base_t *socket_, const options_t &options_, const address_t *addr_); - ~xrep_session_t (); + ~router_session_t (); private: - xrep_session_t (const xrep_session_t&); - const xrep_session_t &operator = (const xrep_session_t&); + router_session_t (const router_session_t&); + const router_session_t &operator = (const router_session_t&); }; } diff --git a/src/session_base.cpp b/src/session_base.cpp index 9c0d9e66..c151f73f 100644 --- a/src/session_base.cpp +++ b/src/session_base.cpp @@ -33,9 +33,9 @@ #include "address.hpp" #include "req.hpp" -#include "xreq.hpp" +#include "dealer.hpp" #include "rep.hpp" -#include "xrep.hpp" +#include "router.hpp" #include "pub.hpp" #include "xpub.hpp" #include "sub.hpp" @@ -54,15 +54,15 @@ zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_, s = new (std::nothrow) req_session_t (io_thread_, connect_, socket_, options_, addr_); break; - case ZMQ_XREQ: - s = new (std::nothrow) xreq_session_t (io_thread_, connect_, + case ZMQ_DEALER: + s = new (std::nothrow) dealer_session_t (io_thread_, connect_, socket_, options_, addr_); case ZMQ_REP: s = new (std::nothrow) rep_session_t (io_thread_, connect_, socket_, options_, addr_); break; - case ZMQ_XREP: - s = new (std::nothrow) xrep_session_t (io_thread_, connect_, + case ZMQ_ROUTER: + s = new (std::nothrow) router_session_t (io_thread_, connect_, socket_, options_, addr_); break; case ZMQ_PUB: diff --git a/src/socket_base.cpp b/src/socket_base.cpp index db0cf5a9..47bd7f3a 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -64,8 +64,8 @@ #include "rep.hpp" #include "pull.hpp" #include "push.hpp" -#include "xreq.hpp" -#include "xrep.hpp" +#include "dealer.hpp" +#include "router.hpp" #include "xpub.hpp" #include "xsub.hpp" @@ -95,11 +95,11 @@ zmq::socket_base_t *zmq::socket_base_t::create (int type_, class ctx_t *parent_, case ZMQ_REP: s = new (std::nothrow) rep_t (parent_, tid_, sid_); break; - case ZMQ_XREQ: - s = new (std::nothrow) xreq_t (parent_, tid_, sid_); + case ZMQ_DEALER: + s = new (std::nothrow) dealer_t (parent_, tid_, sid_); break; - case ZMQ_XREP: - s = new (std::nothrow) xrep_t (parent_, tid_, sid_); + case ZMQ_ROUTER: + s = new (std::nothrow) router_t (parent_, tid_, sid_); break; case ZMQ_PULL: s = new (std::nothrow) pull_t (parent_, tid_, sid_);