1
1
Fork 0
forked from suyu/suyu

bounded_threadsafe_queue: Add TryPush

This commit is contained in:
Morph 2023-03-19 04:01:47 -04:00
parent f28ca5361f
commit 15d573194c

View file

@ -22,6 +22,55 @@ class SPSCQueue {
static_assert((Capacity & (Capacity - 1)) == 0, "Capacity must be a power of two."); static_assert((Capacity & (Capacity - 1)) == 0, "Capacity must be a power of two.");
public: public:
bool TryPush(T&& t) {
const size_t write_index = m_write_index.load();
// Check if we have free slots to write to.
if ((write_index - m_read_index.load()) == Capacity) {
return false;
}
// Determine the position to write to.
const size_t pos = write_index % Capacity;
// Push into the queue.
m_data[pos] = std::move(t);
// Increment the write index.
++m_write_index;
// Notify the consumer that we have pushed into the queue.
std::scoped_lock lock{cv_mutex};
cv.notify_one();
return true;
}
template <typename... Args>
bool TryPush(Args&&... args) {
const size_t write_index = m_write_index.load();
// Check if we have free slots to write to.
if ((write_index - m_read_index.load()) == Capacity) {
return false;
}
// Determine the position to write to.
const size_t pos = write_index % Capacity;
// Emplace into the queue.
std::construct_at(std::addressof(m_data[pos]), std::forward<Args>(args)...);
// Increment the write index.
++m_write_index;
// Notify the consumer that we have pushed into the queue.
std::scoped_lock lock{cv_mutex};
cv.notify_one();
return true;
}
void Push(T&& t) { void Push(T&& t) {
const size_t write_index = m_write_index.load(); const size_t write_index = m_write_index.load();
@ -153,6 +202,17 @@ private:
template <typename T, size_t Capacity = detail::DefaultCapacity> template <typename T, size_t Capacity = detail::DefaultCapacity>
class MPSCQueue { class MPSCQueue {
public: public:
bool TryPush(T&& t) {
std::scoped_lock lock{write_mutex};
return spsc_queue.TryPush(std::move(t));
}
template <typename... Args>
bool TryPush(Args&&... args) {
std::scoped_lock lock{write_mutex};
return spsc_queue.TryPush(std::forward<Args>(args)...);
}
void Push(T&& t) { void Push(T&& t) {
std::scoped_lock lock{write_mutex}; std::scoped_lock lock{write_mutex};
spsc_queue.Push(std::move(t)); spsc_queue.Push(std::move(t));
@ -196,6 +256,17 @@ private:
template <typename T, size_t Capacity = detail::DefaultCapacity> template <typename T, size_t Capacity = detail::DefaultCapacity>
class MPMCQueue { class MPMCQueue {
public: public:
bool TryPush(T&& t) {
std::scoped_lock lock{write_mutex};
return spsc_queue.TryPush(std::move(t));
}
template <typename... Args>
bool TryPush(Args&&... args) {
std::scoped_lock lock{write_mutex};
return spsc_queue.TryPush(std::forward<Args>(args)...);
}
void Push(T&& t) { void Push(T&& t) {
std::scoped_lock lock{write_mutex}; std::scoped_lock lock{write_mutex};
spsc_queue.Push(std::move(t)); spsc_queue.Push(std::move(t));