create src

This commit is contained in:
awfixer
2026-03-11 02:04:19 -07:00
commit 52f7a22bf2
2595 changed files with 402870 additions and 0 deletions

2418
src-features/CHANGELOG.md Normal file

File diff suppressed because it is too large Load Diff

128
src-features/Cargo.toml Normal file
View File

@@ -0,0 +1,128 @@
lints.workspace = true
[package]
name = "src-features"
description = "A crate to integrate various capabilities using compile-time feature flags"
repository = "https://github.com/GitoxideLabs/gitoxide"
version = "0.46.1"
authors = ["Sebastian Thiel <sebastian.thiel@icloud.com>"]
license = "MIT OR Apache-2.0"
edition = "2021"
rust-version = "1.82"
include = ["src/**/*", "LICENSE-*"]
[lib]
doctest = false
test = false
[features]
default = []
## Provide traits and utilities for providing progress information. These can then be rendered
## using facilities of the `prodash` crate.
progress = ["prodash"]
## Provide human-readable numbers as well as easier to read byte units for progress bars.
progress-unit-human-numbers = ["prodash?/unit-human"]
## Provide human readable byte units for progress bars.
progress-unit-bytes = ["dep:bytesize", "prodash?/unit-bytes"]
## Provide utilities suitable for working with the `std::fs::read_dir()`.
fs-read-dir = ["dep:src-utils"]
## Implement `tracing` with `tracing-core`, which provides applications with valuable performance details if they opt-in to it.
##
## Note that this may have overhead as well, thus instrumentations should be used stategically, only providing coarse tracing by default and adding details
## only where needed while marking them with the appropriate level.
tracing = ["src-trace/tracing"]
## If enabled, detailed tracing is also emitted, which can greatly increase insights but at a cost.
tracing-detail = ["src-trace/tracing-detail"]
## Use scoped threads and channels to parallelize common workloads on multiple objects. If enabled, it is used everywhere
## where it makes sense.
## As caches are likely to be used and instantiated per thread, more memory will be used on top of the costs for threads.
## The `threading` module will contain thread-safe primitives for shared ownership and mutation, otherwise these will be their single threaded counterparts.
## This way, single-threaded applications don't have to pay for threaded primitives.
parallel = ["dep:crossbeam-channel", "dep:parking_lot"]
## If enabled, OnceCell will be made available for interior mutability either in sync or unsync forms.
## Note: This still uses once_cell because std::sync::OnceLock::get_or_try_init() is not yet stable.
once_cell = ["dep:once_cell"]
## Makes facilities of the `walkdir` crate partially available.
## In conjunction with the **parallel** feature, directory walking will be parallel instead behind a compatible interface.
walkdir = ["dep:walkdir", "dep:src-path", "dep:src-utils"]
#* an in-memory unidirectional pipe using `bytes` as efficient transfer mechanism.
io-pipe = ["dep:bytes"]
## provide a proven and fast `crc32` implementation.
crc32 = ["dep:crc32fast"]
## Enable the usage of zlib-related utilities to compress or decompress data.
## This enables and uses the high-performance `zlib-rs` backend.
zlib = ["dep:zlib-rs", "dep:thiserror"]
#! ### Other
## Count cache hits and misses and print that debug information on drop.
## Caches implement this by default, which costs nothing unless this feature is enabled
cache-efficiency-debug = []
[[test]]
name = "parallel"
path = "tests/parallel_threaded.rs"
required-features = ["parallel"]
[[test]]
name = "multi-threaded"
path = "tests/parallel_shared_threaded.rs"
required-features = ["parallel"]
[[test]]
name = "single-threaded"
path = "tests/parallel_shared.rs"
[[test]]
name = "pipe"
path = "tests/pipe.rs"
required-features = ["io-pipe"]
[dependencies]
src-trace = { version = "^0.1.18", path = "../src-trace" }
# for walkdir
src-path = { version = "^0.11.1", path = "../src-path", optional = true }
src-utils = { version = "^0.3.1", path = "../src-utils", optional = true }
# 'parallel' feature
crossbeam-channel = { version = "0.5.15", optional = true }
parking_lot = { version = "0.12.4", default-features = false, optional = true }
walkdir = { version = "2.3.2", optional = true } # used when parallel is off
# hashing
crc32fast = { version = "1.5.0", optional = true }
# progress
prodash = { version = "31.0.0", optional = true }
bytesize = { version = "2.3.1", optional = true }
# pipe
bytes = { version = "1.11.1", optional = true }
# zlib module
zlib-rs = { version = "0.6.2", optional = true }
thiserror = { version = "2.0.18", optional = true }
# Note: once_cell is kept for OnceCell type because std::sync::OnceLock::get_or_try_init() is not yet stable.
# Once it's stabilized (tracking issue #109737), we can remove this dependency.
once_cell = { version = "1.21.3", optional = true }
document-features = { version = "0.2.0", optional = true }
[target.'cfg(unix)'.dependencies]
libc = { version = "0.2.182" }
[dev-dependencies]
bstr = { version = "1.12.0", default-features = false }
[package.metadata.docs.rs]
all-features = true
features = ["document-features"]

1
src-features/LICENSE-APACHE Symbolic link
View File

@@ -0,0 +1 @@
../LICENSE-APACHE

1
src-features/LICENSE-MIT Symbolic link
View File

@@ -0,0 +1 @@
../LICENSE-MIT

76
src-features/src/cache.rs Normal file
View File

@@ -0,0 +1,76 @@
#[cfg(feature = "cache-efficiency-debug")]
mod impl_ {
/// A helper to collect useful information about cache efficiency.
pub struct Debug {
owner: String,
hits: usize,
puts: usize,
misses: usize,
}
impl Debug {
/// Create a new instance
#[inline]
pub fn new(owner: String) -> Self {
Debug {
owner,
hits: 0,
puts: 0,
misses: 0,
}
}
/// Count cache insertions
#[inline]
pub fn put(&mut self) {
self.puts += 1;
}
/// Count hits
#[inline]
pub fn hit(&mut self) {
self.hits += 1;
}
/// Count misses
#[inline]
pub fn miss(&mut self) {
self.misses += 1;
}
}
impl Drop for Debug {
fn drop(&mut self) {
let hits = self.hits;
let misses = self.misses;
let ratio = hits as f32 / misses as f32;
eprintln!(
"{}[{:0x}]: {} / {} (hits/misses) = {:.02}%, puts = {}",
self.owner,
self as *const _ as usize,
hits,
misses,
ratio * 100.0,
self.puts
);
}
}
}
#[cfg(not(feature = "cache-efficiency-debug"))]
mod impl_ {
/// The disabled, zero size do-nothing equivalent
pub struct Debug;
impl Debug {
/// Create a new instance
#[inline]
pub fn new(_owner: String) -> Self {
Debug
}
/// noop
pub fn put(&mut self) {}
/// noop
pub fn hit(&mut self) {}
/// noop
pub fn miss(&mut self) {}
}
}
pub use impl_::Debug;

View File

@@ -0,0 +1,38 @@
use std::io::Read;
/// Decode variable int numbers from a `Read` implementation.
///
/// Note: currently overflow checks are only done in debug mode.
#[inline]
pub fn leb64_from_read(mut r: impl Read) -> Result<(u64, usize), std::io::Error> {
let mut b = [0u8; 1];
let mut i = 0;
r.read_exact(&mut b)?;
i += 1;
let mut value = u64::from(b[0]) & 0x7f;
while b[0] & 0x80 != 0 {
r.read_exact(&mut b)?;
i += 1;
debug_assert!(i <= 10, "Would overflow value at 11th iteration");
value += 1;
value = (value << 7) + (u64::from(b[0]) & 0x7f);
}
Ok((value, i))
}
/// Decode variable int numbers.
#[inline]
pub fn leb64(d: &[u8]) -> (u64, usize) {
let mut i = 0;
let mut c = d[i];
i += 1;
let mut value = u64::from(c) & 0x7f;
while c & 0x80 != 0 {
c = d[i];
i += 1;
debug_assert!(i <= 10, "Would overflow value at 11th iteration");
value += 1;
value = (value << 7) + (u64::from(c) & 0x7f);
}
(value, i)
}

273
src-features/src/fs.rs Normal file
View File

@@ -0,0 +1,273 @@
//! Filesystem utilities
//!
//! These are will be parallel if the `parallel` feature is enabled, at the expense of compiling additional dependencies
//! along with runtime costs for maintaining a global [`rayon`](https://docs.rs/rayon) thread pool.
//!
//! For information on how to use the [`WalkDir`] type, have a look at
// TODO: Move all this to `src-fs` in a breaking change.
#[cfg(feature = "walkdir")]
mod shared {
/// The desired level of parallelism.
pub enum Parallelism {
/// Do not parallelize at all by making a serial traversal on the current thread.
Serial,
/// Create a new thread pool for each traversal with up to 16 threads or the amount of logical cores of the machine.
ThreadPoolPerTraversal {
/// The base name of the threads we create as part of the thread-pool.
thread_name: &'static str,
},
}
}
#[cfg(any(feature = "walkdir", feature = "fs-read-dir"))]
mod walkdir_precompose {
use std::{borrow::Cow, ffi::OsStr, path::Path};
#[derive(Debug)]
pub struct DirEntry<T: std::fmt::Debug> {
inner: T,
precompose_unicode: bool,
}
impl<T: std::fmt::Debug> DirEntry<T> {
/// Create a new instance.
pub fn new(inner: T, precompose_unicode: bool) -> Self {
Self {
inner,
precompose_unicode,
}
}
}
pub trait DirEntryApi {
fn path(&self) -> Cow<'_, Path>;
fn file_name(&self) -> Cow<'_, OsStr>;
fn file_type(&self) -> std::io::Result<std::fs::FileType>;
}
impl<T: DirEntryApi + std::fmt::Debug> DirEntry<T> {
/// Obtain the full path of this entry, possibly with precomposed unicode if enabled.
///
/// Note that decomposing filesystem like those made by Apple accept both precomposed and
/// decomposed names, and consider them equal.
pub fn path(&self) -> Cow<'_, Path> {
let path = self.inner.path();
if self.precompose_unicode {
gix_utils::str::precompose_path(path)
} else {
path
}
}
/// Obtain filen name of this entry, possibly with precomposed unicode if enabled.
pub fn file_name(&self) -> Cow<'_, OsStr> {
let name = self.inner.file_name();
if self.precompose_unicode {
gix_utils::str::precompose_os_string(name)
} else {
name
}
}
/// Return the file type for the file that this entry points to.
///
/// If `follow_links` was `true`, this is the file type of the item the link points to.
pub fn file_type(&self) -> std::io::Result<std::fs::FileType> {
self.inner.file_type()
}
}
/// A platform over entries in a directory, which may or may not precompose unicode after retrieving
/// paths from the file system.
#[cfg(feature = "walkdir")]
pub struct WalkDir<T> {
pub(crate) inner: Option<T>,
pub(crate) precompose_unicode: bool,
}
#[cfg(feature = "walkdir")]
pub struct WalkDirIter<T, I, E>
where
T: Iterator<Item = Result<I, E>>,
I: DirEntryApi,
{
pub(crate) inner: T,
pub(crate) precompose_unicode: bool,
}
#[cfg(feature = "walkdir")]
impl<T, I, E> Iterator for WalkDirIter<T, I, E>
where
T: Iterator<Item = Result<I, E>>,
I: DirEntryApi + std::fmt::Debug,
{
type Item = Result<DirEntry<I>, E>;
fn next(&mut self) -> Option<Self::Item> {
self.inner
.next()
.map(|res| res.map(|entry| DirEntry::new(entry, self.precompose_unicode)))
}
}
}
///
#[cfg(feature = "fs-read-dir")]
pub mod read_dir {
use std::{borrow::Cow, ffi::OsStr, fs::FileType, path::Path};
/// A directory entry adding precompose-unicode support to [`std::fs::DirEntry`].
pub type DirEntry = super::walkdir_precompose::DirEntry<std::fs::DirEntry>;
impl super::walkdir_precompose::DirEntryApi for std::fs::DirEntry {
fn path(&self) -> Cow<'_, Path> {
self.path().into()
}
fn file_name(&self) -> Cow<'_, OsStr> {
self.file_name().into()
}
fn file_type(&self) -> std::io::Result<FileType> {
self.file_type()
}
}
}
///
#[cfg(feature = "walkdir")]
pub mod walkdir {
use std::{borrow::Cow, ffi::OsStr, fs::FileType, path::Path};
pub use walkdir::Error;
use walkdir::{DirEntry as DirEntryImpl, WalkDir as WalkDirImpl};
/// A directory entry returned by [DirEntryIter].
pub type DirEntry = super::walkdir_precompose::DirEntry<DirEntryImpl>;
/// A platform to create a [DirEntryIter] from.
pub type WalkDir = super::walkdir_precompose::WalkDir<WalkDirImpl>;
pub use super::shared::Parallelism;
impl super::walkdir_precompose::DirEntryApi for DirEntryImpl {
fn path(&self) -> Cow<'_, Path> {
self.path().into()
}
fn file_name(&self) -> Cow<'_, OsStr> {
self.file_name().into()
}
fn file_type(&self) -> std::io::Result<FileType> {
Ok(self.file_type())
}
}
impl IntoIterator for WalkDir {
type Item = Result<DirEntry, walkdir::Error>;
type IntoIter = DirEntryIter;
fn into_iter(self) -> Self::IntoIter {
DirEntryIter {
inner: self.inner.expect("always set (builder fix)").into_iter(),
precompose_unicode: self.precompose_unicode,
}
}
}
impl WalkDir {
/// Set the minimum component depth of paths of entries.
pub fn min_depth(mut self, min: usize) -> Self {
self.inner = Some(self.inner.take().expect("always set").min_depth(min));
self
}
/// Set the maximum component depth of paths of entries.
pub fn max_depth(mut self, max: usize) -> Self {
self.inner = Some(self.inner.take().expect("always set").max_depth(max));
self
}
/// Follow symbolic links.
pub fn follow_links(mut self, toggle: bool) -> Self {
self.inner = Some(self.inner.take().expect("always set").follow_links(toggle));
self
}
}
/// Instantiate a new directory iterator which will not skip hidden files, with the given level of `parallelism`.
///
/// Use `precompose_unicode` to represent the `core.precomposeUnicode` configuration option.
pub fn walkdir_new(root: &Path, _: Parallelism, precompose_unicode: bool) -> WalkDir {
WalkDir {
inner: WalkDirImpl::new(root).into(),
precompose_unicode,
}
}
/// Instantiate a new directory iterator which will not skip hidden files and is sorted, with the given level of `parallelism`.
///
/// Use `precompose_unicode` to represent the `core.precomposeUnicode` configuration option.
/// Use `max_depth` to limit the depth of the recursive walk.
/// * `0`
/// - Returns only the root path with no children
/// * `1`
/// - Root directory and children.
/// * `1..n`
/// - Root directory, children and {n}-grandchildren
pub fn walkdir_sorted_new(root: &Path, _: Parallelism, max_depth: usize, precompose_unicode: bool) -> WalkDir {
WalkDir {
inner: WalkDirImpl::new(root)
.max_depth(max_depth)
.sort_by(|a, b| {
let storage_a;
let storage_b;
let a_name = match gix_path::os_str_into_bstr(a.file_name()) {
Ok(f) => f,
Err(_) => {
storage_a = a.file_name().to_string_lossy();
storage_a.as_ref().into()
}
};
let b_name = match gix_path::os_str_into_bstr(b.file_name()) {
Ok(f) => f,
Err(_) => {
storage_b = b.file_name().to_string_lossy();
storage_b.as_ref().into()
}
};
// "common." < "common/" < "common0"
let common = a_name.len().min(b_name.len());
a_name[..common].cmp(&b_name[..common]).then_with(|| {
let a = a_name.get(common).or_else(|| a.file_type().is_dir().then_some(&b'/'));
let b = b_name.get(common).or_else(|| b.file_type().is_dir().then_some(&b'/'));
a.cmp(&b)
})
})
.into(),
precompose_unicode,
}
}
/// The Iterator yielding directory items
pub type DirEntryIter = super::walkdir_precompose::WalkDirIter<walkdir::IntoIter, DirEntryImpl, walkdir::Error>;
}
#[cfg(feature = "walkdir")]
pub use self::walkdir::{walkdir_new, walkdir_sorted_new, WalkDir};
/// Prepare open options which won't follow symlinks when the file is opened.
///
/// Note: only effective on unix currently.
pub fn open_options_no_follow() -> std::fs::OpenOptions {
#[cfg_attr(not(unix), allow(unused_mut))]
let mut options = std::fs::OpenOptions::new();
#[cfg(unix)]
{
/// Make sure that it's impossible to follow through to the target of symlinks.
/// Note that this will still follow symlinks in the path, which is what we assume
/// has been checked separately.
use std::os::unix::fs::OpenOptionsExt;
options.custom_flags(libc::O_NOFOLLOW);
}
options
}

