Compare commits

...

10 Commits

Author SHA1 Message Date
Avril 491867fbbb
sync::oneshot::{Try,}{Send,Recv}Error{<T>,}: Added send + recv + generic error (with consuming variants for re-triable operations.)
3 weeks ago
Avril bea2631fb7
sync: Started sync `oneshot` channel impl
4 weeks ago
Avril bcaa9e0703
buffer: Added `ThreadPool<"scope = "static>` skeleton. Old design removed, redesign using `mpsc` (`crossbeam::channel`) instead of bare-bones channel impl we were using with `Arc<(Condvar, Mutex<...>)>` (See comment in `buffer::thread_pool` about previous impl"s shortcomings and needless complexity.)
4 weeks ago
Avril 0ce496cafd
Added `CachelineBuffer` and `AlignedCachelineBuffer`: Byte buffers that are the size of one cacheline (2nd: and aligned to cacheline boundary.)
1 month ago
Avril 03a3a1bfd0
part: Reworked capacity calculation (static & dynamic,) growth, and bounding to be globally compile-time configurable.
1 month ago
Avril 49e0dd6073
part: Removed unneeded 2nd thread spawn in `SearchPar`"s `search_combined()`: Backwards searching is done on main thread, forward searching is done on background thread.
1 month ago
Avril 9d2ae29e8b
part: `SearchSeq` now uses capacity-extended-switching between forward and backward searching (like `SearchPar` does.)
1 month ago
Avril f9068beca1
part: started `partition_one_with()`: Perform one nearest-needle-to-half partition on a buffer. The behaviour of this partition can be controlled with its `method` parameter. There are currently two: `SearchSeq` - search forward then backward in sequence. `SearchPar` - search forward and backward in parallel (NOTE: `SearchPar` is heavy and not well optimised, it will *always* spawn at least one thread.)
1 month ago
Avril ff898fc9b3
Added feature `mapped-file`: Attempt to `mmap()` input where possible to avoid reads. (NOTE: This should be replaced with a cfg-target-unix check, for now it is a default feature that can be disabled.)
1 month ago
Avril 5144539191
Re-worked cargo features & dependencies: Removed previous config features. Added `threads` parallelisation (+ auto `threads-async` internal io_uring support) feature and `unstable` feature for unstable-api using deps.
1 month ago

948
Cargo.lock generated

@ -2,6 +2,954 @@
# It is not intended for manual editing.
version = 3
[[package]]
name = "addr2line"
version = "0.21.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8a30b2e23b9e17a9f90641c7ab1549cd9b44f296d3ccbf309d2863cfe398a0cb"
dependencies = [
"gimli",
]
[[package]]
name = "adler"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe"
[[package]]
name = "anstream"
version = "0.6.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d96bd03f33fe50a863e394ee9718a706f988b9079b20c3784fb726e7678b62fb"
dependencies = [
"anstyle",
"anstyle-parse",
"anstyle-query",
"anstyle-wincon",
"colorchoice",
"utf8parse",
]
[[package]]
name = "anstyle"
version = "1.0.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8901269c6307e8d93993578286ac0edf7f195079ffff5ebdeea6a59ffb7e36bc"
[[package]]
name = "anstyle-parse"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c75ac65da39e5fe5ab759307499ddad880d724eed2f6ce5b5e8a26f4f387928c"
dependencies = [
"utf8parse",
]
[[package]]
name = "anstyle-query"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e28923312444cdd728e4738b3f9c9cac739500909bb3d3c94b43551b16517648"
dependencies = [
"windows-sys 0.52.0",
]
[[package]]
name = "anstyle-wincon"
version = "3.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1cd54b81ec8d6180e24654d0b371ad22fc3dd083b6ff8ba325b72e00c87660a7"
dependencies = [
"anstyle",
"windows-sys 0.52.0",
]
[[package]]
name = "autocfg"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f1fdabc7756949593fe60f30ec81974b613357de856987752631dea1e3394c80"
[[package]]
name = "backtrace"
version = "0.3.71"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "26b05800d2e817c8b3b4b54abd461726265fa9789ae34330622f2db9ee696f9d"
dependencies = [
"addr2line",
"cc",
"cfg-if",
"libc",
"miniz_oxide",
"object",
"rustc-demangle",
]
[[package]]
name = "bitflags"
version = "1.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
[[package]]
name = "bitflags"
version = "2.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf4b9d6a944f767f8e5e0db018570623c85f3d925ac718db4e06d0187adb21c1"
[[package]]
name = "bytes"
version = "1.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "514de17de45fdb8dc022b1a7975556c53c86f9f0aa5f534b98977b171857c2c9"
[[package]]
name = "cc"
version = "1.0.90"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8cd6604a82acf3039f1144f54b8eb34e91ffba622051189e71b781822d5ee1f5"
[[package]]
name = "cfg-if"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "clap"
version = "4.5.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "90bc066a67923782aa8515dbaea16946c5bcc5addbd668bb80af688e53e548a0"
dependencies = [
"clap_builder",
"clap_derive",
]
[[package]]
name = "clap_builder"
version = "4.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ae129e2e766ae0ec03484e609954119f123cc1fe650337e155d03b022f24f7b4"
dependencies = [
"anstream",
"anstyle",
"clap_lex",
"strsim",
"terminal_size",
"unicase",
"unicode-width",
]
[[package]]
name = "clap_derive"
version = "4.5.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "528131438037fd55894f62d6e9f068b8f45ac57ffa77517819645d10aed04f64"
dependencies = [
"heck",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "clap_lex"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "98cc8fbded0c607b7ba9dd60cd98df59af97e84d24e49c8557331cfc26d301ce"
[[package]]
name = "color-eyre"
version = "0.6.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "55146f5e46f237f7423d74111267d4597b59b0dad0ffaf7303bce9945d843ad5"
dependencies = [
"backtrace",
"eyre",
"indenter",
"once_cell",
"owo-colors",
]
[[package]]
name = "colorchoice"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7"
[[package]]
name = "compiler_builtins"
version = "0.1.108"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d68bc55329711cd719c2687bb147bc06211b0521f97ef398280108ccb23227e9"
[[package]]
name = "crossbeam"
version = "0.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1137cd7e7fc0fb5d3c5a8678be38ec56e819125d8d7907411fe24ccb943faca8"
dependencies = [
"crossbeam-channel",
"crossbeam-deque",
"crossbeam-epoch",
"crossbeam-queue",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-channel"
version = "0.5.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ab3db02a9c5b5121e1e42fbdb1aeb65f5e02624cc58c43f2884c6ccac0b82f95"
dependencies = [
"crossbeam-utils",
]
[[package]]
name = "crossbeam-deque"
version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "613f8cc01fe9cf1a3eb3d7f488fd2fa8388403e97039e2f73692932e291a770d"
dependencies = [
"crossbeam-epoch",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-epoch"
version = "0.9.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e"
dependencies = [
"crossbeam-utils",
]
[[package]]
name = "crossbeam-queue"
version = "0.3.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df0346b5d5e76ac2fe4e327c5fd1118d6be7c51dfb18f9b7922923f287471e35"
dependencies = [
"crossbeam-utils",
]
[[package]]
name = "crossbeam-utils"
version = "0.8.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "248e3bacc7dc6baa3b21e405ee045c3047101a49145e7e9eca583ab4c2ca5345"
[[package]]
name = "either"
version = "1.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "11157ac094ffbdde99aa67b23417ebdd801842852b500e395a45a9c0aac03e4a"
[[package]]
name = "errno"
version = "0.3.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a258e46cdc063eb8519c00b9fc845fc47bcfca4130e2f08e88665ceda8474245"
dependencies = [
"libc",
"windows-sys 0.52.0",
]
[[package]]
name = "eyre"
version = "0.6.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7cd915d99f24784cdc19fd37ef22b97e3ff0ae756c7e492e9fbfe897d61e2aec"
dependencies = [
"indenter",
"once_cell",
]
[[package]]
name = "futures"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0"
dependencies = [
"futures-channel",
"futures-core",
"futures-io",
"futures-sink",
"futures-task",
"futures-util",
]
[[package]]
name = "futures-channel"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78"
dependencies = [
"futures-core",
"futures-sink",
]
[[package]]
name = "futures-core"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d"
[[package]]
name = "futures-io"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1"
[[package]]
name = "futures-macro"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "futures-sink"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9fb8e00e87438d937621c1c6269e53f536c14d3fbd6a042bb24879e57d474fb5"
[[package]]
name = "futures-task"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "38d84fa142264698cdce1a9f9172cf383a0c82de1bddcf3092901442c4097004"
[[package]]
name = "futures-util"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48"
dependencies = [
"futures-channel",
"futures-core",
"futures-io",
"futures-macro",
"futures-sink",
"futures-task",
"memchr",
"pin-project-lite",
"pin-utils",
"slab",
]
[[package]]
name = "gimli"
version = "0.28.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4271d37baee1b8c7e4b708028c57d816cf9d2434acb33a549475f78c181f6253"
[[package]]
name = "heck"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea"
[[package]]
name = "hermit-abi"
version = "0.3.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d231dfb89cfffdbc30e7fc41579ed6066ad03abda9e567ccafae602b97ec5024"
[[package]]
name = "indenter"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ce23b50ad8242c51a442f3ff322d56b02f08852c77e4c0b4d3fd684abc89c683"
[[package]]
name = "io-uring"
version = "0.5.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dd1e1a01cfb924fd8c5c43b6827965db394f5a3a16c599ce03452266e1cf984c"
dependencies = [
"bitflags 1.3.2",
"libc",
]
[[package]]
name = "lazy_static"
version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
dependencies = [
"spin",
]
[[package]]
name = "libc"
version = "0.2.153"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c198f91728a82281a64e1f4f9eeb25d82cb32a5de251c6bd1b5154d63a8e7bd"
[[package]]
name = "linux-raw-sys"
version = "0.4.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "01cda141df6706de531b6c46c3a33ecca755538219bd484262fa09410c13539c"
[[package]]
name = "lock_api"
version = "0.4.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3c168f8615b12bc01f9c17e2eb0cc07dcae1940121185446edc3744920e8ef45"
dependencies = [
"autocfg",
"owning_ref",
"scopeguard",
]
[[package]]
name = "mapped-file"
version = "0.0.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9905800218badacc3f721083a97a6841ce4752f16c3c6a6ed3033be3d1c47cc4"
dependencies = [
"lazy_static",
"libc",
"memchr",
]
[[package]]
name = "memchr"
version = "2.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c8640c5d730cb13ebd907d8d04b52f55ac9a2eec55b440c8892f40d56c76c1d"
dependencies = [
"compiler_builtins",
]
[[package]]
name = "miniz_oxide"
version = "0.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9d811f3e15f28568be3407c8e7fdb6514c1cda3cb30683f15b6a1a1dc4ea14a7"
dependencies = [
"adler",
]
[[package]]
name = "mio"
version = "0.8.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a4a650543ca06a924e8b371db273b2756685faae30f8487da1b56505a8f78b0c"
dependencies = [
"libc",
"wasi",
"windows-sys 0.48.0",
]
[[package]]
name = "num_cpus"
version = "1.16.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43"
dependencies = [
"hermit-abi",
"libc",
]
[[package]]
name = "object"
version = "0.32.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a6a622008b6e321afc04970976f62ee297fdbaa6f95318ca343e3eebb9648441"
dependencies = [
"memchr",
]
[[package]]
name = "once_cell"
version = "1.19.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92"
[[package]]
name = "owning_ref"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6ff55baddef9e4ad00f88b6c743a2a8062d4c6ade126c2a528644b8e444d52ce"
dependencies = [
"stable_deref_trait",
]
[[package]]
name = "owo-colors"
version = "3.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c1b04fb49957986fdce4d6ee7a65027d55d4b6d2265e5848bbb507b58ccfdb6f"
[[package]]
name = "parking_lot"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f"
dependencies = [
"lock_api",
"parking_lot_core",
]
[[package]]
name = "parking_lot_core"
version = "0.9.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4c42a9226546d68acdd9c0a280d17ce19bfe27a46bf68784e4066115788d008e"
dependencies = [
"cfg-if",
"libc",
"redox_syscall",
"smallvec",
"windows-targets 0.48.5",
]
[[package]]
name = "pin-project-lite"
version = "0.2.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bda66fc9667c18cb2758a2ac84d1167245054bcf85d5d1aaa6923f45801bdd02"
[[package]]
name = "pin-utils"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "proc-macro2"
version = "1.0.79"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e835ff2298f5721608eb1a980ecaee1aef2c132bf95ecc026a11b7bf3c01c02e"
dependencies = [
"unicode-ident",
]
[[package]]
name = "quote"
version = "1.0.35"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "291ec9ab5efd934aaf503a6466c5d5251535d108ee747472c3977cc5acc868ef"
dependencies = [
"proc-macro2",
]
[[package]]
name = "rayon"
version = "1.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b418a60154510ca1a002a752ca9714984e21e4241e804d32555251faf8b78ffa"
dependencies = [
"either",
"rayon-core",
]
[[package]]
name = "rayon-core"
version = "1.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1465873a3dfdaa8ae7cb14b4383657caab0b3e8a0aa9ae8e04b044854c8dfce2"
dependencies = [
"crossbeam-deque",
"crossbeam-utils",
]
[[package]]
name = "redox_syscall"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4722d768eff46b75989dd134e5c353f0d6296e5aaa3132e776cbdb56be7731aa"
dependencies = [
"bitflags 1.3.2",
]
[[package]]
name = "reverse"
version = "0.5.0"
dependencies = [
"bytes",
"clap",
"color-eyre",
"crossbeam",
"crossbeam-utils",
"futures",
"lazy_static",
"mapped-file",
"memchr",
"num_cpus",
"parking_lot",
"rayon",
"tokio-stream",
"tokio-uring",
]
[[package]]
name = "rustc-demangle"
version = "0.1.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76"
[[package]]
name = "rustix"
version = "0.38.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6ea3e1a662af26cd7a3ba09c0297a31af215563ecf42817c98df621387f4e949"
dependencies = [
"bitflags 2.5.0",
"errno",
"libc",
"linux-raw-sys",
"windows-sys 0.52.0",
]
[[package]]
name = "scoped-tls"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e1cf6437eb19a8f4a6cc0f7dca544973b0b78843adbfeb3683d1a94a0024a294"
[[package]]
name = "scopeguard"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49"
[[package]]
name = "signal-hook-registry"
version = "1.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d8229b473baa5980ac72ef434c4415e70c4b5e71b423043adb4ba059f89c99a1"
dependencies = [
"libc",
]
[[package]]
name = "slab"
version = "0.4.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f92a496fb766b417c996b9c5e57daf2f7ad3b0bebe1ccfca4856390e3d3bb67"
dependencies = [
"autocfg",
]
[[package]]
name = "smallvec"
version = "1.13.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67"
[[package]]
name = "socket2"
version = "0.4.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9f7916fc008ca5542385b89a3d3ce689953c143e9304a9bf8beec1de48994c0d"
dependencies = [
"libc",
"winapi",
]
[[package]]
name = "socket2"
version = "0.5.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "05ffd9c0a93b7543e062e759284fcf5f5e3b098501104bfbdde4d404db792871"
dependencies = [
"libc",
"windows-sys 0.52.0",
]
[[package]]
name = "spin"
version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d"
[[package]]
name = "stable_deref_trait"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3"
[[package]]
name = "strsim"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5ee073c9e4cd00e28217186dbe12796d692868f432bf2e97ee73bed0c56dfa01"
[[package]]
name = "syn"
version = "2.0.57"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "11a6ae1e52eb25aab8f3fb9fca13be982a373b8f1157ca14b897a825ba4a2d35"
dependencies = [
"proc-macro2",
"quote",
"unicode-ident",
]
[[package]]
name = "terminal_size"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "21bebf2b7c9e0a515f6e0f8c51dc0f8e4696391e6f1ff30379559f8365fb0df7"
dependencies = [
"rustix",
"windows-sys 0.48.0",
]
[[package]]
name = "tokio"
version = "1.37.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1adbebffeca75fcfd058afa480fb6c0b81e165a0323f9c9d39c9697e37c46787"
dependencies = [
"backtrace",
"bytes",
"libc",
"mio",
"pin-project-lite",
"signal-hook-registry",
"socket2 0.5.6",
"windows-sys 0.48.0",
]
[[package]]
name = "tokio-stream"
version = "0.1.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "267ac89e0bec6e691e5813911606935d77c476ff49024f98abcea3e7b15e37af"
dependencies = [
"futures-core",
"pin-project-lite",
"tokio",
"tokio-util",
]
[[package]]
name = "tokio-uring"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0d5e02bb137e030b3a547c65a3bd2f1836d66a97369fdcc69034002b10e155ef"
dependencies = [
"bytes",
"io-uring",
"libc",
"scoped-tls",
"slab",
"socket2 0.4.10",
"tokio",
]
[[package]]
name = "tokio-util"
version = "0.7.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5419f34732d9eb6ee4c3578b7989078579b7f039cbbb9ca2c4da015749371e15"
dependencies = [
"bytes",
"futures-core",
"futures-sink",
"pin-project-lite",
"tokio",
]
[[package]]
name = "unicase"
version = "2.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "50f37be617794602aabbeee0be4f259dc1778fabe05e2d67ee8f79326d5cb4f6"
dependencies = [
"version_check",
]
[[package]]
name = "unicode-ident"
version = "1.0.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3354b9ac3fae1ff6755cb6db53683adb661634f67557942dea4facebec0fee4b"
[[package]]
name = "unicode-width"
version = "0.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c0edd1e5b14653f783770bce4a4dabb4a5108a5370a5f5d8cfe8710c361f6c8b"
[[package]]
name = "utf8parse"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a"
[[package]]
name = "version_check"
version = "0.9.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f"
[[package]]
name = "wasi"
version = "0.11.0+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423"
[[package]]
name = "winapi"
version = "0.3.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419"
dependencies = [
"winapi-i686-pc-windows-gnu",
"winapi-x86_64-pc-windows-gnu",
]
[[package]]
name = "winapi-i686-pc-windows-gnu"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6"
[[package]]
name = "winapi-x86_64-pc-windows-gnu"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
[[package]]
name = "windows-sys"
version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9"
dependencies = [
"windows-targets 0.48.5",
]
[[package]]
name = "windows-sys"
version = "0.52.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d"
dependencies = [
"windows-targets 0.52.4",
]
[[package]]
name = "windows-targets"
version = "0.48.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9a2fa6e2155d7247be68c096456083145c183cbbbc2764150dda45a87197940c"
dependencies = [
"windows_aarch64_gnullvm 0.48.5",
"windows_aarch64_msvc 0.48.5",
"windows_i686_gnu 0.48.5",
"windows_i686_msvc 0.48.5",
"windows_x86_64_gnu 0.48.5",
"windows_x86_64_gnullvm 0.48.5",
"windows_x86_64_msvc 0.48.5",
]
[[package]]
name = "windows-targets"
version = "0.52.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7dd37b7e5ab9018759f893a1952c9420d060016fc19a472b4bb20d1bdd694d1b"
dependencies = [
"windows_aarch64_gnullvm 0.52.4",
"windows_aarch64_msvc 0.52.4",
"windows_i686_gnu 0.52.4",
"windows_i686_msvc 0.52.4",
"windows_x86_64_gnu 0.52.4",
"windows_x86_64_gnullvm 0.52.4",
"windows_x86_64_msvc 0.52.4",
]
[[package]]
name = "windows_aarch64_gnullvm"
version = "0.48.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2b38e32f0abccf9987a4e3079dfb67dcd799fb61361e53e2882c3cbaf0d905d8"
[[package]]
name = "windows_aarch64_gnullvm"
version = "0.52.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bcf46cf4c365c6f2d1cc93ce535f2c8b244591df96ceee75d8e83deb70a9cac9"
[[package]]
name = "windows_aarch64_msvc"
version = "0.48.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dc35310971f3b2dbbf3f0690a219f40e2d9afcf64f9ab7cc1be722937c26b4bc"
[[package]]
name = "windows_aarch64_msvc"
version = "0.52.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "da9f259dd3bcf6990b55bffd094c4f7235817ba4ceebde8e6d11cd0c5633b675"
[[package]]
name = "windows_i686_gnu"
version = "0.48.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a75915e7def60c94dcef72200b9a8e58e5091744960da64ec734a6c6e9b3743e"
[[package]]
name = "windows_i686_gnu"
version = "0.52.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b474d8268f99e0995f25b9f095bc7434632601028cf86590aea5c8a5cb7801d3"
[[package]]
name = "windows_i686_msvc"
version = "0.48.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f55c233f70c4b27f66c523580f78f1004e8b5a8b659e05a4eb49d4166cca406"
[[package]]
name = "windows_i686_msvc"
version = "0.52.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1515e9a29e5bed743cb4415a9ecf5dfca648ce85ee42e15873c3cd8610ff8e02"
[[package]]
name = "windows_x86_64_gnu"
version = "0.48.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e"
[[package]]
name = "windows_x86_64_gnu"
version = "0.52.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5eee091590e89cc02ad514ffe3ead9eb6b660aedca2183455434b93546371a03"
[[package]]
name = "windows_x86_64_gnullvm"
version = "0.48.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b7b52767868a23d5bab768e390dc5f5c55825b6d30b86c844ff2dc7414044cc"
[[package]]
name = "windows_x86_64_gnullvm"
version = "0.52.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77ca79f2451b49fa9e2af39f0747fe999fcda4f5e241b2898624dca97a1f2177"
[[package]]
name = "windows_x86_64_msvc"
version = "0.48.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538"
[[package]]
name = "windows_x86_64_msvc"
version = "0.52.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "32b752e52a2da0ddfbdbcc6fceadfeede4c939ed16d13e648833a61dfb611ed8"

@ -1,34 +1,63 @@
[package]
name = "reverse"
version = "0.4.0"
version = "0.5.0"
authors = ["Avril <flanchan@cumallover.me>"]
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[features]
default = ["output-lines", "buffer-output", "ignore-output-errors", "ignore-invalid-args"]
default = ["mapped-file", "threads"]
# Enable nightly features.
unstable = ["parking_lot?/nightly", "memchr/compiler_builtins", "crossbeam?/nightly", "futures?/unstable", "color-eyre/track-caller", "crossbeam-utils/nightly"]
# Enables parallelising the operation where possible.
#
# Cli config options can control this, but in default auto setting the process will decide if parallelisation is worth using on the dimensions of the provided input and system.
threads = ["dep:rayon", "dep:parking_lot", "dep:crossbeam", "dep:num_cpus", "threads-async"]
# Attempt `mmap()`ping the input instead of reading it.
# If the output can be mapped, it may also be.
#
# Useable on unix-family systems. (TODO: Make this dependency target-dependent for unix instead of a manual feature addition like this.)
mapped-file = ["dep:mapped-file"]
# TODO: Make this non-dependent feature of `threads`: use `tokio_uring` tasks for I/O, and pull in `tokio` **instead of** rayon for operations. For now, we'll see if we can get away with single-threaded async for `tokio_uring` usage and `rayon`'s auto parallisation for the chunk partitioning; if not, change this to not be internal.
#
# We are using `tokio_uring` & `tokio-stream` to allow parallel queued writes. (TODO: If we end up using (or needing) this feature for partitioning (e.g. we need a controlled IO thread that takes the data gathered by the parallelised partition system or we need a task pool (NOTE: the task pool will execute on tokio_uring's thread only; rayon's thread pool is being used for auto parallelisation currently) to spawn reverse-reader tasks per partition.) then: Remove the use of crossbeam (XXX: and maybe rayon?), and use Tokio runtime for manual parallelised partitioning/line-reversing.)
threads-async = ["threads", "dep:futures", "dep:tokio-uring", "dep:tokio-stream"]
#rayon = ["dep:rayon"]
#parking_lot = ["dep:parking_lot"]
#XXX ["output-lines", "buffer-output", "ignore-output-errors", "ignore-invalid-args"]
#TODO: Move these features to Clap runtime arg parse options.
#TODO: Add feature 'mapped-file': Attempt to map input file and process data backwards from end if possible.
#TODO: Add feature 'threads': Use rayon to parallelise line-searching in buffer (or mapped file ^) (partition in half, move partition line to closest {r,}memchr('\n'); repeat N times recursively on each side (parallel) where N is number of logical CPU cores and/or to get the chunks below/in a certain size range.
# Output as lines instead of `["2", "1", "0"]`
output-lines = []
#XXX output-lines = []
# Print each string escaped
output-quoted = []
#XXX output-quoted = []
# Buffer output to conserve syscalls, useful for very large inputs (can cause higher memory usage, but generally speeds output up considerably)
buffer-output = []
#XXX buffer-output = []
# Do not attempt to handle output errors.
# Disable this if you are writing to a faulty device or expect some output operations to stdout to fail.
ignore-output-errors = []
#XXX ignore-output-errors = []
# Ignore invalid arguments instead of removing invalid UTF8 characters if they exist in the argument
ignore-invalid-args = []
#XXX ignore-invalid-args = []
# Operate on raw input byte arrays instead of strings; so non-utf8 characters will be preserved in both input and output
# NOTE: May cause collecting from stdin to be *slightly* slower when enabled; so only enable if you intend to be operating on non-utf8 strings (which is usually unlikely)
# NOTE: `ignore-invalid-args` will do nothing if this is enabled.
byte-strings = []
#XXX byte-strings = []
[profile.release]
opt-level = 3
@ -41,3 +70,18 @@ inherits="release"
strip=false
[dependencies]
bytes = "1.6.0"
clap = { version = "4.5.4", features = ["derive", "string", "unicode", "wrap_help"] }
color-eyre = { version = "0.6.3", default-features = false }
crossbeam = { version = "0.8.4", optional = true, features = ["crossbeam-queue", "crossbeam-channel", "crossbeam-epoch"] }
crossbeam-utils = "0.8.19"
#crossbeam-queue = { version = "0.3.11", optional = true }
futures = { version = "0.3.30", optional = true, default-features = false, features = ["alloc", "std", "async-await"] }
lazy_static = { version = "1.4.0", features = ["spin"] }
mapped-file = { version = "0.0.8", features = ["file"], optional = true }
memchr = "2.7.2"
num_cpus = { version = "1.16.0", optional = true }
parking_lot = { version = "0.12.1", optional = true, features = ["arc_lock", "owning_ref", "send_guard"] }
rayon = { version = "1.10.0", optional = true }
tokio-stream = { version = "0.1.15", optional = true, features = ["sync", "fs", "tokio-util", "io-util", "signal"] }
tokio-uring = { version = "0.4.0", optional = true, features = ["bytes"] }

@ -0,0 +1,358 @@
//! Buffering and resource pooling utilities
//!
//!
use super::*;
pub use bytes::{
Buf, BufMut,
Bytes, BytesMut,
};
use std::{
fmt, error,
sync::Arc,
pin::Pin,
};
#[cfg(feature="_loan")]
const _TODO_REWORK_THIS_INTERFACE_ITS_OVERCOMPLICATED_SHITE: () = {
type OwnedResult<S, T, E> = Result<T, (E, S)>;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum MaybeRef<'r, T>
{
Owned(T),
Borrowed(&'r T),
}
#[derive(Debug)]
struct Loan<'owner, O: Loaner>
{
owner: MaybeRef<'owner, O>,
item: O::Item<'owner>,
}
impl<'o, O> Loaned<'o, O> for Loan<'o, O>
where O: Loaner
{
fn try_release(self) -> Result<(), <O as Loaner>::Error> {
self.item.try_release()
}
fn try_release_owned(self: Arc<Self>) -> OwnedResult<Arc<Self>, (), <O as Loaner>::Error> {
}
}
pub trait Loaned<'owner, Owner: ?Sized + 'owner>: Sized
where Owner: Loaner
{
//fn owner(&self) -> Option<&Owner>;
fn try_release(self) -> Result<(), Owner::Error>;
fn try_release_owned(self: Arc<Self>) -> OwnedResult<Arc<Self>, (), Owner::Error>
where Self: 'owner; // XXX: What? 'owner or 'static??? I think it's `'owner` == `'static` but how tf do I write that?
}
pub trait Loaner {
type Item<'owner>: Loaned<'owner, Self>
where Self: 'owner;
type Error: error::Error + Send + 'static;
fn try_loan(&self) -> Result<Self::Item<'_>, Self::Error>;
fn try_loan_owned(self: Arc<Self>) -> OwnedResult<Arc<Self>, Self::Item<'static>, Self::Error>
where Self: 'static;
fn try_release(&self, item: Self::Item<'_>) -> OwnedResult<Self::Item<'_>, (), Self::Error>;
}
compile_error!("This `trait Loan` design isn't going to fucking work ofc...")
};
/// Thread pooling
#[cfg(feature="threads")]
pub mod thread_pool {
use super::*;
use std::{
mem::{
self,
ManuallyDrop,
},
thread::{
self,
Thread, ThreadId,
},
ops::Drop,
sync::{Arc, Weak,},
};
use parking_lot::{
Condvar,
Mutex,
};
use crossbeam::{
queue::ArrayQueue,
channel,
};
//TODO: Imlement a oneshot channel: `oneshot::{Sender, Receiver`; works like Tokio's `sync::oneshot`, but is sync, not async: Can use `{Sender,Receiver}slot: {Arc, Weak}<Mutex<MaybeUninit<T>>>, `waiter: {Weak/_PinnedPtrIntoArc<Pin<slot>>(avoids 2nd alloc)/Weak,Arc<_, A = SenderSharedBumpAllocator*>,Arc}<Condvar>` (NOTE: Bump allocator crate name `bumpalo`.)
//TODO: Implement `ThreadPool<'scope = 'static>` like below, but instead use `crossbeam::channel` mpsc to send commands to unparked threads (i.e. loaned threads from the pool that have an active user handle.)
// The design will be far simpler, since the Arc/Weak, send+recv stuff will be handled fine by `channel` on both ends. The only thing we need dtors for is `force_push()`ing threads back into their pool's ringbuffer, and for force-unparking threads from a pool that are removed (same as loan, except sender should be dropped before unpark happens.)
#[cfg(feature="_thread-pool-no-mpsc")]
const _: () = {
#[derive(Debug)]
pub enum ThreadHandleKind<'scope, T>
{
Detached(Option<ThreadId>),
Attached(thread::JoinHandle<T>),
Scoped(thread::ScopedJoinHandle<'scope, T>),
}
impl<'scope, T> ThreadHandleKind<'scope, T>
{
/// Get a reference to the attached thread (if there is one.)
#[inline]
pub fn thread(&self) -> Option<&Thread>
{
Some(match self {
Self::Scoped(s) => s.thread(),
Self::Attached(s) => s.thread(),
_ => return None,
})
}
/// Is this handle attached to a thread?
///
/// If it is *attached*, this means the thread *should not* outlive the instance (unless the instance is `'static`.)
#[inline]
pub const fn is_attached(&self) -> bool
{
match self {
Self::Attached(_) |
Self::Scoped(_) => true,
Self::Detached(_) => false,
}
}
/// Is this handle no longer referring to *any* thread?
///
/// In a *poisoned* state, the handle is useless.
#[inline]
pub const fn is_poisoned(&self) -> bool
{
match self {
Self::Detached(None) => true,
_ => false,
}
}
/// Get the thread id of the handle, if there is a referred to thread.
#[inline]
pub fn id(&self) -> Option<ThreadId>
{
Some(match self {
Self::Scoped(x) => x.thread().id(),
Self::Attached(x) => x.thread().id(),
Self::Detached(Some(id)) => *id,
_ => return None
})
}
}
/// Represents a raw thread handle
#[derive(Debug)]
#[repr(transparent)]
pub struct ThreadHandle<'scope, T>(ThreadHandleKind<'scope, T>);
//TODO: forward `ThreadHandle<'s, T>` methods to `ThreadHandleKind<'s, T>`: `thread(), id(), is_attached()`. Add methods `is_running(), join(), try_join(),` etc...
impl<'s, T> Drop for ThreadHandle<'s, T>
{
#[inline]
fn drop(&mut self) {
match mem::replace(&mut self.0, ThreadHandleKind::Detached(None)) {
ThreadHandleKind::Attached(a) => drop(a.join()), // Attempt join, ignore panic.
ThreadHandleKind::Scoped(s) => drop(s.join().unwrap_or_resume()), // Attempt join, carry panic back through into scope.
_ => (),
}
}
}
#[derive(Debug)]
enum PooledSendContext<T: Send, F: ?Sized + Send>
{
/// An execute directive `Err(Some(func))` is taken by the thread and replaced by `Ok(func())`.
///
/// # Direction
/// Bidirectional.
/// (Handle -> Thread; Thread -> Handle)
Execute(Result<T, Option<Box<F>>>),
/// Pass a caught panic within an `Execute(Err(Some(F)))` payload back from the thread.
///
/// # Direction
/// Thread -> Handle
Panic(UnwindPayload),
/// When the loaned `PooledThread` is moved back into the pool, tell the thread to park itself again.
///
/// # Direction
/// Handle -> Thread
Park,
/// The default invariant used for when an operation has been taken by a thread.
///
/// # When `Taken`.
/// The replacement is always `Taken` after a `take(&mut self)` that contains a value.
/// The `Park` directive will not be replaced to `Taken`.
Taken,
//NOTE: `Close` variant not needed because `handle` will have a `Weak` reference to `callback_passage`; when the upgrade fails, the thread will exit.
}
impl<T: Send, F: ?Sized + Send> PooledSendContext<T, F>
{
pub fn take(&mut self) -> Option<Self>
{
Some(match self {
//XXX: I don't think not replacing cloneable variants is a good or useful default. (see below V)
//// Clone non-data-holding variants.
//Self::Park => Self::Park,
//// If the ctx has already been taken, return `None`.
//Self::Taken => return None,
// Move out of data-holding variants.
_ => mem::replace(self, Self::Taken)
})
}
}
/// A loanable thread that belongs to a pool.
///
/// # Pooling
/// When taken from a `ThreadPool`, the object is *moved* out of the thread-pool's ring-buffer and then pinned in return (See `PooledThread`.)
///
/// To return the thread to the pool, the `callback_passage` writes a `Park` command to the thread, unpins itself, and then on drop moves itself back into the start of the ring-buffer of the thread pool `owner`
#[derive(Debug)]
struct PooledThreadHandle<'scope, T, F: ?Sized = dyn FnOnce() -> T + Send + 'scope>
where F: FnOnce() -> T + Send + 'scope,
T: Send + 'scope
{
/// Passes `F` to `handle` and retrieves `T`.
callback_passage: Arc<(Condvar, Mutex<PooledSendContext<T, F>>)>,
/// The backing thread, which when started parks itself.
/// The `PooledThread` object will unpark the thread when the object is loaned (moved) out of the pool, and re-parked when enter
///
/// Will wait on `callback_passage?.0` for `Execute` directives, pass them back to the handle, and when a `Park` directive is recieved (i.e. when moved back into the pool) the thread will park itself and wait to be unparked when moved out of the pool or closed again.
///
/// If `callback_message` is dropped, the thread will exit when unparked (dropped from pool.)
//TODO: Ensure unpark on drop?
handle: ThreadHandle<'scope, Result<(), ()>>,
}
impl<'scope, T, F:?Sized> PooledThreadHandle<'scope, T, F>
where F: FnOnce() -> T + Send + 'scope,
T: Send + 'scope
{
fn send_to_thread_raw(&self, message: PooledSendContext<T, F>) -> bool
{
let mut ctx = self.callback_passage.1.lock();
*ctx = message;
self.callback_passage.0.notify_one()
}
fn send_from_thread_raw(&self, message: PooledSendContext<T, F>)
{
let mut ctx = self.callback_passage.1.lock();
*ctx = message;
}
fn read_in_thread_raw(&self, wait_for: bool) -> Option<PooledSendContext<T, F>>
{
let mut ctx = self.callback_passage.1.lock(); // NOTE: Should we fair lock here?
// Wait on condvar if `wait_for` is true.
if wait_for {
self.callback_passage.0.wait(&mut ctx);
}
// Then replace the ctx with `Taken`.
ctx.take()
}
fn try_read_from_thread_raw(&self) -> Option<PooledSendContext<T, F>>
{
let mut ctx = self.callback_passage.1.try_lock()?;
ctx.take()
}
fn read_from_thread_raw(&self) -> Option<PooledSendContext<T, F>>
{
let mut ctx = self.callback_passage.1.lock();
ctx.take()
}
}
#[derive(Debug)]
pub struct PooledThread<'scope, 'pool: 'scope, T, F: ?Sized = dyn FnOnce() -> T + Send + 'scope>
where F: FnOnce() -> T + Send + 'scope,
T: Send + 'scope
{
/// The owner to return `thread` to on drop.
owner: Option<&'pool ThreadPool<'scope, T, F>>,
/// The pinned thread handle that is returned on drop.
thread: ManuallyDrop<PooledThreadHandle<'scope, T, F>>, //XXX: Pin<> needed (or even useful at all) here?
}
impl<'scope, 'pool: 'scope, T, F:?Sized> Drop for PooledThread<'scope, 'pool, T, F>
where F: FnOnce() -> T + Send + 'scope,
T: Send + 'scope
{
fn drop(&mut self) {
let Some(loaned_from) = self.owner else {
// There is no owner.
// Instead, unpark the thread after drop.
let PooledThreadHandle { callback_passage, handle } = unsafe { ManuallyDrop::take(&mut self.thread) };
// Drop the Arc, so when `thread` wakes from unpark, it will fail to acquire context.
drop(callback_passage);
if let Some(thread) = handle.0.thread() {
thread.unpark();
}
return todo!("How do we know *when* to unpark? How can we unpark *after* dropping the Arc?");
};
// There is an owner. Tell the thread to park itself.
let mut handle = unsafe { ManuallyDrop::take(&mut self.thread) };
handle.send_to_thread_raw(PooledSendContext::Park);
// Read the message object, take it from the thread after it notifies us it is about to park.
let ctx = {
let mut ctx = handle.callback_passage.1.lock();
// Before thread parks, it should signal condvar, and callback_passage should not be unique again yet.
handle.callback_passage.0.wait_while(&mut ctx, |_| Arc::strong_count(&handle.callback_passage) > 1);
ctx.take()
};
// Force insert back into thred pool.
if let Some( PooledThreadHandle { callback_passage, handle }) = loaned_from.pool_queue.force_push(handle) {
// If one is removed, drop the context..
drop(callback_passage);
// Then force-unpark the thread.
match handle.0.thread() {
Some(thread) => thread.unpark(),
None => (),
}
}
todo!("Should we force `join()` here? Or allow non-scoped handles to be forgotten?")
}
}
#[derive(Debug)]
pub struct ThreadPool<'scope, T, F: ?Sized = dyn FnOnce() -> T + Send + 'scope>
where F: FnOnce() -> T + Send + 'scope,
T: Send + 'scope
{
pool_queue: ArrayQueue<PooledThreadHandle<'scope, T, F>>,
}
};
//TODO: Build a ThreadPool<'scope = 'static> using crossbeam::ArrayQueue<...> (XXX: See phone for details on how pool should use un/parking for loaning; and CondVar for passing tasks.) Also, add a `Loan` trait with assoc type `Item<'owner>` and methods `try_loan(&self) -> Result<Self::Item<'_>, ...>`, `return(&self, item: Self::Item<'_>) -> Result<(), ...>` and `try_loan_owned(self: Arc<Self>) -> `MovedResult<Arc<Self>, OwnedLoan<'static, Self>, ...>` (struct OwnedLoad<'scope, L>{ item: Option<L::Item<'scope>> } )`, etc.
}

@ -0,0 +1,304 @@
//! Extension traits, global functions + macros.
#![allow(unused)]
use super::*;
use std::convert::Infallible;
pub use std::{
convert::{
TryFrom, TryInto,
},
borrow::*,
};
/// Add forwarding borrow + deref (+ optional `into_inner()`) impls for a type.
///
/// # Usage
/// For a mutable forwarding newtype:
/// ```
/// # use crate::forward_newtype;
/// # use core::borrow::*;
///
/// /// A mutable buffer newtype over an array.
/// struct Buffer([u8; 16]);
/// forward_newtype!(mut Buffer => [u8], 0); // Generates `Borrow<[u8]>`, `BorrowMut<[u8]>`, `Deref<Target=[u8]>`, and `DerefMut` impls for `Buffer` that return `<&[mut] self.>0` (as specified by `0`.)
/// ```
///
/// For an immutable forwarding newtype:
/// ```
/// # use crate::forward_newtype;
/// # use core::borrow::*;
///
/// /// A mutable buffer newtype over an array.
/// struct Data([u8; 16]);
/// forward_newtype!(ref Buffer => [u8], 0); // Generates `Borrow<[u8]>` and `Deref<Target=[u8]>` impls for `Buffer` that return `<& self.>0` (as specified by `0`.) Immutable access only is specified by `ref`.
/// ```
///
/// ## Consuming into inner
/// To generate an `into_inner(self) -> T` inherent impl for the type, the syntax `forward_newtype!(move [const] Type => Inner, accessor)` can be used.
/// If `const` is passed, then the `into_inner()` function will be a `const fn`, if not, then it won't be.
///
/// To combine with ref-forwarding accessors, the syntax `forward_newtype!(move [const] {ref/mut} Type => Inner, accessor)` can be used to generate them all; the `Borrow`, `BorrowMut`, `Deref`, `DerefMut` and `pub [const] fn into_inner()`.
/// This is the most likely to be useful.
///
/// If you need a seperate `into_inner()` impl, you can either not use the `move` declarator, or use the `ref`/`mut` accessor generator in a different statement than the `move` one:
/// ```
/// # use crate::forward_newtype;
/// # use core::borrow::*;
///
/// /// A mutable buffer newtype over an array.
/// struct Buffer([u8; 16]);
/// forward_newtype!(mut Buffer => [u8], 0); // Generate a mutable & immutable forwarding ref to a slice of bytes.
/// forward_newtype!(move const Buffer => [u8; 16], 0); // Generate a seperately typed `into_inner()` that returns the sized array.
/// ```
macro_rules! forward_newtype {
(ref $type:ty => $inner:ty, $($expr:tt)+) => {
impl Borrow<$inner> for $type
{
#[inline]
fn borrow(&self) -> &$inner
{
&self.$($expr)+
}
}
impl ::std::ops::Deref for $type {
type Target = $inner;
#[inline]
fn deref(&self) -> &Self::Target {
self.borrow()
}
}
};
(mut $type:ty => $inner:ty, $($expr:tt)+) => {
forward_newtype!(ref $type => $inner, $($expr)+);
impl BorrowMut<$inner> for $type
{
#[inline]
fn borrow_mut(&mut self) -> &mut $inner
{
&mut self.$($expr)+
}
}
impl ::std::ops::DerefMut for $type {
#[inline]
fn deref_mut(&mut self) -> &mut Self::Target {
self.borrow_mut()
}
}
};
(move const $type:ty => $inner:ty, $($expr:tt)+) => {
impl $type {
/// Consume into the inner value.
pub const fn into_inner(self) -> $inner {
self.$($expr)+
}
}
};
(move $type:ty => $inner:ty, $($expr:tt)+) => {
impl $type {
/// Consume into the inner value.
pub fn into_inner(self) -> $inner {
self.$($expr)+
}
}
};
(move const ref $type:ty => $inner:ty, $($expr:tt)+) => {
forward_newtype!(move const $type => $inner, $($expr)+);
forward_newtype!(ref $type => $inner, $($expr)+);
};
(move ref $type:ty => $inner:ty, $($expr:tt)+) => {
forward_newtype!(move $type => $inner, $($expr)+);
forward_newtype!(ref $type => $inner, $($expr)+);
};
(move const mut $type:ty => $inner:ty, $($expr:tt)+) => {
forward_newtype!(move const $type => $inner, $($expr)+);
forward_newtype!(mut $type => $inner, $($expr)+);
};
(move mut $type:ty => $inner:ty, $($expr:tt)+) => {
forward_newtype!(move $type => $inner, $($expr)+);
forward_newtype!(mut $type => $inner, $($expr)+);
};
}
/// The default bottom type.
///
/// To use the `unwrap_infallible()`-like interface, functions that return `-> !` should be changed to `-> Never`.
/// When `unstable` is enabled, this is an alias to `!` and `-> !` is not special cased.
///
/// # As return argument
/// When feature `unstable` is enabled, `into_unreachable()` may not be required to ensure propogation to `!` from a function returning `-> Never`.
#[cfg(feature="unstable")]
pub type Never = !;
/// The default bottom type.
///
/// To use the `unwrap_infallible()`-like interface, functions special cased to `-> !` should be changed to `-> Never`.
///
/// # As return argument
/// When feature `unstable` is not enabled, `into_unreachable()` may be required to be used when dealing with return bottom types other than the special case `-> !`.
/// This is a current limitation of the type system.
#[cfg(not(feature="unstable"))]
pub type Never = Infallible;
/// Contractually ensures this type cannot exist (i.e. it is a bottom type.)
///
/// # Safety
/// Instances of the impl type **cannot exist**.
/// They must be bottom types (i.e. empty enums, types contatining an `Infallible` / `!` object, etc.)
///
/// # Auto-impl
/// This trait is not intended to be implemented on any user-defined type other than empty enums.
///
/// By default it is implemented for the following types:
/// - `core::convert::Infallible`
/// - `!` (**feature**: `unstable`)
/// - `Box<T>` *where* `T: ?Sized + Unreachable`
pub unsafe trait Unreachable {
/// Force control flow to terminate type checking here.
///
/// # Note
/// This function will never be executed, it is used to terminate the value's existence in the type system, by converting it from any `Unreachable` type into the bottom return type `!`.
/// If this function ever **can** be called at all, it is undefined behaviour.
#[inline]
#[cold]
fn into_unreachable(self) -> !
where Self: Sized
{
if cfg!(debug_assertions) {
unreachable!("Unreachable conversion from {}!", std::any::type_name::<Self>())
} else {
// SAFETY: Contractually enforced by the trait impl itself.
unsafe {
std::hint::unreachable_unchecked()
}
}
}
}
unsafe impl Unreachable for Infallible {
#[inline(always)]
#[cold]
fn into_unreachable(self) -> ! {
match self {}
}
}
#[cfg(feature="unstable")]
unsafe impl Unreachable for ! {
#[inline(always)]
#[cold]
fn into_unreachable(self) -> ! {
match self {}
}
}
unsafe impl<T: ?Sized + Unreachable> Unreachable for Box<T> {}
pub trait UnwrapPanicExt<T, E> {
/// Unwrap the result `Ok` value or panic as described by the non-returning function `F`, with the `Unreachable` bottom type `N`.
/// This will usually be an `-> !` function, (or an `-> Never` function using the `Unreachable` interface.)
///
/// # Panic usage
/// `func` must not return. It should panic, resume a panic, or exit the thread/program, trap, or terminate in an infinite loop.
///
/// It does not *have* to call `panic!()` if it terminates in another way that is not a panic, however.
fn unwrap_or_panic<N: Unreachable, F: FnOnce(E) -> N>(self, func: F) -> T;
/// Unwrap the result `Ok` value or panic as described by the non-returning function `func` with the default bottom type `Never`.
#[inline(always)]
fn unwrap_or_panic_unreachable<F: FnOnce(E) -> Never>(self, func: F) -> T
where Self: Sized {
self.unwrap_or_panic::<Never, _>(func)
}
}
pub trait UnwrapInfallibleExt<T> {
/// Unwrapping is infallible and therefore safe to do so without checking.
fn unwrap_infallible(self) -> T;
}
pub trait UnwrapPanicResumeExt<T> {
/// Unwrap or resume a previous unwind, with the unwind payload in the `Err` variant.
fn unwrap_or_resume(self) -> T;
}
impl<T, E> UnwrapPanicExt<T, E> for Result<T, E>
{
#[inline]
fn unwrap_or_panic<N: Unreachable, F: FnOnce(E) -> N>(self, func: F) -> T {
#[inline(never)]
#[cold]
fn _do_panic<Nn: Unreachable, Ee, Ff: FnOnce(Ee) -> Nn>(error: Ee, func: Ff) -> !
{
func(error).into_unreachable()
}
match self {
Ok(v) => v,
Err(e) => _do_panic(e, func)
}
}
}
impl<T, E: Unreachable> UnwrapInfallibleExt<T> for Result<T, E>
{
#[inline]
fn unwrap_infallible(self) -> T {
match self {
Ok(v) => v,
Err(e) => if cfg!(debug_assertions) {
e.into_unreachable()
} else {
// SAFETY: Contract bound of `E: Unreachable` ensures this path will never be taken.
unsafe {
std::hint::unreachable_unchecked()
}
}
}
}
}
/// The type of a caught unwind payload.
pub type UnwindPayload = Box<dyn std::any::Any + Send>;
#[cold]
#[inline(never)]
fn _resume_unwind<E: Into<UnwindPayload>>(e: E) -> !
{
std::panic::resume_unwind(e.into())
}
impl<T, E: Into<UnwindPayload>> UnwrapPanicResumeExt<T> for Result<T, E>
{
#[inline]
fn unwrap_or_resume(self) -> T {
match self {
Ok(v) => v,
Err(e) => _resume_unwind(e),
}
}
}
pub trait ArcExt
{
/// If the strong-count == 1 and the weak-count is 0.
fn is_fully_unique(self: &std::sync::Arc<Self>) -> bool;
/// If the strong-count == 1.
///
/// If there are alive `Weak`s pointing to this object, those are not considered.
fn is_currently_unique(self: &std::sync::Arc<Self>) -> bool;
}
impl<T: ?Sized> ArcExt for T
{
#[inline]
fn is_fully_unique(self: &std::sync::Arc<Self>) -> bool {
std::sync::Arc::strong_count(&self) == 1 &&
std::sync::Arc::weak_count(&self) == 0
}
#[inline]
fn is_currently_unique(self: &std::sync::Arc<Self>) -> bool {
std::sync::Arc::strong_count(&self) == 1
}
}

@ -1,3 +1,12 @@
#![cfg_attr(feature="unstable", feature(never_type))] // See `Never`.
#[macro_use] mod ext; use ext::*;
#[cfg(any(feature="threads-async", feature="threads"))]
mod sync;
mod buffer;
mod part;
//#[inline]
fn reverse<T>(slice: &mut [T])

@ -0,0 +1,426 @@
//! Partitioning even areas by delimitor byte.
use super::*;
use std::{
num::NonZeroUsize,
};
/// Size of one cache-line.
///
/// NOTE: alignment padded for `u8`.
///
/// TODO: Make this comptime env-var configurable (`option_env!()`) on debug builds. (See `SEARCH_CAP_GROW`.)
const CACHELINE_SIZE: usize = std::mem::size_of::<crossbeam_utils::CachePadded<u8>>();
/// A buffer that takes up exactly one cache-line.
///
/// This type is not `Copy` to ensure copies are made safely. `clone()` is trivial, and to copy explicitly in a const context use `.copied()`
///
/// # Alignment
/// Note that the buffer is *not* 1-cacheline aligned itself by default.
/// To ensure its alignment, you should use `crossbeam_utils::CachePadded<CachelineBuffer>` (or the type-alias `AlignedCachelineBuffer`.)
#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
#[repr(transparent)]
pub struct CachelineBuffer([u8; CACHELINE_SIZE]);
impl Default for CachelineBuffer
{
#[inline]
fn default() -> Self
{
Self::new()
}
}
/// A buffer that takes up exactly one cache-line, which is itself aligned to 1 cacheline.
pub type AlignedCachelineBuffer = crossbeam_utils::CachePadded<CachelineBuffer>;
impl CachelineBuffer {
/// The size of the buffer (1 cacheline of bytes.)
pub const SIZE: usize = CACHELINE_SIZE;
/// Create a new, empty buffer.
#[inline]
pub const fn new() -> Self
{
Self([0; Self::SIZE])
}
/// Clone this value.
///
/// This is a `const fn` explicit trivial copy of the data.
#[inline]
pub const fn copied(&self) -> Self
{
Self(self.0)
}
/// Get a reference to the byte array.
pub const fn as_bytes(&self) -> &[u8; Self::SIZE]
{
&self.0
}
}
forward_newtype!(mut CachelineBuffer => [u8], 0);
forward_newtype!(move const CachelineBuffer => [u8; CACHELINE_SIZE], 0);
const _: () = {
debug_assert!(CachelineBuffer::SIZE > std::mem::size_of::<u8>(), "Invalid cacheline-padding size (`CACHELINE_SIZE`)");
//debug_assert!(CACHELINE_SIZE == 128, "Unexpected `CACHELINE_SIZE`");
};
/// Grow capacity exponentially when search fails.
///
/// TODO: Make this comptime env-var settable (`option_env!()`) on debug builds.
const SEARCH_CAP_GROW: bool = true;
/// Settings for a searcher (memory search method configuration.)
///
/// The default values provided to implementors are globally controlled and (debug-build only) env-var configurable (for benchmarking purposes.)
trait SynchonousSearcher {
/// Initial size of capacity
const CAP_SIZE: usize = CACHELINE_SIZE;
/// Should the capacity be grown on failed search?
const CAP_GROW: bool = SEARCH_CAP_GROW;
}
// Default impl global compiled capacity settings for each.
impl SynchonousSearcher for SearchPar {}
impl SynchonousSearcher for SearchSeq {}
/// Midpoint searcher (forwards & backwards)
trait MidpointFBSearcher<T=u8>: SynchonousSearcher
{
fn search_forward<'a>(&self, haystack: &'a [T], needle: T) -> Option<&'a T>;
fn search_backward<'a>(&self, haystack: &'a [T], needle: T) -> Option<&'a T>;
fn search_combined<'a>(&self, haystack: &'a [T], begin: usize, needle: T) -> Option<&'a T>;
}
/// Search the pivot for the needle sequentially.
///
/// The order of operations will be: `search_forward()?, search_backward()`.
#[derive(Debug, Clone, Default)]
struct SearchSeq;
#[inline]
fn get_max_cap_for_search_area(size: usize) -> Option<NonZeroUsize>
{
SYS_PAGE_SIZE.and_then(move |page| if size == 0 {
// Size is unknown, bound by page.
Some(page)
} else if size >= (page.get() << 2) {
// Size is huge, bound by page ^2
NonZeroUsize::new(page.get() << 1)
} else if size >= page.get() {
// Size is larger than page, bound by page.
Some(page)
} else {
// If the area size is lower than one page, do not bound the capacity growth.
None
})
}
impl MidpointFBSearcher<u8> for SearchSeq
{
#[inline(always)]
fn search_forward<'a>(&self, haystack: &'a [u8], needle: u8) -> Option<&'a u8> {
memchr::memchr(needle, haystack).map(move |i| &haystack[i])
}
#[inline(always)]
fn search_backward<'a>(&self, haystack: &'a [u8], needle: u8) -> Option<&'a u8> {
memchr::memrchr(needle, haystack).map(move |i| &haystack[i])
}
#[inline]
fn search_combined<'a>(&self, haystack: &'a [u8], begin: usize, needle: u8) -> Option<&'a u8> {
let max_cap = get_max_cap_for_search_area(haystack.len());
match haystack.split_at(begin) {
([], []) => None,
([], x) => self.search_forward(x, needle),
(x, []) => self.search_backward(x, needle),
// If both the buffers are lower than `max_cap`, just do the entire operation on each
(x, y) if max_cap.map(|max| x.len() <= max.get() && y.len() <= max.get()).unwrap_or(false) => {
self.search_forward(y, needle)?;
self.search_backward(x, needle)
},
(mut x, mut y) => {
let len = std::cmp::min(x.len(), y.len());
let mut cap = std::cmp::min(len, Self::CAP_SIZE);
if let Some(&max) = max_cap.as_ref() {
// Bound `cap` to `max_cap` if it is set.
cap = std::cmp::min(cap, max.get());
}
while cap <= len {
// If cap is too large for one (or more) of the buffers, truncate it.
if cap > y.len() || cap > x.len() {
cap = std::cmp::min(y.len(), x.len());
}
// Search forwards in `y`. (up to `cap`)
if let Some(y) = self.search_forward(&y[..cap], needle) {
return Some(y);
}
// Search backwards in `x`. (down to `cap`)
if let Some(x) = self.search_backward(&x[(x.len()-cap)..], needle) {
return Some(x);
}
// Cut out `cap` bytes from the start of forwards
y = &y[cap..];
// Cut out `cap` bytes from the end of backwards.
x = &x[..cap];
if Self::CAP_GROW {
// Grow `cap` by 1 ^2 (not passing `max_cap` if there is one set.)
cap = max_cap.map(|max| std::cmp::min(max.get(), cap << 1)).unwrap_or_else(|| cap << 1);
}
}
None
}
}
}
}
#[cfg(feature="async")]
const _TODO_FUTURES_JOIN2_ASYNC_SEARCH: () = {
#[derive(Debug, Clone)]
struct SearchAsync<F>
{
spawn_task: F,
result: oneshot::Receiver<usize>,
}
#[cfg(feature="threads-async")]
impl<F, Fu> MidpointFBSearcher<u8> for SearchAsync<F>
where F: Fn() -> Fu,
Fu: futures::Future + Send + Sync + 'static
{
}
};
/// Search in parallel.
///
/// # Warning
/// This search operation is heavy. It **always** spawns its own 2nd thread when `search_combined()` is invoked.
/// This may not be ideal... A lighter, thread-pool (async) or thread-reusing (sync) API would be better. (See below.)
#[derive(Debug, Clone, Default)]
struct SearchPar;
lazy_static::lazy_static! {
/// Real system page size (raw.)
static ref REAL_PAGE_SIZE: std::ffi::c_int = {
use std::ffi::c_int;
extern "C" {
fn getpagesize() -> c_int;
}
unsafe {
getpagesize()
}
};
/// System page size.
///
/// If the page size returned from `getpagesize()` (`REAL_PAGE_SIZE`) was invalid (below-or-equal to 0,) `None` will be returned.
static ref SYS_PAGE_SIZE: Option<NonZeroUsize> = {
match *REAL_PAGE_SIZE {
std::ffi::c_int::MIN..=0 => None,
// SAFETY: We have masked out `0` in the above branch.
rest => unsafe {
debug_assert!(usize::try_from(rest).is_ok(), "Page size `c_int` out of range of system `usize`??? (Got {})", rest);
Some(NonZeroUsize::new_unchecked(rest as usize))
}
}
};
}
#[cfg(feature="threads")]
impl MidpointFBSearcher<u8> for SearchPar
{
#[inline(always)]
fn search_forward<'a>(&self, haystack: &'a [u8], needle: u8) -> Option<&'a u8> {
memchr::memchr(needle, haystack).map(move |i| &haystack[i])
}
#[inline(always)]
fn search_backward<'a>(&self, haystack: &'a [u8], needle: u8) -> Option<&'a u8> {
memchr::memrchr(needle, haystack).map(move |i| &haystack[i])
}
fn search_combined<'a>(&self, haystack: &'a [u8], begin: usize, needle: u8) -> Option<&'a u8> {
let complete = crossbeam::atomic::AtomicCell::new(false);
std::thread::scope(|s| {
//let mut complete_val = UnsafeCell::new(false);
//let complete: parking_lot::Once = parking_lot::Once::new();
// let value_cont = (parking_lot::Condvar::new(), parking_lot::FairMutex::new(None::<&'a u8>));
let (mut hb, mut hf) = haystack.split_at(begin);
let max_cap = get_max_cap_for_search_area(haystack.len());
// Cap the cap to `max_cap` if there is a max cap.
let cap = if let Some(max) = max_cap.as_ref() {
std::cmp::min(max.get(), Self::CAP_SIZE)
} else {
Self::CAP_SIZE
};
let forward = if hf.len() > 0 {
let cap = cap;
let sf = &self;
let complete = &complete;
// Background thread: Forward search (`forward-searcher`.)
Some(std::thread::Builder::new().name("forward-searcher".into()).spawn_scoped(s, move || -> Option<_> {
let mut cap = std::cmp::min(cap, hf.len());
let len = hf.len();
// Check completion before starting loop too.
if complete.load() {
return None;
}
while cap <= len {
// If `cap` is larger than the buffer `hf`, truncate it.
cap = std::cmp::min(cap, hf.len());
// Search forward in `hf` up to `cap` bytes.
if let /*v @ */Some(x) = sf.search_forward(&hf[..cap], needle) {
// Tell other operation we have found something.
complete.store(true);
return Some(x);
} else if complete.load() {
break;
}
// Cut out `cap` bytes from the start.
hf = &hf[cap..];
if Self::CAP_GROW {
// Grow `cap` by 1 ^2 (not passing `max_cap` if there is one set.)
cap = max_cap.map(|max| std::cmp::min(max.get(), cap << 1)).unwrap_or_else(|| cap << 1);
}
}
None::<&'a u8>
}).expect("Failed to spawn forward-searcher thread"))
} else {
None
};
//NOTE: There is no need to spawn another thread for the 2nd operation, since they are both join()'d at the end regardless and both already communicate completion.
let backward = if hb.len() > 0 {
let cap = cap;
let sf = &self;
let complete = &complete;
// Main thread: Backwards search.
move || -> Option<_> {
let mut cap = std::cmp::min(cap, hb.len());
let len = hb.len();
// Check completion before starting loop too.
if complete.load() {
return None;
} else {
// Allow previous thread to run if it is not.
std::thread::yield_now();
}
while cap <= len {
// If `cap` is larger than the buffer `hb`, truncate it.
cap = std::cmp::min(cap, hb.len());
// Search backwards in `hb` up to `cap` bytes.
if let /*v @ */Some(x) = sf.search_backward(&hb[(hb.len()-cap)..], needle) {
complete.store(true);
return Some(x);
} else if complete.load() {
break;
}
// Cut out `cap` bytes from the end.
hb = &hb[..cap];
// Grow `cap` by 1 ^2 (not passing `max_cap` if there is one set.)
cap = max_cap.map(|max| std::cmp::min(max.get(), cap << 1)).unwrap_or_else(|| cap << 1);
}
None::<&'a u8>
}()
} else {
None
};
if backward.is_some() && forward.as_ref().map(|th| !th.is_finished()).unwrap_or(false) {
// `backward` found something, `forward` is still running.
debug_assert_ne!(complete.load(), false, "Complete has not been set! (main thread waiting for forward-searcher thread");
complete.store(true);
}
#[cold]
#[inline(never)]
fn _resume_unwind(e: Box<dyn std::any::Any + Send>) -> Never
{
if cfg!(debug_assertions) {
panic!("forward-searcher thread panic")
} else {
std::panic::resume_unwind(e)
}
}
match (forward, backward) {
(None, None) => None,
(None, back @ Some(_)) => back,
(Some(forward), backward) => backward.or_else(move || forward.join().unwrap_or_panic(_resume_unwind)),
//(Some(forward), Some(_)) => Handled ^
}
})
}
}
fn partition_once_with<'a, S>(buffer: &'a [u8], needle: u8, method: S) -> (&'a [u8], &'a [u8])
where S: MidpointFBSearcher<u8>
{
todo!("Perform one single buffer partition partition (buffer/2.at_nearest_mpr(needle)) (using `method.search_combined()`) and return its parts. If we can fast-path skip the `search_combined()` then that is okay (e.g. if the buffer/2 is small enough that we should just use `SearchSeq`, we can use `SearchSeq` instead of `S`, and so on.) (XXX: Also see below about thread spawning on parallelised partitions and re-using thread pools (we may be able to do this manually with crossbeam, or we might just have to embrace using `spawn_blocking()` async/a default tokio multithreaded-runtime) since parallel partitions needs at least two threads to search both directions at a time.)")
}
//XXX: Should we add a `SearchAsync`? Or an impl for SearchPar that uses already-spawned threads? TODO: It would be best if we could re-use extant threads instead of spawning two on each partition...
//Parallel (feature="threads") byte area partition-on-nearest-newline-to-halve impl, and non-parallel (default) impl. These impls can differ in their desired depths of partitioning (using parallel impls should balance num of partitions to num of logical cpus & input(/desired chunk) size.)
//TODO: Add tests for `Search{Seq,Par}` partitioning methods.
#[cfg(test)]
mod test
{
use super::*;
use std::hint::black_box;
//TODO: Add a generic randomised lorem-ipsum-like text data generator & a generic assertion tester that can take a unique `MidpointFBSearcher`.
#[test]
fn partition_seq()
{
todo!("Test `SearchSeq` sequential partition searcher")
}
#[cfg(feature="threads")]
#[test]
fn partition_par_heavy()
{
todo!("Test `SearchPar` parallel partition searcher")
}
//TODO: Thread-reusing parallel `MidpointFBSearcher` (SearchSeq is thread-*spawning*; heavy.) This may require we use async and tasks. If it does, we should also create a `SearchAsync` partitioner (XXX: MidpointFBSearcher is currently a synchonous-only interface; a pure-async pivot finder may require a refactor.)
#[cfg(all(feature="threads-async", feature = "threads"))]
#[test]
fn partition_par_light()
{
unimplemented!("A light (thread-*reusing*) parallel searcher has not yet been implemented")
}
#[cfg(feature="threads-async")]
#[/*tokio::*/test]
fn partition_par_async()
{
unimplemented!("A pure async parallel searcher has not yet been implemented")
}
//TODO: Benchmarking the searchers' configuration about capacity size, growth and bounding.
}

