You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

345 lines
12 KiB

using AngleSharp;
using AngleSharp.Dom;
using System;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Runtime.CompilerServices;
using System.Text;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using System.Linq;
using System.Runtime.Serialization;
using System.IO;
using Tools;
using System.Text.RegularExpressions;
using Tools.Crypto;
namespace napdump
{
public readonly struct DumperConfig
{
public readonly int MaxThreads;
public readonly Dumper ShareContextWith;
public readonly (string Url, string Value)[] Cookies;
public readonly AESKey? EncryptDeleted;
public DumperConfig(int maxThreads, Dumper ShareContext=null, (string,string)[] Cookies=null, AESKey? EncryptDeleted=null)
{
MaxThreads = maxThreads;
ShareContextWith = ShareContext;
this.Cookies = Cookies;
this.EncryptDeleted = EncryptDeleted;
}
public bool Equals(in DumperConfig other)
{
return other.MaxThreads == this.MaxThreads &&
ReferenceEquals(this.ShareContextWith, other.ShareContextWith) &&
(ReferenceEquals(Cookies, other.Cookies) || (Cookies?.SequenceEqual(other.Cookies) ?? false)) &&
(ReferenceEquals(EncryptDeleted, other.EncryptDeleted) || (EncryptDeleted?.Equals(other.EncryptDeleted ?? default) ?? false));
}
public override bool Equals(object obj)
{
return obj is DumperConfig conf && this.Equals(conf);
}
public override int GetHashCode()
{
return MaxThreads.GetHashCode() ^
(ShareContextWith?.GetHashCode() ?? 0) ^
(Cookies?.Select(x => x.GetHashCode())?.Aggregate((x, y) => x ^ y) ?? 0) ^
(EncryptDeleted?.GetHashCode() ?? 0);
}
public static bool operator ==(DumperConfig left, DumperConfig right)
{
return left.Equals(right);
}
public static bool operator !=(DumperConfig left, DumperConfig right)
{
return !(left == right);
}
}
public sealed class AsyncMutex : IDisposable
{
private readonly SemaphoreSlim sem;
public AsyncMutex()
{
sem = new SemaphoreSlim(1, 1);
}
private AsyncMutex(SemaphoreSlim from)
{
sem = from;
}
private class Lock : IDisposable
{
public AsyncMutex Parent { get; }
public Lock(AsyncMutex held)
{
Parent = held;
}
public void Dispose()
{
Parent.sem.Release();
}
}
public IDisposable Aquire(int msTimeout, CancellationToken token = default)
{
sem.Wait(msTimeout, token);
return new Lock(this);
}
public IDisposable Aquire(CancellationToken token)
{
sem.Wait(token);
return new Lock(this);
}
public IDisposable Aquire()
{
sem.Wait();
return new Lock(this);
}
public async ValueTask<IDisposable> AquireAsync(int msTimeout, CancellationToken token = default)
{
await sem.WaitAsync(msTimeout, token);
return new Lock(this);
}
public async ValueTask<IDisposable> AquireAsync(CancellationToken token = default)
{
await sem.WaitAsync(token);
return new Lock(this);
}
public void Dispose()
{
sem.Dispose();
}
public static AsyncMutex Semaphore(int count, int max)
{
SemaphoreSlim sem = new SemaphoreSlim(count, max);
return new AsyncMutex(sem);
}
public static AsyncMutex Semaphore(int count)
=> Semaphore(count, count);
}
public abstract class Dumper : IDisposable
{
public DumperConfig Config { get; }
protected readonly AsyncMutex Pool;
protected readonly CancellationTokenSource globalCancel = new CancellationTokenSource();
protected Dumper(DumperConfig config)
{
Config = config;
Pool = config.ShareContextWith?.Pool ?? AsyncMutex.Semaphore(config.MaxThreads);
}
public void CancelAllOperations()
{
globalCancel.Cancel();
}
public async IAsyncEnumerable<ThreadInfo> Parse(string boardUrl, Hooks hooks = default, [EnumeratorCancellation] CancellationToken token=default)
{
using var cancel = CancellationTokenSource.CreateLinkedTokenSource(globalCancel.Token, token);
//var cataloguePage = await GetCataloguePage(boardUrl, cancel.Token);
var boardInfo = NewBoardInfo();
boardInfo.BoardURL = boardUrl;
if (hooks.PrintDebug)
Console.WriteLine($"({boardUrl}) gen PostInfo");
Channel<Task> threadGetters = Channel.CreateUnbounded<Task>();
Channel<ThreadInfo> completedThreads = Channel.CreateUnbounded<ThreadInfo>();
Task completer = Task.Run(async () =>
{
List<Task> getters = new List<Task>();
if (hooks.PrintDebug)
Console.WriteLine($"({boardUrl}) gen getter");
int gi = 0;
await foreach (var getter in threadGetters.Reader.ReadAllAsync(cancel.Token))
{
getters.Add(getter);
if (hooks.PrintDebug)
Console.WriteLine($"({boardUrl}) add getter {gi++}");
}
if (hooks.PrintDebug)
Console.WriteLine($"({boardUrl}) add getters {getters.Count}");
await Task.WhenAll(getters);
if (hooks.PrintDebug)
Console.WriteLine($"({boardUrl}) getters complete");
completedThreads.Writer.Complete();
});
if (hooks.PrintDebug)
Console.WriteLine($"({boardUrl}) start completer");
int ti = 0;
await foreach (var thread in GetThreads(boardInfo).WithCancellation(cancel.Token))
{
//Thread got.
thread.BoardInfo = boardInfo;
hooks.OnThreadRetrieved?.Invoke(thread);
ThreadRetrievedHook(thread);
await threadGetters.Writer.WriteAsync(Task.Run(async () =>
{
try
{
await InternalGetPosts(thread, hooks, cancel.Token);
if (hooks.PrintDebug) Console.WriteLine($"({boardUrl}) writing to complete");
await completedThreads.Writer.WriteAsync(thread, cancel.Token);
if (hooks.PrintDebug) Console.WriteLine($"({thread.BoardInfo.BoardURL}) written");
}catch(Exception ex)
{
if (hooks.PrintDebug)
Console.WriteLine($"Whoops {thread.PostNumber} failed: {ex.Message}\n{ex.StackTrace}");
hooks.OnThreadReadFailed?.Invoke(thread, ex);
ThreadReadFailedHook(thread, ex);
}
}), cancel.Token);
if (hooks.PrintDebug)
Console.WriteLine($"({boardUrl}) thread write ({ti++}) {thread.PostNumber}");
}
threadGetters.Writer.Complete();
if (hooks.PrintDebug)
Console.WriteLine($"({boardUrl}) stop getter");
await foreach (var completedThread in completedThreads.Reader.ReadAllAsync(cancel.Token))
{
boardInfo.AddChildThread(completedThread);
yield return completedThread;
}
if (hooks.PrintDebug)
Console.WriteLine($"({boardUrl}) completer complete");
await completer;
boardInfo.DumpTimestamp = DateTime.Now;
hooks.OnBoardRetrieved?.Invoke(boardInfo);
BoardRetrievedHook(boardInfo);
if (hooks.PrintDebug)
Console.WriteLine($"({boardUrl}) end");
}
public struct Hooks
{
public Action<BoardInfo> OnBoardRetrieved;
public Action<ThreadInfo> OnThreadRetrieved;
public Action<PostInfo> OnPostRetrieved;
public Action<ThreadInfo, Exception> OnThreadReadFailed;
#if DEBUG
public
#else
internal
#endif
bool PrintDebug;
}
public event Action<BoardInfo> OnBoardRetrieved;
public event Action<ThreadInfo> OnThreadRetrieved;
public event Action<PostInfo> OnPostRetrieved;
public event Action<ThreadInfo, Exception> OnThreadReadFailed;
protected virtual void PostRetrievedHook(PostInfo post) => OnPostRetrieved?.Invoke(post);
protected virtual void ThreadRetrievedHook(ThreadInfo thread) => OnThreadRetrieved?.Invoke(thread);
protected virtual void BoardRetrievedHook(BoardInfo board) => OnBoardRetrieved?.Invoke(board);
protected virtual void ThreadReadFailedHook(ThreadInfo thread, Exception ex) => OnThreadReadFailed?.Invoke(thread, ex);
private async Task InternalGetPosts(ThreadInfo thread, Hooks hooks, CancellationToken token = default)
{
using var cancel = CancellationTokenSource.CreateLinkedTokenSource(globalCancel.Token, token);
List<PostInfo> posts = new List<PostInfo>();
if (hooks.PrintDebug) Console.WriteLine($"({thread.BoardInfo.BoardURL}) entering context");
using (await Pool.AquireAsync(cancel.Token))
{
//var threadPage = await GetThreadPage(thread, token);
if (hooks.PrintDebug) Console.WriteLine($" ctx_aqu ({thread.BoardInfo.BoardURL}) getting posts");
await foreach (var post in GetPosts(thread).WithCancellation(cancel.Token))
{
//Post got.
post.Parent = thread;
post.BoardInfo = thread.BoardInfo;
cancel.Token.ThrowIfCancellationRequested();
hooks.OnPostRetrieved?.Invoke(post);
PostRetrievedHook(post);
posts.Add(post);
}
if (hooks.PrintDebug) Console.WriteLine($" ctx_aqu ({thread.BoardInfo.BoardURL}) posts got");
}
if (hooks.PrintDebug) Console.WriteLine($"({thread.BoardInfo.BoardURL}) adding children");
thread.AddChildPosts(posts);
}
/// <summary>
/// Run a block on this Dumper's thread pool.
/// </summary>
public async Task<IDisposable> EnterContextAsync(CancellationToken token = default)
{
return await Pool.AquireAsync(token);
}
/// <summary>
/// Run a block on this Dumper's thread pool.
/// </summary>
public IDisposable EnterContext(CancellationToken token = default)
{
return Pool.Aquire(token);
}
protected virtual BoardInfo NewBoardInfo() => new BoardInfo();
protected abstract IAsyncEnumerable<PostInfo> GetPosts(ThreadInfo thread, [EnumeratorCancellation] CancellationToken token = default);
protected abstract IAsyncEnumerable<ThreadInfo> GetThreads(BoardInfo boardInfo, [EnumeratorCancellation] CancellationToken token = default);
#region IDisposable Support
private bool disposedValue = false;
protected virtual void Dispose(bool disposing)
{
if (!disposedValue)
{
if (disposing)
{
if (!globalCancel.IsCancellationRequested) globalCancel.Cancel();
if (Config.ShareContextWith == null)
Pool.Dispose();
globalCancel.Dispose();
}
disposedValue = true;
}
}
~Dumper()
{
Dispose(false);
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
#endregion
}
}