23
src-features/src/hash.rs Normal file
View File

@@ -0,0 +1,23 @@
//! Hash functions and hash utilities
/// Compute a CRC32 hash from the given `bytes`, returning the CRC32 hash.
///
/// When calling this function for the first time, `previous_value` should be `0`.
/// Otherwise, it should be the previous return value of this function to provide a hash
/// of multiple sequential chunks of `bytes`.
#[cfg(feature = "crc32")]
pub fn crc32_update(previous_value: u32, bytes: &[u8]) -> u32 {
let mut h = crc32fast::Hasher::new_with_initial(previous_value);
h.update(bytes);
h.finalize()
}
/// Compute a CRC32 value of the given input `bytes`.
///
/// In case multiple chunks of `bytes` are present, one should use [`crc32_update()`] instead.
#[cfg(feature = "crc32")]
pub fn crc32(bytes: &[u8]) -> u32 {
let mut h = crc32fast::Hasher::new();
h.update(bytes);
h.finalize()
}

View File

@@ -0,0 +1,161 @@
//! Utilities to cause interruptions in common traits, like Read/Write and Iterator.
use std::{
io,
sync::atomic::{AtomicBool, Ordering},
};
/// A wrapper for an inner iterator which will check for interruptions on each iteration, stopping the iteration when
/// that is requested.
pub struct Iter<'a, I> {
/// The actual iterator to yield elements from.
pub inner: I,
should_interrupt: &'a AtomicBool,
}
impl<'a, I> Iter<'a, I>
where
I: Iterator,
{
/// Create a new iterator over `inner` which checks for interruptions on each iteration on `should_interrupt`.
///
/// Note that this means the consumer of the iterator data should also be able to access `should_interrupt` and
/// consider it when producing the final result to avoid claiming success even though the operation is only partially
/// complete.
pub fn new(inner: I, should_interrupt: &'a AtomicBool) -> Self {
Iter {
inner,
should_interrupt,
}
}
}
impl<I> Iterator for Iter<'_, I>
where
I: Iterator,
{
type Item = I::Item;
fn next(&mut self) -> Option<Self::Item> {
if self.should_interrupt.load(Ordering::Relaxed) {
return None;
}
self.inner.next()
}
}
/// A wrapper for an inner iterator which will check for interruptions on each iteration.
pub struct IterWithErr<'a, I, EFN> {
/// The actual iterator to yield elements from.
pub inner: I,
make_err: Option<EFN>,
should_interrupt: &'a AtomicBool,
}
impl<'a, I, EFN, E> IterWithErr<'a, I, EFN>
where
I: Iterator,
EFN: FnOnce() -> E,
{
/// Create a new iterator over `inner` which checks for interruptions on each iteration and calls `make_err()` to
/// signal an interruption happened, causing no further items to be iterated from that point on.
pub fn new(inner: I, make_err: EFN, should_interrupt: &'a AtomicBool) -> Self {
IterWithErr {
inner,
make_err: Some(make_err),
should_interrupt,
}
}
}
impl<I, EFN, E> Iterator for IterWithErr<'_, I, EFN>
where
I: Iterator,
EFN: FnOnce() -> E,
{
type Item = Result<I::Item, E>;
fn next(&mut self) -> Option<Self::Item> {
self.make_err.as_ref()?;
if self.should_interrupt.load(Ordering::Relaxed) {
return self.make_err.take().map(|f| Err(f()));
}
match self.inner.next() {
Some(next) => Some(Ok(next)),
None => {
self.make_err = None;
None
}
}
}
}
/// A wrapper for implementers of [`std::io::Read`] or [`std::io::BufRead`] with interrupt support.
///
/// It fails a [read][std::io::Read::read] while an interrupt was requested.
pub struct Read<'a, R> {
/// The actual implementor of [`std::io::Read`] to which interrupt support will be added.
pub inner: R,
/// The flag to trigger interruption
pub should_interrupt: &'a AtomicBool,
}
impl<R> io::Read for Read<'_, R>
where
R: io::Read,
{
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
if self.should_interrupt.load(Ordering::Relaxed) {
return Err(std::io::Error::other("Interrupted"));
}
self.inner.read(buf)
}
}
impl<R> io::BufRead for Read<'_, R>
where
R: io::BufRead,
{
fn fill_buf(&mut self) -> io::Result<&[u8]> {
self.inner.fill_buf()
}
fn consume(&mut self, amt: usize) {
self.inner.consume(amt);
}
}
/// A wrapper for implementers of [`std::io::Write`] with interrupt checks on each write call.
///
/// It fails a [write][std::io::Write::write] while an interrupt was requested.
pub struct Write<'a, W> {
/// The actual implementor of [`std::io::Write`] to which interrupt support will be added.
pub inner: W,
/// The flag to trigger interruption
pub should_interrupt: &'a AtomicBool,
}
impl<W> io::Write for Write<'_, W>
where
W: std::io::Write,
{
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
if self.should_interrupt.load(Ordering::Relaxed) {
return Err(std::io::Error::other("Interrupted"));
}
self.inner.write(buf)
}
fn flush(&mut self) -> io::Result<()> {
// Don't interrupt here, allow flushes to happen to prefer disk consistency.
self.inner.flush()
}
}
impl<W> io::Seek for Write<'_, W>
where
W: std::io::Seek,
{
fn seek(&mut self, pos: io::SeekFrom) -> io::Result<u64> {
self.inner.seek(pos)
}
}

94
src-features/src/io.rs Normal file
View File

@@ -0,0 +1,94 @@
//!A unidirectional pipe for bytes, analogous to a unix pipe. Available with the `io-pipe` feature toggle.
/// A unidirectional pipe for bytes, analogous to a unix pipe. Available with the `io-pipe` feature toggle.
#[cfg(feature = "io-pipe")]
pub mod pipe {
use std::io;
use bytes::{Buf, BufMut, BytesMut};
/// The write-end of the pipe, receiving items to become available in the [`Reader`].
///
/// It's commonly used with the [`std::io::Write`] trait it implements.
pub struct Writer {
/// The channel through which bytes are transferred. Useful for sending [`std::io::Error`]s instead.
pub channel: std::sync::mpsc::SyncSender<io::Result<BytesMut>>,
buf: BytesMut,
}
/// The read-end of the pipe, implementing the [`std::io::Read`] trait.
pub struct Reader {
channel: std::sync::mpsc::Receiver<io::Result<BytesMut>>,
buf: BytesMut,
}
impl io::BufRead for Reader {
fn fill_buf(&mut self) -> io::Result<&[u8]> {
if self.buf.is_empty() {
match self.channel.recv() {
Ok(Ok(buf)) => self.buf = buf,
Ok(Err(err)) => return Err(err),
Err(_) => {}
}
}
Ok(&self.buf)
}
fn consume(&mut self, amt: usize) {
self.buf.advance(amt.min(self.buf.len()));
}
}
impl io::Read for Reader {
fn read(&mut self, mut out: &mut [u8]) -> io::Result<usize> {
let mut written = 0;
while !out.is_empty() {
if self.buf.is_empty() {
match self.channel.recv() {
Ok(Ok(buf)) => self.buf = buf,
Ok(Err(err)) => return Err(err),
Err(_) => break,
}
}
let bytes_to_write = self.buf.len().min(out.len());
let (to_write, rest) = out.split_at_mut(bytes_to_write);
self.buf.split_to(bytes_to_write).copy_to_slice(to_write);
out = rest;
written += bytes_to_write;
}
Ok(written)
}
}
impl io::Write for Writer {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.buf.put_slice(buf);
self.channel
.send(Ok(self.buf.split()))
.map_err(|err| io::Error::new(io::ErrorKind::BrokenPipe, err))?;
Ok(buf.len())
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}
/// Returns the _([`write`][Writer], [`read`][Reader])_ ends of a pipe for transferring bytes, analogous to a unix pipe.
///
/// * `in_flight_writes` defines the amount of chunks of bytes to keep in memory until the `write` end will block when writing.
/// If `0`, the `write` end will always block until the `read` end consumes the transferred bytes.
pub fn unidirectional(in_flight_writes: usize) -> (Writer, Reader) {
let (tx, rx) = std::sync::mpsc::sync_channel(in_flight_writes);
(
Writer {
channel: tx,
buf: BytesMut::with_capacity(4096),
},
Reader {
channel: rx,
buf: BytesMut::new(),
},
)
}
}

66
src-features/src/lib.rs Normal file
View File

@@ -0,0 +1,66 @@
//! A crate providing foundational capabilities to other `git-*` crates with trade-offs between compile time, binary size or speed
//! selectable using cargo feature toggles.
//!
//! It's designed to allow the application level crate to configure feature toggles, affecting all other `git-*` crates using
//! this one.
//!
//! Thus all features provided here commonly have a 'cheap' base implementation, with the option to pull in
//! counterparts with higher performance.
//! ## Feature Flags
#![cfg_attr(
all(doc, feature = "document-features"),
doc = ::document_features::document_features!()
)]
#![cfg_attr(all(doc, feature = "document-features"), feature(doc_cfg))]
#![deny(rust_2018_idioms, missing_docs)]
///
pub mod cache;
///
pub mod decode;
pub mod fs;
pub mod hash;
pub mod interrupt;
#[cfg(feature = "io-pipe")]
pub mod io;
pub mod parallel;
#[cfg(feature = "progress")]
pub mod progress;
pub mod threading;
pub use gix_trace as trace;
///
#[cfg(feature = "zlib")]
pub mod zlib;
///
pub mod iter {
/// An iterator over chunks of input, producing `Vec<Item>` with a size of `size`, with the last chunk being the remainder and thus
/// potentially smaller than `size`.
pub struct Chunks<I> {
/// The inner iterator to ask for items.
pub inner: I,
/// The size of chunks to produce
pub size: usize,
}
impl<I, Item> Iterator for Chunks<I>
where
I: Iterator<Item = Item>,
{
type Item = Vec<Item>;
fn next(&mut self) -> Option<Self::Item> {
let mut res = Vec::with_capacity(self.size);
let mut items_left = self.size;
for item in &mut self.inner {
res.push(item);
items_left -= 1;
if items_left == 0 {
break;
}
}
(!res.is_empty()).then_some(res)
}
}
}

