0
0
mirror of https://github.com/zeromq/libzmq.git synced 2025-01-14 09:47:56 +08:00

zmq_queue implementation added

This commit is contained in:
Martin Sustrik 2010-03-13 08:59:46 +01:00
parent 22db38bf3d
commit dcb983699e

View File

@ -20,6 +20,113 @@
#include "../../include/zmq.hpp"
#include "../../foreign/xmlParser/xmlParser.cpp"
class queue
{
public:
queue (zmq::socket_t& reply, zmq::socket_t& request) :
xrep (reply),
xreq (request)
{
items [0].socket = reply;
items [0].fd = 0;
items [0].events = ZMQ_POLLIN;
items [0].revents = 0;
items [1].socket = request;
items [1].fd = 0;
items [1].events = ZMQ_POLLIN;
items [1].revents = 0;
m_next_request_method = &queue::get_request;
m_next_response_method = &queue::get_response;
}
void run()
{
while (true) {
int rc = zmq::poll (&items [0], 2, -1);
if (rc < 0)
break;
next_request();
next_response();
}
}
private:
void next_request()
{
(this->*m_next_request_method) ();
}
void next_response()
{
(this->*m_next_response_method) ();
}
void get_request()
{
if (items [0].revents & ZMQ_POLLIN ) {
int rc = xrep.recv (&request_msg, ZMQ_NOBLOCK);
if (!rc)
return;
items [0].events &= ~ZMQ_POLLIN;
items [1].events |= ZMQ_POLLOUT;
m_next_request_method = &queue::send_request;
}
}
void send_request()
{
if (items [1].revents & ZMQ_POLLOUT) {
int rc = xreq.send (request_msg, ZMQ_NOBLOCK);
if (!rc) return;
items [1].events &= ~ZMQ_POLLOUT;
items [0].events |= ZMQ_POLLIN;
m_next_request_method = &queue::get_request;
}
}
void get_response()
{
if ( items [1].revents & ZMQ_POLLIN ) {
int rc = xreq.recv (&response_msg, ZMQ_NOBLOCK);
if (!rc)
return;
items [1].events &= ~ZMQ_POLLIN;
items [0].events |= ZMQ_POLLOUT;
m_next_response_method = &queue::send_response;
}
}
void send_response()
{
if (items [0].revents & ZMQ_POLLOUT) {
int rc = xrep.send (response_msg, ZMQ_NOBLOCK);
if (!rc)
return;
items [0].events &= ~ZMQ_POLLOUT;
items [1].events |= ZMQ_POLLIN;
m_next_response_method = &queue::get_response;
}
}
zmq::socket_t & xrep;
zmq::socket_t & xreq;
zmq_pollitem_t items [2];
zmq::message_t request_msg;
zmq::message_t response_msg;
typedef void (queue::*next_method) ();
next_method m_next_request_method;
next_method m_next_response_method;
queue (queue const &);
void operator = (queue const &);
};
int main (int argc, char *argv [])
{
if (argc != 2) {
@ -112,11 +219,8 @@ int main (int argc, char *argv [])
n++;
}
zmq::message_t msg;
while (true) {
in_socket.recv (&msg);
out_socket.send (msg);
}
queue q(in_socket, out_socket);
q.run();
return 0;
}