0
0
mirror of https://github.com/zeromq/libzmq.git synced 2024-12-28 16:15:23 +08:00

Merge pull request #2803 from f18m/master

Change ZMQ_THREAD_AFFINITY to ZMQ_THREAD_AFFINITY_CPU_ADD/REMOVE
This commit is contained in:
Luca Boccassi 2017-10-25 09:35:49 +01:00 committed by GitHub
commit dec3af4d69
8 changed files with 70 additions and 43 deletions

View File

@ -76,15 +76,25 @@ This option only applies before creating any sockets on the context.
Default value:: -1 Default value:: -1
ZMQ_THREAD_AFFINITY: Set CPU affinity for I/O threads ZMQ_THREAD_AFFINITY_CPU_ADD: Add a CPU to list of affinity for I/O threads
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_THREAD_AFFINITY' argument sets CPU affinity for the internal The 'ZMQ_THREAD_AFFINITY_CPU_ADD' argument adds a specific CPU to the affinity list for the internal
context's thread pool. This option is only supported on Linux. context's thread pool. This option is only supported on Linux.
On Linux, each bit of the 'option_value' argument will represent an enabled
CPU in the corresponding cpu_set_t; for example the value 0x1 will set affinity
of all context's thread pool to CPU 0; the value 0x110 will set affinity
of all context's thread pool to CPU 1 and 2.
This option only applies before creating any sockets on the context. This option only applies before creating any sockets on the context.
The default affinity list is empty and means that no explicit CPU-affinity will be set on
internal context's threads.
[horizontal]
Default value:: -1
ZMQ_THREAD_AFFINITY_CPU_REMOVE: Remove a CPU to list of affinity for I/O threads
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_THREAD_AFFINITY_CPU_REMOVE' argument removes a specific CPU to the affinity list for the internal
context's thread pool. This option is only supported on Linux.
This option only applies before creating any sockets on the context.
The default affinity list is empty and means that no explicit CPU-affinity will be set on
internal context's threads.
[horizontal] [horizontal]
Default value:: -1 Default value:: -1

View File

@ -610,9 +610,9 @@ ZMQ_EXPORT void zmq_threadclose (void* thread);
/* DRAFT Context options */ /* DRAFT Context options */
#define ZMQ_MSG_T_SIZE 6 #define ZMQ_MSG_T_SIZE 6
#define ZMQ_THREAD_AFFINITY 7 #define ZMQ_THREAD_AFFINITY_CPU_ADD 7
#define ZMQ_THREAD_AFFINITY_DFLT -1 #define ZMQ_THREAD_AFFINITY_CPU_REMOVE 8
#define ZMQ_THREAD_NAME_PREFIX 8 #define ZMQ_THREAD_NAME_PREFIX 9
/* DRAFT Socket methods. */ /* DRAFT Socket methods. */
ZMQ_EXPORT int zmq_join (void *s, const char *group); ZMQ_EXPORT int zmq_join (void *s, const char *group);

View File

