From 2aa0e6fd4d929987e4ee9510da3712b75c186062 Mon Sep 17 00:00:00 2001 From: f18m Date: Wed, 25 Oct 2017 09:55:47 +0200 Subject: [PATCH] Change ZMQ_THREAD_AFFINITY to ZMQ_THREAD_AFFINITY_CPU_ADD/ZMQ_THREAD_AFFINITY_CPU_REMOVE. Avoid prefix thread names when no prefix was set. --- doc/zmq_ctx_set.txt | 24 +++++++++++++++++------- include/zmq.h | 6 +++--- src/ctx.cpp | 24 ++++++++++++++++++------ src/ctx.hpp | 2 +- src/thread.cpp | 16 +++++++--------- src/thread.hpp | 6 +++--- src/zmq_draft.h | 6 +++--- tests/test_ctx_options.cpp | 29 ++++++++++++++++++----------- 8 files changed, 70 insertions(+), 43 deletions(-) diff --git a/doc/zmq_ctx_set.txt b/doc/zmq_ctx_set.txt index 96fa737b..7f9345cd 100644 --- a/doc/zmq_ctx_set.txt +++ b/doc/zmq_ctx_set.txt @@ -76,15 +76,25 @@ This option only applies before creating any sockets on the context. Default value:: -1 -ZMQ_THREAD_AFFINITY: Set CPU affinity for I/O threads -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -The 'ZMQ_THREAD_AFFINITY' argument sets CPU affinity for the internal +ZMQ_THREAD_AFFINITY_CPU_ADD: Add a CPU to list of affinity for I/O threads +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +The 'ZMQ_THREAD_AFFINITY_CPU_ADD' argument adds a specific CPU to the affinity list for the internal context's thread pool. This option is only supported on Linux. -On Linux, each bit of the 'option_value' argument will represent an enabled -CPU in the corresponding cpu_set_t; for example the value 0x1 will set affinity -of all context's thread pool to CPU 0; the value 0x110 will set affinity -of all context's thread pool to CPU 1 and 2. This option only applies before creating any sockets on the context. +The default affinity list is empty and means that no explicit CPU-affinity will be set on +internal context's threads. + +[horizontal] +Default value:: -1 + + +ZMQ_THREAD_AFFINITY_CPU_REMOVE: Remove a CPU to list of affinity for I/O threads +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +The 'ZMQ_THREAD_AFFINITY_CPU_REMOVE' argument removes a specific CPU to the affinity list for the internal +context's thread pool. This option is only supported on Linux. +This option only applies before creating any sockets on the context. +The default affinity list is empty and means that no explicit CPU-affinity will be set on +internal context's threads. [horizontal] Default value:: -1 diff --git a/include/zmq.h b/include/zmq.h index edf47326..12302e1e 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -610,9 +610,9 @@ ZMQ_EXPORT void zmq_threadclose (void* thread); /* DRAFT Context options */ #define ZMQ_MSG_T_SIZE 6 -#define ZMQ_THREAD_AFFINITY 7 -#define ZMQ_THREAD_AFFINITY_DFLT -1 -#define ZMQ_THREAD_NAME_PREFIX 8 +#define ZMQ_THREAD_AFFINITY_CPU_ADD 7 +#define ZMQ_THREAD_AFFINITY_CPU_REMOVE 8 +#define ZMQ_THREAD_NAME_PREFIX 9 /* DRAFT Socket methods. */ ZMQ_EXPORT int zmq_join (void *s, const char *group); diff --git a/src/ctx.cpp b/src/ctx.cpp index 0358e344..59f3be98 100644 --- a/src/ctx.cpp +++ b/src/ctx.cpp @@ -77,8 +77,7 @@ zmq::ctx_t::ctx_t () : blocky (true), ipv6 (false), thread_priority (ZMQ_THREAD_PRIORITY_DFLT), - thread_sched_policy (ZMQ_THREAD_SCHED_POLICY_DFLT), - thread_affinity (ZMQ_THREAD_AFFINITY_DFLT) + thread_sched_policy (ZMQ_THREAD_SCHED_POLICY_DFLT) { #ifdef HAVE_FORK pid = getpid(); @@ -252,9 +251,20 @@ int zmq::ctx_t::set (int option_, int optval_) thread_sched_policy = optval_; } else - if (option_ == ZMQ_THREAD_AFFINITY && optval_ >= 0) { + if (option_ == ZMQ_THREAD_AFFINITY_CPU_ADD && optval_ >= 0) { scoped_lock_t locker(opt_sync); - thread_affinity = optval_; + thread_affinity_cpus.insert( optval_ ); + } + else + if (option_ == ZMQ_THREAD_AFFINITY_CPU_REMOVE && optval_ >= 0) { + scoped_lock_t locker(opt_sync); + std::set::iterator it = thread_affinity_cpus.find( optval_ ); + if (it != thread_affinity_cpus.end()) { + thread_affinity_cpus.erase( it ); + } else { + errno = EINVAL; + rc = -1; + } } else if (option_ == ZMQ_THREAD_NAME_PREFIX && optval_ >= 0) { @@ -411,11 +421,13 @@ void zmq::ctx_t::start_thread (thread_t &thread_, thread_fn *tfn_, void *arg_) c { static unsigned int nthreads_started = 0; - thread_.setSchedulingParameters(thread_priority, thread_sched_policy, thread_affinity); + thread_.setSchedulingParameters(thread_priority, thread_sched_policy, thread_affinity_cpus); thread_.start(tfn_, arg_); #ifndef ZMQ_HAVE_ANDROID std::ostringstream s; - s << thread_name_prefix << "/ZMQbg/" << nthreads_started; + if (!thread_name_prefix.empty()) + s << thread_name_prefix << "/"; + s << "ZMQbg/" << nthreads_started; thread_.setThreadName (s.str().c_str()); #endif nthreads_started++; diff --git a/src/ctx.hpp b/src/ctx.hpp index 8f4d2e6a..500b75e5 100644 --- a/src/ctx.hpp +++ b/src/ctx.hpp @@ -214,7 +214,7 @@ namespace zmq // Thread parameters. int thread_priority; int thread_sched_policy; - int thread_affinity; + std::set thread_affinity_cpus; std::string thread_name_prefix; // Synchronisation of access to context options. diff --git a/src/thread.cpp b/src/thread.cpp index ba6b9696..4fc59c3e 100644 --- a/src/thread.cpp +++ b/src/thread.cpp @@ -70,12 +70,12 @@ void zmq::thread_t::stop () win_assert (rc2 != 0); } -void zmq::thread_t::setSchedulingParameters(int priority_, int schedulingPolicy_, int affinity_) +void zmq::thread_t::setSchedulingParameters(int priority_, int schedulingPolicy_, const std::set& affinity_cpus_) { // not implemented LIBZMQ_UNUSED (priority_); LIBZMQ_UNUSED (schedulingPolicy_); - LIBZMQ_UNUSED (affinity_); + LIBZMQ_UNUSED (affinity_cpus_); } void zmq::thread_t::setThreadName(const char *name_) @@ -125,11 +125,11 @@ void zmq::thread_t::stop () posix_assert (rc); } -void zmq::thread_t::setSchedulingParameters(int priority_, int schedulingPolicy_, int affinity_) +void zmq::thread_t::setSchedulingParameters(int priority_, int schedulingPolicy_, const std::set& affinity_cpus_) { thread_priority=priority_; thread_sched_policy=schedulingPolicy_; - thread_affinity=affinity_; + thread_affinity_cpus=affinity_cpus_; } void zmq::thread_t::applySchedulingParameters() // to be called in secondary thread context @@ -194,15 +194,13 @@ void zmq::thread_t::applySchedulingParameters() // to be called in secon } #ifdef ZMQ_HAVE_PTHREAD_SET_AFFINITY - if (thread_affinity != ZMQ_THREAD_AFFINITY_DFLT) + if (!thread_affinity_cpus.empty()) { cpu_set_t cpuset; CPU_ZERO(&cpuset); - for (unsigned int cpuidx=0; cpuidx::const_iterator it = thread_affinity_cpus.begin(); it != thread_affinity_cpus.end(); it++) { - int cpubit = (1 << cpuidx); - if ( (thread_affinity & cpubit) != 0 ) - CPU_SET( cpuidx , &cpuset ); + CPU_SET( (int)(*it) , &cpuset ); } rc = pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset); posix_assert (rc); diff --git a/src/thread.hpp b/src/thread.hpp index 8d9be81a..150b31d1 100644 --- a/src/thread.hpp +++ b/src/thread.hpp @@ -33,6 +33,7 @@ #ifndef ZMQ_HAVE_WINDOWS #include #endif +#include namespace zmq { @@ -55,7 +56,6 @@ namespace zmq , arg(NULL) , thread_priority(ZMQ_THREAD_PRIORITY_DFLT) , thread_sched_policy(ZMQ_THREAD_SCHED_POLICY_DFLT) - , thread_affinity(ZMQ_THREAD_AFFINITY_DFLT) { } @@ -68,7 +68,7 @@ namespace zmq // Sets the thread scheduling parameters. Only implemented for // pthread. Has no effect on other platforms. - void setSchedulingParameters(int priority_, int schedulingPolicy_, int affinity_); + void setSchedulingParameters(int priority_, int schedulingPolicy_, const std::set& affinity_cpus_); // Sets the thread name, 16 characters max including terminating NUL. // Only implemented for pthread. Has no effect on other platforms. @@ -91,7 +91,7 @@ namespace zmq // Thread scheduling parameters. int thread_priority; int thread_sched_policy; - int thread_affinity; + std::set thread_affinity_cpus; thread_t (const thread_t&); const thread_t &operator = (const thread_t&); diff --git a/src/zmq_draft.h b/src/zmq_draft.h index 7d947ae0..cbffcf38 100644 --- a/src/zmq_draft.h +++ b/src/zmq_draft.h @@ -91,9 +91,9 @@ /* DRAFT Context options */ #define ZMQ_MSG_T_SIZE 6 -#define ZMQ_THREAD_AFFINITY 7 -#define ZMQ_THREAD_AFFINITY_DFLT -1 -#define ZMQ_THREAD_NAME_PREFIX 8 +#define ZMQ_THREAD_AFFINITY_CPU_ADD 7 +#define ZMQ_THREAD_AFFINITY_CPU_REMOVE 8 +#define ZMQ_THREAD_NAME_PREFIX 9 /* DRAFT Socket methods. */ int zmq_join (void *s, const char *group); diff --git a/tests/test_ctx_options.cpp b/tests/test_ctx_options.cpp index 9a0b19d1..36437648 100644 --- a/tests/test_ctx_options.cpp +++ b/tests/test_ctx_options.cpp @@ -81,10 +81,6 @@ void test_ctx_thread_opts(void* ctx) assert (rc == -1 && errno == EINVAL); rc = zmq_ctx_set(ctx, ZMQ_THREAD_PRIORITY, ZMQ_THREAD_PRIORITY_DFLT); assert (rc == -1 && errno == EINVAL); -#ifdef ZMQ_THREAD_AFFINITY - rc = zmq_ctx_set(ctx, ZMQ_THREAD_AFFINITY, ZMQ_THREAD_AFFINITY_DFLT); - assert (rc == -1 && errno == EINVAL); -#endif // test scheduling policy: @@ -113,16 +109,27 @@ void test_ctx_thread_opts(void* ctx) } -#ifdef ZMQ_THREAD_AFFINITY +#ifdef ZMQ_THREAD_AFFINITY_CPU_ADD // test affinity: - int cpu_affinity_test = (1 << 0); - // this should result in background threads being placed only on the - // first CPU available on this system; try experimenting with other values - // (e.g., 1<<5 to use CPU index 5) and use "top -H" or "taskset -pc" to see the result + // this should result in background threads being placed only on the + // first CPU available on this system; try experimenting with other values + // (e.g., 5 to use CPU index 5) and use "top -H" or "taskset -pc" to see the result - rc = zmq_ctx_set(ctx, ZMQ_THREAD_AFFINITY, cpu_affinity_test); - assert (rc == 0); + int cpus_add[] = { 0, 1 }; + for (unsigned int idx=0; idx < sizeof(cpus_add)/sizeof(cpus_add[0]); idx++) + { + rc = zmq_ctx_set(ctx, ZMQ_THREAD_AFFINITY_CPU_ADD, cpus_add[idx]); + assert (rc == 0); + } + + // you can also remove CPUs from list of affinities: + int cpus_remove[] = { 1 }; + for (unsigned int idx=0; idx < sizeof(cpus_remove)/sizeof(cpus_remove[0]); idx++) + { + rc = zmq_ctx_set(ctx, ZMQ_THREAD_AFFINITY_CPU_REMOVE, cpus_remove[idx]); + assert (rc == 0); + } #endif