From a1d55d0506215b0390ab316a40ac1cad4dffc308 Mon Sep 17 00:00:00 2001 From: Simon Giesecke Date: Thu, 8 Feb 2018 23:10:45 +0100 Subject: [PATCH] Problem: race conditions for options.linger (#2910) * Problem: race conditions for options.linger Solution: make options.linger atomic --- src/atomic_ptr.hpp | 249 ++++++++++++++++++++++++++++++-------------- src/dish.cpp | 2 +- src/options.cpp | 4 +- src/options.hpp | 3 +- src/own.cpp | 4 +- src/socket_base.cpp | 2 +- src/xsub.cpp | 2 +- 7 files changed, 178 insertions(+), 88 deletions(-) diff --git a/src/atomic_ptr.hpp b/src/atomic_ptr.hpp index 875a4176..df1537f2 100644 --- a/src/atomic_ptr.hpp +++ b/src/atomic_ptr.hpp @@ -34,7 +34,8 @@ #define ZMQ_ATOMIC_PTR_MUTEX #elif defined ZMQ_HAVE_ATOMIC_INTRINSICS #define ZMQ_ATOMIC_PTR_INTRINSIC -#elif (defined __cplusplus && __cplusplus >= 201103L) +#elif ((defined __cplusplus && __cplusplus >= 201103L) \ + || (defined _MSC_VER && _MSC_VER >= 1700)) #define ZMQ_ATOMIC_PTR_CXX11 #elif (defined __i386__ || defined __x86_64__) && defined __GNUC__ #define ZMQ_ATOMIC_PTR_X86 @@ -65,6 +66,110 @@ namespace zmq { +#if !defined ZMQ_ATOMIC_PTR_CXX11 +inline void *atomic_xchg_ptr (void **ptr, + void *const val_ +#if defined ZMQ_ATOMIC_PTR_MUTEX + , + mutex_t &sync +#endif +) +{ +#if defined ZMQ_ATOMIC_PTR_WINDOWS + return InterlockedExchangePointer ((PVOID *) ptr, val_); +#elif defined ZMQ_ATOMIC_PTR_INTRINSIC + return __atomic_exchange_n (ptr, val_, __ATOMIC_ACQ_REL); +#elif defined ZMQ_ATOMIC_PTR_ATOMIC_H + return atomic_swap_ptr (ptr, val_); +#elif defined ZMQ_ATOMIC_PTR_TILE + return arch_atomic_exchange (ptr, val_); +#elif defined ZMQ_ATOMIC_PTR_X86 + void *old; + __asm__ volatile("lock; xchg %0, %2" + : "=r"(old), "=m"(*ptr) + : "m"(*ptr), "0"(val_)); + return old; +#elif defined ZMQ_ATOMIC_PTR_ARM + void *old; + unsigned int flag; + __asm__ volatile(" dmb sy\n\t" + "1: ldrex %1, [%3]\n\t" + " strex %0, %4, [%3]\n\t" + " teq %0, #0\n\t" + " bne 1b\n\t" + " dmb sy\n\t" + : "=&r"(flag), "=&r"(old), "+Qo"(*ptr) + : "r"(ptr), "r"(val_) + : "cc"); + return old; +#elif defined ZMQ_ATOMIC_PTR_MUTEX + sync.lock (); + void *old = *ptr; + *ptr = val_; + sync.unlock (); + return old; +#else +#error atomic_ptr is not implemented for this platform +#endif +} + +inline void *atomic_cas (void *volatile *ptr_, + void *cmp_, + void *val_ +#if defined ZMQ_ATOMIC_PTR_MUTEX + , + mutex_t &sync +#endif +) +{ +#if defined ZMQ_ATOMIC_PTR_WINDOWS + return InterlockedCompareExchangePointer ((volatile PVOID *) ptr_, val_, + cmp_); +#elif defined ZMQ_ATOMIC_PTR_INTRINSIC + void *old = cmp_; + __atomic_compare_exchange_n (ptr_, &old, val_, false, __ATOMIC_RELEASE, + __ATOMIC_ACQUIRE); + return old; +#elif defined ZMQ_ATOMIC_PTR_ATOMIC_H + return atomic_cas_ptr (ptr_, cmp_, val_); +#elif defined ZMQ_ATOMIC_PTR_TILE + return arch_atomic_val_compare_and_exchange (ptr_, cmp_, val_); +#elif defined ZMQ_ATOMIC_PTR_X86 + void *old; + __asm__ volatile("lock; cmpxchg %2, %3" + : "=a"(old), "=m"(*ptr_) + : "r"(val_), "m"(*ptr_), "0"(cmp_) + : "cc"); + return old; +#elif defined ZMQ_ATOMIC_PTR_ARM + void *old; + unsigned int flag; + __asm__ volatile(" dmb sy\n\t" + "1: ldrex %1, [%3]\n\t" + " mov %0, #0\n\t" + " teq %1, %4\n\t" + " it eq\n\t" + " strexeq %0, %5, [%3]\n\t" + " teq %0, #0\n\t" + " bne 1b\n\t" + " dmb sy\n\t" + : "=&r"(flag), "=&r"(old), "+Qo"(*ptr_) + : "r"(ptr_), "r"(cmp_), "r"(val_) + : "cc"); + return old; +#elif defined ZMQ_ATOMIC_PTR_MUTEX + sync.lock (); + void *old = *ptr_; + if (*ptr_ == cmp_) + *ptr_ = val_; + sync.unlock (); + return old; +#else +#error atomic_ptr is not implemented for this platform +#endif +} +#endif + // This class encapsulates several atomic operations on pointers. template class atomic_ptr_t @@ -85,43 +190,15 @@ template class atomic_ptr_t // to the 'val' value. Old value is returned. inline T *xchg (T *val_) { -#if defined ZMQ_ATOMIC_PTR_WINDOWS - return (T *) InterlockedExchangePointer ((PVOID *) &ptr, val_); -#elif defined ZMQ_ATOMIC_PTR_INTRINSIC - return (T *) __atomic_exchange_n (&ptr, val_, __ATOMIC_ACQ_REL); -#elif defined ZMQ_ATOMIC_PTR_CXX11 +#if defined ZMQ_ATOMIC_PTR_CXX11 return ptr.exchange (val_, std::memory_order_acq_rel); -#elif defined ZMQ_ATOMIC_PTR_ATOMIC_H - return (T *) atomic_swap_ptr (&ptr, val_); -#elif defined ZMQ_ATOMIC_PTR_TILE - return (T *) arch_atomic_exchange (&ptr, val_); -#elif defined ZMQ_ATOMIC_PTR_X86 - T *old; - __asm__ volatile("lock; xchg %0, %2" - : "=r"(old), "=m"(ptr) - : "m"(ptr), "0"(val_)); - return old; -#elif defined ZMQ_ATOMIC_PTR_ARM - T *old; - unsigned int flag; - __asm__ volatile(" dmb sy\n\t" - "1: ldrex %1, [%3]\n\t" - " strex %0, %4, [%3]\n\t" - " teq %0, #0\n\t" - " bne 1b\n\t" - " dmb sy\n\t" - : "=&r"(flag), "=&r"(old), "+Qo"(ptr) - : "r"(&ptr), "r"(val_) - : "cc"); - return old; -#elif defined ZMQ_ATOMIC_PTR_MUTEX - sync.lock (); - T *old = (T *) ptr; - ptr = val_; - sync.unlock (); - return old; #else -#error atomic_ptr is not implemented for this platform + return (T *) atomic_xchg_ptr ((void **) &ptr, val_ +#if defined ZMQ_ATOMIC_PTR_MUTEX + , + sync +#endif + ); #endif } @@ -131,53 +208,16 @@ template class atomic_ptr_t // is returned. inline T *cas (T *cmp_, T *val_) { -#if defined ZMQ_ATOMIC_PTR_WINDOWS - return (T *) InterlockedCompareExchangePointer ((volatile PVOID *) &ptr, - val_, cmp_); -#elif defined ZMQ_ATOMIC_PTR_INTRINSIC - T *old = cmp_; - __atomic_compare_exchange_n (&ptr, (volatile T **) &old, val_, false, - __ATOMIC_RELEASE, __ATOMIC_ACQUIRE); - return old; -#elif defined ZMQ_ATOMIC_PTR_CXX11 +#if defined ZMQ_ATOMIC_PTR_CXX11 ptr.compare_exchange_strong (cmp_, val_, std::memory_order_acq_rel); return cmp_; -#elif defined ZMQ_ATOMIC_PTR_ATOMIC_H - return (T *) atomic_cas_ptr (&ptr, cmp_, val_); -#elif defined ZMQ_ATOMIC_PTR_TILE - return (T *) arch_atomic_val_compare_and_exchange (&ptr, cmp_, val_); -#elif defined ZMQ_ATOMIC_PTR_X86 - T *old; - __asm__ volatile("lock; cmpxchg %2, %3" - : "=a"(old), "=m"(ptr) - : "r"(val_), "m"(ptr), "0"(cmp_) - : "cc"); - return old; -#elif defined ZMQ_ATOMIC_PTR_ARM - T *old; - unsigned int flag; - __asm__ volatile(" dmb sy\n\t" - "1: ldrex %1, [%3]\n\t" - " mov %0, #0\n\t" - " teq %1, %4\n\t" - " it eq\n\t" - " strexeq %0, %5, [%3]\n\t" - " teq %0, #0\n\t" - " bne 1b\n\t" - " dmb sy\n\t" - : "=&r"(flag), "=&r"(old), "+Qo"(ptr) - : "r"(&ptr), "r"(cmp_), "r"(val_) - : "cc"); - return old; -#elif defined ZMQ_ATOMIC_PTR_MUTEX - sync.lock (); - T *old = (T *) ptr; - if (ptr == cmp_) - ptr = val_; - sync.unlock (); - return old; #else -#error atomic_ptr is not implemented for this platform + return (T *) atomic_cas ((void **) &ptr, cmp_, val_ +#if defined ZMQ_ATOMIC_PTR_MUTEX + , + sync +#endif + ); #endif } @@ -197,6 +237,55 @@ template class atomic_ptr_t const atomic_ptr_t &operator= (const atomic_ptr_t &); #endif }; + +struct atomic_value_t +{ + atomic_value_t (const int value_) : value (value_) {} + + atomic_value_t (const atomic_value_t &src) : value (src.load ()) {} + + void store (const int value_) + { +#if defined ZMQ_ATOMIC_PTR_CXX11 + value.store (value_, std::memory_order_release); +#else + atomic_xchg_ptr ((void **) &value, (void *) (ptrdiff_t) value_ +#if defined ZMQ_ATOMIC_PTR_MUTEX + , + sync +#endif + ); +#endif + } + + int load () const + { +#if defined ZMQ_ATOMIC_PTR_CXX11 + return value.load (std::memory_order_acquire); +#else + return (int) (ptrdiff_t) atomic_cas ((void **) &value, 0, 0 +#if defined ZMQ_ATOMIC_PTR_MUTEX + , + sync +#endif + ); +#endif + } + + private: +#if defined ZMQ_ATOMIC_PTR_CXX11 + std::atomic value; +#else + volatile ptrdiff_t value; +#endif + +#if defined ZMQ_ATOMIC_PTR_MUTEX + mutex_t sync; +#endif + + private: + atomic_value_t &operator= (const atomic_value_t &src); +}; } // Remove macros local to this file. diff --git a/src/dish.cpp b/src/dish.cpp index c4e51a58..b45c24e2 100644 --- a/src/dish.cpp +++ b/src/dish.cpp @@ -42,7 +42,7 @@ zmq::dish_t::dish_t (class ctx_t *parent_, uint32_t tid_, int sid_) : // When socket is being closed down we don't want to wait till pending // subscription commands are sent to the wire. - options.linger = 0; + options.linger.store (0); int rc = message.init (); errno_assert (rc == 0); diff --git a/src/options.cpp b/src/options.cpp index 05eec97b..e42fcea7 100644 --- a/src/options.cpp +++ b/src/options.cpp @@ -216,7 +216,7 @@ int zmq::options_t::setsockopt (int option_, case ZMQ_LINGER: if (is_int && value >= -1) { - linger = value; + linger.store (value); return 0; } break; @@ -733,7 +733,7 @@ int zmq::options_t::getsockopt (int option_, case ZMQ_LINGER: if (is_int) { - *value = linger; + *value = linger.load (); return 0; } break; diff --git a/src/options.hpp b/src/options.hpp index d5104f72..43f43416 100644 --- a/src/options.hpp +++ b/src/options.hpp @@ -34,6 +34,7 @@ #include #include +#include "atomic_ptr.hpp" #include "stddef.h" #include "stdint.hpp" #include "tcp_address.hpp" @@ -100,7 +101,7 @@ struct options_t int type; // Linger time, in milliseconds. - int linger; + atomic_value_t linger; // Maximum interval in milliseconds beyond which userspace will // timeout connect(). diff --git a/src/own.cpp b/src/own.cpp index 3fc4fabf..b5933bb8 100644 --- a/src/own.cpp +++ b/src/own.cpp @@ -115,7 +115,7 @@ void zmq::own_t::process_term_req (own_t *object_) // Note that this object is the root of the (partial shutdown) thus, its // value of linger is used, rather than the value stored by the children. - send_term (object_, options.linger); + send_term (object_, options.linger.load ()); } void zmq::own_t::process_own (own_t *object_) @@ -142,7 +142,7 @@ void zmq::own_t::terminate () // As for the root of the ownership tree, there's no one to terminate it, // so it has to terminate itself. if (!owner) { - process_term (options.linger); + process_term (options.linger.load ()); return; } diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 09cc441d..11962091 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -205,7 +205,7 @@ zmq::socket_base_t::socket_base_t (ctx_t *parent_, { options.socket_id = sid_; options.ipv6 = (parent_->get (ZMQ_IPV6) != 0); - options.linger = parent_->get (ZMQ_BLOCKY) ? -1 : 0; + options.linger.store (parent_->get (ZMQ_BLOCKY) ? -1 : 0); if (thread_safe) { mailbox = new (std::nothrow) mailbox_safe_t (&sync); diff --git a/src/xsub.cpp b/src/xsub.cpp index 6efb968b..86df5bf7 100644 --- a/src/xsub.cpp +++ b/src/xsub.cpp @@ -43,7 +43,7 @@ zmq::xsub_t::xsub_t (class ctx_t *parent_, uint32_t tid_, int sid_) : // When socket is being closed down we don't want to wait till pending // subscription commands are sent to the wire. - options.linger = 0; + options.linger.store (0); int rc = message.init (); errno_assert (rc == 0);