mirror of
https://github.com/zeromq/libzmq.git
synced 2024-12-31 01:43:02 +08:00
Simplified uses of erase
This commit is contained in:
parent
6357890ff6
commit
fa976f87f9
22
src/ctx.cpp
22
src/ctx.cpp
@ -448,10 +448,7 @@ int zmq::thread_ctx_t::set (int option_, int optval_)
|
|||||||
_thread_affinity_cpus.insert (optval_);
|
_thread_affinity_cpus.insert (optval_);
|
||||||
} else if (option_ == ZMQ_THREAD_AFFINITY_CPU_REMOVE && optval_ >= 0) {
|
} else if (option_ == ZMQ_THREAD_AFFINITY_CPU_REMOVE && optval_ >= 0) {
|
||||||
scoped_lock_t locker (_opt_sync);
|
scoped_lock_t locker (_opt_sync);
|
||||||
std::set<int>::iterator it = _thread_affinity_cpus.find (optval_);
|
if (0 == _thread_affinity_cpus.erase (optval_)) {
|
||||||
if (it != _thread_affinity_cpus.end ()) {
|
|
||||||
_thread_affinity_cpus.erase (it);
|
|
||||||
} else {
|
|
||||||
errno = EINVAL;
|
errno = EINVAL;
|
||||||
rc = -1;
|
rc = -1;
|
||||||
}
|
}
|
||||||
@ -531,15 +528,16 @@ void zmq::ctx_t::unregister_endpoints (socket_base_t *socket_)
|
|||||||
{
|
{
|
||||||
scoped_lock_t locker (_endpoints_sync);
|
scoped_lock_t locker (_endpoints_sync);
|
||||||
|
|
||||||
endpoints_t::iterator it = _endpoints.begin ();
|
for (endpoints_t::iterator it = _endpoints.begin ();
|
||||||
while (it != _endpoints.end ()) {
|
it != _endpoints.end ();) {
|
||||||
if (it->second.socket == socket_) {
|
if (it->second.socket == socket_)
|
||||||
endpoints_t::iterator to_erase = it;
|
#if __cplusplus >= 201103L
|
||||||
|
it = _endpoints.erase (it);
|
||||||
|
#else
|
||||||
|
_endpoints.erase (it++);
|
||||||
|
#endif
|
||||||
|
else
|
||||||
++it;
|
++it;
|
||||||
_endpoints.erase (to_erase);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
++it;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -137,16 +137,11 @@ int zmq::dish_t::xleave (const char *group_)
|
|||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
subscriptions_t::iterator it =
|
if (0 == _subscriptions.erase (group)) {
|
||||||
std::find (_subscriptions.begin (), _subscriptions.end (), group);
|
|
||||||
|
|
||||||
if (it == _subscriptions.end ()) {
|
|
||||||
errno = EINVAL;
|
errno = EINVAL;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
_subscriptions.erase (it);
|
|
||||||
|
|
||||||
msg_t msg;
|
msg_t msg;
|
||||||
int rc = msg.init_leave ();
|
int rc = msg.init_leave ();
|
||||||
errno_assert (rc == 0);
|
errno_assert (rc == 0);
|
||||||
|
@ -32,6 +32,8 @@
|
|||||||
#include "clock.hpp"
|
#include "clock.hpp"
|
||||||
#include "err.hpp"
|
#include "err.hpp"
|
||||||
|
|
||||||
|
#include <algorithm>
|
||||||
|
|
||||||
zmq::mailbox_safe_t::mailbox_safe_t (mutex_t *sync_) : _sync (sync_)
|
zmq::mailbox_safe_t::mailbox_safe_t (mutex_t *sync_) : _sync (sync_)
|
||||||
{
|
{
|
||||||
// Get the pipe into passive state. That way, if the users starts by
|
// Get the pipe into passive state. That way, if the users starts by
|
||||||
@ -58,13 +60,9 @@ void zmq::mailbox_safe_t::add_signaler (signaler_t *signaler_)
|
|||||||
|
|
||||||
void zmq::mailbox_safe_t::remove_signaler (signaler_t *signaler_)
|
void zmq::mailbox_safe_t::remove_signaler (signaler_t *signaler_)
|
||||||
{
|
{
|
||||||
std::vector<signaler_t *>::iterator it = _signalers.begin ();
|
|
||||||
|
|
||||||
// TODO: make a copy of array and signal outside the lock
|
// TODO: make a copy of array and signal outside the lock
|
||||||
for (; it != _signalers.end (); ++it) {
|
std::vector<signaler_t *>::iterator it =
|
||||||
if (*it == signaler_)
|
std::find (_signalers.begin (), _signalers.end (), signaler_);
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (it != _signalers.end ())
|
if (it != _signalers.end ())
|
||||||
_signalers.erase (it);
|
_signalers.erase (it);
|
||||||
|
@ -102,15 +102,12 @@ void zmq::own_t::process_term_req (own_t *object_)
|
|||||||
if (_terminating)
|
if (_terminating)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
// If I/O object is well and alive let's ask it to terminate.
|
|
||||||
owned_t::iterator it = std::find (_owned.begin (), _owned.end (), object_);
|
|
||||||
|
|
||||||
// If not found, we assume that termination request was already sent to
|
// If not found, we assume that termination request was already sent to
|
||||||
// the object so we can safely ignore the request.
|
// the object so we can safely ignore the request.
|
||||||
if (it == _owned.end ())
|
if (0 == _owned.erase (object_))
|
||||||
return;
|
return;
|
||||||
|
|
||||||
_owned.erase (it);
|
// If I/O object is well and alive let's ask it to terminate.
|
||||||
register_term_acks (1);
|
register_term_acks (1);
|
||||||
|
|
||||||
// Note that this object is the root of the (partial shutdown) thus, its
|
// Note that this object is the root of the (partial shutdown) thus, its
|
||||||
|
@ -82,29 +82,32 @@ uint64_t zmq::poller_base_t::execute_timers ()
|
|||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
// Get the current time.
|
// Get the current time.
|
||||||
uint64_t current = _clock.now_ms ();
|
const uint64_t current = _clock.now_ms ();
|
||||||
|
|
||||||
// Execute the timers that are already due.
|
// Execute the timers that are already due.
|
||||||
timers_t::iterator it = _timers.begin ();
|
const timers_t::iterator begin = _timers.begin ();
|
||||||
while (it != _timers.end ()) {
|
const timers_t::iterator end = _timers.end ();
|
||||||
|
uint64_t res = 0;
|
||||||
|
timers_t::iterator it = begin;
|
||||||
|
for (; it != end; ++it) {
|
||||||
// If we have to wait to execute the item, same will be true about
|
// If we have to wait to execute the item, same will be true about
|
||||||
// all the following items (multimap is sorted). Thus we can stop
|
// all the following items (multimap is sorted). Thus we can stop
|
||||||
// checking the subsequent timers and return the time to wait for
|
// checking the subsequent timers.
|
||||||
// the next timer (at least 1ms).
|
if (it->first > current) {
|
||||||
if (it->first > current)
|
res = it->first - current;
|
||||||
return it->first - current;
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
// Trigger the timer.
|
// Trigger the timer.
|
||||||
it->second.sink->timer_event (it->second.id);
|
it->second.sink->timer_event (it->second.id);
|
||||||
|
|
||||||
// Remove it from the list of active timers.
|
|
||||||
timers_t::iterator o = it;
|
|
||||||
++it;
|
|
||||||
_timers.erase (o);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// There are no more timers.
|
// Remove them from the list of active timers.
|
||||||
return 0;
|
_timers.erase (begin, it);
|
||||||
|
|
||||||
|
// Return the time to wait for the next timer (at least 1ms), or 0, if
|
||||||
|
// there are no more timers.
|
||||||
|
return res;
|
||||||
}
|
}
|
||||||
|
|
||||||
zmq::worker_poller_base_t::worker_poller_base_t (const thread_ctx_t &ctx_) :
|
zmq::worker_poller_base_t::worker_poller_base_t (const thread_ctx_t &ctx_) :
|
||||||
|
@ -122,12 +122,14 @@ int zmq::radio_t::xsetsockopt (int option_,
|
|||||||
|
|
||||||
void zmq::radio_t::xpipe_terminated (pipe_t *pipe_)
|
void zmq::radio_t::xpipe_terminated (pipe_t *pipe_)
|
||||||
{
|
{
|
||||||
// NOTE: erase invalidates an iterator, and that's why it's not incrementing in post-loop
|
|
||||||
// read-after-free caught by Valgrind, see https://github.com/zeromq/libzmq/pull/1771
|
|
||||||
for (subscriptions_t::iterator it = _subscriptions.begin ();
|
for (subscriptions_t::iterator it = _subscriptions.begin ();
|
||||||
it != _subscriptions.end ();) {
|
it != _subscriptions.end ();) {
|
||||||
if (it->second == pipe_) {
|
if (it->second == pipe_) {
|
||||||
|
#if __cplusplus >= 201103L
|
||||||
|
it = _subscriptions.erase (it);
|
||||||
|
#else
|
||||||
_subscriptions.erase (it++);
|
_subscriptions.erase (it++);
|
||||||
|
#endif
|
||||||
} else {
|
} else {
|
||||||
++it;
|
++it;
|
||||||
}
|
}
|
||||||
|
@ -150,10 +150,7 @@ int zmq::router_t::xsetsockopt (int option_,
|
|||||||
|
|
||||||
void zmq::router_t::xpipe_terminated (pipe_t *pipe_)
|
void zmq::router_t::xpipe_terminated (pipe_t *pipe_)
|
||||||
{
|
{
|
||||||
std::set<pipe_t *>::iterator it = _anonymous_pipes.find (pipe_);
|
if (0 == _anonymous_pipes.erase (pipe_)) {
|
||||||
if (it != _anonymous_pipes.end ())
|
|
||||||
_anonymous_pipes.erase (it);
|
|
||||||
else {
|
|
||||||
erase_out_pipe (pipe_);
|
erase_out_pipe (pipe_);
|
||||||
_fq.pipe_terminated (pipe_);
|
_fq.pipe_terminated (pipe_);
|
||||||
pipe_->rollback ();
|
pipe_->rollback ();
|
||||||
|
Loading…
x
Reference in New Issue
Block a user