mirror of
https://github.com/zeromq/libzmq.git
synced 2025-01-01 19:05:18 +08:00
commit
282765cab4
@ -43,8 +43,6 @@ zmq::options_t::options_t () :
|
|||||||
sndtimeo (-1),
|
sndtimeo (-1),
|
||||||
ipv6 (0),
|
ipv6 (0),
|
||||||
immediate (0),
|
immediate (0),
|
||||||
delay_on_close (true),
|
|
||||||
delay_on_disconnect (true),
|
|
||||||
filter (false),
|
filter (false),
|
||||||
recv_identity (false),
|
recv_identity (false),
|
||||||
raw_sock (false),
|
raw_sock (false),
|
||||||
|
@ -97,14 +97,6 @@ namespace zmq
|
|||||||
// on a socket with only connecting pipes would block
|
// on a socket with only connecting pipes would block
|
||||||
int immediate;
|
int immediate;
|
||||||
|
|
||||||
// If true, session reads all the pending messages from the pipe and
|
|
||||||
// sends them to the network when socket is closed.
|
|
||||||
bool delay_on_close;
|
|
||||||
|
|
||||||
// If true, socket reads all the messages from the pipe and delivers
|
|
||||||
// them to the user when the peer terminates.
|
|
||||||
bool delay_on_disconnect;
|
|
||||||
|
|
||||||
// If 1, (X)SUB socket should filter the messages. If 0, it should not.
|
// If 1, (X)SUB socket should filter the messages. If 0, it should not.
|
||||||
bool filter;
|
bool filter;
|
||||||
|
|
||||||
|
15
src/pipe.cpp
15
src/pipe.cpp
@ -27,7 +27,7 @@
|
|||||||
#include "ypipe_conflate.hpp"
|
#include "ypipe_conflate.hpp"
|
||||||
|
|
||||||
int zmq::pipepair (class object_t *parents_ [2], class pipe_t* pipes_ [2],
|
int zmq::pipepair (class object_t *parents_ [2], class pipe_t* pipes_ [2],
|
||||||
int hwms_ [2], bool delays_ [2], bool conflate_ [2])
|
int hwms_ [2], bool conflate_ [2])
|
||||||
{
|
{
|
||||||
// Creates two pipe objects. These objects are connected by two ypipes,
|
// Creates two pipe objects. These objects are connected by two ypipes,
|
||||||
// each to pass messages in one direction.
|
// each to pass messages in one direction.
|
||||||
@ -50,10 +50,10 @@ int zmq::pipepair (class object_t *parents_ [2], class pipe_t* pipes_ [2],
|
|||||||
alloc_assert (upipe2);
|
alloc_assert (upipe2);
|
||||||
|
|
||||||
pipes_ [0] = new (std::nothrow) pipe_t (parents_ [0], upipe1, upipe2,
|
pipes_ [0] = new (std::nothrow) pipe_t (parents_ [0], upipe1, upipe2,
|
||||||
hwms_ [1], hwms_ [0], delays_ [0], conflate_ [0]);
|
hwms_ [1], hwms_ [0], conflate_ [0]);
|
||||||
alloc_assert (pipes_ [0]);
|
alloc_assert (pipes_ [0]);
|
||||||
pipes_ [1] = new (std::nothrow) pipe_t (parents_ [1], upipe2, upipe1,
|
pipes_ [1] = new (std::nothrow) pipe_t (parents_ [1], upipe2, upipe1,
|
||||||
hwms_ [0], hwms_ [1], delays_ [1], conflate_ [1]);
|
hwms_ [0], hwms_ [1], conflate_ [1]);
|
||||||
alloc_assert (pipes_ [1]);
|
alloc_assert (pipes_ [1]);
|
||||||
|
|
||||||
pipes_ [0]->set_peer (pipes_ [1]);
|
pipes_ [0]->set_peer (pipes_ [1]);
|
||||||
@ -63,7 +63,7 @@ int zmq::pipepair (class object_t *parents_ [2], class pipe_t* pipes_ [2],
|
|||||||
}
|
}
|
||||||
|
|
||||||
zmq::pipe_t::pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_,
|
zmq::pipe_t::pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_,
|
||||||
int inhwm_, int outhwm_, bool delay_, bool conflate_) :
|
int inhwm_, int outhwm_, bool conflate_) :
|
||||||
object_t (parent_),
|
object_t (parent_),
|
||||||
inpipe (inpipe_),
|
inpipe (inpipe_),
|
||||||
outpipe (outpipe_),
|
outpipe (outpipe_),
|
||||||
@ -77,7 +77,7 @@ zmq::pipe_t::pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_,
|
|||||||
peer (NULL),
|
peer (NULL),
|
||||||
sink (NULL),
|
sink (NULL),
|
||||||
state (active),
|
state (active),
|
||||||
delay (delay_),
|
delay (true),
|
||||||
conflate (conflate_)
|
conflate (conflate_)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
@ -334,6 +334,11 @@ void zmq::pipe_t::process_pipe_term_ack ()
|
|||||||
delete this;
|
delete this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void zmq::pipe_t::set_nodelay ()
|
||||||
|
{
|
||||||
|
this->delay = false;
|
||||||
|
}
|
||||||
|
|
||||||
void zmq::pipe_t::terminate (bool delay_)
|
void zmq::pipe_t::terminate (bool delay_)
|
||||||
{
|
{
|
||||||
// Overload the value specified at pipe creation.
|
// Overload the value specified at pipe creation.
|
||||||
|
12
src/pipe.hpp
12
src/pipe.hpp
@ -43,7 +43,7 @@ namespace zmq
|
|||||||
// If conflate is true, only the most recently arrived message could be
|
// If conflate is true, only the most recently arrived message could be
|
||||||
// read (older messages are discarded)
|
// read (older messages are discarded)
|
||||||
int pipepair (zmq::object_t *parents_ [2], zmq::pipe_t* pipes_ [2],
|
int pipepair (zmq::object_t *parents_ [2], zmq::pipe_t* pipes_ [2],
|
||||||
int hwms_ [2], bool delays_ [2], bool conflate_ [2]);
|
int hwms_ [2], bool conflate_ [2]);
|
||||||
|
|
||||||
struct i_pipe_events
|
struct i_pipe_events
|
||||||
{
|
{
|
||||||
@ -66,9 +66,8 @@ namespace zmq
|
|||||||
public array_item_t <3>
|
public array_item_t <3>
|
||||||
{
|
{
|
||||||
// This allows pipepair to create pipe objects.
|
// This allows pipepair to create pipe objects.
|
||||||
friend int pipepair (zmq::object_t *parents_ [2],
|
friend int pipepair (zmq::object_t *parents_ [2], zmq::pipe_t* pipes_ [2],
|
||||||
zmq::pipe_t* pipes_ [2], int hwms_ [2], bool delays_ [2],
|
int hwms_ [2], bool conflate_ [2]);
|
||||||
bool conflate_ [2]);
|
|
||||||
|
|
||||||
public:
|
public:
|
||||||
|
|
||||||
@ -104,6 +103,9 @@ namespace zmq
|
|||||||
// in the peer.
|
// in the peer.
|
||||||
void hiccup ();
|
void hiccup ();
|
||||||
|
|
||||||
|
// Ensure the pipe wont block on receiving pipe_term.
|
||||||
|
void set_nodelay ();
|
||||||
|
|
||||||
// Ask pipe to terminate. The termination will happen asynchronously
|
// Ask pipe to terminate. The termination will happen asynchronously
|
||||||
// and user will be notified about actual deallocation by 'terminated'
|
// and user will be notified about actual deallocation by 'terminated'
|
||||||
// event. If delay is true, the pending messages will be processed
|
// event. If delay is true, the pending messages will be processed
|
||||||
@ -128,7 +130,7 @@ namespace zmq
|
|||||||
// Constructor is private. Pipe can only be created using
|
// Constructor is private. Pipe can only be created using
|
||||||
// pipepair function.
|
// pipepair function.
|
||||||
pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_,
|
pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_,
|
||||||
int inhwm_, int outhwm_, bool delay_, bool conflate_);
|
int inhwm_, int outhwm_, bool conflate_);
|
||||||
|
|
||||||
// Pipepair uses this function to let us know about
|
// Pipepair uses this function to let us know about
|
||||||
// the peer pipe object.
|
// the peer pipe object.
|
||||||
|
@ -299,15 +299,16 @@ int zmq::session_base_t::zap_connect ()
|
|||||||
object_t *parents [2] = {this, peer.socket};
|
object_t *parents [2] = {this, peer.socket};
|
||||||
pipe_t *new_pipes [2] = {NULL, NULL};
|
pipe_t *new_pipes [2] = {NULL, NULL};
|
||||||
int hwms [2] = {0, 0};
|
int hwms [2] = {0, 0};
|
||||||
bool delays [2] = {false, false};
|
|
||||||
bool conflates [2] = {false, false};
|
bool conflates [2] = {false, false};
|
||||||
int rc = pipepair (parents, new_pipes, hwms, delays, conflates);
|
int rc = pipepair (parents, new_pipes, hwms, conflates);
|
||||||
errno_assert (rc == 0);
|
errno_assert (rc == 0);
|
||||||
|
|
||||||
// Attach local end of the pipe to this socket object.
|
// Attach local end of the pipe to this socket object.
|
||||||
zap_pipe = new_pipes [0];
|
zap_pipe = new_pipes [0];
|
||||||
|
zap_pipe->set_nodelay ();
|
||||||
zap_pipe->set_event_sink (this);
|
zap_pipe->set_event_sink (this);
|
||||||
|
|
||||||
|
new_pipes [1]->set_nodelay ();
|
||||||
send_bind (peer.socket, new_pipes [1], false);
|
send_bind (peer.socket, new_pipes [1], false);
|
||||||
|
|
||||||
// Send empty identity if required by the peer.
|
// Send empty identity if required by the peer.
|
||||||
@ -342,9 +343,8 @@ void zmq::session_base_t::process_attach (i_engine *engine_)
|
|||||||
|
|
||||||
int hwms [2] = {conflate? -1 : options.rcvhwm,
|
int hwms [2] = {conflate? -1 : options.rcvhwm,
|
||||||
conflate? -1 : options.sndhwm};
|
conflate? -1 : options.sndhwm};
|
||||||
bool delays [2] = {options.delay_on_close, options.delay_on_disconnect};
|
|
||||||
bool conflates [2] = {conflate, conflate};
|
bool conflates [2] = {conflate, conflate};
|
||||||
int rc = pipepair (parents, pipes, hwms, delays, conflates);
|
int rc = pipepair (parents, pipes, hwms, conflates);
|
||||||
errno_assert (rc == 0);
|
errno_assert (rc == 0);
|
||||||
|
|
||||||
// Plug the local end of the pipe.
|
// Plug the local end of the pipe.
|
||||||
@ -389,7 +389,7 @@ void zmq::session_base_t::process_term (int linger_)
|
|||||||
|
|
||||||
// If the termination of the pipe happens before the term command is
|
// If the termination of the pipe happens before the term command is
|
||||||
// delivered there's nothing much to do. We can proceed with the
|
// delivered there's nothing much to do. We can proceed with the
|
||||||
// stadard termination immediately.
|
// standard termination immediately.
|
||||||
if (!pipe && !zap_pipe) {
|
if (!pipe && !zap_pipe) {
|
||||||
proceed_with_term ();
|
proceed_with_term ();
|
||||||
return;
|
return;
|
||||||
|
@ -459,9 +459,8 @@ int zmq::socket_base_t::connect (const char *addr_)
|
|||||||
options.type == ZMQ_SUB);
|
options.type == ZMQ_SUB);
|
||||||
|
|
||||||
int hwms [2] = {conflate? -1 : sndhwm, conflate? -1 : rcvhwm};
|
int hwms [2] = {conflate? -1 : sndhwm, conflate? -1 : rcvhwm};
|
||||||
bool delays [2] = {options.delay_on_disconnect, options.delay_on_close};
|
|
||||||
bool conflates [2] = {conflate, conflate};
|
bool conflates [2] = {conflate, conflate};
|
||||||
int rc = pipepair (parents, new_pipes, hwms, delays, conflates);
|
int rc = pipepair (parents, new_pipes, hwms, conflates);
|
||||||
errno_assert (rc == 0);
|
errno_assert (rc == 0);
|
||||||
|
|
||||||
// Attach local end of the pipe to this socket object.
|
// Attach local end of the pipe to this socket object.
|
||||||
@ -573,9 +572,8 @@ int zmq::socket_base_t::connect (const char *addr_)
|
|||||||
|
|
||||||
int hwms [2] = {conflate? -1 : options.sndhwm,
|
int hwms [2] = {conflate? -1 : options.sndhwm,
|
||||||
conflate? -1 : options.rcvhwm};
|
conflate? -1 : options.rcvhwm};
|
||||||
bool delays [2] = {options.delay_on_disconnect, options.delay_on_close};
|
|
||||||
bool conflates [2] = {conflate, conflate};
|
bool conflates [2] = {conflate, conflate};
|
||||||
rc = pipepair (parents, new_pipes, hwms, delays, conflates);
|
rc = pipepair (parents, new_pipes, hwms, conflates);
|
||||||
errno_assert (rc == 0);
|
errno_assert (rc == 0);
|
||||||
|
|
||||||
// Attach local end of the pipe to the socket object.
|
// Attach local end of the pipe to the socket object.
|
||||||
|
Loading…
x
Reference in New Issue
Block a user