View File

@@ -0,0 +1,124 @@
/// Evaluate any iterator in their own thread.
///
/// This is particularly useful if the wrapped iterator performs IO and/or heavy computations.
/// Use [`EagerIter::new()`] for instantiation.
pub struct EagerIter<I: Iterator> {
receiver: std::sync::mpsc::Receiver<Vec<I::Item>>,
chunk: Option<std::vec::IntoIter<I::Item>>,
size_hint: (usize, Option<usize>),
}
impl<I> EagerIter<I>
where
I: Iterator + Send + 'static,
<I as Iterator>::Item: Send,
{
/// Return a new `EagerIter` which evaluates `iter` in its own thread,
/// with a given `chunk_size` allowing a maximum `chunks_in_flight`.
///
/// * `chunk_size` describes how many items returned by `iter` will be a single item of this `EagerIter`.
/// This helps to reduce the overhead imposed by transferring many small items.
/// If this number is 1, each item will become a single chunk. 0 is invalid.
/// * `chunks_in_flight` describes how many chunks can be kept in memory in case the consumer of the `EagerIter`s items
/// isn't consuming them fast enough. Setting this number to 0 effectively turns off any caching, but blocks `EagerIter`
/// if its items aren't consumed fast enough.
pub fn new(iter: I, chunk_size: usize, chunks_in_flight: usize) -> Self {
let (sender, receiver) = std::sync::mpsc::sync_channel(chunks_in_flight);
let size_hint = iter.size_hint();
assert!(chunk_size > 0, "non-zero chunk size is needed");
std::thread::spawn(move || {
let mut out = Vec::with_capacity(chunk_size);
for item in iter {
out.push(item);
if out.len() == chunk_size {
if sender.send(out).is_err() {
return;
}
out = Vec::with_capacity(chunk_size);
}
}
if !out.is_empty() {
sender.send(out).ok();
}
});
EagerIter {
receiver,
chunk: None,
size_hint,
}
}
fn fill_buf_and_pop(&mut self) -> Option<I::Item> {
self.chunk = self.receiver.recv().ok().map(|v| {
assert!(!v.is_empty());
v.into_iter()
});
self.chunk.as_mut().and_then(Iterator::next)
}
}
impl<I> Iterator for EagerIter<I>
where
I: Iterator + Send + 'static,
<I as Iterator>::Item: Send,
{
type Item = I::Item;
fn next(&mut self) -> Option<Self::Item> {
match self.chunk.as_mut() {
Some(chunk) => chunk.next().or_else(|| self.fill_buf_and_pop()),
None => self.fill_buf_and_pop(),
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
self.size_hint
}
}
/// An conditional `EagerIter`, which may become a just-in-time iterator running in the main thread depending on a condition.
pub enum EagerIterIf<I: Iterator> {
/// A separate thread will eagerly evaluate iterator `I`.
Eager(EagerIter<I>),
/// The current thread evaluates `I`.
OnDemand(I),
}
impl<I> EagerIterIf<I>
where
I: Iterator + Send + 'static,
<I as Iterator>::Item: Send,
{
/// Return a new `EagerIterIf` if `condition()` returns true.
///
/// For all other parameters, please see [`EagerIter::new()`].
pub fn new(condition: impl FnOnce() -> bool, iter: I, chunk_size: usize, chunks_in_flight: usize) -> Self {
if condition() {
EagerIterIf::Eager(EagerIter::new(iter, chunk_size, chunks_in_flight))
} else {
EagerIterIf::OnDemand(iter)
}
}
}
impl<I> Iterator for EagerIterIf<I>
where
I: Iterator + Send + 'static,
<I as Iterator>::Item: Send,
{
type Item = I::Item;
fn next(&mut self) -> Option<Self::Item> {
match self {
EagerIterIf::OnDemand(i) => i.next(),
EagerIterIf::Eager(i) => i.next(),
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
match self {
EagerIterIf::OnDemand(i) => i.size_hint(),
EagerIterIf::Eager(i) => i.size_hint(),
}
}
}

View File

@@ -0,0 +1,83 @@
use std::{cmp::Ordering, collections::BTreeMap};
/// A counter for items that are in sequence, to be able to put them back into original order later.
pub type SequenceId = usize;
/// An iterator which olds iterated items with a **sequential** ID starting at 0 long enough to dispense them in order.
pub struct InOrderIter<T, I> {
/// The iterator yielding the out-of-order elements we are to yield in order.
pub inner: I,
store: BTreeMap<SequenceId, T>,
next_chunk: SequenceId,
is_done: bool,
}
impl<T, E, I> From<I> for InOrderIter<T, I>
where
I: Iterator<Item = Result<(SequenceId, T), E>>,
{
fn from(iter: I) -> Self {
InOrderIter {
inner: iter,
store: Default::default(),
next_chunk: 0,
is_done: false,
}
}
}
impl<T, E, I> Iterator for InOrderIter<T, I>
where
I: Iterator<Item = Result<(SequenceId, T), E>>,
{
type Item = Result<T, E>;
fn next(&mut self) -> Option<Self::Item> {
if self.is_done {
return None;
}
'find_next_in_sequence: loop {
match self.inner.next() {
Some(Ok((c, v))) => match c.cmp(&self.next_chunk) {
Ordering::Equal => {
self.next_chunk += 1;
return Some(Ok(v));
}
Ordering::Less => {
unreachable!("in a correctly ordered sequence we can never see keys again, got {}", c)
}
Ordering::Greater => {
let previous = self.store.insert(c, v);
assert!(
previous.is_none(),
"Chunks are returned only once, input is an invalid sequence"
);
if let Some(v) = self.store.remove(&self.next_chunk) {
self.next_chunk += 1;
return Some(Ok(v));
}
continue 'find_next_in_sequence;
}
},
Some(Err(e)) => {
self.is_done = true;
self.store.clear();
return Some(Err(e));
}
None => match self.store.remove(&self.next_chunk) {
Some(v) => {
self.next_chunk += 1;
return Some(Ok(v));
}
None => {
debug_assert!(
self.store.is_empty(),
"When iteration is done we should not have stored items left"
);
return None;
}
},
}
}
}
}

View File

@@ -0,0 +1,302 @@
use std::sync::atomic::{AtomicBool, AtomicIsize, AtomicUsize, Ordering};
use crate::parallel::{num_threads, Reduce};
/// A scope to start threads within.
pub type Scope<'scope, 'env> = std::thread::Scope<'scope, 'env>;
/// Runs `left` and `right` in parallel, returning their output when both are done.
pub fn join<O1: Send, O2: Send>(left: impl FnOnce() -> O1 + Send, right: impl FnOnce() -> O2 + Send) -> (O1, O2) {
std::thread::scope(|s| {
let left = std::thread::Builder::new()
.name("gitoxide.join.left".into())
.spawn_scoped(s, left)
.expect("valid name");
let right = std::thread::Builder::new()
.name("gitoxide.join.right".into())
.spawn_scoped(s, right)
.expect("valid name");
(left.join().unwrap(), right.join().unwrap())
})
}
/// Runs `f` with a scope to be used for spawning threads that will not outlive the function call.
/// That way it's possible to handle threads without needing the 'static lifetime for data they interact with.
///
/// Note that the threads should not rely on actual parallelism as threading might be turned off entirely, hence should not
/// connect each other with channels as deadlock would occur in single-threaded mode.
pub fn threads<'env, F, R>(f: F) -> R
where
F: for<'scope> FnOnce(&'scope std::thread::Scope<'scope, 'env>) -> R,
{
std::thread::scope(f)
}
/// Create a builder for threads which allows them to be spawned into a scope and configured prior to spawning.
pub fn build_thread() -> std::thread::Builder {
std::thread::Builder::new()
}
/// Read items from `input` and `consume` them in multiple threads,
/// whose output is collected by a `reducer`. Its task is to
/// aggregate these outputs into the final result returned by this function, with the benefit of not having to be thread-safe.
///
/// * if `thread_limit` is `Some`, then the given number of threads will be used. If `None`, all logical cores will be used.
/// * `new_thread_state(thread_number) -> State` produces thread-local state once per thread to be passed to `consume`
/// * `consume(Item, &mut State) -> Output` produces an output given an input obtained by `input` along with mutable state initially
/// created by `new_thread_state(…)`.
/// * For `reducer`, see the [`Reduce`] trait
pub fn in_parallel<I, S, O, R>(
input: impl Iterator<Item = I> + Send,
thread_limit: Option<usize>,
new_thread_state: impl FnOnce(usize) -> S + Send + Clone,
consume: impl FnMut(I, &mut S) -> O + Send + Clone,
mut reducer: R,
) -> Result<<R as Reduce>::Output, <R as Reduce>::Error>
where
R: Reduce<Input = O>,
I: Send,
O: Send,
{
let num_threads = num_threads(thread_limit);
std::thread::scope(move |s| {
let receive_result = {
let (send_input, receive_input) = crossbeam_channel::bounded::<I>(num_threads);
let (send_result, receive_result) = crossbeam_channel::bounded::<O>(num_threads);
for thread_id in 0..num_threads {
std::thread::Builder::new()
.name(format!("gitoxide.in_parallel.produce.{thread_id}"))
.spawn_scoped(s, {
let send_result = send_result.clone();
let receive_input = receive_input.clone();
let new_thread_state = new_thread_state.clone();
let mut consume = consume.clone();
move || {
let mut state = new_thread_state(thread_id);
for item in receive_input {
if send_result.send(consume(item, &mut state)).is_err() {
break;
}
}
}
})
.expect("valid name");
}
std::thread::Builder::new()
.name("gitoxide.in_parallel.feed".into())
.spawn_scoped(s, move || {
for item in input {
if send_input.send(item).is_err() {
break;
}
}
})
.expect("valid name");
receive_result
};
for item in receive_result {
drop(reducer.feed(item)?);
}
reducer.finalize()
})
}
/// Read items from `input` and `consume` them in multiple threads,
/// whose output is collected by a `reducer`. Its task is to
/// aggregate these outputs into the final result returned by this function with the benefit of not having to be thread-safe.
/// Call `finalize` to finish the computation, once per thread, if there was no error sending results earlier.
///
/// * if `thread_limit` is `Some`, then the given number of threads will be used. If `None`, all logical cores will be used.
/// * `new_thread_state(thread_number) -> State` produces thread-local state once per thread to be passed to `consume`
/// * `consume(Item, &mut State) -> Output` produces an output given an input obtained by `input` along with mutable state initially
/// created by `new_thread_state(…)`.
/// * `finalize(State) -> Output` is called to potentially process remaining work that was placed in `State`.
/// * For `reducer`, see the [`Reduce`] trait
pub fn in_parallel_with_finalize<I, S, O, R>(
input: impl Iterator<Item = I> + Send,
thread_limit: Option<usize>,
new_thread_state: impl FnOnce(usize) -> S + Send + Clone,
consume: impl FnMut(I, &mut S) -> O + Send + Clone,
finalize: impl FnOnce(S) -> O + Send + Clone,
mut reducer: R,
) -> Result<<R as Reduce>::Output, <R as Reduce>::Error>
where
R: Reduce<Input = O>,
I: Send,
O: Send,
{
let num_threads = num_threads(thread_limit);
std::thread::scope(move |s| {
let receive_result = {
let (send_input, receive_input) = crossbeam_channel::bounded::<I>(num_threads);
let (send_result, receive_result) = crossbeam_channel::bounded::<O>(num_threads);
for thread_id in 0..num_threads {
std::thread::Builder::new()
.name(format!("gitoxide.in_parallel.produce.{thread_id}"))
.spawn_scoped(s, {
let send_result = send_result.clone();
let receive_input = receive_input.clone();
let new_thread_state = new_thread_state.clone();
let mut consume = consume.clone();
let finalize = finalize.clone();
move || {
let mut state = new_thread_state(thread_id);
let mut can_send = true;
for item in receive_input {
if send_result.send(consume(item, &mut state)).is_err() {
can_send = false;
break;
}
}
if can_send {
send_result.send(finalize(state)).ok();
}
}
})
.expect("valid name");
}
std::thread::Builder::new()
.name("gitoxide.in_parallel.feed".into())
.spawn_scoped(s, move || {
for item in input {
if send_input.send(item).is_err() {
break;
}
}
})
.expect("valid name");
receive_result
};
for item in receive_result {
drop(reducer.feed(item)?);
}
reducer.finalize()
})
}
/// An experiment to have fine-grained per-item parallelization with built-in aggregation via thread state.
/// This is only good for operations where near-random access isn't detrimental, so it's not usually great
/// for file-io as it won't make use of sorted inputs well.
/// Note that `periodic` is not guaranteed to be called in case other threads come up first and finish too fast.
/// `consume(&mut item, &mut stat, &Scope, &threads_available, &should_interrupt)` is called for performing the actual computation.
/// Note that `threads_available` should be decremented to start a thread that can steal your own work (as stored in `item`),
/// which allows callees to implement their own work-stealing in case the work is distributed unevenly.
/// Work stealing should only start after having processed at least one item to give all threads naturally operating on the slice
/// some time to start. Starting threads while slice-workers are still starting up would lead to over-allocation of threads,
/// which is why the number of threads left may turn negative. Once threads are started and stopped, be sure to adjust
/// the thread-count accordingly.
// TODO: better docs
pub fn in_parallel_with_slice<I, S, R, E>(
input: &mut [I],
thread_limit: Option<usize>,
new_thread_state: impl FnOnce(usize) -> S + Send + Clone,
consume: impl FnMut(&mut I, &mut S, &AtomicIsize, &AtomicBool) -> Result<(), E> + Send + Clone,
mut periodic: impl FnMut() -> Option<std::time::Duration> + Send,
state_to_rval: impl FnOnce(S) -> R + Send + Clone,
) -> Result<Vec<R>, E>
where
I: Send,
E: Send,
R: Send,
{
let num_threads = num_threads(thread_limit);
let mut results = Vec::with_capacity(num_threads);
let stop_everything = &AtomicBool::default();
let index = &AtomicUsize::default();
let threads_left = &AtomicIsize::new(num_threads as isize);
std::thread::scope({
move |s| {
std::thread::Builder::new()
.name("gitoxide.in_parallel_with_slice.watch-interrupts".into())
.spawn_scoped(s, {
move || loop {
if stop_everything.load(Ordering::Relaxed) {
break;
}
match periodic() {
Some(duration) => std::thread::sleep(duration),
None => {
stop_everything.store(true, Ordering::Relaxed);
break;
}
}
}
})
.expect("valid name");
let input_len = input.len();
struct Input<I>(*mut I)
where
I: Send;
// SAFETY: I is Send, and we only use the pointer for creating new
// pointers (within the input slice) from the threads.
#[allow(unsafe_code)]
unsafe impl<I> Send for Input<I> where I: Send {}
let threads: Vec<_> = (0..num_threads)
.map(|thread_id| {
std::thread::Builder::new()
.name(format!("gitoxide.in_parallel_with_slice.produce.{thread_id}"))
.spawn_scoped(s, {
let new_thread_state = new_thread_state.clone();
let state_to_rval = state_to_rval.clone();
let mut consume = consume.clone();
let input = Input(input.as_mut_ptr());
move || {
let _ = &input;
threads_left.fetch_sub(1, Ordering::SeqCst);
let mut state = new_thread_state(thread_id);
let res = (|| {
while let Ok(input_index) =
index.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |x| {
(x < input_len).then_some(x + 1)
})
{
if stop_everything.load(Ordering::Relaxed) {
break;
}
// SAFETY: our atomic counter for `input_index` is only ever incremented, yielding
// each item exactly once.
let item = {
#[allow(unsafe_code)]
unsafe {
&mut *input.0.add(input_index)
}
};
if let Err(err) = consume(item, &mut state, threads_left, stop_everything) {
stop_everything.store(true, Ordering::Relaxed);
return Err(err);
}
}
Ok(state_to_rval(state))
})();
threads_left.fetch_add(1, Ordering::SeqCst);
res
}
})
.expect("valid name")
})
.collect();
for thread in threads {
match thread.join() {
Ok(res) => {
results.push(res?);
}
Err(err) => {
// a panic happened, stop the world gracefully (even though we panic later)
stop_everything.store(true, Ordering::Relaxed);
std::panic::resume_unwind(err);
}
}
}
stop_everything.store(true, Ordering::Relaxed);
Ok(results)
}
})
}

