From 306840a5808cae10bf5d91e4b6e8a91cd619386b Mon Sep 17 00:00:00 2001
From: Morph <39850852+Morph1984@users.noreply.github.com>
Date: Sun, 19 Mar 2023 03:19:25 -0400
Subject: [PATCH] bounded_threadsafe_queue: Use simplified impl of bounded
 queue

Provides a simplified SPSC, MPSC, and MPMC bounded queue implementation using mutexes.
---
 src/common/bounded_threadsafe_queue.h | 337 ++++++++++++++++----------
 src/video_core/gpu_thread.cpp         |   7 +-
 2 files changed, 216 insertions(+), 128 deletions(-)

diff --git a/src/common/bounded_threadsafe_queue.h b/src/common/bounded_threadsafe_queue.h
index 14e887c70..e03427539 100644
--- a/src/common/bounded_threadsafe_queue.h
+++ b/src/common/bounded_threadsafe_queue.h
@@ -1,159 +1,246 @@
-// SPDX-FileCopyrightText: Copyright (c) 2020 Erik Rigtorp <erik@rigtorp.se>
-// SPDX-License-Identifier: MIT
+// SPDX-FileCopyrightText: Copyright 2023 yuzu Emulator Project
+// SPDX-License-Identifier: GPL-2.0-or-later
 
 #pragma once
 
 #include <atomic>
-#include <bit>
 #include <condition_variable>
-#include <memory>
+#include <cstddef>
 #include <mutex>
 #include <new>