@ -0,0 +1,236 @@
//! Sync (inter-thread communication) helpers
use super::*;
use std::{
sync::{
Arc, Weak,
},
mem::{
self,
MaybeUninit,
},
ptr,
fmt, error,
};
use parking_lot::{
Condvar,
Mutex,
};
/// Send a single value across thread boundaries to a receiver.
///
/// This is a sync implementation of `tokio::sync::oneshot`.
pub mod oneshot {
use super::*;
/// Error when sending
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
#[non_exhaustive]
pub enum SendError
{
/// The corresponding `Receiver<T>` channel has been dropped.
///
/// # Note
/// This operation **cannot** be re-tried
Closed,
//TODO: Should we (or *can* we even?) have a `send_wait()` method?
}
/// Error when receiving
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
#[non_exhaustive]
pub enum RecvError
{
/// The corresponding `Sender<T>` channel has been dropped.
///
/// # Note
/// This operation **cannot** be re-tried
Closed,
/// The `recv()` call timed out before a value was sent.
///
/// This operation can be re-tried
Timeout,
/// The `recv()` call was cancelled by a `StopToken` before a value was sent.
///
/// This operation can be re-tried
//TODO: Maybe merge this and `Timeout`?
Cancelled,
}
//TODO: Add impl Send/RecvError: `fn can_retry(&self) -> bool`
impl fmt::Display for SendError
{
#[inline(always)]
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
f.write_str("send error")
// TODO: Should we `match self` for detailed error messages here?
}
}
impl fmt::Display for RecvError
{
#[inline(always)]
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
f.write_str("recv error")
// TODO: Should we `match self` for detailed error messages here?
}
}
impl error::Error for SendError{}
impl error::Error for RecvError{}
impl error::Error for Error
{
#[inline]
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
Some(match &self {
Self::Recv(r) => r,
Self::Send(s) => s,
})
}
}
impl fmt::Display for Error
{
#[inline(always)]
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
f.write_str("oneshot error")
}
}
/// An error using the `oneshot` channel.
#[derive(Debug)]
pub enum Error
{
/// An error regarding the sending of a value.
Send(SendError),
/// An error regarding the receiving of a value.
Recv(RecvError),
}
impl From<SendError> for Error
{
#[inline]
fn from(from: SendError) -> Self
{
Self::Send(from)
}
}
impl From<RecvError> for Error
{
#[inline]
fn from(from: RecvError) -> Self
{
Self::Recv(from)
}
}
//TODO: impl fmt::Display error::Error for Try*Error[<T>]...
impl<T> fmt::Display for TrySendError<T>
{
#[inline]
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
write!(f, "send error (T = {})", std::any::type_name::<T>())
}
}
impl<T> fmt::Display for TryRecvError<T>
{
#[inline]
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
write!(f, "recv error (T = {})", std::any::type_name::<T>())
}
}
impl<T> error::Error for TryRecvError<T>
where T: fmt::Debug
{
#[inline]
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
Some(&self.0)
}
}
impl<T> error::Error for TrySendError<T>
where T: fmt::Debug
{
#[inline]
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
Some(&self.0)
}
}
impl<T> fmt::Display for TryError<T>
{
#[inline]
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
write!(f, "oneshot error (T = {})", std::any::type_name::<T>())
}
}
impl<T> error::Error for TryError<T>
where T: fmt::Debug
{
#[inline]
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
Some(&self.0)
}
}
//TODO: XXX: We might also want explicit `Debug` impls for all `Try*Error<T>`s, since `T` is irrelevent to the `Error` part.
/// Error when attempting to send a value using a `try_` function.
///
/// The `Sender<T>` object that originated this is stored in this object for a re-try of the operation.
#[derive(Debug)]
pub struct TrySendError<T>(SendError, Sender<T>);
/// Error when attempting to receive a value using a `try_` function.
///
/// The `Receiver<T>` object that originated this is stored in this object for a re-try of the operation.
#[derive(Debug)]
pub struct TryRecvError<T>(RecvError, Receiver<T>);
/// An error when attempting a oneshot function using a consuming `try_` function.
///
/// The `Sender<T>`/`Receiver<T>` object(s) that originated this error are stored in this object for a re-try of the operation.
#[derive(Debug)]
pub struct TryError<T>(Error, (Option<Sender<T>>, Option<Receiver<T>>));
//TODO: Make a `feature=unstable` version that is allocator-aware *and* re-uses the same allocation for both `Arc` creations (e.g. C++ polymorphic allocator; `bumpalo` or another simple implementation `Sync` bump-allocator would suffice.)
/// Oneshot sender.
///
/// Sends one value of `T` to a corresponding `Receiver`, if it is still alive.
#[derive(Debug)]
pub struct Sender<T>
{
/// The value to write (`Sender`) to / read (`Receiver`) from.
///
/// Write is bound to `send.notify_one()`, read is bound to `send.wait()`.
/// # Ownership
///
/// Note that `Receiver` has a `Weak` variant to hold this. It will fail to read if it cannot upgrade.
/// If this slot is unique, then `send`s should fast-fail as there is no corresponding `Receiver` anyway.
value: Weak<Mutex<MaybeUninit<T>>>,
/// Sends a signal to the receiver to read from `value`.
///
/// # Ownership
/// If this weak-ptr cannot be upgraded, then the `Receiver` ascosiated with this instance *cannot* be waiting on it, and therefore sending should fast-fail
//XXX: Is this order of Sender: `Arc, Weak`, Receiver: `Weak, Arc` correct? Check and think about it before proceeding pls...
//NOTE: It **is correct** to be this order.
send: Arc<Condvar>,
}
/// Oneshot receiver.
///
/// Receive one value of `T` from a corresponding `Sender`, if it is still alive.
#[derive(Debug)]
pub struct Receiver<T> {
value: Arc<Mutex<MaybeUninit<T>>>,
send: Weak<Condvar>,
}
}
Loading…
Cancel
Save