From c2ddda2f5119a7f9dc44d79f340fd7c63a68af7d Mon Sep 17 00:00:00 2001 From: ameerj <52414509+ameerj@users.noreply.github.com> Date: Wed, 15 Sep 2021 20:18:36 -0400 Subject: [PATCH] threadsafe_queue: Add std::stop_token overload to PopWait Useful for jthreads which make use of the threadsafe queues. --- src/common/threadsafe_queue.h | 27 ++++++++++++++++++++++----- 1 file changed, 22 insertions(+), 5 deletions(-) diff --git a/src/common/threadsafe_queue.h b/src/common/threadsafe_queue.h index 8430b9778a..2c8c2b90e9 100644 --- a/src/common/threadsafe_queue.h +++ b/src/common/threadsafe_queue.h @@ -14,7 +14,7 @@ #include namespace Common { -template +template class SPSCQueue { public: SPSCQueue() { @@ -84,7 +84,7 @@ public: void Wait() { if (Empty()) { std::unique_lock lock{cv_mutex}; - cv.wait(lock, [this]() { return !Empty(); }); + cv.wait(lock, [this] { return !Empty(); }); } } @@ -95,6 +95,19 @@ public: return t; } + T PopWait(std::stop_token stop_token) { + if (Empty()) { + std::unique_lock lock{cv_mutex}; + cv.wait(lock, stop_token, [this] { return !Empty(); }); + } + if (stop_token.stop_requested()) { + return T{}; + } + T t; + Pop(t); + return t; + } + // not thread-safe void Clear() { size.store(0); @@ -123,13 +136,13 @@ private: ElementPtr* read_ptr; std::atomic_size_t size{0}; std::mutex cv_mutex; - std::condition_variable cv; + std::conditional_t cv; }; // a simple thread-safe, // single reader, multiple writer queue -template +template class MPSCQueue { public: [[nodiscard]] std::size_t Size() const { @@ -166,13 +179,17 @@ public: return spsc_queue.PopWait(); } + T PopWait(std::stop_token stop_token) { + return spsc_queue.PopWait(stop_token); + } + // not thread-safe void Clear() { spsc_queue.Clear(); } private: - SPSCQueue spsc_queue; + SPSCQueue spsc_queue; std::mutex write_lock; }; } // namespace Common