bounded_threadsafe_queue: Add producer cv to avoid busy waiting

This commit is contained in:
Morph 2023-03-19 14:48:01 -04:00
parent 407dc917f1
commit 8c56481249
1 changed files with 29 additions and 17 deletions

View File

@ -45,12 +45,12 @@ public:
} }
void PopWait(T& t, std::stop_token stop_token) { void PopWait(T& t, std::stop_token stop_token) {
Wait(stop_token); ConsumerWait(stop_token);
Pop(t); Pop(t);
} }
T PopWait(std::stop_token stop_token) { T PopWait(std::stop_token stop_token) {
Wait(stop_token); ConsumerWait(stop_token);
T t; T t;
Pop(t); Pop(t);
return t; return t;
@ -88,9 +88,10 @@ private:
} }
} else if constexpr (Mode == PushMode::Wait) { } else if constexpr (Mode == PushMode::Wait) {
// Wait until we have free slots to write to. // Wait until we have free slots to write to.
while ((write_index - m_read_index.load()) == Capacity) { std::unique_lock lock{producer_cv_mutex};
std::this_thread::yield(); producer_cv.wait(lock, [this, write_index] {
} return (write_index - m_read_index.load()) < Capacity;
});
} else { } else {
static_assert(Mode < PushMode::Count, "Invalid PushMode."); static_assert(Mode < PushMode::Count, "Invalid PushMode.");
} }
@ -105,8 +106,8 @@ private:
++m_write_index; ++m_write_index;
// Notify the consumer that we have pushed into the queue. // Notify the consumer that we have pushed into the queue.
std::scoped_lock lock{cv_mutex}; std::scoped_lock lock{consumer_cv_mutex};
cv.notify_one(); consumer_cv.notify_one();
return true; return true;
} }
@ -122,9 +123,10 @@ private:
} }
} else if constexpr (Mode == PushMode::Wait) { } else if constexpr (Mode == PushMode::Wait) {
// Wait until we have free slots to write to. // Wait until we have free slots to write to.
while ((write_index - m_read_index.load()) == Capacity) { std::unique_lock lock{producer_cv_mutex};
std::this_thread::yield(); producer_cv.wait(lock, [this, write_index] {
} return (write_index - m_read_index.load()) < Capacity;
});
} else { } else {
static_assert(Mode < PushMode::Count, "Invalid PushMode."); static_assert(Mode < PushMode::Count, "Invalid PushMode.");
} }
@ -139,8 +141,8 @@ private:
++m_write_index; ++m_write_index;
// Notify the consumer that we have pushed into the queue. // Notify the consumer that we have pushed into the queue.
std::scoped_lock lock{cv_mutex}; std::scoped_lock lock{consumer_cv_mutex};
cv.notify_one(); consumer_cv.notify_one();
return true; return true;
} }
@ -161,6 +163,10 @@ private:
// Increment the read index. // Increment the read index.
++m_read_index; ++m_read_index;
// Notify the producer that we have popped off the queue.
std::unique_lock lock{producer_cv_mutex};
producer_cv.notify_one();
} }
bool Pop(T& t) { bool Pop(T& t) {
@ -180,12 +186,16 @@ private:
// Increment the read index. // Increment the read index.
++m_read_index; ++m_read_index;
// Notify the producer that we have popped off the queue.
std::scoped_lock lock{producer_cv_mutex};
producer_cv.notify_one();
return true; return true;
} }
void Wait(std::stop_token stop_token) { void ConsumerWait(std::stop_token stop_token) {
std::unique_lock lock{cv_mutex}; std::unique_lock lock{consumer_cv_mutex};
Common::CondvarWait(cv, lock, stop_token, [this] { return !Empty(); }); Common::CondvarWait(consumer_cv, lock, stop_token, [this] { return !Empty(); });
} }
alignas(128) std::atomic_size_t m_read_index{0}; alignas(128) std::atomic_size_t m_read_index{0};
@ -193,8 +203,10 @@ private:
std::array<T, Capacity> m_data; std::array<T, Capacity> m_data;
std::condition_variable_any cv; std::condition_variable_any producer_cv;
std::mutex cv_mutex; std::mutex producer_cv_mutex;
std::condition_variable_any consumer_cv;
std::mutex consumer_cv_mutex;
}; };
template <typename T, size_t Capacity = detail::DefaultCapacity> template <typename T, size_t Capacity = detail::DefaultCapacity>