View File

@@ -0,0 +1,178 @@
//! Run computations in parallel, or not based the `parallel` feature toggle.
//!
//! ### `in_parallel`(…)
//!
//! The [`in_parallel(…)`][in_parallel()] is the typical fan-out-fan-in mode of parallelism, with thread local storage
//! made available to a `consume(…)` function to process input. The result is sent to the [`Reduce`] running in the calling
//! thread to aggregate the results into a single output, which is returned by [`in_parallel()`].
//!
//! Interruptions can be achieved by letting the reducers [`feed(…)`][Reduce::feed()] method fail.
//!
//! It gets a boost in usability as it allows threads to borrow variables from the stack, most commonly the repository itself
//! or the data to work on.
//!
//! This mode of operation doesn't lend itself perfectly to being wrapped for `async` as it appears like a single long-running
//! operation which runs as fast as possible, which is cancellable only by merit of stopping the input or stopping the output
//! aggregation.
//!
//! ### `reduce::Stepwise`
//!
//! The [`Stepwise`][reduce::Stepwise] iterator works exactly as [`in_parallel()`] except that the processing of the output produced by
//! `consume(I, &mut State) -> O` is made accessible by the `Iterator` trait's `next()` method. As produced work is not
//! buffered, the owner of the iterator controls the progress made.
//!
//! Getting the final output of the [`Reduce`] is achieved through the consuming [`Stepwise::finalize()`][reduce::Stepwise::finalize()] method, which
//! is functionally equivalent to calling [`in_parallel()`].
//!
//! In an `async` context this means that progress is only made each time `next()` is called on the iterator, while merely dropping
//! the iterator will wind down the computation without any result.
//!
//! #### Maintaining Safety
//!
//! In order to assure that threads don't outlive the data they borrow because their handles are leaked, we enforce
//! the `'static` lifetime for its inputs, making it less intuitive to use. It is, however, possible to produce
//! suitable input iterators as long as they can hold something on the heap.
#[cfg(feature = "parallel")]
mod in_parallel;
#[cfg(feature = "parallel")]
pub use in_parallel::{
build_thread, in_parallel, in_parallel_with_finalize, in_parallel_with_slice, join, threads, Scope,
};
mod serial;
#[cfg(not(feature = "parallel"))]
pub use serial::{build_thread, in_parallel, in_parallel_with_finalize, in_parallel_with_slice, join, threads, Scope};
mod in_order;
pub use in_order::{InOrderIter, SequenceId};
mod eager_iter;
pub use eager_iter::{EagerIter, EagerIterIf};
/// A no-op returning the input _(`desired_chunk_size`, `Some(thread_limit)`, `thread_limit)_ used
/// when the `parallel` feature toggle is not set.
#[cfg(not(feature = "parallel"))]
pub fn optimize_chunk_size_and_thread_limit(
desired_chunk_size: usize,
_num_items: Option<usize>,
thread_limit: Option<usize>,
_available_threads: Option<usize>,
) -> (usize, Option<usize>, usize) {
(desired_chunk_size, thread_limit, num_threads(thread_limit))
}
/// Return the 'optimal' _(`size of chunks`, `amount of threads as Option`, `amount of threads`)_ to use in [`in_parallel()`] for the given
/// `desired_chunk_size`, `num_items`, `thread_limit` and `available_threads`.
///
/// * `desired_chunk_size` is the amount of items per chunk you think should be used.
/// * `num_items` is the total amount of items in the iteration, if `Some`.
/// Otherwise this knowledge will not affect the output of this function.
/// * `thread_limit` is the amount of threads to use at most, if `Some`.
/// Otherwise this knowledge will not affect the output of this function.
/// * `available_threads` is the total amount of threads available, if `Some`.
/// Otherwise the actual amount of available threads is determined by querying the system.
///
/// `Note` that this implementation is available only if the `parallel` feature toggle is set.
#[cfg(feature = "parallel")]
pub fn optimize_chunk_size_and_thread_limit(
desired_chunk_size: usize,
num_items: Option<usize>,
thread_limit: Option<usize>,
available_threads: Option<usize>,
) -> (usize, Option<usize>, usize) {
let available_threads =
available_threads.unwrap_or_else(|| std::thread::available_parallelism().map_or(1, Into::into));
let available_threads = thread_limit.map_or(available_threads, |l| if l == 0 { available_threads } else { l });
let (lower, upper) = (50, 1000);
let (chunk_size, thread_limit) = num_items.map_or(
{
let chunk_size = if available_threads == 1 {
desired_chunk_size
} else if desired_chunk_size < lower {
lower
} else {
desired_chunk_size.min(upper)
};
(chunk_size, available_threads)
},
|num_items| {
let desired_chunks_per_thread_at_least = 2;
let items = num_items;
let chunk_size = (items / (available_threads * desired_chunks_per_thread_at_least)).clamp(1, upper);
let num_chunks = items / chunk_size;
let thread_limit = if num_chunks <= available_threads {
(num_chunks / desired_chunks_per_thread_at_least).max(1)
} else {
available_threads
};
(chunk_size, thread_limit)
},
);
(chunk_size, Some(thread_limit), thread_limit)
}
/// Always returns 1, available when the `parallel` feature toggle is unset.
#[cfg(not(feature = "parallel"))]
pub fn num_threads(_thread_limit: Option<usize>) -> usize {
1
}
/// Returns the amount of threads the system can effectively use as the amount of its logical cores.
///
/// Only available with the `parallel` feature toggle set.
#[cfg(feature = "parallel")]
pub fn num_threads(thread_limit: Option<usize>) -> usize {
let logical_cores = std::thread::available_parallelism().map_or(1, Into::into);
thread_limit.map_or(logical_cores, |l| if l == 0 { logical_cores } else { l })
}
/// Run [`in_parallel()`] only if the given `condition()` returns true when eagerly evaluated.
///
/// For parameters, see the documentation of [`in_parallel()`]
#[cfg(feature = "parallel")]
pub fn in_parallel_if<I, S, O, R>(
condition: impl FnOnce() -> bool,
input: impl Iterator<Item = I> + Send,
thread_limit: Option<usize>,
new_thread_state: impl FnOnce(usize) -> S + Send + Clone,
consume: impl FnMut(I, &mut S) -> O + Send + Clone,
reducer: R,
) -> Result<<R as Reduce>::Output, <R as Reduce>::Error>
where
R: Reduce<Input = O>,
I: Send,
O: Send,
{
if num_threads(thread_limit) > 1 && condition() {
in_parallel(input, thread_limit, new_thread_state, consume, reducer)
} else {
serial::in_parallel(input, thread_limit, new_thread_state, consume, reducer)
}
}
/// Run [`in_parallel()`] only if the given `condition()` returns true when eagerly evaluated.
///
/// For parameters, see the documentation of [`in_parallel()`]
///
/// Note that the non-parallel version is equivalent to [`in_parallel()`].
#[cfg(not(feature = "parallel"))]
pub fn in_parallel_if<I, S, O, R>(
_condition: impl FnOnce() -> bool,
input: impl Iterator<Item = I>,
thread_limit: Option<usize>,
new_thread_state: impl FnOnce(usize) -> S,
consume: impl FnMut(I, &mut S) -> O,
reducer: R,
) -> Result<<R as Reduce>::Output, <R as Reduce>::Error>
where
R: Reduce<Input = O>,
I: Send,
O: Send,
{
serial::in_parallel(input, thread_limit, new_thread_state, consume, reducer)
}
///
pub mod reduce;
pub use reduce::Reduce;

View File

