You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
153 lines
4.2 KiB
153 lines
4.2 KiB
5 years ago
|
using System;
|
||
|
using System.Threading;
|
||
|
using System.Threading.Tasks;
|
||
|
using System.Threading.Channels;
|
||
|
using System.Collections.Generic;
|
||
|
using System.Linq;
|
||
|
|
||
|
namespace ndview
|
||
|
{
|
||
|
public class Dispatcher : IDisposable
|
||
|
{
|
||
|
protected napdump.AsyncMutex mutex = new napdump.AsyncMutex();
|
||
|
protected Action Hooks = delegate { };
|
||
|
public delegate Task AsyncHookFunction(AsyncHookFunction @this, CancellationToken token);
|
||
|
protected List<Func<CancellationToken, Task>> AsyncHooks = new List<Func<CancellationToken, Task>>();
|
||
|
|
||
|
public bool InSeries {get; set;} = false;
|
||
|
|
||
|
public Dispatcher()
|
||
|
{
|
||
|
|
||
|
}
|
||
|
|
||
|
public void Dispose()
|
||
|
{
|
||
|
Hooks = delegate {};
|
||
|
AsyncHooks.Clear();
|
||
|
mutex.Dispose();
|
||
|
sigMutex.Dispose();
|
||
|
}
|
||
|
|
||
|
public Task WaitForCurrentSignal
|
||
|
{
|
||
|
get
|
||
|
{
|
||
|
return Task.Run(async () => {
|
||
|
using var _m = await sigMutex.AquireAsync();
|
||
|
});
|
||
|
}
|
||
|
}
|
||
|
|
||
|
public void Hook(Action run)
|
||
|
{
|
||
|
using(mutex.Aquire())
|
||
|
Hooks += run;
|
||
|
}
|
||
|
|
||
|
public Task HookAsync(Func<CancellationToken, Task> run, CancellationToken token=default)
|
||
|
=> HookAsync(run, false, token);
|
||
|
public async Task HookAsync(Func<CancellationToken, Task> run, bool single, CancellationToken token=default)
|
||
|
{
|
||
|
using(await mutex.AquireAsync(token)) {
|
||
|
AsyncHooks.Add(run);
|
||
|
if(single) {
|
||
|
async Task remover(CancellationToken tok)
|
||
|
{
|
||
|
using var link = CancellationTokenSource.CreateLinkedTokenSource(tok,token);
|
||
|
await Task.WhenAll(RemoveHookAsync(run, link.Token), RemoveHookAsync(remover, link.Token));
|
||
|
}
|
||
|
AsyncHooks.Add(remover);
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
private readonly Dictionary<Func<Task>, Func<CancellationToken, Task>> ahnc = new Dictionary<Func<Task>, Func<CancellationToken, Task>>();
|
||
|
public Task HookAsync(Func<Task> run, CancellationToken token=default)
|
||
|
=> HookAsync(run, false, token);
|
||
|
public async Task HookAsync(Func<Task> run, bool single, CancellationToken token=default)
|
||
|
{
|
||
|
Func<CancellationToken, Task> lam = (_) => run();
|
||
|
using(await mutex.AquireAsync(token))
|
||
|
{
|
||
|
ahnc.Add(run, lam);
|
||
|
AsyncHooks.Add(lam);
|
||
|
if(single) {
|
||
|
async Task remover(CancellationToken tok)
|
||
|
{
|
||
|
using var link = CancellationTokenSource.CreateLinkedTokenSource(tok,token);
|
||
|
await Task.WhenAll(RemoveHookAsync(run, link.Token),
|
||
|
RemoveHookAsync(remover, link.Token));
|
||
|
}
|
||
|
AsyncHooks.Add(remover);
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
public async Task RemoveHookAsync(Func<CancellationToken, Task> hook, CancellationToken token=default)
|
||
|
{
|
||
|
using(await mutex.AquireAsync(token))
|
||
|
AsyncHooks.Remove(hook);
|
||
|
}
|
||
|
|
||
|
public async Task RemoveHookAsync(Func<Task> hook, CancellationToken token=default)
|
||
|
{
|
||
|
using(await mutex.AquireAsync(token))
|
||
|
{
|
||
|
if(!ahnc.ContainsKey(hook)) return;
|
||
|
var run = ahnc[hook];
|
||
|
AsyncHooks.Remove(run);
|
||
|
ahnc.Remove(hook);
|
||
|
}
|
||
|
}
|
||
|
|
||
|
public async Task RemoveHookAsync(Action remove, CancellationToken token=default)
|
||
|
{
|
||
|
using(await mutex.AquireAsync(token))
|
||
|
Hooks -= remove;
|
||
|
}
|
||
|
|
||
|
protected napdump.AsyncMutex sigMutex = new napdump.AsyncMutex();
|
||
|
public async Task Signal(CancellationToken token=default)
|
||
|
{
|
||
|
Action h;
|
||
|
Func<CancellationToken, Task>[] ah;
|
||
|
using(await sigMutex.AquireAsync(token)) {
|
||
|
using(await mutex.AquireAsync(token))
|
||
|
{
|
||
|
h = Hooks;
|
||
|
ah = AsyncHooks.ToArray();
|
||
|
}
|
||
|
h?.Invoke();
|
||
|
var sig = InSeries ? Task.Run(async () => {
|
||
|
foreach(var x in ah.Reverse())
|
||
|
await x(token);
|
||
|
}) : Task.WhenAll(ah.Select(x=> x(token)));
|
||
|
TaskCompletionSource<bool> canceller = new TaskCompletionSource<bool>();
|
||
|
using var _c = token.Register(()=> canceller.SetResult(true));
|
||
|
|
||
|
if( (await Task.WhenAny(sig, canceller.Task)) != sig)
|
||
|
throw new OperationCanceledException();
|
||
|
}
|
||
|
}
|
||
|
|
||
|
public async Task WaitForSignal(CancellationToken token=default)
|
||
|
{
|
||
|
TaskCompletionSource<bool> comp = new TaskCompletionSource<bool>();
|
||
|
await HookAsync((tok)=> {
|
||
|
|
||
|
if(tok.IsCancellationRequested || token.IsCancellationRequested)
|
||
|
comp.SetResult(false);
|
||
|
else
|
||
|
comp.SetResult(true);
|
||
|
return Task.CompletedTask;
|
||
|
}, true, token);
|
||
|
|
||
|
|
||
|
using var _c = token.Register(()=> comp.TrySetException(new OperationCanceledException()));
|
||
|
if(!await comp.Task)
|
||
|
throw new OperationCanceledException();
|
||
|
}
|
||
|
}
|
||
|
}
|