@ -77,8 +77,7 @@ zmq::ctx_t::ctx_t () :
blocky (true), blocky (true),
ipv6 (false), ipv6 (false),
thread_priority (ZMQ_THREAD_PRIORITY_DFLT), thread_priority (ZMQ_THREAD_PRIORITY_DFLT),
thread_sched_policy (ZMQ_THREAD_SCHED_POLICY_DFLT), thread_sched_policy (ZMQ_THREAD_SCHED_POLICY_DFLT)
thread_affinity (ZMQ_THREAD_AFFINITY_DFLT)
{ {
#ifdef HAVE_FORK #ifdef HAVE_FORK
pid = getpid(); pid = getpid();
@ -252,9 +251,20 @@ int zmq::ctx_t::set (int option_, int optval_)
thread_sched_policy = optval_; thread_sched_policy = optval_;
} }
else else
if (option_ == ZMQ_THREAD_AFFINITY && optval_ >= 0) { if (option_ == ZMQ_THREAD_AFFINITY_CPU_ADD && optval_ >= 0) {
scoped_lock_t locker(opt_sync); scoped_lock_t locker(opt_sync);
thread_affinity = optval_; thread_affinity_cpus.insert( optval_ );
}
else
if (option_ == ZMQ_THREAD_AFFINITY_CPU_REMOVE && optval_ >= 0) {
scoped_lock_t locker(opt_sync);
std::set<int>::iterator it = thread_affinity_cpus.find( optval_ );
if (it != thread_affinity_cpus.end()) {
thread_affinity_cpus.erase( it );
} else {
errno = EINVAL;
rc = -1;
}
} }
else else
if (option_ == ZMQ_THREAD_NAME_PREFIX && optval_ >= 0) { if (option_ == ZMQ_THREAD_NAME_PREFIX && optval_ >= 0) {
@ -411,11 +421,13 @@ void zmq::ctx_t::start_thread (thread_t &thread_, thread_fn *tfn_, void *arg_) c
{ {
static unsigned int nthreads_started = 0; static unsigned int nthreads_started = 0;
thread_.setSchedulingParameters(thread_priority, thread_sched_policy, thread_affinity); thread_.setSchedulingParameters(thread_priority, thread_sched_policy, thread_affinity_cpus);
thread_.start(tfn_, arg_); thread_.start(tfn_, arg_);
#ifndef ZMQ_HAVE_ANDROID #ifndef ZMQ_HAVE_ANDROID
std::ostringstream s; std::ostringstream s;
s << thread_name_prefix << "/ZMQbg/" << nthreads_started; if (!thread_name_prefix.empty())
s << thread_name_prefix << "/";
s << "ZMQbg/" << nthreads_started;
thread_.setThreadName (s.str().c_str()); thread_.setThreadName (s.str().c_str());
#endif #endif
nthreads_started++; nthreads_started++;

View File

@ -214,7 +214,7 @@ namespace zmq
// Thread parameters. // Thread parameters.
int thread_priority; int thread_priority;
int thread_sched_policy; int thread_sched_policy;
int thread_affinity; std::set<int> thread_affinity_cpus;
std::string thread_name_prefix; std::string thread_name_prefix;
// Synchronisation of access to context options. // Synchronisation of access to context options.

View File

@ -70,12 +70,12 @@ void zmq::thread_t::stop ()
win_assert (rc2 != 0); win_assert (rc2 != 0);
} }
void zmq::thread_t::setSchedulingParameters(int priority_, int schedulingPolicy_, int affinity_) void zmq::thread_t::setSchedulingParameters(int priority_, int schedulingPolicy_, const std::set<int>& affinity_cpus_)
{ {
// not implemented // not implemented
LIBZMQ_UNUSED (priority_); LIBZMQ_UNUSED (priority_);
LIBZMQ_UNUSED (schedulingPolicy_); LIBZMQ_UNUSED (schedulingPolicy_);
LIBZMQ_UNUSED (affinity_); LIBZMQ_UNUSED (affinity_cpus_);
} }
void zmq::thread_t::setThreadName(const char *name_) void zmq::thread_t::setThreadName(const char *name_)
@ -125,11 +125,11 @@ void zmq::thread_t::stop ()
posix_assert (rc); posix_assert (rc);
} }
void zmq::thread_t::setSchedulingParameters(int priority_, int schedulingPolicy_, int affinity_) void zmq::thread_t::setSchedulingParameters(int priority_, int schedulingPolicy_, const std::set<int>& affinity_cpus_)
{ {
thread_priority=priority_; thread_priority=priority_;
thread_sched_policy=schedulingPolicy_; thread_sched_policy=schedulingPolicy_;
thread_affinity=affinity_; thread_affinity_cpus=affinity_cpus_;
} }
void zmq::thread_t::applySchedulingParameters() // to be called in secondary thread context void zmq::thread_t::applySchedulingParameters() // to be called in secondary thread context
@ -194,15 +194,13 @@ void zmq::thread_t::applySchedulingParameters() // to be called in secon
} }
#ifdef ZMQ_HAVE_PTHREAD_SET_AFFINITY #ifdef ZMQ_HAVE_PTHREAD_SET_AFFINITY
if (thread_affinity != ZMQ_THREAD_AFFINITY_DFLT) if (!thread_affinity_cpus.empty())
{ {
cpu_set_t cpuset; cpu_set_t cpuset;
CPU_ZERO(&cpuset); CPU_ZERO(&cpuset);
for (unsigned int cpuidx=0; cpuidx<sizeof(int)*8; cpuidx++) for (std::set<int>::const_iterator it = thread_affinity_cpus.begin(); it != thread_affinity_cpus.end(); it++)
{ {
int cpubit = (1 << cpuidx); CPU_SET( (int)(*it) , &cpuset );
if ( (thread_affinity & cpubit) != 0 )
CPU_SET( cpuidx , &cpuset );
} }
rc = pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset); rc = pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset);
posix_assert (rc); posix_assert (rc);

View File