@@ -0,0 +1,279 @@
#[cfg(feature = "parallel")]
mod stepped {
use crate::parallel::num_threads;
/// An iterator adaptor to allow running computations using [`in_parallel()`][crate::parallel::in_parallel()] in a step-wise manner, see the [module docs][crate::parallel]
/// for details.
pub struct Stepwise<Reduce: super::Reduce> {
/// This field is first to assure it's dropped first and cause threads that are dropped next to stop their loops
/// as sending results fails when the receiver is dropped.
receive_result: std::sync::mpsc::Receiver<Reduce::Input>,
/// `join()` will be called on these guards to assure every thread tries to send through a closed channel. When
/// that happens, they break out of their loops.
threads: Vec<std::thread::JoinHandle<()>>,
/// The reducer is called only in the thread using the iterator, dropping it has no side effects.
reducer: Option<Reduce>,
}
impl<Reduce: super::Reduce> Drop for Stepwise<Reduce> {
fn drop(&mut self) {
let (_, sink) = std::sync::mpsc::channel();
drop(std::mem::replace(&mut self.receive_result, sink));
let mut last_err = None;
for handle in std::mem::take(&mut self.threads) {
if let Err(err) = handle.join() {
last_err = Some(err);
};
}
if let Some(thread_err) = last_err {
std::panic::resume_unwind(thread_err);
}
}
}
impl<Reduce: super::Reduce> Stepwise<Reduce> {
/// Instantiate a new iterator and start working in threads.
/// For a description of parameters, see [`in_parallel()`][crate::parallel::in_parallel()].
pub fn new<InputIter, ThreadStateFn, ConsumeFn, I, O, S>(
input: InputIter,
thread_limit: Option<usize>,
new_thread_state: ThreadStateFn,
consume: ConsumeFn,
reducer: Reduce,
) -> Self
where
InputIter: Iterator<Item = I> + Send + 'static,
ThreadStateFn: Fn(usize) -> S + Send + Clone + 'static,
ConsumeFn: Fn(I, &mut S) -> O + Send + Clone + 'static,
Reduce: super::Reduce<Input = O> + 'static,
I: Send + 'static,
O: Send + 'static,
{
let num_threads = num_threads(thread_limit);
let mut threads = Vec::with_capacity(num_threads + 1);
let receive_result = {
let (send_input, receive_input) = crossbeam_channel::bounded::<I>(num_threads);
let (send_result, receive_result) = std::sync::mpsc::sync_channel::<O>(num_threads);
for thread_id in 0..num_threads {
let handle = std::thread::spawn({
let send_result = send_result.clone();
let receive_input = receive_input.clone();
let new_thread_state = new_thread_state.clone();
let consume = consume.clone();
move || {
let mut state = new_thread_state(thread_id);
for item in receive_input {
if send_result.send(consume(item, &mut state)).is_err() {
break;
}
}
}
});
threads.push(handle);
}
threads.push(std::thread::spawn(move || {
for item in input {
if send_input.send(item).is_err() {
break;
}
}
}));
receive_result
};
Stepwise {
threads,
receive_result,
reducer: Some(reducer),
}
}
/// Consume the iterator by finishing its iteration and calling [`Reduce::finalize()`][crate::parallel::Reduce::finalize()].
pub fn finalize(mut self) -> Result<Reduce::Output, Reduce::Error> {
for value in self.by_ref() {
drop(value?);
}
self.reducer
.take()
.expect("this is the last call before consumption")
.finalize()
}
}
impl<Reduce: super::Reduce> Iterator for Stepwise<Reduce> {
type Item = Result<Reduce::FeedProduce, Reduce::Error>;
fn next(&mut self) -> Option<<Self as Iterator>::Item> {
self.receive_result
.recv()
.ok()
.and_then(|input| self.reducer.as_mut().map(|r| r.feed(input)))
}
}
impl<R: super::Reduce> super::Finalize for Stepwise<R> {
type Reduce = R;
fn finalize(
self,
) -> Result<
<<Self as super::Finalize>::Reduce as super::Reduce>::Output,
<<Self as super::Finalize>::Reduce as super::Reduce>::Error,
> {
Stepwise::finalize(self)
}
}
}
#[cfg(not(feature = "parallel"))]
mod stepped {
/// An iterator adaptor to allow running computations using [`in_parallel()`][crate::parallel::in_parallel()] in a step-wise manner, see the [module docs][crate::parallel]
/// for details.
pub struct Stepwise<InputIter, ConsumeFn, ThreadState, Reduce> {
input: InputIter,
consume: ConsumeFn,
thread_state: ThreadState,
reducer: Reduce,
}
impl<InputIter, ConsumeFn, Reduce, I, O, S> Stepwise<InputIter, ConsumeFn, S, Reduce>
where
InputIter: Iterator<Item = I>,
ConsumeFn: Fn(I, &mut S) -> O,
Reduce: super::Reduce<Input = O>,
{
/// Instantiate a new iterator.
/// For a description of parameters, see [`in_parallel()`][crate::parallel::in_parallel()].
pub fn new<ThreadStateFn>(
input: InputIter,
_thread_limit: Option<usize>,
new_thread_state: ThreadStateFn,
consume: ConsumeFn,
reducer: Reduce,
) -> Self
where
ThreadStateFn: Fn(usize) -> S,
{
Stepwise {
input,
consume,
thread_state: new_thread_state(0),
reducer,
}
}
/// Consume the iterator by finishing its iteration and calling [`Reduce::finalize()`][crate::parallel::Reduce::finalize()].
pub fn finalize(mut self) -> Result<Reduce::Output, Reduce::Error> {
for value in self.by_ref() {
drop(value?);
}
self.reducer.finalize()
}
}
impl<InputIter, ConsumeFn, ThreadState, Reduce, I, O> Iterator for Stepwise<InputIter, ConsumeFn, ThreadState, Reduce>
where
InputIter: Iterator<Item = I>,
ConsumeFn: Fn(I, &mut ThreadState) -> O,
Reduce: super::Reduce<Input = O>,
{
type Item = Result<Reduce::FeedProduce, Reduce::Error>;
fn next(&mut self) -> Option<<Self as Iterator>::Item> {
self.input
.next()
.map(|input| self.reducer.feed((self.consume)(input, &mut self.thread_state)))
}
}
impl<InputIter, ConsumeFn, R, I, O, S> super::Finalize for Stepwise<InputIter, ConsumeFn, S, R>
where
InputIter: Iterator<Item = I>,
ConsumeFn: Fn(I, &mut S) -> O,
R: super::Reduce<Input = O>,
{
type Reduce = R;
fn finalize(
self,
) -> Result<
<<Self as super::Finalize>::Reduce as super::Reduce>::Output,
<<Self as super::Finalize>::Reduce as super::Reduce>::Error,
> {
Stepwise::finalize(self)
}
}
}
use std::marker::PhantomData;
pub use stepped::Stepwise;
/// An trait for aggregating items commonly produced in threads into a single result, without itself
/// needing to be thread safe.
pub trait Reduce {
/// The type fed to the reducer in the [`feed()`][Reduce::feed()] method.
///
/// It's produced by a function that may run on multiple threads.
type Input;
/// The type produced in Ok(…) by [`feed()`][Reduce::feed()].
/// Most reducers by nature use `()` here as the value is in the aggregation.
/// However, some may use it to collect statistics only and return their Input
/// in some form as a result here for [`Stepwise`] to be useful.
type FeedProduce;
/// The type produced once by the [`finalize()`][Reduce::finalize()] method.
///
/// For traditional reducers, this is the value produced by the entire operation.
/// For those made for step-wise iteration this may be aggregated statistics.
type Output;
/// The error type to use for all methods of this trait.
type Error;
/// Called each time a new `item` was produced in order to aggregate it into the final result.
///
/// If an `Error` is returned, the entire operation will be stopped.
fn feed(&mut self, item: Self::Input) -> Result<Self::FeedProduce, Self::Error>;
/// Called once for all items that were passed to `feed()`, producing the final `Output` of the operation or an `Error`.
fn finalize(self) -> Result<Self::Output, Self::Error>;
}
/// An identity reducer for those who want to use [`Stepwise`] or [`in_parallel()`][crate::parallel::in_parallel()]
/// without the use of non-threaded reduction of products created in threads.
pub struct IdentityWithResult<Input, Error> {
_input: PhantomData<Input>,
_error: PhantomData<Error>,
}
impl<Input, Error> Default for IdentityWithResult<Input, Error> {
fn default() -> Self {
IdentityWithResult {
_input: Default::default(),
_error: Default::default(),
}
}
}
impl<Input, Error> Reduce for IdentityWithResult<Input, Error> {
type Input = Result<Input, Self::Error>;
type FeedProduce = Input;
type Output = ();
type Error = Error;
fn feed(&mut self, item: Self::Input) -> Result<Self::FeedProduce, Self::Error> {
item
}
fn finalize(self) -> Result<Self::Output, Self::Error> {
Ok(())
}
}
/// A trait reflecting the `finalize()` method of [`Reduce`] implementations
pub trait Finalize {
/// An implementation of [`Reduce`]
type Reduce: self::Reduce;
/// Similar to the [`Reduce::finalize()`] method
fn finalize(
self,
) -> Result<<<Self as Finalize>::Reduce as self::Reduce>::Output, <<Self as Finalize>::Reduce as self::Reduce>::Error>;
}

View File

@@ -0,0 +1,174 @@
use crate::parallel::Reduce;
#[cfg(not(feature = "parallel"))]
mod not_parallel {
use std::sync::atomic::{AtomicBool, AtomicIsize};
/// Runs `left` and then `right`, one after another, returning their output when both are done.
pub fn join<O1, O2>(left: impl FnOnce() -> O1, right: impl FnOnce() -> O2) -> (O1, O2) {
(left(), right())
}
/// A scope for spawning threads.
pub struct Scope<'scope, 'env: 'scope> {
_scope: std::marker::PhantomData<&'scope mut &'scope ()>,
_env: std::marker::PhantomData<&'env mut &'env ()>,
}
pub struct ThreadBuilder;
/// Create a builder for threads which allows them to be spawned into a scope and configured prior to spawning.
pub fn build_thread() -> ThreadBuilder {
ThreadBuilder
}
#[allow(unsafe_code)]
unsafe impl Sync for Scope<'_, '_> {}
impl ThreadBuilder {
pub fn name(self, _new: String) -> Self {
self
}
pub fn spawn_scoped<'scope, 'env, F, T>(
&self,
scope: &'scope Scope<'scope, 'env>,
f: F,
) -> std::io::Result<ScopedJoinHandle<'scope, T>>
where
F: FnOnce() -> T + 'scope,
T: 'scope,
{
Ok(scope.spawn(f))
}
}
impl<'scope> Scope<'scope, '_> {
/// Provided with this scope, let `f` start new threads that live within it.
pub fn spawn<F, T>(&'scope self, f: F) -> ScopedJoinHandle<'scope, T>
where
F: FnOnce() -> T + 'scope,
T: 'scope,
{
ScopedJoinHandle {
result: f(),
_marker: Default::default(),
}
}
}
/// Runs `f` with a scope to be used for spawning threads that will not outlive the function call.
/// Note that this implementation will run the spawned functions immediately.
pub fn threads<'env, F, R>(f: F) -> R
where
F: for<'scope> FnOnce(&'scope Scope<'scope, 'env>) -> R,
{
f(&Scope {
_scope: Default::default(),
_env: Default::default(),
})
}
/// A handle that can be used to join its scoped thread.
///
/// This struct is created by the [`Scope::spawn`] method and the
/// [`ScopedThreadBuilder::spawn`] method.
pub struct ScopedJoinHandle<'scope, T> {
/// Holds the result of the inner closure.
result: T,
_marker: std::marker::PhantomData<&'scope mut &'scope ()>,
}
impl<T> ScopedJoinHandle<'_, T> {
pub fn join(self) -> std::thread::Result<T> {
Ok(self.result)
}
pub fn is_finished(&self) -> bool {
true
}
}
/// An experiment to have fine-grained per-item parallelization with built-in aggregation via thread state.
/// This is only good for operations where near-random access isn't detrimental, so it's not usually great
/// for file-io as it won't make use of sorted inputs well.
// TODO: better docs
pub fn in_parallel_with_slice<I, S, R, E>(
input: &mut [I],
_thread_limit: Option<usize>,
new_thread_state: impl FnOnce(usize) -> S + Clone,
mut consume: impl FnMut(&mut I, &mut S, &AtomicIsize, &AtomicBool) -> Result<(), E> + Clone,
mut periodic: impl FnMut() -> Option<std::time::Duration>,
state_to_rval: impl FnOnce(S) -> R + Clone,
) -> Result<Vec<R>, E> {
let mut state = new_thread_state(0);
let should_interrupt = &AtomicBool::default();
let threads_left = &AtomicIsize::default();
for item in input {
consume(item, &mut state, threads_left, should_interrupt)?;
if periodic().is_none() {
break;
}
}
Ok(vec![state_to_rval(state)])
}
}
#[cfg(not(feature = "parallel"))]
pub use not_parallel::{build_thread, in_parallel_with_slice, join, threads, Scope};
/// Read items from `input` and `consume` them in a single thread, producing an output to be collected by a `reducer`,
/// whose task it is to aggregate these outputs into the final result returned by this function.
///
/// * `new_thread_state(thread_number) -> State` produces thread-local state once per thread to be passed to `consume`
/// * `consume(Item, &mut State) -> Output` produces an output given an input along with mutable state.
/// * For `reducer`, see the [`Reduce`] trait
/// * `thread_limit` has no effect as everything is run on the main thread, but is present to keep the signature
/// similar to the parallel version.
///
/// **This serial version performing all calculations on the current thread.**
pub fn in_parallel<I, S, O, R>(
input: impl Iterator<Item = I>,
_thread_limit: Option<usize>,
new_thread_state: impl FnOnce(usize) -> S,
mut consume: impl FnMut(I, &mut S) -> O,
mut reducer: R,
) -> Result<<R as Reduce>::Output, <R as Reduce>::Error>
where
R: Reduce<Input = O>,
{
let mut state = new_thread_state(0);
for item in input {
drop(reducer.feed(consume(item, &mut state))?);
}
reducer.finalize()
}
/// Read items from `input` and `consume` them in multiple threads,
/// whose output is collected by a `reducer`. Its task is to
/// aggregate these outputs into the final result returned by this function with the benefit of not having to be thread-safe.
/// Call `finalize` to finish the computation, once per thread, if there was no error sending results earlier.
///
/// * if `thread_limit` is `Some`, the given number of threads will be used. If `None`, all logical cores will be used.
/// * `new_thread_state(thread_number) -> State` produces thread-local state once per thread to be passed to `consume`
/// * `consume(Item, &mut State) -> Output` produces an output given an input obtained by `input` along with mutable state initially
/// created by `new_thread_state(…)`.
/// * `finalize(State) -> Output` is called to potentially process remaining work that was placed in `State`.
/// * For `reducer`, see the [`Reduce`] trait
#[cfg(not(feature = "parallel"))]
pub fn in_parallel_with_finalize<I, S, O, R>(
input: impl Iterator<Item = I>,
_thread_limit: Option<usize>,
new_thread_state: impl FnOnce(usize) -> S,
mut consume: impl FnMut(I, &mut S) -> O,
finalize: impl FnOnce(S) -> O + Send + Clone,
mut reducer: R,
) -> Result<<R as Reduce>::Output, <R as Reduce>::Error>
where
R: Reduce<Input = O>,
{
let mut state = new_thread_state(0);
for item in input {
drop(reducer.feed(consume(item, &mut state))?);
}
reducer.feed(finalize(state))?;
reducer.finalize()
}

