From 7e73ebcad03a5477367763d4a5e9c22f9e9967ca Mon Sep 17 00:00:00 2001 From: Max Audron Date: Tue, 7 Jun 2022 12:28:18 +0200 Subject: process sync and update in parallel Split the repos up in x batches where x is either decided on based on number of cpu cores available or given by the --jobs argument --- Cargo.lock | 2 ++ Cargo.toml | 2 ++ src/batch/mod.rs | 62 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/config/args.rs | 4 ++++ src/main.rs | 18 ++++++++++------ src/sync/mod.rs | 11 +++++----- src/update/mod.rs | 11 +++++----- 7 files changed, 94 insertions(+), 16 deletions(-) create mode 100644 src/batch/mod.rs diff --git a/Cargo.lock b/Cargo.lock index df85de7..75c0bc5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -574,6 +574,8 @@ dependencies = [ "gitlab", "graphql_client", "itertools", + "num_cpus", + "once_cell", "serde", "thiserror", "tokio", diff --git a/Cargo.toml b/Cargo.toml index a9f1766..88e490c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,6 +8,8 @@ tokio = { version = "1", features = ["rt", "rt-multi-thread", "macros"] } futures = "0.3" itertools = "0.10" +num_cpus = "1" +once_cell = "1" gitlab = "0.1408" graphql_client = "0.10" diff --git a/src/batch/mod.rs b/src/batch/mod.rs new file mode 100644 index 0000000..ea231c7 --- /dev/null +++ b/src/batch/mod.rs @@ -0,0 +1,62 @@ +use itertools::Itertools; +use std::{sync::RwLockWriteGuard, thread}; +use tracing::debug; + +use crate::repo::{Repo, Repos}; + +pub fn batch(repos: Repos, f: fn(RwLockWriteGuard)) { + let cpus = num_cpus::get(); + let len = repos.len(); + + let jobs = crate::GTREE.get().unwrap().args.jobs; + + let batch_size = if jobs != 0 { + if len <= jobs { + 1 + } else { + len / jobs + } + } else { + if len <= cpus { + 1 + } else { + len / cpus + } + }; + + debug!( + "got {} repos and {} threads, batch size is {}", + len, cpus, batch_size + ); + + let chunks = repos.into_iter().chunks(batch_size); + let mut handles = Vec::with_capacity(cpus); + + // This has to be a for loop because iterators are lazy + // and will only start processing the closure in a map when used. + // https://stackoverflow.com/q/34765967 + for (i, chunk) in chunks.into_iter().enumerate() { + let repos: Repos = chunk.collect(); + + let span = tracing::debug_span!("spawn_batch_thread", number = i); + let t_span = tracing::debug_span!("batch_thread", number = i); + let _enter = span.enter(); + let handle = thread::Builder::new() + .name(format!("batch_{}", i)) + .spawn(move || { + let _enter = t_span.enter(); + for (_name, repo) in repos { + let repo = repo.write().unwrap(); + + f(repo) + } + }) + .unwrap(); + + handles.push(handle); + } + + handles + .into_iter() + .for_each(|handle| handle.join().unwrap()); +} diff --git a/src/config/args.rs b/src/config/args.rs index 446c5ce..f8ad8e8 100644 --- a/src/config/args.rs +++ b/src/config/args.rs @@ -10,6 +10,10 @@ pub struct Args { /// Only operate on this subtree #[clap(global = true)] pub scope: Option, + + /// Number of jobs to run in parallel, 0 is automatic + #[clap(short = 'j', long = "jobs", default_value = "0", global = true)] + pub jobs: usize, } #[derive(PartialEq, Clone, Debug, Subcommand)] diff --git a/src/main.rs b/src/main.rs index e37427c..8b72a34 100644 --- a/src/main.rs +++ b/src/main.rs @@ -18,17 +18,23 @@ mod list; mod sync; mod update; +mod batch; + #[cfg(test)] mod tests; -#[derive(Debug)] +use once_cell::sync::OnceCell; + +static GTREE: OnceCell = OnceCell::new(); +static RUNTIME: OnceCell = OnceCell::new(); + +#[derive(Debug, Clone)] #[allow(dead_code)] struct GTree { figment: figment::Figment, config: config::Config, args: config::args::Args, forge: forge::Forge, - rt: Runtime, } impl GTree { @@ -44,16 +50,15 @@ impl GTree { .next() .context("No Forge configured, please setup a forge")?; - let rt = Runtime::new()?; + RUNTIME.set(Runtime::new()?).unwrap(); - let forge = rt.block_on(forge::Forge::new(forge_config))?; + let forge = RUNTIME.get().unwrap().block_on(forge::Forge::new(forge_config))?; Ok(GTree { figment, config, args, forge, - rt }) } @@ -74,7 +79,7 @@ impl GTree { Repos::from_local(forge_t.root(), &scope_t) }); - let projects = self.rt.block_on(self.forge.projects(&scope))?; + let projects = RUNTIME.get().unwrap().block_on(self.forge.projects(&scope))?; let remote = Repos::from_forge(forge.root(), projects); let local = handle.join().unwrap(); @@ -108,6 +113,7 @@ fn main() -> Result<()> { debug!("starting"); let gtree = GTree::new()?; + GTREE.set(gtree.clone()).unwrap(); gtree.run()?; diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 6cf1f7f..4bfe322 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -1,17 +1,18 @@ use std::fmt::{Debug, Display}; -use crate::repo::{Repo, RepoError, Repos}; +use crate::{ + batch::batch, + repo::{Repo, RepoError, Repos}, +}; impl crate::GTree { pub fn sync(&self, repos: Repos) { - for (_name, repo) in repos { - let mut repo = repo.write().unwrap(); - + batch(repos, |mut repo| { match repo.sync() { Ok(u) => println!("{}", u), Err(u) => println!("{}", u), }; - } + }); } } diff --git a/src/update/mod.rs b/src/update/mod.rs index d2d6494..8f10663 100644 --- a/src/update/mod.rs +++ b/src/update/mod.rs @@ -3,20 +3,21 @@ use std::fmt::{Debug, Display}; use git2::BranchType; use tracing::debug; -use crate::repo::{Repo, RepoError, Repos}; +use crate::{ + batch::batch, + repo::{Repo, RepoError, Repos}, +}; impl crate::GTree { pub fn update(&self, repos: Repos) { - for (_name, repo) in repos { - let mut repo = repo.write().unwrap(); - + batch(repos, |mut repo| { if repo.repo.is_some() { match repo.update() { Ok(u) => println!("{}", u), Err(u) => println!("{}", u), }; } - } + }); } } -- cgit v1.2.3