diff options
| author | Max Audron <audron@cocaine.farm> | 2022-06-07 12:28:18 +0200 |
|---|---|---|
| committer | Maximilian Manz <maximilian.manz@de.clara.net> | 2022-06-20 11:33:04 +0200 |
| commit | 7e73ebcad03a5477367763d4a5e9c22f9e9967ca (patch) | |
| tree | 39e543ec482be2e95c512b45f6dbdbe816dc0992 /src/batch | |
| parent | fix clippy lints (diff) | |
process sync and update in parallel
Split the repos up in x batches where x is either decided on based on
number of cpu cores available or given by the --jobs argument
Diffstat (limited to 'src/batch')
| -rw-r--r-- | src/batch/mod.rs | 62 |
1 files changed, 62 insertions, 0 deletions
diff --git a/src/batch/mod.rs b/src/batch/mod.rs new file mode 100644 index 0000000..ea231c7 --- /dev/null +++ b/src/batch/mod.rs @@ -0,0 +1,62 @@ +use itertools::Itertools; +use std::{sync::RwLockWriteGuard, thread}; +use tracing::debug; + +use crate::repo::{Repo, Repos}; + +pub fn batch(repos: Repos, f: fn(RwLockWriteGuard<Repo>)) { + let cpus = num_cpus::get(); + let len = repos.len(); + + let jobs = crate::GTREE.get().unwrap().args.jobs; + + let batch_size = if jobs != 0 { + if len <= jobs { + 1 + } else { + len / jobs + } + } else { + if len <= cpus { + 1 + } else { + len / cpus + } + }; + + debug!( + "got {} repos and {} threads, batch size is {}", + len, cpus, batch_size + ); + + let chunks = repos.into_iter().chunks(batch_size); + let mut handles = Vec::with_capacity(cpus); + + // This has to be a for loop because iterators are lazy + // and will only start processing the closure in a map when used. + // https://stackoverflow.com/q/34765967 + for (i, chunk) in chunks.into_iter().enumerate() { + let repos: Repos = chunk.collect(); + + let span = tracing::debug_span!("spawn_batch_thread", number = i); + let t_span = tracing::debug_span!("batch_thread", number = i); + let _enter = span.enter(); + let handle = thread::Builder::new() + .name(format!("batch_{}", i)) + .spawn(move || { + let _enter = t_span.enter(); + for (_name, repo) in repos { + let repo = repo.write().unwrap(); + + f(repo) + } + }) + .unwrap(); + + handles.push(handle); + } + + handles + .into_iter() + .for_each(|handle| handle.join().unwrap()); +} |