View File

@@ -0,0 +1,150 @@
//! Various `prodash` types along with various utilities for comfort.
use std::io;
#[cfg(feature = "progress-unit-bytes")]
pub use bytesize;
pub use prodash::{
self,
messages::MessageLevel,
progress::{
AtomicStep, Discard, DoOrDiscard, Either, Id, Step, StepShared, Task, ThroughputOnDrop, Value, UNKNOWN,
},
unit, BoxedDynNestedProgress, Count, DynNestedProgress, DynNestedProgressToNestedProgress, NestedProgress,
Progress, Unit,
};
/// A stub for the portions of the `bytesize` crate that we use internally in `gitoxide`.
#[cfg(not(feature = "progress-unit-bytes"))]
pub mod bytesize {
/// A stub for the `ByteSize` wrapper.
pub struct ByteSize(pub u64);
impl std::fmt::Display for ByteSize {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.0.fmt(f)
}
}
}
/// A unit for displaying bytes with throughput and progress percentage.
#[cfg(feature = "progress-unit-bytes")]
pub fn bytes() -> Option<Unit> {
Some(unit::dynamic_and_mode(
unit::Bytes,
unit::display::Mode::with_throughput().and_percentage(),
))
}
/// A unit for displaying bytes with throughput and progress percentage.
#[cfg(not(feature = "progress-unit-bytes"))]
pub fn bytes() -> Option<Unit> {
Some(unit::label_and_mode(
"B",
unit::display::Mode::with_throughput().and_percentage(),
))
}
/// A unit for displaying human readable numbers with throughput and progress percentage, and a single decimal place.
pub fn count(name: &'static str) -> Option<Unit> {
count_with_decimals(name, 1)
}
/// A unit for displaying human readable numbers with `name` suffix,
/// with throughput and progress percentage, and `decimals` decimal places.
#[cfg(feature = "progress-unit-human-numbers")]
pub fn count_with_decimals(name: &'static str, decimals: usize) -> Option<Unit> {
Some(unit::dynamic_and_mode(
unit::Human::new(
{
let mut f = unit::human::Formatter::new();
f.with_decimals(decimals);
f
},
name,
),
unit::display::Mode::with_throughput().and_percentage(),
))
}
/// A unit for displaying human readable numbers with `name` suffix,
/// with throughput and progress percentage, and `decimals` decimal places.
#[cfg(not(feature = "progress-unit-human-numbers"))]
pub fn count_with_decimals(name: &'static str, _decimals: usize) -> Option<Unit> {
Some(unit::label_and_mode(
name,
unit::display::Mode::with_throughput().and_percentage(),
))
}
/// A predefined unit for displaying a multi-step progress
pub fn steps() -> Option<Unit> {
Some(unit::dynamic(unit::Range::new("steps")))
}
/// A structure passing every [`read`](std::io::Read::read()) call through to the contained Progress instance using [`inc_by(bytes_read)`](Count::inc_by()).
pub struct Read<T, P> {
/// The implementor of [`std::io::Read`] to which progress is added
pub inner: T,
/// The progress instance receiving progress information on each invocation of `reader`
pub progress: P,
}
impl<T, P> io::Read for Read<T, P>
where
T: io::Read,
P: Progress,
{
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
let bytes_read = self.inner.read(buf)?;
self.progress.inc_by(bytes_read);
Ok(bytes_read)
}
}
impl<T, P> io::BufRead for Read<T, P>
where
T: io::BufRead,
P: Progress,
{
fn fill_buf(&mut self) -> io::Result<&[u8]> {
self.inner.fill_buf()
}
fn consume(&mut self, amt: usize) {
self.inner.consume(amt);
}
}
/// A structure passing every [`write`][std::io::Write::write()] call through to the contained Progress instance using [`inc_by(bytes_written)`](Count::inc_by()).
///
/// This is particularly useful if the final size of the bytes to write is known or can be estimated precisely enough.
pub struct Write<T, P> {
/// The implementor of [`std::io::Write`] to which progress is added
pub inner: T,
/// The progress instance receiving progress information on each invocation of `reader`
pub progress: P,
}
impl<T, P> io::Write for Write<T, P>
where
T: io::Write,
P: Progress,
{
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
let written = self.inner.write(buf)?;
self.progress.inc_by(written);
Ok(written)
}
fn flush(&mut self) -> io::Result<()> {
self.inner.flush()
}
}
impl<T, P> io::Seek for Write<T, P>
where
T: io::Seek,
{
fn seek(&mut self, pos: io::SeekFrom) -> io::Result<u64> {
self.inner.seek(pos)
}
}

View File

@@ -0,0 +1,116 @@
//! Type definitions for putting shared ownership and synchronized mutation behind the `threading` feature toggle.
//!
//! That way, single-threaded applications will not have to use thread-safe primitives, and simply do not specify the 'threading' feature.
#[cfg(feature = "parallel")]
mod _impl {
use std::sync::Arc;
/// A thread-safe cell which can be written to only once.
///
/// Note: We use `once_cell` here because `std::sync::OnceLock::get_or_try_init()` is not yet stable.
/// Once it's stabilized, we can switch to `std::sync::OnceLock`.
#[cfg(feature = "once_cell")]
pub type OnceCell<T> = once_cell::sync::OnceCell<T>;
/// A reference counted pointer type for shared ownership.
pub type OwnShared<T> = Arc<T>;
/// A synchronization primitive which can start read-only and transition to support mutation.
pub type MutableOnDemand<T> = parking_lot::RwLock<T>;
/// A synchronization primitive which provides read-write access right away.
pub type Mutable<T> = parking_lot::Mutex<T>;
/// A guarded reference suitable for safekeeping in a struct.
pub type RefGuard<'a, T> = parking_lot::RwLockReadGuard<'a, T>;
/// A mapped reference created from a `RefGuard`
pub type MappedRefGuard<'a, U> = parking_lot::MappedRwLockReadGuard<'a, U>;
/// Get a shared reference through a [`MutableOnDemand`] for read-only access.
pub fn get_ref<T>(v: &MutableOnDemand<T>) -> RefGuard<'_, T> {
v.read()
}
/// Get a mutable reference through a [`MutableOnDemand`] for read-write access.
pub fn get_mut<T>(v: &MutableOnDemand<T>) -> parking_lot::RwLockWriteGuard<'_, T> {
v.write()
}
/// Get a mutable reference to the underlying data, with semantics similar to [Arc::make_mut()].
pub fn make_mut<T: Clone>(this: &mut OwnShared<T>) -> &mut T {
OwnShared::make_mut(this)
}
/// Get a mutable reference through a [`Mutable`] for read-write access.
pub fn lock<T>(v: &Mutable<T>) -> parking_lot::MutexGuard<'_, T> {
v.lock()
}
/// Downgrade a handle previously obtained with [`get_mut()`] to drop mutation support.
pub fn downgrade_mut_to_ref<'a, T>(
v: parking_lot::RwLockWriteGuard<'a, T>,
_orig: &'a MutableOnDemand<T>,
) -> RefGuard<'a, T> {
parking_lot::RwLockWriteGuard::downgrade(v)
}
/// Map a read guard into a sub-type it contains.
pub fn map_ref<T, U: ?Sized>(v: RefGuard<'_, T>, f: impl FnOnce(&T) -> &U) -> MappedRefGuard<'_, U> {
parking_lot::RwLockReadGuard::map(v, f)
}
}
#[cfg(not(feature = "parallel"))]
mod _impl {
use std::{
cell::{Ref, RefCell, RefMut},
rc::Rc,
};
/// A thread-safe cell which can be written to only once.
///
/// Note: We use `once_cell` here because `std::cell::OnceCell::get_or_try_init()` is not yet stable.
/// Once it's stabilized, we can switch to `std::cell::OnceCell`.
#[cfg(feature = "once_cell")]
pub type OnceCell<T> = once_cell::unsync::OnceCell<T>;
/// A reference counted pointer type for shared ownership.
pub type OwnShared<T> = Rc<T>;
/// A synchronization primitive which can start read-only and transition to support mutation.
pub type MutableOnDemand<T> = RefCell<T>;
/// A synchronization primitive which provides read-write access right away.
pub type Mutable<T> = RefCell<T>;
/// A guarded reference suitable for safekeeping in a struct.
pub type RefGuard<'a, T> = Ref<'a, T>;
/// A mapped reference created from a RefGuard
pub type MappedRefGuard<'a, U> = Ref<'a, U>;
/// Get a shared reference through a [`MutableOnDemand`] for read-only access.
pub fn get_mut<T>(v: &RefCell<T>) -> RefMut<'_, T> {
v.borrow_mut()
}
/// Get a mutable reference to the underlying data, with semantics similar to [Rc::make_mut()].
pub fn make_mut<T: Clone>(this: &mut OwnShared<T>) -> &mut T {
OwnShared::make_mut(this)
}
/// Get a mutable reference through a [`Mutable`] for read-write access.
pub fn lock<T>(v: &Mutable<T>) -> RefMut<'_, T> {
v.borrow_mut()
}
/// Get a mutable reference through a [`MutableOnDemand`] for read-write access.
pub fn get_ref<T>(v: &RefCell<T>) -> RefGuard<'_, T> {
v.borrow()
}
/// Downgrade a handle previously obtained with [`upgrade_ref_to_mut()`] to drop mutation support.
pub fn downgrade_mut_to_ref<'a, T>(v: RefMut<'a, T>, orig: &'a RefCell<T>) -> RefGuard<'a, T> {
drop(v);
orig.borrow()
}
/// Map a read guard into a sub-type it contains.
pub fn map_ref<T, U: ?Sized>(v: RefGuard<'_, T>, f: impl FnOnce(&T) -> &U) -> MappedRefGuard<'_, U> {
Ref::map(v, f)
}
}
pub use _impl::*;

View File

