0
0
mirror of https://github.com/zeromq/libzmq.git synced 2025-01-21 15:12:03 +08:00
libzmq/src/pair.cpp

180 lines
3.8 KiB
C++
Raw Normal View History

2009-09-21 14:39:59 +02:00
/*
Copyright (c) 2007-2010 iMatix Corporation
2009-09-21 14:39:59 +02:00
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
2009-09-21 14:39:59 +02:00
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
2009-09-21 14:39:59 +02:00
You should have received a copy of the GNU Lesser General Public License
2009-09-21 14:39:59 +02:00
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "../include/zmq.h"
2009-09-21 14:39:59 +02:00
2010-04-26 16:51:05 +02:00
#include "pair.hpp"
2009-09-21 14:39:59 +02:00
#include "err.hpp"
2010-01-30 13:40:50 +01:00
#include "pipe.hpp"
2009-09-21 14:39:59 +02:00
zmq::pair_t::pair_t (class ctx_t *parent_, uint32_t tid_) :
socket_base_t (parent_, tid_),
2010-01-30 13:40:50 +01:00
inpipe (NULL),
outpipe (NULL),
inpipe_alive (false),
2010-08-11 14:09:56 +02:00
outpipe_alive (false),
terminating (false)
2009-09-21 14:39:59 +02:00
{
2010-09-28 15:27:45 +02:00
options.type = ZMQ_PAIR;
2009-09-21 17:20:13 +02:00
options.requires_in = true;
options.requires_out = true;
2009-09-21 14:39:59 +02:00
}
2010-04-26 16:51:05 +02:00
zmq::pair_t::~pair_t ()
2009-09-21 14:39:59 +02:00
{
zmq_assert (!inpipe);
zmq_assert (!outpipe);
2009-09-21 14:39:59 +02:00
}
void zmq::pair_t::xattach_pipes (reader_t *inpipe_, writer_t *outpipe_,
const blob_t &peer_identity_)
2009-09-21 14:39:59 +02:00
{
2010-01-30 13:40:50 +01:00
zmq_assert (!inpipe && !outpipe);
2010-01-30 13:40:50 +01:00
inpipe = inpipe_;
inpipe_alive = true;
inpipe->set_event_sink (this);
2010-01-30 13:40:50 +01:00
outpipe = outpipe_;
outpipe_alive = true;
outpipe->set_event_sink (this);
if (terminating) {
register_term_acks (2);
inpipe_->terminate ();
outpipe_->terminate ();
}
2009-09-21 14:39:59 +02:00
}
void zmq::pair_t::terminated (reader_t *pipe_)
2009-09-21 14:39:59 +02:00
{
2010-01-30 13:40:50 +01:00
zmq_assert (pipe_ == inpipe);
inpipe = NULL;
inpipe_alive = false;
2010-08-11 14:09:56 +02:00
if (terminating)
unregister_term_ack ();
2009-09-21 14:39:59 +02:00
}
void zmq::pair_t::terminated (writer_t *pipe_)
2009-09-21 14:39:59 +02:00
{
2010-01-30 13:40:50 +01:00
zmq_assert (pipe_ == outpipe);
outpipe = NULL;
outpipe_alive = false;
2009-09-21 14:39:59 +02:00
2010-08-11 14:09:56 +02:00
if (terminating)
unregister_term_ack ();
2009-09-21 14:39:59 +02:00
}
void zmq::pair_t::delimited (reader_t *pipe_)
{
}
void zmq::pair_t::process_term (int linger_)
2009-09-21 14:39:59 +02:00
{
2010-08-11 14:09:56 +02:00
terminating = true;
if (inpipe) {
register_term_acks (1);
inpipe->terminate ();
}
if (outpipe) {
register_term_acks (1);
outpipe->terminate ();
}
2010-08-11 14:09:56 +02:00
socket_base_t::process_term (linger_);
2009-09-21 14:39:59 +02:00
}
void zmq::pair_t::activated (class reader_t *pipe_)
{
zmq_assert (!inpipe_alive);
inpipe_alive = true;
}
void zmq::pair_t::activated (class writer_t *pipe_)
2009-09-21 14:39:59 +02:00
{
zmq_assert (!outpipe_alive);
outpipe_alive = true;
2009-09-21 14:39:59 +02:00
}
2010-04-26 16:51:05 +02:00
int zmq::pair_t::xsend (zmq_msg_t *msg_, int flags_)
2009-09-21 14:39:59 +02:00
{
if (outpipe == NULL || !outpipe_alive) {
errno = EAGAIN;
return -1;
}
if (!outpipe->write (msg_)) {
outpipe_alive = false;
2010-01-30 13:40:50 +01:00
errno = EAGAIN;
return -1;
}
if (!(flags_ & ZMQ_SNDMORE))
outpipe->flush ();
// Detach the original message from the data buffer.
int rc = zmq_msg_init (msg_);
zmq_assert (rc == 0);
2009-09-22 08:30:15 +02:00
return 0;
2009-09-21 14:39:59 +02:00
}
2010-04-26 16:51:05 +02:00
int zmq::pair_t::xrecv (zmq_msg_t *msg_, int flags_)
2009-09-21 14:39:59 +02:00
{
2010-01-30 13:40:50 +01:00
// Deallocate old content of the message.
zmq_msg_close (msg_);
if (!inpipe_alive || !inpipe || !inpipe->read (msg_)) {
// No message is available.
inpipe_alive = false;
// Initialise the output parameter to be a 0-byte message.
2010-06-19 19:46:35 +02:00
zmq_msg_init (msg_);
2010-01-30 13:40:50 +01:00
errno = EAGAIN;
return -1;
}
2009-09-22 08:30:15 +02:00
return 0;
2009-09-21 14:39:59 +02:00
}
2010-04-26 16:51:05 +02:00
bool zmq::pair_t::xhas_in ()
{
if (!inpipe || !inpipe_alive)
return false;
inpipe_alive = inpipe->check_read ();
return inpipe_alive;
}
2010-04-26 16:51:05 +02:00
bool zmq::pair_t::xhas_out ()
{
if (!outpipe || !outpipe_alive)
return false;
zmq_msg_t msg;
zmq_msg_init (&msg);
outpipe_alive = outpipe->check_write (&msg);
zmq_msg_close (&msg);
return outpipe_alive;
}
2009-09-21 14:39:59 +02:00