You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

252 lines
8.9 KiB

//! Sync cond-var queue
#pragma once
#include <span>
#include <queue>
#include <optional>
#include "error.hh"
#include "condvar.hh"
namespace fx {
namespace details [[gnu::visibility("hidden")]] {
// std::queue::pop() is VOID?????
inline auto pop_queue(auto& q) {
auto p = std::move(q.front());
q.pop();
return p;
};
}
template<typename T, size_t N = std::dynamic_extent>
struct Queue;
//TODO: Fixed-sized queue: template<typename T, size_t N> Queue<T, N> { ... };
// TODO: In non-`dynamic_extent` Queue<T, N>, we can use `.wait_then(has_less_than_max_elements, st)` in `.push(T&&[, st])` to wait if queue has >= N elements already in (on optional stop-token `st`.).
template<typename T>
class Queue<T, std::dynamic_extent> {
template<typename, size_t N> //XXX: Uhh... Does this like, properly work to decl `Queue<T, any(N)>` as friend....?
friend class Queue;
constexpr static inline auto has_element = [](auto const& q) noexcept { return !q.empty(); };
public:
using value_type = std::decay_t<T>;
constexpr static inline size_t& extent = std::dynamic_extent;
constexpr static inline bool push_may_wait = false;
template<std::convertible_to<T> U = T>
inline void push(U&& value)
{
m_queue.with_unique_lock([&](auto& queue) { queue.push(std::forward<U>(value)); });
m_queue.notify_one();
}
inline value_type pop()
{
return this->m_queue.wait_then([] (auto& queue) { return details::pop_queue( queue ); }, has_element);
}
#if 1
inline std::optional<value_type> try_pop(std::stop_token st) noexcept
{
std::optional<value_type> v{ std::nullopt };
m_queue.try_wait_then([&v](auto& queue) {
v.emplace( std::move(queue.front()) );
queue.pop();
}, has_element, st);
return v;
}
[[gnu::nonnull]]
inline value_type* try_pop(value_type* __restrict__ v, std::stop_token st) noexcept
{
if(not st.stop_possible()) {
std::exchange(*v, this->pop());
return true;
}
return m_queue.try_wait_then([v](auto& queue) {
std::exchange(*v, queue.front());
queue.pop();
}, has_element, st)
? v
: nullptr;
}
#endif
inline value_type pop(std::stop_token st)
{
#if 1
return m_queue.wait_then([](auto& queue) { return details::pop_queue( queue ); }, has_element, st);
#else
// This is *really* ugly but XXX: we don't have a throwing `wait_then()`, so... TODO: Go add a `wait_then_or()` to SharedCondVar, which throws `OperationCancelledError` if wait() returns `false`, and then that can be a `decltype(auto)`, `return `fwd(f)(mut_value())` function like the rest.
std::aligned_storage_t<sizeof(value_type), alignof(value_type)> store;
value_type* v = static_cast<value_type*>(store.data);
if(! m_queue.try_wait_then([v](auto& queue) {
std::construct_at(v, queue.pop());
}, has_element, st))
error::throw_cancelled();
// Destroy `*v` after return.
struct defer_destroy {
value_type* v;
constexpr ~defer_destroy() {
if constexpr(not std::is_trivially_destructible_v<value_type>) {
std::destroy_at(v);
}
}
} _ddest{v};
return std::move(*v);
#endif
}
inline size_t size() const noexcept { return m_queue.with_shared_lock([] (auto const& q) noexcept { return q.size(); }); }
inline bool empty() const noexcept { return m_queue.with_shared_lock([] (auto const& q) noexcept { return q.empty(); }); }
[[gnu::const, gnu::always_inline, gnu::artificial]]
constexpr virtual bool can_push() const noexcept { return true; } // Always true for dynamic_extent
inline bool can_pop() const noexcept { return not empty(); } // If the queue is not empty, a pop can be requested by someone.
// Not-appliccable on non-max-sized (`N = std::dynamic_extent`) queues
size_t left() const noexcept = delete;
size_t capacity() const noexcept = delete;
#if 0
protected:
#warning "XXX: `wait()` functions are **NOT SAFE**, they all consume a `push()`'s `notify_one()` **without** reducing queue pressure; they should not be used except as an internal interface that also pops the queue via some other method after the wait completes."
// Wait for an element to be pushed without popping it.
//
// # Returns
// If there was an element and `st` was not stopped.
// # Throws
// `error::OperationCancelledError` - If `st` is fired while waiting.
inline bool wait(std::stop_token st) const
{
return m_queue.wait_then_shared([](auto const& q) noexcept { return ! q.empty(); }, has_element, st);
}
/// Same as `wait(st)` but returns `false` on cancellation instead of throwing.
inline bool try_wait(std::stop_token st) const noexcept
{
bool elem = false;
if(! m_queue.try_wait_then_shared([&](auto const& q) noexcept { elem = !q.empty(); }, has_element, st))
return false;
return elem;
}
/// Waits for an element to be available without popping it.
inline bool wait() const
{
return m_queue.wait_then_shared([](auto const& q) noexcept { return ! q.empty(); }, has_element);
}
#endif /* unsafe interface removed (see above) */
private:
shared::CondVar< std::queue<value_type> > m_queue{};
};
template<typename T, size_t N>
class Queue : private Queue<T> {
constexpr static inline auto& has_element = Queue<T>::has_element;
constexpr static inline auto has_less_than_max = [](auto const& q) noexcept { return q.size() <= N; };
public:
constexpr static inline size_t extent = N;
constexpr static inline bool push_may_wait = true;
using typename Queue<T>::value_type;
using Queue<T>::Queue;
template<std::convertible_to<T> U = T>
inline bool try_push(U&& value, std::stop_token st)
{
std::unique_lock plk { this->m_queue.get_mutex() }; //XXX: Is this okay? To keep the lock going...?
while(! has_less_than_max(*this->m_queue.get_ptr())) {
if(! m_send.wait(plk, st, [this] () noexcept { return has_less_than_max(this->m_queue.get_ptr()); }))
return false;
}
// We have a unique_lock, we have waited to be signalled if queue pressure not has_less_than_max(), it is okay to push and notify poppers now.
this->m_queue.get_ptr_unsafe()->push(std::forward<U>(value));
this->m_queue.notify_one();
return true;
}
template<std::convertible_to<T> U = T>
inline void push(U&& value)
{
std::unique_lock plk { this->m_queue.get_mutex() }; //XXX: Is this okay? To keep the lock going...?
while(! has_less_than_max(*this->m_queue.get_ptr()))
m_send.wait(plk, [this] () noexcept { return has_less_than_max(this->m_queue.get_ptr()); });
// We have a unique_lock, we have waited to be signalled if queue pressure not has_less_than_max(), it is okay to push and notify poppers now.
this->m_queue.get_ptr_unsafe()->push(std::forward<U>(value));
this->m_queue.notify_one();
}
template<std::convertible_to<T> U = T>
inline void push(U&& value, std::stop_token st)
{
std::unique_lock plk { this->m_queue.get_mutex() }; //XXX: Is this okay? To keep the lock going...?
while(! has_less_than_max(*this->m_queue.get_ptr())) {
if(! m_send.wait(plk, st, [this] () noexcept { return has_less_than_max(this->m_queue.get_ptr()); }))
error::throw_cancelled();
}
// We have a unique_lock, we have waited to be signalled if queue pressure not has_less_than_max(), it is okay to push and notify poppers now.
this->m_queue.get_ptr_unsafe()->push(std::forward<U>(value));
this->m_queue.notify_one();
}
inline value_type pop()
{
bool was_full = false;
value_type v = this->m_queue.wait_then([&] (auto& queue) {
was_full = not has_less_than_max(queue);
return details::pop_queue( queue );
}, has_element);
// Notify a waiting pusher to re-check their condition.
//XXX: Do we only want to do this if `not has_less_than_max()`?
(void)was_full; //For now: The cond might be outdated, so notify regardless
m_send.notify_one();
return v;
}
inline value_type pop(std::stop_token st)
{
bool was_full = false;
value_type v = this->m_queue.wait_then([&] (auto& queue) {
was_full = not has_less_than_max(queue);
return details::pop_queue( queue );
}, has_element, st);
// Notify a waiting pusher to re-check their condition.
//XXX: Do we only want to do this if `not has_less_than_max()`?
(void)was_full; //For now: The cond might be outdated, so notify regardless
m_send.notify_one();
return v;
}
//TODO: try_pop(std::stop_token st)
// Accessors (same as `Queue<T, std::dynamic_extent>`.)
using Queue<T>::size;
using Queue<T>::empty;
// May be `false` in this queue.
inline virtual bool can_push() const noexcept override final { return has_space(); }
using Queue<T>::can_pop;
// Only appliccable on non-max-sized (`N = std::dynamic_extent`) queues
inline size_t left() const noexcept { return extent - this->size(); }
[[gnu::const, gnu::always_inline, gnu::artificial]]
constexpr size_t capacity() const noexcept { return extent; }
protected:
inline bool has_space() const
{
return this->m_queue.with_shared_lock(has_less_than_max);
}
private:
mutable std::condition_variable_any m_send{};
};
}