-#include <type_traits>
-#include <utility>
 
 #include "common/polyfill_thread.h"
 
 namespace Common {
 
-#if defined(__cpp_lib_hardware_interference_size)
-constexpr size_t hardware_interference_size = std::hardware_destructive_interference_size;
-#else
-constexpr size_t hardware_interference_size = 64;
-#endif
+namespace detail {
+constexpr size_t DefaultCapacity = 0x1000;
+} // namespace detail
+
+template <typename T, size_t Capacity = detail::DefaultCapacity>
+class SPSCQueue {
+    static_assert((Capacity & (Capacity - 1)) == 0, "Capacity must be a power of two.");
 
-template <typename T, size_t capacity = 0x400>
-class MPSCQueue {
 public:
-    explicit MPSCQueue() : allocator{std::allocator<Slot<T>>()} {
-        // Allocate one extra slot to prevent false sharing on the last slot
-        slots = allocator.allocate(capacity + 1);
-        // Allocators are not required to honor alignment for over-aligned types
-        // (see http://eel.is/c++draft/allocator.requirements#10) so we verify
-        // alignment here
-        if (reinterpret_cast<uintptr_t>(slots) % alignof(Slot<T>) != 0) {
-            allocator.deallocate(slots, capacity + 1);
-            throw std::bad_alloc();
-        }
-        for (size_t i = 0; i < capacity; ++i) {
-            std::construct_at(&slots[i]);
-        }
-        static_assert(std::has_single_bit(capacity), "capacity must be an integer power of 2");
-        static_assert(alignof(Slot<T>) == hardware_interference_size,
-                      "Slot must be aligned to cache line boundary to prevent false sharing");
-        static_assert(sizeof(Slot<T>) % hardware_interference_size == 0,
-                      "Slot size must be a multiple of cache line size to prevent "
-                      "false sharing between adjacent slots");
-        static_assert(sizeof(MPSCQueue) % hardware_interference_size == 0,
-                      "Queue size must be a multiple of cache line size to "
-                      "prevent false sharing between adjacent queues");
-    }
+    void Push(T&& t) {
+        const size_t write_index = m_write_index.load();
 
-    ~MPSCQueue() noexcept {
-        for (size_t i = 0; i < capacity; ++i) {
-            std::destroy_at(&slots[i]);
-        }
-        allocator.deallocate(slots, capacity + 1);
-    }
-
-    // The queue must be both non-copyable and non-movable
-    MPSCQueue(const MPSCQueue&) = delete;
-    MPSCQueue& operator=(const MPSCQueue&) = delete;
-
-    MPSCQueue(MPSCQueue&&) = delete;
-    MPSCQueue& operator=(MPSCQueue&&) = delete;
-
-    void Push(const T& v) noexcept {
-        static_assert(std::is_nothrow_copy_constructible_v<T>,
-                      "T must be nothrow copy constructible");
-        emplace(v);
-    }
-
-    template <typename P, typename = std::enable_if_t<std::is_nothrow_constructible_v<T, P&&>>>
-    void Push(P&& v) noexcept {
-        emplace(std::forward<P>(v));
-    }
-
-    void Pop(T& v, std::stop_token stop) noexcept {
-        auto const tail = tail_.fetch_add(1);
-        auto& slot = slots[idx(tail)];
-        if (!slot.turn.test()) {
-            std::unique_lock lock{cv_mutex};
-            Common::CondvarWait(cv, lock, stop, [&slot] { return slot.turn.test(); });
-        }
-        v = slot.move();
-        slot.destroy();
-        slot.turn.clear();
-        slot.turn.notify_one();
-    }
-
-private:
-    template <typename U = T>
-    struct Slot {
-        ~Slot() noexcept {
-            if (turn.test()) {
-                destroy();
-            }
+        // Wait until we have free slots to write to.
+        while ((write_index - m_read_index.load()) == Capacity) {
+            std::this_thread::yield();
         }
 
-        template <typename... Args>
-        void construct(Args&&... args) noexcept {
-            static_assert(std::is_nothrow_constructible_v<U, Args&&...>,
-                          "T must be nothrow constructible with Args&&...");
-            std::construct_at(reinterpret_cast<U*>(&storage), std::forward<Args>(args)...);
-        }
+        // Determine the position to write to.
+        const size_t pos = write_index % Capacity;
 
-        void destroy() noexcept {
-            static_assert(std::is_nothrow_destructible_v<U>, "T must be nothrow destructible");
-            std::destroy_at(reinterpret_cast<U*>(&storage));
-        }
+        // Push into the queue.
+        m_data[pos] = std::move(t);
 
-        U&& move() noexcept {
-            return reinterpret_cast<U&&>(storage);
-        }
+        // Increment the write index.
+        ++m_write_index;
 
-        // Align to avoid false sharing between adjacent slots
-        alignas(hardware_interference_size) std::atomic_flag turn{};
-        struct aligned_store {
-            struct type {
-                alignas(U) unsigned char data[sizeof(U)];
-            };
-        };
-        typename aligned_store::type storage;
-    };
-
-    template <typename... Args>
-    void emplace(Args&&... args) noexcept {
-        static_assert(std::is_nothrow_constructible_v<T, Args&&...>,
-                      "T must be nothrow constructible with Args&&...");
-        auto const head = head_.fetch_add(1);
-        auto& slot = slots[idx(head)];
-        slot.turn.wait(true);
-        slot.construct(std::forward<Args>(args)...);
-        slot.turn.test_and_set();
+        // Notify the consumer that we have pushed into the queue.
+        std::scoped_lock lock{cv_mutex};
         cv.notify_one();
     }
 
-    constexpr size_t idx(size_t i) const noexcept {
-        return i & mask;
+    template <typename... Args>
+    void Push(Args&&... args) {
+        const size_t write_index = m_write_index.load();
+
+        // Wait until we have free slots to write to.
+        while ((write_index - m_read_index.load()) == Capacity) {
+            std::this_thread::yield();
+        }
+
+        // Determine the position to write to.
+        const size_t pos = write_index % Capacity;
+
+        // Emplace into the queue.
+        std::construct_at(std::addressof(m_data[pos]), std::forward<Args>(args)...);
+
+        // Increment the write index.
+        ++m_write_index;
+
+        // Notify the consumer that we have pushed into the queue.
+        std::scoped_lock lock{cv_mutex};
+        cv.notify_one();
     }
 
-    static constexpr size_t mask = capacity - 1;
+    bool TryPop(T& t) {
+        return Pop(t);
+    }
 
-    // Align to avoid false sharing between head_ and tail_
-    alignas(hardware_interference_size) std::atomic<size_t> head_{0};
-    alignas(hardware_interference_size) std::atomic<size_t> tail_{0};
+    void PopWait(T& t, std::stop_token stop_token) {
+        Wait(stop_token);
+        Pop(t);
+    }
+
+    T PopWait(std::stop_token stop_token) {
+        Wait(stop_token);
+        T t;
+        Pop(t);
+        return t;
+    }
+
+    void Clear() {
+        while (!Empty()) {
+            Pop();
+        }
+    }
+
+    bool Empty() const {
+        return m_read_index.load() == m_write_index.load();
+    }
+
+    size_t Size() const {
+        return m_write_index.load() - m_read_index.load();
+    }
+
+private:
+    void Pop() {
+        const size_t read_index = m_read_index.load();
+
+        // Check if the queue is empty.
+        if (read_index == m_write_index.load()) {
+            return;
+        }
+
+        // Determine the position to read from.
+        const size_t pos = read_index % Capacity;
+
+        // Pop the data off the queue, deleting it.
+        std::destroy_at(std::addressof(m_data[pos]));
+
+        // Increment the read index.
+        ++m_read_index;
+    }
+
+    bool Pop(T& t) {
+        const size_t read_index = m_read_index.load();
+
+        // Check if the queue is empty.
+        if (read_index == m_write_index.load()) {
+            return false;
+        }
+
+        // Determine the position to read from.
+        const size_t pos = read_index % Capacity;
+
+        // Pop the data off the queue, moving it.
+        t = std::move(m_data[pos]);
+
+        // Increment the read index.
+        ++m_read_index;
+
+        return true;
+    }
+
+    void Wait(std::stop_token stop_token) {
+        std::unique_lock lock{cv_mutex};
+        Common::CondvarWait(cv, lock, stop_token, [this] { return !Empty(); });
+    }
+
+    alignas(128) std::atomic_size_t m_read_index{0};
+    alignas(128) std::atomic_size_t m_write_index{0};
+
+    std::array<T, Capacity> m_data;
 
-    std::mutex cv_mutex;
     std::condition_variable_any cv;
+    std::mutex cv_mutex;
+};
 
-    Slot<T>* slots;
-    [[no_unique_address]] std::allocator<Slot<T>> allocator;
+template <typename T, size_t Capacity = detail::DefaultCapacity>
+class MPSCQueue {
+public:
+    void Push(T&& t) {
+        std::scoped_lock lock{write_mutex};
+        spsc_queue.Push(std::move(t));
+    }
 
-    static_assert(std::is_nothrow_copy_assignable_v<T> || std::is_nothrow_move_assignable_v<T>,
-                  "T must be nothrow copy or move assignable");
+    template <typename... Args>
+    void Push(Args&&... args) {
+        std::scoped_lock lock{write_mutex};
+        spsc_queue.Push(std::forward<Args>(args)...);
+    }
 
-    static_assert(std::is_nothrow_destructible_v<T>, "T must be nothrow destructible");
+    bool TryPop(T& t) {
+        return spsc_queue.TryPop(t);
+    }
+
+    void PopWait(T& t, std::stop_token stop_token) {
+        spsc_queue.PopWait(t, stop_token);
+    }
+
+    T PopWait(std::stop_token stop_token) {
+        return spsc_queue.PopWait(stop_token);
+    }
+
+    void Clear() {
+        spsc_queue.Clear();
+    }
+
+    bool Empty() {
+        return spsc_queue.Empty();
+    }
+
+    size_t Size() {
+        return spsc_queue.Size();
+    }
+
+private:
+    SPSCQueue<T, Capacity> spsc_queue;
+    std::mutex write_mutex;
+};
+
+template <typename T, size_t Capacity = detail::DefaultCapacity>
+class MPMCQueue {
+public:
+    void Push(T&& t) {
+        std::scoped_lock lock{write_mutex};
+        spsc_queue.Push(std::move(t));
+    }
+
+    template <typename... Args>
+    void Push(Args&&... args) {
+        std::scoped_lock lock{write_mutex};
+        spsc_queue.Push(std::forward<Args>(args)...);
+    }
+
+    bool TryPop(T& t) {
+        std::scoped_lock lock{read_mutex};
+        return spsc_queue.TryPop(t);
+    }
+
+    void PopWait(T& t, std::stop_token stop_token) {
+        std::scoped_lock lock{read_mutex};
+        spsc_queue.PopWait(t, stop_token);
+    }
+
+    T PopWait(std::stop_token stop_token) {
+        std::scoped_lock lock{read_mutex};
+        return spsc_queue.PopWait(stop_token);
+    }
+
+    void Clear() {
+        std::scoped_lock lock{read_mutex};
+        spsc_queue.Clear();
+    }
+
+    bool Empty() {
+        std::scoped_lock lock{read_mutex};
+        return spsc_queue.Empty();
+    }
+
+    size_t Size() {
+        std::scoped_lock lock{read_mutex};
+        return spsc_queue.Size();
+    }
+
+private:
+    SPSCQueue<T, Capacity> spsc_queue;
+    std::mutex write_mutex;
+    std::mutex read_mutex;
 };
 
 } // namespace Common
diff --git a/src/video_core/gpu_thread.cpp b/src/video_core/gpu_thread.cpp
index f52f9e28f..469a59cf9 100644
--- a/src/video_core/gpu_thread.cpp
+++ b/src/video_core/gpu_thread.cpp
@@ -31,9 +31,10 @@ static void RunThread(std::stop_token stop_token, Core::System& system,
     auto current_context = context.Acquire();
     VideoCore::RasterizerInterface* const rasterizer = renderer.ReadRasterizer();
 
+    CommandDataContainer next;
+
     while (!stop_token.stop_requested()) {
-        CommandDataContainer next;
-        state.queue.Pop(next, stop_token);
+        state.queue.PopWait(next, stop_token);
         if (stop_token.stop_requested()) {
             break;
         }
@@ -117,7 +118,7 @@ u64 ThreadManager::PushCommand(CommandData&& command_data, bool block) {
 
     std::unique_lock lk(state.write_lock);
     const u64 fence{++state.last_fence};
-    state.queue.Push(CommandDataContainer(std::move(command_data), fence, block));
+    state.queue.Push(std::move(command_data), fence, block);
 
     if (block) {
         Common::CondvarWait(state.cv, lk, thread.get_stop_token(), [this, fence] {