aboutsummaryrefslogtreecommitdiff
path: root/src/batch
diff options
context:
space:
mode:
authorMax Audron <audron@cocaine.farm>2022-06-07 12:28:18 +0200
committerMaximilian Manz <maximilian.manz@de.clara.net>2022-06-20 11:33:04 +0200
commit7e73ebcad03a5477367763d4a5e9c22f9e9967ca (patch)
tree39e543ec482be2e95c512b45f6dbdbe816dc0992 /src/batch
parentfix clippy lints (diff)
process sync and update in parallel
Split the repos up in x batches where x is either decided on based on number of cpu cores available or given by the --jobs argument
Diffstat (limited to 'src/batch')
-rw-r--r--src/batch/mod.rs62
1 files changed, 62 insertions, 0 deletions
diff --git a/src/batch/mod.rs b/src/batch/mod.rs
new file mode 100644
index 0000000..ea231c7
--- /dev/null
+++ b/src/batch/mod.rs
@@ -0,0 +1,62 @@
+use itertools::Itertools;
+use std::{sync::RwLockWriteGuard, thread};
+use tracing::debug;
+
+use crate::repo::{Repo, Repos};
+
+pub fn batch(repos: Repos, f: fn(RwLockWriteGuard<Repo>)) {
+ let cpus = num_cpus::get();
+ let len = repos.len();
+
+ let jobs = crate::GTREE.get().unwrap().args.jobs;
+
+ let batch_size = if jobs != 0 {
+ if len <= jobs {
+ 1
+ } else {
+ len / jobs
+ }
+ } else {
+ if len <= cpus {
+ 1
+ } else {
+ len / cpus
+ }
+ };
+
+ debug!(
+ "got {} repos and {} threads, batch size is {}",
+ len, cpus, batch_size
+ );
+
+ let chunks = repos.into_iter().chunks(batch_size);
+ let mut handles = Vec::with_capacity(cpus);
+
+ // This has to be a for loop because iterators are lazy
+ // and will only start processing the closure in a map when used.
+ // https://stackoverflow.com/q/34765967
+ for (i, chunk) in chunks.into_iter().enumerate() {
+ let repos: Repos = chunk.collect();
+
+ let span = tracing::debug_span!("spawn_batch_thread", number = i);
+ let t_span = tracing::debug_span!("batch_thread", number = i);
+ let _enter = span.enter();
+ let handle = thread::Builder::new()
+ .name(format!("batch_{}", i))
+ .spawn(move || {
+ let _enter = t_span.enter();
+ for (_name, repo) in repos {
+ let repo = repo.write().unwrap();
+
+ f(repo)
+ }
+ })
+ .unwrap();
+
+ handles.push(handle);
+ }
+
+ handles
+ .into_iter()
+ .for_each(|handle| handle.join().unwrap());
+}