#include #include #include #if 1 // FEATURE_FX_TEST #include #include #include #include #include #include #include #include #include #include #include namespace fx::test { //#warning "fx::Queue Runtime test" constexpr static auto factor = [](auto v) noexcept { std::remove_cvref_t total = 0; while(v --> 0) total += v; return total; }; using namespace fx::signal; template requires(std::is_invocable_v) class defer { F m_deferred; public: template U = F> constexpr defer(U&& f) noexcept(std::is_nothrow_convertible_v) : m_deferred(std::forward(f)) {} defer(defer&&) noexcept = delete; defer(defer const&) = delete; constexpr ~defer() noexcept(std::is_nothrow_invocable_v) { std::invoke(std::move(m_deferred)); } }; template defer(F&&) -> defer; /// Blocks the specified signals on the current thread and any newly created ones, but the callback which blocks until any of them are received. /// After blocking the signals, this function creates and returns a callback which, when invoked, will block for any of the specified signals to arrive, and then invoke the callback passed `on_sig` with the number of the signal that arrived. /// /// The returned callback captures `on_sig` via forwarding-decay, and the returned callback can be invoked any number of times as long as the captured `F` can be invoked more than once safely through a (non-`const`) reference. template requires(std::is_invocable_v) static auto make_wait_for_signals(F&& on_sig, std::initializer_list sigs) { //auto sigset = std::make_unique_for_overwrite(); // Should we actually need to have this a unique_ptr and move it into return so address remains pinned...? I don't know if that is valuable or needed here tbh. (NOTE: Nope. Not needed.) sigset_t sigset = {0}; sigemptyset(&sigset); for(const int sig : sigs) sigaddset(&sigset, sig); pthread_sigmask(SIG_BLOCK, &sigset, nullptr); return [sigset, on_sig = std::forward(on_sig)] () mutable noexcept(std::is_nothrow_invocable_v) -> decltype(auto) { int signum = 0; if(__builtin_expect(sigwait(&sigset, &signum) != 0, false)) { if constexpr(std::is_nothrow_invocable_v) { __builtin_trap(); } else fx::error::throw_last_system_error(); } return std::invoke(on_sig, signum); }; } void _so_sqrtx() { fx::Queue qu; auto stop_workers = std::make_shared(); std::vector workers; constexpr int WORKERS = 10; //TODO: Impl this test. } template void _so_qrtx() { fx::Queue qu; auto stop_workers = std::make_shared(); //auto&& rt_signals_callback #ifndef USE_STOP_LIFETIME /// If to couple rt_signals' lifetime to stop_workers' lifetime. (NOTE: See rest of comment on why this is not prefferential.) /// /// If set to 1, we move stop_workers *below* rt_signals, so that it is destroyed *before* the signal thread handle asks it to stop. /// This results in the signal worker seeing stop_workers having been deallocated already, and assumes that is because the workers have already all stopped (which is true, but not the actual reason that happens here.) /// /// If set to 0, we use the internal signal thread handle cancellation mechanism to check if this is a genuine signal or the "stop" control signal the handle sends when it goes out of scope. This results in the signal worker seeing it is being cancelled because *its own* scope has ran out, and not because the workers' scope has, and it produces a different message about being cancelled. /// /// The former (1) ties the cancellation to the lifetime of the workers' cancellation source, which **MUST** be *move()*d to a new handle *below* the signal worker handle (which itself can be moved, so that must be tracked as well,) and is thus not advisable. /// /// The latter (0) decouples the cancellation from the workers' cancellation source, and from everything else, making the cancellation check entirely internal to the handle, which can then be freely moved without lifetime concerns. # define USE_STOP_LIFETIME 0 #endif thread_wait_for_signals rt_signals( [stop_workers = std::weak_ptr(stop_workers)] (int sig #if ! USE_STOP_LIFETIME , std::stop_token st #endif ) noexcept { //std::stop_callback _dealloc_on_signals_exit(st, [&] () noexcept { // stop_workers.reset(); //}); // NOTE: We **used to** also wait on `SIGTERM`, which allows us to send a "clean-up resources" signal; but now we reuse SIGINT for those purposes (with access to containing std::jthread's std::stop_token-based cancellation; or alternatively by ensuring that `stop_workers` has died before `rt_signals` does. The former is better for reasons outlined in the comment above.) #define fprintf(...) (sig != SIGTERM && fprintf(__VA_ARGS__), (void)0) if(sig == SIGINT /*|| sig == SIGTERM*/) { fprintf(stderr, " !!! requesting stop on SIGINT ..."); #if ! USE_STOP_LIFETIME if(st.stop_requested()) fprintf(stderr, " not stopping workers, operation cancelled by handle destruction; ignoring !!!\n"); else #endif if(auto s = stop_workers.lock()) { fprintf(stderr, " [[[ stopping workers ]]] !!!\n"); s->request_stop(); } else fprintf(stderr, " workers already exited completely, ignoring & exiting !!!\n"); } else fprintf(stderr, " !!! unknown signal received @ %-2d, ignoring !!!\n", sig); #undef fprintf }, {SIGINT}, SIGINT); #if USE_STOP_LIFETIME // Since shadowing isn't supported in sepples for some reason (neither is gnucc's autotype,) use this stupid hack lol. auto $stop_workers = std::move(stop_workers); #define stop_workers $stop_workers #endif std::vector workers; static_assert(N_WORKERS > 0 && N_WORKERS <= INT_MAX, "Invalid requested number of worker threads."); static_assert(factor( static_cast(N_WORKERS) ) <= ((signed __int128)INT_MAX), "Requested number of worker threads too large for result to not overflow."); static_assert(static_cast(N_WORKERS) <= static_cast(std::latch::max()), "More threads than latch can take."); using namespace std::chrono_literals; constexpr static int WORKERS = int(N_WORKERS); constexpr static auto MAX_WAIT = 5s; constexpr static double WAIT_JITTER = 1.5; std::latch start(3); // We wait for collector (below,) timeout control, & for main. //std::latch stop(WORKERS); // We wait for workers (below) to cancel collector. // Set `stop_workers`' token to stop all the workers. std::stop_callback _stop_workers(stop_workers->get_token(), [&] { for(auto& w : workers) w.request_stop(); }); fx::shared::CondVar worker_timeout(true); std::jthread timeout_ctrl([&worker_timeout, &start] (std::stop_token st) noexcept { // XXX: Is taking the unique lock *here* but not releasing it until timeout below bad...? It will stop the other timeouts from being able to start, won't it? Idk, maybe not, cause we want *this* timeout to start first, so... std::unique_lock ulk{ worker_timeout.get_mutex() }; start.arrive_and_wait(); fprintf(stderr, "[worker n/a] timeout max starting...\n"); if(not worker_timeout.get_condvar().wait_for(ulk, st, MAX_WAIT, [waiting = worker_timeout.get_ptr()]() noexcept { return ! *(volatile const bool*)waiting; })) { if(st.stop_requested()) { // NOTE: This behaviour is what we want, because if this thread has been cancelled, the workers will likely be cancelled by the same mechanism. In any canse, only stopping the cv update & notify when *this* thread is cancelled is what we want to do. fprintf(stderr, "[worker n/a] max timeout ctrl thread cancelled. Will not force end to worker timeouts.\n"); return; } } // Unique lock is now active again, so we can change the value. *worker_timeout.get_ptr_unsafe() = false; fprintf(stderr, "[worker n/a] max timeout reachaed, shephearding all workers to ignore rest of their timeouts...\n"); ulk.unlock(); // Tell *all* the worker timeouts to refresh timeouts now & stop waiting. // It is safe to reuse this CV, because nothing else will notify this thread to stop its timeout other than an explicit st cancel. worker_timeout.notify_all(); }); for(int i=0;i bool { return ! *(volatile const bool*)worker_timeout.get_ptr(); })) { //fprintf(stderr, "[worker %-2d] timeout reached okay.\n", i); if(st.stop_requested()) { return; } } else if(st.stop_requested()) { fprintf(stderr, "[worker %-2d] timeout broken by stop request, exiting before push.\n", i); return; } else fprintf(stderr, "[worker %-2d] timeout broken by global max-timeout-reached.\n", i); qu.push(i); }}; workers.push_back(std::move(worker)); } std::atomic total = 0; std::atomic n=0, c=0; std::jthread collector([&] (std::stop_token st) { //NOTE: Not useful, also UB: we always join all workers (by **move**) before explicitly cancelling this thread. //std::stop_callback cancel_workers(st, [&] { // for(auto& v : workers) v.request_stop(); //}); // Tell workers that collector thread has arrived. start.arrive_and_wait(); while(true) { int cur; printf("Collection %02d: ", cur = n++); try { int i = qu.pop(st); total += i; printf("%-2d (%02d)\n", i, c++); } catch(fx::error::OperationCancelledError const&) { printf("Cancelled (on op %d)!\n", cur); break; } } }); std::stop_callback _stop_timeout_on_col_exit(stop_workers->get_token(), [&timeout_ctrl] () noexcept { // When signal handler is requested to stop, also stop the max timeout control thread. timeout_ctrl.request_stop(); }); printf("-- Starting %d... --\n", WORKERS); start.count_down(); // Tell workers & collector thread to start. // Wait for all workers to complete (NOTE: Not moved, so we can still refer to their state while main is joining.) for(auto& w : workers) { w.join(); // If the queue has pressure, check it. if(size_t len = qu.size()) { fprintf(stderr, " [ ..worker finished @ %-2lu.. ] ", len); } } fprintf(stderr, " << join all workers complete >>\n"); // Cancel the collector. collector.request_stop(); // Wait for it to exit. collector.join(); printf("-- Complete --\n"); bool ok; { constexpr auto expected = factor(WORKERS); auto ttl = total.load(); printf("Total: %d (expected %d,) diff: %d.", ttl, expected, expected - ttl); if( (ok = (ttl == expected)) ) printf(" [OK] No missing pops.\n"); else printf(" [Failure] Missing pops!\n"); } printf("Collector processed %d / %d (attempted: %d) pushed values\n", c.load(std::memory_order_relaxed), WORKERS, n.load(std::memory_order_relaxed)); bool stopped = stop_workers->stop_requested(); if(!ok && !stopped) throw std::runtime_error("Invalid number of workers to valid collector `.pop()` calls (non-cancelled case.)"); if(stopped) printf("The workers were cancelled by SIGINT, discrepency expected.\n"); } } int main() { // Test sync-queue `fx::Queue` fx::test::_so_qrtx<20>(); } #endif // #if 1 // FEATURE-FX-TEST