@ -33,6 +33,7 @@
#ifndef ZMQ_HAVE_WINDOWS #ifndef ZMQ_HAVE_WINDOWS
#include <pthread.h> #include <pthread.h>
#endif #endif
#include <set>
namespace zmq namespace zmq
{ {
@ -55,7 +56,6 @@ namespace zmq
, arg(NULL) , arg(NULL)
, thread_priority(ZMQ_THREAD_PRIORITY_DFLT) , thread_priority(ZMQ_THREAD_PRIORITY_DFLT)
, thread_sched_policy(ZMQ_THREAD_SCHED_POLICY_DFLT) , thread_sched_policy(ZMQ_THREAD_SCHED_POLICY_DFLT)
, thread_affinity(ZMQ_THREAD_AFFINITY_DFLT)
{ {
} }
@ -68,7 +68,7 @@ namespace zmq
// Sets the thread scheduling parameters. Only implemented for // Sets the thread scheduling parameters. Only implemented for
// pthread. Has no effect on other platforms. // pthread. Has no effect on other platforms.
void setSchedulingParameters(int priority_, int schedulingPolicy_, int affinity_); void setSchedulingParameters(int priority_, int schedulingPolicy_, const std::set<int>& affinity_cpus_);
// Sets the thread name, 16 characters max including terminating NUL. // Sets the thread name, 16 characters max including terminating NUL.
// Only implemented for pthread. Has no effect on other platforms. // Only implemented for pthread. Has no effect on other platforms.
@ -91,7 +91,7 @@ namespace zmq
// Thread scheduling parameters. // Thread scheduling parameters.
int thread_priority; int thread_priority;
int thread_sched_policy; int thread_sched_policy;
int thread_affinity; std::set<int> thread_affinity_cpus;
thread_t (const thread_t&); thread_t (const thread_t&);
const thread_t &operator = (const thread_t&); const thread_t &operator = (const thread_t&);

View File

@ -91,9 +91,9 @@
/* DRAFT Context options */ /* DRAFT Context options */
#define ZMQ_MSG_T_SIZE 6 #define ZMQ_MSG_T_SIZE 6
#define ZMQ_THREAD_AFFINITY 7 #define ZMQ_THREAD_AFFINITY_CPU_ADD 7
#define ZMQ_THREAD_AFFINITY_DFLT -1 #define ZMQ_THREAD_AFFINITY_CPU_REMOVE 8
#define ZMQ_THREAD_NAME_PREFIX 8 #define ZMQ_THREAD_NAME_PREFIX 9
/* DRAFT Socket methods. */ /* DRAFT Socket methods. */
int zmq_join (void *s, const char *group); int zmq_join (void *s, const char *group);

View File

@ -81,10 +81,6 @@ void test_ctx_thread_opts(void* ctx)
assert (rc == -1 && errno == EINVAL); assert (rc == -1 && errno == EINVAL);
rc = zmq_ctx_set(ctx, ZMQ_THREAD_PRIORITY, ZMQ_THREAD_PRIORITY_DFLT); rc = zmq_ctx_set(ctx, ZMQ_THREAD_PRIORITY, ZMQ_THREAD_PRIORITY_DFLT);
assert (rc == -1 && errno == EINVAL); assert (rc == -1 && errno == EINVAL);
#ifdef ZMQ_THREAD_AFFINITY
rc = zmq_ctx_set(ctx, ZMQ_THREAD_AFFINITY, ZMQ_THREAD_AFFINITY_DFLT);
assert (rc == -1 && errno == EINVAL);
#endif
// test scheduling policy: // test scheduling policy:
@ -113,16 +109,27 @@ void test_ctx_thread_opts(void* ctx)
} }
#ifdef ZMQ_THREAD_AFFINITY #ifdef ZMQ_THREAD_AFFINITY_CPU_ADD
// test affinity: // test affinity:
int cpu_affinity_test = (1 << 0); // this should result in background threads being placed only on the
// this should result in background threads being placed only on the // first CPU available on this system; try experimenting with other values
// first CPU available on this system; try experimenting with other values // (e.g., 5 to use CPU index 5) and use "top -H" or "taskset -pc" to see the result
// (e.g., 1<<5 to use CPU index 5) and use "top -H" or "taskset -pc" to see the result
rc = zmq_ctx_set(ctx, ZMQ_THREAD_AFFINITY, cpu_affinity_test); int cpus_add[] = { 0, 1 };
assert (rc == 0); for (unsigned int idx=0; idx < sizeof(cpus_add)/sizeof(cpus_add[0]); idx++)
{
rc = zmq_ctx_set(ctx, ZMQ_THREAD_AFFINITY_CPU_ADD, cpus_add[idx]);
assert (rc == 0);
}
// you can also remove CPUs from list of affinities:
int cpus_remove[] = { 1 };
for (unsigned int idx=0; idx < sizeof(cpus_remove)/sizeof(cpus_remove[0]); idx++)
{
rc = zmq_ctx_set(ctx, ZMQ_THREAD_AFFINITY_CPU_REMOVE, cpus_remove[idx]);
assert (rc == 0);
}
#endif #endif