using System; using System.Threading; using System.Threading.Tasks; using System.Threading.Channels; using System.Collections.Generic; using System.Linq; namespace ndview { public class Dispatcher : IDisposable { protected napdump.AsyncMutex mutex = new napdump.AsyncMutex(); protected Action Hooks = delegate { }; public delegate Task AsyncHookFunction(AsyncHookFunction @this, CancellationToken token); protected List> AsyncHooks = new List>(); public bool InSeries {get; set;} = false; public Dispatcher() { } public void Dispose() { Hooks = delegate {}; AsyncHooks.Clear(); mutex.Dispose(); sigMutex.Dispose(); } public Task WaitForCurrentSignal { get { return Task.Run(async () => { using var _m = await sigMutex.AquireAsync(); }); } } public void Hook(Action run) { using(mutex.Aquire()) Hooks += run; } public Task HookAsync(Func run, CancellationToken token=default) => HookAsync(run, false, token); public async Task HookAsync(Func run, bool single, CancellationToken token=default) { using(await mutex.AquireAsync(token)) { AsyncHooks.Add(run); if(single) { async Task remover(CancellationToken tok) { using var link = CancellationTokenSource.CreateLinkedTokenSource(tok,token); await Task.WhenAll(RemoveHookAsync(run, link.Token), RemoveHookAsync(remover, link.Token)); } AsyncHooks.Add(remover); } } } private readonly Dictionary, Func> ahnc = new Dictionary, Func>(); public Task HookAsync(Func run, CancellationToken token=default) => HookAsync(run, false, token); public async Task HookAsync(Func run, bool single, CancellationToken token=default) { Func lam = (_) => run(); using(await mutex.AquireAsync(token)) { ahnc.Add(run, lam); AsyncHooks.Add(lam); if(single) { async Task remover(CancellationToken tok) { using var link = CancellationTokenSource.CreateLinkedTokenSource(tok,token); await Task.WhenAll(RemoveHookAsync(run, link.Token), RemoveHookAsync(remover, link.Token)); } AsyncHooks.Add(remover); } } } public async Task RemoveHookAsync(Func hook, CancellationToken token=default) { using(await mutex.AquireAsync(token)) AsyncHooks.Remove(hook); } public async Task RemoveHookAsync(Func hook, CancellationToken token=default) { using(await mutex.AquireAsync(token)) { if(!ahnc.ContainsKey(hook)) return; var run = ahnc[hook]; AsyncHooks.Remove(run); ahnc.Remove(hook); } } public async Task RemoveHookAsync(Action remove, CancellationToken token=default) { using(await mutex.AquireAsync(token)) Hooks -= remove; } protected napdump.AsyncMutex sigMutex = new napdump.AsyncMutex(); public async Task Signal(CancellationToken token=default) { Action h; Func[] ah; using(await sigMutex.AquireAsync(token)) { using(await mutex.AquireAsync(token)) { h = Hooks; ah = AsyncHooks.ToArray(); } h?.Invoke(); var sig = InSeries ? Task.Run(async () => { foreach(var x in ah.Reverse()) await x(token); }) : Task.WhenAll(ah.Select(x=> x(token))); TaskCompletionSource canceller = new TaskCompletionSource(); using var _c = token.Register(()=> canceller.SetResult(true)); if( (await Task.WhenAny(sig, canceller.Task)) != sig) throw new OperationCanceledException(); } } public async Task WaitForSignal(CancellationToken token=default) { TaskCompletionSource comp = new TaskCompletionSource(); await HookAsync((tok)=> { if(tok.IsCancellationRequested || token.IsCancellationRequested) comp.SetResult(false); else comp.SetResult(true); return Task.CompletedTask; }, true, token); using var _c = token.Register(()=> comp.TrySetException(new OperationCanceledException())); if(!await comp.Task) throw new OperationCanceledException(); } } }