mirror of
https://github.com/zeromq/libzmq.git
synced 2025-01-21 15:12:03 +08:00
commit
cecc790ca3
12
src/lb.cpp
12
src/lb.cpp
@ -131,17 +131,9 @@ bool zmq::lb_t::has_out ()
|
|||||||
|
|
||||||
while (active > 0) {
|
while (active > 0) {
|
||||||
|
|
||||||
// Check whether zero-sized message can be written to the pipe.
|
// Check whether a pipe has room for another message.
|
||||||
msg_t msg;
|
if (pipes [current]->check_write ())
|
||||||
int rc = msg.init ();
|
|
||||||
errno_assert (rc == 0);
|
|
||||||
if (pipes [current]->check_write (&msg)) {
|
|
||||||
rc = msg.close ();
|
|
||||||
errno_assert (rc == 0);
|
|
||||||
return true;
|
return true;
|
||||||
}
|
|
||||||
rc = msg.close ();
|
|
||||||
errno_assert (rc == 0);
|
|
||||||
|
|
||||||
// Deactivate the pipe.
|
// Deactivate the pipe.
|
||||||
active--;
|
active--;
|
||||||
|
@ -81,28 +81,24 @@ namespace zmq
|
|||||||
inline mutex_t ()
|
inline mutex_t ()
|
||||||
{
|
{
|
||||||
int rc = pthread_mutex_init (&mutex, NULL);
|
int rc = pthread_mutex_init (&mutex, NULL);
|
||||||
if (rc)
|
|
||||||
posix_assert (rc);
|
posix_assert (rc);
|
||||||
}
|
}
|
||||||
|
|
||||||
inline ~mutex_t ()
|
inline ~mutex_t ()
|
||||||
{
|
{
|
||||||
int rc = pthread_mutex_destroy (&mutex);
|
int rc = pthread_mutex_destroy (&mutex);
|
||||||
if (rc)
|
|
||||||
posix_assert (rc);
|
posix_assert (rc);
|
||||||
}
|
}
|
||||||
|
|
||||||
inline void lock ()
|
inline void lock ()
|
||||||
{
|
{
|
||||||
int rc = pthread_mutex_lock (&mutex);
|
int rc = pthread_mutex_lock (&mutex);
|
||||||
if (rc)
|
|
||||||
posix_assert (rc);
|
posix_assert (rc);
|
||||||
}
|
}
|
||||||
|
|
||||||
inline void unlock ()
|
inline void unlock ()
|
||||||
{
|
{
|
||||||
int rc = pthread_mutex_unlock (&mutex);
|
int rc = pthread_mutex_unlock (&mutex);
|
||||||
if (rc)
|
|
||||||
posix_assert (rc);
|
posix_assert (rc);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
18
src/own.cpp
18
src/own.cpp
@ -80,24 +80,6 @@ void zmq::own_t::launch_child (own_t *object_)
|
|||||||
send_own (this, object_);
|
send_own (this, object_);
|
||||||
}
|
}
|
||||||
|
|
||||||
void zmq::own_t::launch_sibling (own_t *object_)
|
|
||||||
{
|
|
||||||
// At this point it is important that object is plugged in before its
|
|
||||||
// owner has a chance to terminate it. Thus, 'plug' command is sent before
|
|
||||||
// the 'own' command. Given that the mailbox preserves ordering of
|
|
||||||
// commands, 'term' command from the owner cannot make it to the object
|
|
||||||
// before the already written 'plug' command.
|
|
||||||
|
|
||||||
// Specify the owner of the object.
|
|
||||||
object_->set_owner (owner);
|
|
||||||
|
|
||||||
// Plug the object into its I/O thread.
|
|
||||||
send_plug (object_);
|
|
||||||
|
|
||||||
// Make parent own the object.
|
|
||||||
send_own (owner, object_);
|
|
||||||
}
|
|
||||||
|
|
||||||
void zmq::own_t::process_term_req (own_t *object_)
|
void zmq::own_t::process_term_req (own_t *object_)
|
||||||
{
|
{
|
||||||
// When shutting down we can ignore termination requests from owned
|
// When shutting down we can ignore termination requests from owned
|
||||||
|
@ -70,10 +70,6 @@ namespace zmq
|
|||||||
// Launch the supplied object and become its owner.
|
// Launch the supplied object and become its owner.
|
||||||
void launch_child (own_t *object_);
|
void launch_child (own_t *object_);
|
||||||
|
|
||||||
// Launch the supplied object and make it your sibling (make your
|
|
||||||
// owner become its owner as well).
|
|
||||||
void launch_sibling (own_t *object_);
|
|
||||||
|
|
||||||
// Ask owner object to terminate this object. It may take a while
|
// Ask owner object to terminate this object. It may take a while
|
||||||
// while actual termination is started. This function should not be
|
// while actual termination is started. This function should not be
|
||||||
// called more than once.
|
// called more than once.
|
||||||
|
@ -108,13 +108,7 @@ bool zmq::pair_t::xhas_out ()
|
|||||||
if (!pipe)
|
if (!pipe)
|
||||||
return false;
|
return false;
|
||||||
|
|
||||||
msg_t msg;
|
return pipe->check_write ();
|
||||||
int rc = msg.init ();
|
|
||||||
errno_assert (rc == 0);
|
|
||||||
bool result = pipe->check_write (&msg);
|
|
||||||
rc = msg.close ();
|
|
||||||
errno_assert (rc == 0);
|
|
||||||
return result;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
zmq::pair_session_t::pair_session_t (io_thread_t *io_thread_, bool connect_,
|
zmq::pair_session_t::pair_session_t (io_thread_t *io_thread_, bool connect_,
|
||||||
|
@ -146,7 +146,7 @@ bool zmq::pipe_t::read (msg_t *msg_)
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool zmq::pipe_t::check_write (msg_t *msg_)
|
bool zmq::pipe_t::check_write ()
|
||||||
{
|
{
|
||||||
if (unlikely (!out_active || state != active))
|
if (unlikely (!out_active || state != active))
|
||||||
return false;
|
return false;
|
||||||
@ -163,7 +163,7 @@ bool zmq::pipe_t::check_write (msg_t *msg_)
|
|||||||
|
|
||||||
bool zmq::pipe_t::write (msg_t *msg_)
|
bool zmq::pipe_t::write (msg_t *msg_)
|
||||||
{
|
{
|
||||||
if (unlikely (!check_write (msg_)))
|
if (unlikely (!check_write ()))
|
||||||
return false;
|
return false;
|
||||||
|
|
||||||
bool more = msg_->flags () & msg_t::more ? true : false;
|
bool more = msg_->flags () & msg_t::more ? true : false;
|
||||||
|
@ -87,7 +87,7 @@ namespace zmq
|
|||||||
|
|
||||||
// Checks whether messages can be written to the pipe. If writing
|
// Checks whether messages can be written to the pipe. If writing
|
||||||
// the message would cause high watermark the function returns false.
|
// the message would cause high watermark the function returns false.
|
||||||
bool check_write (msg_t *msg_);
|
bool check_write ();
|
||||||
|
|
||||||
// Writes a message to the underlying pipe. Returns false if the
|
// Writes a message to the underlying pipe. Returns false if the
|
||||||
// message cannot be written because high watermark was reached.
|
// message cannot be written because high watermark was reached.
|
||||||
|
@ -152,22 +152,16 @@ int zmq::router_t::xsend (msg_t *msg_, int flags_)
|
|||||||
|
|
||||||
if (it != outpipes.end ()) {
|
if (it != outpipes.end ()) {
|
||||||
current_out = it->second.pipe;
|
current_out = it->second.pipe;
|
||||||
msg_t empty;
|
if (!current_out->check_write ()) {
|
||||||
int rc = empty.init ();
|
|
||||||
errno_assert (rc == 0);
|
|
||||||
if (!current_out->check_write (&empty)) {
|
|
||||||
it->second.active = false;
|
it->second.active = false;
|
||||||
more_out = false;
|
more_out = false;
|
||||||
current_out = NULL;
|
current_out = NULL;
|
||||||
}
|
}
|
||||||
rc = empty.close ();
|
|
||||||
errno_assert (rc == 0);
|
|
||||||
} else if(fail_unroutable) {
|
} else if(fail_unroutable) {
|
||||||
more_out = false;
|
more_out = false;
|
||||||
errno = EHOSTUNREACH;
|
errno = EHOSTUNREACH;
|
||||||
retval = -1;
|
retval = -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int rc = msg_->close ();
|
int rc = msg_->close ();
|
||||||
|
Loading…
x
Reference in New Issue
Block a user