/* Copyright (c) 2020 Erik Rigtorp Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ #pragma once #include #include #include // offsetof #include #include #include // std::hardware_destructive_interference_size #include #ifndef __cpp_aligned_new #ifdef _WIN32 #include // _aligned_malloc #else #include // posix_memalign #endif #endif namespace rigtorp { namespace mpmc { #ifdef __cpp_lib_hardware_interference_size static constexpr size_t hardwareInterferenceSize = std::hardware_destructive_interference_size; #else static constexpr size_t hardwareInterferenceSize = 64; #endif #if defined(__cpp_aligned_new) template using AlignedAllocator = std::allocator; #else template struct AlignedAllocator { using value_type = T; T *allocate(std::size_t n) { if (n > std::numeric_limits::max() / sizeof(T)) { throw std::bad_array_new_length(); } #ifdef _WIN32 auto *p = static_cast(_aligned_malloc(sizeof(T) * n, alignof(T))); if (p == nullptr) { throw std::bad_alloc(); } #else T *p; if (posix_memalign(reinterpret_cast(&p), alignof(T), sizeof(T) * n) != 0) { throw std::bad_alloc(); } #endif return p; } void deallocate(T *p, std::size_t) { #ifdef WIN32 _aligned_free(p); #else free(p); #endif } }; #endif template struct Slot { ~Slot() noexcept { if (turn & 1) { destroy(); } } template void construct(Args &&... args) noexcept { static_assert(std::is_nothrow_constructible::value, "T must be nothrow constructible with Args&&..."); new (&storage) T(std::forward(args)...); } void destroy() noexcept { static_assert(std::is_nothrow_destructible::value, "T must be nothrow destructible"); reinterpret_cast(&storage)->~T(); } T &&move() noexcept { return reinterpret_cast(storage); } // Align to avoid false sharing between adjacent slots alignas(hardwareInterferenceSize) std::atomic turn = {0}; typename std::aligned_storage::type storage; }; template >> class Queue { private: static_assert(std::is_nothrow_copy_assignable::value || std::is_nothrow_move_assignable::value, "T must be nothrow copy or move assignable"); static_assert(std::is_nothrow_destructible::value, "T must be nothrow destructible"); public: explicit Queue(const size_t capacity, const Allocator &allocator = Allocator()) : capacity_(capacity), allocator_(allocator), head_(0), tail_(0) { if (capacity_ < 1) { throw std::invalid_argument("capacity < 1"); } // Allocate one extra slot to prevent false sharing on the last slot slots_ = allocator_.allocate(capacity_ + 1); // Allocators are not required to honor alignment for over-aligned types // (see http://eel.is/c++draft/allocator.requirements#10) so we verify // alignment here if (reinterpret_cast(slots_) % alignof(Slot) != 0) { allocator_.deallocate(slots_, capacity_ + 1); throw std::bad_alloc(); } for (size_t i = 0; i < capacity_; ++i) { new (&slots_[i]) Slot(); } static_assert( alignof(Slot) == hardwareInterferenceSize, "Slot must be aligned to cache line boundary to prevent false sharing"); static_assert(sizeof(Slot) % hardwareInterferenceSize == 0, "Slot size must be a multiple of cache line size to prevent " "false sharing between adjacent slots"); static_assert(sizeof(Queue) % hardwareInterferenceSize == 0, "Queue size must be a multiple of cache line size to " "prevent false sharing between adjacent queues"); static_assert( offsetof(Queue, tail_) - offsetof(Queue, head_) == static_cast(hardwareInterferenceSize), "head and tail must be a cache line apart to prevent false sharing"); } ~Queue() noexcept { for (size_t i = 0; i < capacity_; ++i) { slots_[i].~Slot(); } allocator_.deallocate(slots_, capacity_ + 1); } // non-copyable and non-movable Queue(const Queue &) = delete; Queue &operator=(const Queue &) = delete; template void emplace(Args &&... args) noexcept { static_assert(std::is_nothrow_constructible::value, "T must be nothrow constructible with Args&&..."); auto const head = head_.fetch_add(1); auto &slot = slots_[idx(head)]; while (turn(head) * 2 != slot.turn.load(std::memory_order_acquire)) ; slot.construct(std::forward(args)...); slot.turn.store(turn(head) * 2 + 1, std::memory_order_release); } template bool try_emplace(Args &&... args) noexcept { static_assert(std::is_nothrow_constructible::value, "T must be nothrow constructible with Args&&..."); auto head = head_.load(std::memory_order_acquire); for (;;) { auto &slot = slots_[idx(head)]; if (turn(head) * 2 == slot.turn.load(std::memory_order_acquire)) { if (head_.compare_exchange_strong(head, head + 1)) { slot.construct(std::forward(args)...); slot.turn.store(turn(head) * 2 + 1, std::memory_order_release); return true; } } else { auto const prevHead = head; head = head_.load(std::memory_order_acquire); if (head == prevHead) { return false; } } } } void push(const T &v) noexcept { static_assert(std::is_nothrow_copy_constructible::value, "T must be nothrow copy constructible"); emplace(v); } template ::value>::type> void push(P &&v) noexcept { emplace(std::forward

(v)); } bool try_push(const T &v) noexcept { static_assert(std::is_nothrow_copy_constructible::value, "T must be nothrow copy constructible"); return try_emplace(v); } template ::value>::type> bool try_push(P &&v) noexcept { return try_emplace(std::forward

(v)); } void pop(T &v) noexcept { auto const tail = tail_.fetch_add(1); auto &slot = slots_[idx(tail)]; while (turn(tail) * 2 + 1 != slot.turn.load(std::memory_order_acquire)) ; v = slot.move(); slot.destroy(); slot.turn.store(turn(tail) * 2 + 2, std::memory_order_release); } bool try_pop(T &v) noexcept { auto tail = tail_.load(std::memory_order_acquire); for (;;) { auto &slot = slots_[idx(tail)]; if (turn(tail) * 2 + 1 == slot.turn.load(std::memory_order_acquire)) { if (tail_.compare_exchange_strong(tail, tail + 1)) { v = slot.move(); slot.destroy(); slot.turn.store(turn(tail) * 2 + 2, std::memory_order_release); return true; } } else { auto const prevTail = tail; tail = tail_.load(std::memory_order_acquire); if (tail == prevTail) { return false; } } } } private: constexpr size_t idx(size_t i) const noexcept { return i % capacity_; } constexpr size_t turn(size_t i) const noexcept { return i / capacity_; } private: const size_t capacity_; Slot *slots_; #if defined(__has_cpp_attribute) && __has_cpp_attribute(no_unique_address) Allocator allocator_ [[no_unique_address]]; #else Allocator allocator_; #endif // Align to avoid false sharing between head_ and tail_ alignas(hardwareInterferenceSize) std::atomic head_; alignas(hardwareInterferenceSize) std::atomic tail_; }; } // namespace mpmc template >> using MPMCQueue = mpmc::Queue; } // namespace rigtorp