@@ -0,0 +1,164 @@
use zlib_rs::InflateError;
/// A type to hold all state needed for decompressing a ZLIB encoded stream.
pub struct Decompress(zlib_rs::Inflate);
impl Default for Decompress {
fn default() -> Self {
Self::new()
}
}
impl Decompress {
/// The amount of bytes consumed from the input so far.
pub fn total_in(&self) -> u64 {
self.0.total_in()
}
/// The amount of decompressed bytes that have been written to the output thus far.
pub fn total_out(&self) -> u64 {
self.0.total_out()
}
/// Create a new instance. Note that it allocates in various ways and thus should be re-used.
pub fn new() -> Self {
let config = zlib_rs::InflateConfig::default();
let header = true;
let inner = zlib_rs::Inflate::new(header, config.window_bits as u8);
Self(inner)
}
/// Reset the state to allow handling a new stream.
pub fn reset(&mut self) {
self.0.reset(true);
}
/// Decompress `input` and write all decompressed bytes into `output`, with `flush` defining some details about this.
pub fn decompress(
&mut self,
input: &[u8],
output: &mut [u8],
flush: FlushDecompress,
) -> Result<Status, DecompressError> {
let inflate_flush = match flush {
FlushDecompress::None => zlib_rs::InflateFlush::NoFlush,
FlushDecompress::Sync => zlib_rs::InflateFlush::SyncFlush,
FlushDecompress::Finish => zlib_rs::InflateFlush::Finish,
};
let status = self.0.decompress(input, output, inflate_flush)?;
match status {
zlib_rs::Status::Ok => Ok(Status::Ok),
zlib_rs::Status::BufError => Ok(Status::BufError),
zlib_rs::Status::StreamEnd => Ok(Status::StreamEnd),
}
}
}
/// The error produced by [`Decompress::decompress()`].
#[derive(Debug, thiserror::Error)]
#[allow(missing_docs)]
pub enum DecompressError {
#[error("stream error")]
StreamError,
#[error("Not enough memory")]
InsufficientMemory,
#[error("Invalid input data")]
DataError,
#[error("Decompressing this input requires a dictionary")]
NeedDict,
}
impl From<zlib_rs::InflateError> for DecompressError {
fn from(value: InflateError) -> Self {
match value {
InflateError::NeedDict { .. } => DecompressError::NeedDict,
InflateError::StreamError => DecompressError::StreamError,
InflateError::DataError => DecompressError::DataError,
InflateError::MemError => DecompressError::InsufficientMemory,
}
}
}
/// The status returned by [`Decompress::decompress()`].
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Status {
/// The decompress operation went well. Not to be confused with `StreamEnd`, so one can continue
/// the decompression.
Ok,
/// An error occurred when decompression.
BufError,
/// The stream was fully decompressed.
StreamEnd,
}
/// Values which indicate the form of flushing to be used when
/// decompressing in-memory data.
#[derive(Copy, Clone, PartialEq, Eq, Debug)]
#[non_exhaustive]
#[allow(clippy::unnecessary_cast)]
pub enum FlushDecompress {
/// A typical parameter for passing to compression/decompression functions,
/// this indicates that the underlying stream to decide how much data to
/// accumulate before producing output in order to maximize compression.
None = 0,
/// All pending output is flushed to the output buffer and the output is
/// aligned on a byte boundary so that the decompressor can get all input
/// data available so far.
///
/// Flushing may degrade compression for some compression algorithms and so
/// it should only be used when necessary. This will complete the current
/// deflate block and follow it with an empty stored block.
Sync = 2,
/// Pending input is processed and pending output is flushed.
///
/// The return value may indicate that the stream is not yet done and more
/// data has yet to be processed.
Finish = 4,
}
/// non-streaming interfaces for decompression
pub mod inflate {
/// The error returned by various [Inflate methods][super::Inflate]
#[derive(Debug, thiserror::Error)]
#[allow(missing_docs)]
pub enum Error {
#[error("Could not write all bytes when decompressing content")]
WriteInflated(#[from] std::io::Error),
#[error("Could not decode zip stream, status was '{0}'")]
Inflate(#[from] super::DecompressError),
#[error("The zlib status indicated an error, status was '{0:?}'")]
Status(super::Status),
}
}
/// Decompress a few bytes of a zlib stream without allocation
#[derive(Default)]
pub struct Inflate {
/// The actual decompressor doing all the work.
pub state: Decompress,
}
impl Inflate {
/// Run the decompressor exactly once. Cannot be run multiple times
pub fn once(&mut self, input: &[u8], out: &mut [u8]) -> Result<(Status, usize, usize), inflate::Error> {
let before_in = self.state.total_in();
let before_out = self.state.total_out();
let status = self.state.decompress(input, out, FlushDecompress::None)?;
Ok((
status,
(self.state.total_in() - before_in) as usize,
(self.state.total_out() - before_out) as usize,
))
}
/// Ready this instance for decoding another data stream.
pub fn reset(&mut self) {
self.state.reset();
}
}
///
pub mod stream;

View File

@@ -0,0 +1,230 @@
use crate::zlib::Status;
use zlib_rs::DeflateError;
const BUF_SIZE: usize = 4096 * 8;
/// A utility to zlib compress anything that is written via its [Write][std::io::Write] implementation.
///
/// Be sure to call `flush()` when done to finalize the deflate stream.
pub struct Write<W> {
compressor: Compress,
inner: W,
buf: [u8; BUF_SIZE],
}
impl<W> Clone for Write<W>
where
W: Clone,
{
fn clone(&self) -> Self {
Write {
compressor: impls::new_compress(),
inner: self.inner.clone(),
buf: self.buf,
}
}
}
/// Hold all state needed for compressing data.
pub struct Compress(zlib_rs::Deflate);
impl Default for Compress {
fn default() -> Self {
Self::new()
}
}
impl Compress {
/// The number of bytes that were read from the input.
pub fn total_in(&self) -> u64 {
self.0.total_in()
}
/// The number of compressed bytes that were written to the output.
pub fn total_out(&self) -> u64 {
self.0.total_out()
}
/// Create a new instance - this allocates so should be done with care.
pub fn new() -> Self {
let config = zlib_rs::DeflateConfig::best_speed();
let header = true;
let inner = zlib_rs::Deflate::new(config.level, header, config.window_bits as u8);
Self(inner)
}
/// Prepare the instance for a new stream.
pub fn reset(&mut self) {
self.0.reset();
}
/// Compress `input` and write compressed bytes to `output`, with `flush` controlling additional characteristics.
pub fn compress(&mut self, input: &[u8], output: &mut [u8], flush: FlushCompress) -> Result<Status, CompressError> {
let flush = match flush {
FlushCompress::None => zlib_rs::DeflateFlush::NoFlush,
FlushCompress::Partial => zlib_rs::DeflateFlush::PartialFlush,
FlushCompress::Sync => zlib_rs::DeflateFlush::SyncFlush,
FlushCompress::Full => zlib_rs::DeflateFlush::FullFlush,
FlushCompress::Finish => zlib_rs::DeflateFlush::Finish,
};
let status = self.0.compress(input, output, flush)?;
match status {
zlib_rs::Status::Ok => Ok(Status::Ok),
zlib_rs::Status::BufError => Ok(Status::BufError),
zlib_rs::Status::StreamEnd => Ok(Status::StreamEnd),
}
}
}
/// The error produced by [`Compress::compress()`].
#[derive(Debug, thiserror::Error)]
#[allow(missing_docs)]
pub enum CompressError {
#[error("stream error")]
StreamError,
#[error("The input is not a valid deflate stream.")]
DataError,
#[error("Not enough memory")]
InsufficientMemory,
}
impl From<zlib_rs::DeflateError> for CompressError {
fn from(value: zlib_rs::DeflateError) -> Self {
match value {
DeflateError::StreamError => CompressError::StreamError,
DeflateError::DataError => CompressError::DataError,
DeflateError::MemError => CompressError::InsufficientMemory,
}
}
}
/// Values which indicate the form of flushing to be used when compressing
/// in-memory data.
#[derive(Copy, Clone, PartialEq, Eq, Debug)]
#[non_exhaustive]
#[allow(clippy::unnecessary_cast)]
pub enum FlushCompress {
/// A typical parameter for passing to compression/decompression functions,
/// this indicates that the underlying stream to decide how much data to
/// accumulate before producing output in order to maximize compression.
None = 0,
/// All pending output is flushed to the output buffer, but the output is
/// not aligned to a byte boundary.
///
/// All input data so far will be available to the decompressor (as with
/// `Flush::Sync`). This completes the current deflate block and follows it
/// with an empty fixed codes block that is 10 bits long, and it assures
/// that enough bytes are output in order for the decompressor to finish the
/// block before the empty fixed code block.
Partial = 1,
/// All pending output is flushed to the output buffer and the output is
/// aligned on a byte boundary so that the decompressor can get all input
/// data available so far.
///
/// Flushing may degrade compression for some compression algorithms and so
/// it should only be used when necessary. This will complete the current
/// deflate block and follow it with an empty stored block.
Sync = 2,
/// All output is flushed as with `Flush::Sync` and the compression state is
/// reset so decompression can restart from this point if previous
/// compressed data has been damaged or if random access is desired.
///
/// Using this option too often can seriously degrade compression.
Full = 3,
/// Pending input is processed and pending output is flushed.
///
/// The return value may indicate that the stream is not yet done and more
/// data has yet to be processed.
Finish = 4,
}
mod impls {
use std::io;
use crate::zlib::stream::deflate::{self, Compress, FlushCompress};
use crate::zlib::Status;
pub(crate) fn new_compress() -> Compress {
Compress::new()
}
impl<W> deflate::Write<W>
where
W: io::Write,
{
/// Create a new instance writing compressed bytes to `inner`.
pub fn new(inner: W) -> deflate::Write<W> {
deflate::Write {
compressor: new_compress(),
inner,
buf: [0; deflate::BUF_SIZE],
}
}
/// Reset the compressor, starting a new compression stream.
///
/// That way multiple streams can be written to the same inner writer.
pub fn reset(&mut self) {
self.compressor.reset();
}
/// Consume `self` and return the inner writer.
pub fn into_inner(self) -> W {
self.inner
}
fn write_inner(&mut self, mut buf: &[u8], flush: FlushCompress) -> io::Result<usize> {
let total_in_when_start = self.compressor.total_in();
loop {
let last_total_in = self.compressor.total_in();
let last_total_out = self.compressor.total_out();
let status = self
.compressor
.compress(buf, &mut self.buf, flush)
.map_err(io::Error::other)?;
let written = self.compressor.total_out() - last_total_out;
if written > 0 {
self.inner.write_all(&self.buf[..written as usize])?;
}
match status {
Status::StreamEnd => return Ok((self.compressor.total_in() - total_in_when_start) as usize),
Status::Ok | Status::BufError => {
let consumed = self.compressor.total_in() - last_total_in;
buf = &buf[consumed as usize..];
// output buffer still makes progress
if self.compressor.total_out() > last_total_out {
continue;
}
// input still makes progress
if self.compressor.total_in() > last_total_in {
continue;
}
// input also makes no progress anymore, need more so leave with what we have
return Ok((self.compressor.total_in() - total_in_when_start) as usize);
}
}
}
}
}
impl<W: io::Write> io::Write for deflate::Write<W> {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.write_inner(buf, FlushCompress::None)
}
fn flush(&mut self) -> io::Result<()> {
self.write_inner(&[], FlushCompress::Finish).map(|_| ())
}
}
}
#[cfg(test)]
mod tests;

View File

@@ -0,0 +1,102 @@
mod deflate_stream {
use std::{
io,
io::{Read, Write},
};
use bstr::ByteSlice;
use crate::zlib::stream::deflate;
use crate::zlib::Decompress;
/// Provide streaming decompression using the `std::io::Read` trait.
/// If `std::io::BufReader` is used, an allocation for the input buffer will be performed.
struct InflateReader<R> {
inner: R,
decompressor: Decompress,
}
impl<R> InflateReader<R>
where
R: io::BufRead,
{
pub fn from_read(read: R) -> InflateReader<R> {
InflateReader {
decompressor: Decompress::new(),
inner: read,
}
}
}
impl<R> io::Read for InflateReader<R>
where
R: io::BufRead,
{
fn read(&mut self, into: &mut [u8]) -> io::Result<usize> {
crate::zlib::stream::inflate::read(&mut self.inner, &mut self.decompressor, into)
}
}
#[test]
fn small_file_decompress() -> Result<(), Box<dyn std::error::Error>> {
fn fixture_path(path: &str) -> std::path::PathBuf {
std::path::PathBuf::from("tests/fixtures").join(path)
}
let r = InflateReader::from_read(io::BufReader::new(std::fs::File::open(fixture_path(
"objects/37/d4e6c5c48ba0d245164c4e10d5f41140cab980",
))?));
#[allow(clippy::unbuffered_bytes)]
let mut bytes = r.bytes();
let content = bytes.by_ref().take(16).collect::<Result<Vec<_>, _>>()?;
assert_eq!(content.as_slice().as_bstr(), b"blob 9\0hi there\n".as_bstr());
assert!(bytes.next().is_none());
Ok(())
}
#[test]
fn all_at_once() -> Result<(), Box<dyn std::error::Error>> {
let mut w = deflate::Write::new(Vec::new());
assert_eq!(w.write(b"hello")?, 5);
w.flush()?;
let out = w.inner;
assert!(out.len() == 12 || out.len() == 13);
assert_deflate_buffer(out, b"hello")
}
fn assert_deflate_buffer(out: Vec<u8>, expected: &[u8]) -> Result<(), Box<dyn std::error::Error>> {
let mut actual = Vec::new();
InflateReader::from_read(out.as_slice()).read_to_end(&mut actual)?;
assert_eq!(actual, expected);
Ok(())
}
#[test]
fn big_file_small_writes() -> Result<(), Box<dyn std::error::Error>> {
let mut w = deflate::Write::new(Vec::new());
let bytes = include_bytes!(
"../../../../../src-odb/tests/fixtures/objects/pack/pack-11fdfa9e156ab73caae3b6da867192221f2089c2.pack"
);
for chunk in bytes.chunks(2) {
assert_eq!(w.write(chunk)?, chunk.len());
}
w.flush()?;
assert_deflate_buffer(w.inner, bytes)
}
#[test]
fn big_file_a_few_big_writes() -> Result<(), Box<dyn std::error::Error>> {
let mut w = deflate::Write::new(Vec::new());
let bytes = include_bytes!(
"../../../../../src-odb/tests/fixtures/objects/pack/pack-11fdfa9e156ab73caae3b6da867192221f2089c2.pack"
);
for chunk in bytes.chunks(4096 * 9) {
assert_eq!(w.write(chunk)?, chunk.len());
}
w.flush()?;
assert_deflate_buffer(w.inner, bytes)
}
}

View File

@@ -0,0 +1,40 @@
use std::{io, io::BufRead};
use crate::zlib::{Decompress, FlushDecompress, Status};
/// Read bytes from `rd` and decompress them using `state` into a pre-allocated fitting buffer `dst`, returning the amount of bytes written.
pub fn read(rd: &mut impl BufRead, state: &mut Decompress, mut dst: &mut [u8]) -> io::Result<usize> {
let mut total_written = 0;
loop {
let (written, consumed, ret, eof);
{
let input = rd.fill_buf()?;
eof = input.is_empty();
let before_out = state.total_out();
let before_in = state.total_in();
let flush = if eof {
FlushDecompress::Finish
} else {
FlushDecompress::None
};
ret = state.decompress(input, dst, flush);
written = (state.total_out() - before_out) as usize;
total_written += written;
dst = &mut dst[written..];
consumed = (state.total_in() - before_in) as usize;
}
rd.consume(consumed);
match ret {
// The stream has officially ended, nothing more to do here.
Ok(Status::StreamEnd) => return Ok(total_written),
// Either input our output are depleted even though the stream is not depleted yet.
Ok(Status::Ok | Status::BufError) if eof || dst.is_empty() => return Ok(total_written),
// Some progress was made in both the input and the output, it must continue to reach the end.
Ok(Status::Ok | Status::BufError) if consumed != 0 || written != 0 => continue,
// A strange state, where zlib makes no progress but isn't done either. Call it out.
Ok(Status::Ok | Status::BufError) => unreachable!("Definitely a bug somewhere"),
Err(..) => return Err(io::Error::new(io::ErrorKind::InvalidInput, "corrupt deflate stream")),
}
}
}

View File

@@ -0,0 +1,4 @@
///
pub mod deflate;
///
pub mod inflate;

View File

@@ -0,0 +1 @@
mod trace;

View File

@@ -0,0 +1,54 @@
use std::convert::Infallible;
use gix_features::parallel::InOrderIter;
#[test]
fn in_order_stays_in_order() {
assert_eq!(
InOrderIter::from(vec![Ok::<_, Infallible>((0usize, 'a')), Ok((1, 'b')), Ok((2, 'c'))].into_iter())
.collect::<Result<Vec<_>, _>>()
.expect("infallible"),
vec!['a', 'b', 'c']
);
}
#[test]
fn out_of_order_items_are_held_until_the_sequence_is_complete() {
assert_eq!(
InOrderIter::from(
vec![
Ok::<_, Infallible>((2usize, 'c')),
Ok((1, 'b')),
Ok((0, 'a')),
Ok((3, 'd'))
]
.into_iter()
)
.collect::<Result<Vec<_>, _>>()
.expect("infallible"),
vec!['a', 'b', 'c', 'd']
);
}
#[test]
fn in_sequence_errors_immediately_trigger_a_fuse() {
let mut iter = InOrderIter::from(vec![Ok::<_, &'static str>((0usize, 'a')), Err("err"), Ok((1, 'b'))].into_iter());
assert_eq!(iter.next(), Some(Ok('a')));
assert_eq!(iter.next(), Some(Err("err")));
assert_eq!(
iter.next(),
None,
"fuse should have triggered so we don't see anything else"
);
}
#[test]
fn out_of_sequence_errors_immediately_trigger_a_fuse() {
let mut iter = InOrderIter::from(vec![Ok::<_, &'static str>((1usize, 'b')), Err("err"), Ok((0, 'a'))].into_iter());
assert_eq!(iter.next(), Some(Err("err")));
assert_eq!(
iter.next(),
None,
"fuse should have triggered so we don't see anything else"
);
}

View File

@@ -0,0 +1,125 @@
//! Tests that are working similarly in parallel and serial mode
use gix_features::parallel;
mod in_order_iter;
#[derive(Default)]
struct Adder {
count: usize,
}
impl parallel::Reduce for Adder {
type Input = usize;
type FeedProduce = usize;
type Output = usize;
type Error = ();
fn feed(&mut self, item: Self::Input) -> Result<Self::FeedProduce, Self::Error> {
self.count += item;
Ok(item)
}
fn finalize(self) -> Result<Self::Output, Self::Error> {
Ok(self.count)
}
}
#[test]
fn in_parallel() {
let res = parallel::in_parallel(
std::iter::from_fn(|| Some(1)).take(100),
None,
|_n| (),
|input, _state| input,
Adder::default(),
)
.expect("successful computation");
assert_eq!(res, 100);
}
#[test]
fn in_parallel_with_mut_slice_in_chunks() {
let num_items = 33;
let mut input: Vec<_> = std::iter::repeat_n(1, num_items).collect();
let counts = parallel::in_parallel_with_slice(
&mut input,
None,
|_| 0usize,
|item, acc, _threads_eft, _should_interrupt| {
*acc += *item;
*item += 1;
Ok::<_, ()>(())
},
|| Some(std::time::Duration::from_millis(10)),
std::convert::identity,
)
.unwrap();
let expected = std::iter::repeat_n(1, num_items).sum::<usize>();
assert_eq!(counts.iter().sum::<usize>(), expected);
assert_eq!(input.iter().sum::<usize>(), expected * 2, "we increment each entry");
}
#[test]
fn stepped_reduce_next() {
let mut iter = parallel::reduce::Stepwise::new(
std::iter::from_fn(|| Some(1)).take(100),
None,
|_n| (),
|input, _state| input,
Adder::default(),
);
let mut aggregate = 0;
for value in iter.by_ref() {
aggregate += value.expect("success");
}
assert_eq!(aggregate, 100);
}
#[test]
fn stepped_reduce_ref_input_and_consume() {
let seq = std::sync::Arc::new(vec![0usize, 1, 2]);
struct ArcIter(std::sync::Arc<Vec<usize>>, usize);
impl Iterator for ArcIter {
type Item = usize;
fn next(&mut self) -> Option<Self::Item> {
let n = self.0.get(self.1).copied();
self.1 += 1;
n
}
}
let mut iter = parallel::reduce::Stepwise::new(
ArcIter(seq.clone(), 0).enumerate(),
None,
{
let seq = std::sync::Arc::clone(&seq);
move |_n| seq.len()
},
{
let seq = std::sync::Arc::clone(&seq);
move |(idx, ref_val): (usize, usize), _state| seq[idx] * ref_val
},
Adder::default(),
);
let mut aggregate = 0;
for value in iter.by_ref() {
aggregate += value.expect("success");
}
assert_eq!(aggregate, 5);
}
#[test]
fn stepped_reduce_finalize() {
let iter = parallel::reduce::Stepwise::new(
std::iter::from_fn(|| Some(1)).take(100),
None,
|_n| (),
|input, _state| input,
Adder::default(),
);
assert_eq!(iter.finalize().expect("success"), 100);
}

View File

@@ -0,0 +1 @@
mod parallel;

View File

@@ -0,0 +1 @@
mod parallel;

View File

@@ -0,0 +1,122 @@
mod optimize_chunk_size_and_thread_limit {
use gix_features::parallel::optimize_chunk_size_and_thread_limit;
#[test]
fn not_enough_chunks_for_threads() {
assert_eq!(
optimize_chunk_size_and_thread_limit(1, Some(10), None, Some(10)),
(1, Some(5), 5)
);
assert_eq!(
optimize_chunk_size_and_thread_limit(1, Some(10), Some(3), Some(10)),
(1, Some(3), 3),
"the thread limit is always respected"
);
}
#[test]
fn some_more_chunks_per_thread() {
assert_eq!(
optimize_chunk_size_and_thread_limit(1, Some(30), None, Some(10)),
(1, Some(10), 10)
);
assert_eq!(
optimize_chunk_size_and_thread_limit(1, Some(30), Some(5), Some(10)),
(3, Some(5), 5),
"the thread limit is always respected"
);
}
#[test]
fn chunk_size_too_small() {
assert_eq!(
optimize_chunk_size_and_thread_limit(1, Some(100), None, Some(10)),
(5, Some(10), 10)
);
assert_eq!(
optimize_chunk_size_and_thread_limit(1, Some(100), Some(5), Some(10)),
(10, Some(5), 5),
"the thread limit is always respected"
);
}
#[test]
fn chunk_size_too_big() {
assert_eq!(
optimize_chunk_size_and_thread_limit(50, Some(100), None, Some(10)),
(5, Some(10), 10)
);
assert_eq!(
optimize_chunk_size_and_thread_limit(50, Some(100), Some(5), Some(10)),
(10, Some(5), 5),
"the thread limit is always respected"
);
}
mod unknown_chunk_count {
use gix_features::parallel::optimize_chunk_size_and_thread_limit;
#[test]
fn medium_chunk_size_many_threads() {
assert_eq!(
optimize_chunk_size_and_thread_limit(50, None, None, Some(4)),
(50, Some(4), 4),
"really, what do we know"
);
}
#[test]
fn medium_chunk_size_single_thread() {
assert_eq!(
optimize_chunk_size_and_thread_limit(50, None, None, Some(1)),
(50, Some(1), 1),
"single threaded - we don't touch that"
);
}
#[test]
fn small_chunk_size_single_thread() {
assert_eq!(
optimize_chunk_size_and_thread_limit(1, None, None, Some(1)),
(1, Some(1), 1),
"single threaded - we don't touch that"
);
}
#[test]
fn small_chunk_size_many_threads() {
assert_eq!(
optimize_chunk_size_and_thread_limit(1, None, None, Some(4)),
(50, Some(4), 4),
"we prefer an arbitrary number, which should really be based on effort, but the caller has to adjust for that"
);
}
}
mod real_values {
use gix_features::parallel::optimize_chunk_size_and_thread_limit;
#[test]
fn linux_kernel_pack_my_machine_lookup() {
assert_eq!(
optimize_chunk_size_and_thread_limit(10000, Some(7_500_000), None, Some(4)),
(1000, Some(4), 4),
"the bucket size is capped actually, somewhat arbitrarily"
);
}
#[test]
fn linux_kernel_pack_my_machine_indexed() {
assert_eq!(
optimize_chunk_size_and_thread_limit(1, None, None, Some(4)),
(50, Some(4), 4),
"low values are raised to arbitrary value"
);
assert_eq!(
optimize_chunk_size_and_thread_limit(10000, None, None, Some(4)),
(1000, Some(4), 4),
"high values are capped"
);
}
}
}

117
src-features/tests/pipe.rs Normal file
View File

@@ -0,0 +1,117 @@
mod io {
use std::io::{BufRead, ErrorKind, Read, Write};
use gix_features::io;
#[test]
fn threaded_read_to_end() {
let (mut writer, mut reader) = gix_features::io::pipe::unidirectional(0);
let message = "Hello, world!";
std::thread::spawn(move || {
writer
.write_all(message.as_bytes())
.expect("writes to work if reader is present");
});
let mut received = String::new();
reader.read_to_string(&mut received).unwrap();
assert_eq!(&received, message);
}
#[test]
fn lack_of_reader_fails_with_broken_pipe() {
let (mut writer, _) = io::pipe::unidirectional(0);
assert_eq!(
writer.write_all(b"must fail").unwrap_err().kind(),
ErrorKind::BrokenPipe
);
}
#[test]
fn line_reading_one_by_one() {
let (mut writer, mut reader) = io::pipe::unidirectional(2);
writer.write_all(b"a\n").expect("success");
writer.write_all(b"b\nc").expect("success");
drop(writer);
let mut buf = String::new();
for expected in &["a\n", "b\n", "c"] {
buf.clear();
assert_eq!(reader.read_line(&mut buf).expect("success"), expected.len());
assert_eq!(buf, *expected);
}
}
#[test]
fn line_reading() {
let (mut writer, reader) = io::pipe::unidirectional(2);
writer.write_all(b"a\n").expect("success");
writer.write_all(b"b\nc\n").expect("success");
drop(writer);
assert_eq!(
reader.lines().map_while(Result::ok).collect::<Vec<_>>(),
vec!["a", "b", "c"]
);
}
#[test]
fn writer_can_inject_errors() {
let (writer, mut reader) = io::pipe::unidirectional(1);
writer
.channel
.send(Err(std::io::Error::other("the error")))
.expect("send success");
let mut buf = [0];
assert_eq!(
reader.read(&mut buf).unwrap_err().to_string(),
"the error",
"using Read trait, errors are propagated"
);
writer
.channel
.send(Err(std::io::Error::other("the error")))
.expect("send success");
assert_eq!(
reader.fill_buf().unwrap_err().to_string(),
"the error",
"using BufRead trait, errors are propagated"
);
}
#[test]
fn continue_on_empty_writes() {
let (mut writer, mut reader) = io::pipe::unidirectional(2);
writer.write_all(&[]).expect("write successful and non-blocking");
let input = b"hello";
writer
.write_all(input)
.expect("second write works as well as there is capacity");
let mut buf = vec![0u8; input.len()];
assert_eq!(reader.read(&mut buf).expect("read succeeds"), input.len());
assert_eq!(buf, &input[..]);
}
#[test]
fn small_reads() {
const BLOCK_SIZE: usize = 20;
let block_count = 20;
let (mut writer, mut reader) = io::pipe::unidirectional(4);
std::thread::spawn(move || {
for _ in 0..block_count {
let data = &[0; BLOCK_SIZE];
writer.write_all(data).unwrap();
}
});
let mut small_read_buf = [0; BLOCK_SIZE / 2];
let mut bytes_read = 0;
while let Ok(size) = reader.read(&mut small_read_buf) {
if size == 0 {
break;
}
bytes_read += size;
}
assert_eq!(block_count * BLOCK_SIZE, bytes_read);
}
}

View File

@@ -0,0 +1,15 @@
use gix_features::trace::{coarse, detail, span};
#[test]
fn span() {
let _x = span!(gix_features::trace::Level::Coarse, "hello");
span!(gix_features::trace::Level::Coarse, "hello", x = "value", y = 42);
span!(target: "other", gix_features::trace::Level::Coarse, "hello", x = "value", y = 42);
let _x = coarse!("hello");
coarse!("hello", x = "value", y = 42);
coarse!(target: "other", "hello", x = "value", y = 42);
let _y = detail!("hello");
detail!("hello", x = "value", y = 42);
detail!(target: "other", "hello", x = "value", y = 42);
}