diff options
| author | Max Audron <audron@cocaine.farm> | 2022-06-07 12:28:18 +0200 |
|---|---|---|
| committer | Maximilian Manz <maximilian.manz@de.clara.net> | 2022-06-20 11:33:04 +0200 |
| commit | 7e73ebcad03a5477367763d4a5e9c22f9e9967ca (patch) | |
| tree | 39e543ec482be2e95c512b45f6dbdbe816dc0992 | |
| parent | fix clippy lints (diff) | |
process sync and update in parallel
Split the repos up in x batches where x is either decided on based on
number of cpu cores available or given by the --jobs argument
| -rw-r--r-- | Cargo.lock | 2 | ||||
| -rw-r--r-- | Cargo.toml | 2 | ||||
| -rw-r--r-- | src/batch/mod.rs | 62 | ||||
| -rw-r--r-- | src/config/args.rs | 4 | ||||
| -rw-r--r-- | src/main.rs | 18 | ||||
| -rw-r--r-- | src/sync/mod.rs | 11 | ||||
| -rw-r--r-- | src/update/mod.rs | 11 |
7 files changed, 94 insertions, 16 deletions
@@ -574,6 +574,8 @@ dependencies = [ "gitlab", "graphql_client", "itertools", + "num_cpus", + "once_cell", "serde", "thiserror", "tokio", @@ -8,6 +8,8 @@ tokio = { version = "1", features = ["rt", "rt-multi-thread", "macros"] } futures = "0.3" itertools = "0.10" +num_cpus = "1" +once_cell = "1" gitlab = "0.1408" graphql_client = "0.10" diff --git a/src/batch/mod.rs b/src/batch/mod.rs new file mode 100644 index 0000000..ea231c7 --- /dev/null +++ b/src/batch/mod.rs @@ -0,0 +1,62 @@ +use itertools::Itertools; +use std::{sync::RwLockWriteGuard, thread}; +use tracing::debug; + +use crate::repo::{Repo, Repos}; + +pub fn batch(repos: Repos, f: fn(RwLockWriteGuard<Repo>)) { + let cpus = num_cpus::get(); + let len = repos.len(); + + let jobs = crate::GTREE.get().unwrap().args.jobs; + + let batch_size = if jobs != 0 { + if len <= jobs { + 1 + } else { + len / jobs + } + } else { + if len <= cpus { + 1 + } else { + len / cpus + } + }; + + debug!( + "got {} repos and {} threads, batch size is {}", + len, cpus, batch_size + ); + + let chunks = repos.into_iter().chunks(batch_size); + let mut handles = Vec::with_capacity(cpus); + + // This has to be a for loop because iterators are lazy + // and will only start processing the closure in a map when used. + // https://stackoverflow.com/q/34765967 + for (i, chunk) in chunks.into_iter().enumerate() { + let repos: Repos = chunk.collect(); + + let span = tracing::debug_span!("spawn_batch_thread", number = i); + let t_span = tracing::debug_span!("batch_thread", number = i); + let _enter = span.enter(); + let handle = thread::Builder::new() + .name(format!("batch_{}", i)) + .spawn(move || { + let _enter = t_span.enter(); + for (_name, repo) in repos { + let repo = repo.write().unwrap(); + + f(repo) + } + }) + .unwrap(); + + handles.push(handle); + } + + handles + .into_iter() + .for_each(|handle| handle.join().unwrap()); +} diff --git a/src/config/args.rs b/src/config/args.rs index 446c5ce..f8ad8e8 100644 --- a/src/config/args.rs +++ b/src/config/args.rs @@ -10,6 +10,10 @@ pub struct Args { /// Only operate on this subtree #[clap(global = true)] pub scope: Option<String>, + + /// Number of jobs to run in parallel, 0 is automatic + #[clap(short = 'j', long = "jobs", default_value = "0", global = true)] + pub jobs: usize, } #[derive(PartialEq, Clone, Debug, Subcommand)] diff --git a/src/main.rs b/src/main.rs index e37427c..8b72a34 100644 --- a/src/main.rs +++ b/src/main.rs @@ -18,17 +18,23 @@ mod list; mod sync; mod update; +mod batch; + #[cfg(test)] mod tests; -#[derive(Debug)] +use once_cell::sync::OnceCell; + +static GTREE: OnceCell<GTree> = OnceCell::new(); +static RUNTIME: OnceCell<Runtime> = OnceCell::new(); + +#[derive(Debug, Clone)] #[allow(dead_code)] struct GTree { figment: figment::Figment, config: config::Config, args: config::args::Args, forge: forge::Forge, - rt: Runtime, } impl GTree { @@ -44,16 +50,15 @@ impl GTree { .next() .context("No Forge configured, please setup a forge")?; - let rt = Runtime::new()?; + RUNTIME.set(Runtime::new()?).unwrap(); - let forge = rt.block_on(forge::Forge::new(forge_config))?; + let forge = RUNTIME.get().unwrap().block_on(forge::Forge::new(forge_config))?; Ok(GTree { figment, config, args, forge, - rt }) } @@ -74,7 +79,7 @@ impl GTree { Repos::from_local(forge_t.root(), &scope_t) }); - let projects = self.rt.block_on(self.forge.projects(&scope))?; + let projects = RUNTIME.get().unwrap().block_on(self.forge.projects(&scope))?; let remote = Repos::from_forge(forge.root(), projects); let local = handle.join().unwrap(); @@ -108,6 +113,7 @@ fn main() -> Result<()> { debug!("starting"); let gtree = GTree::new()?; + GTREE.set(gtree.clone()).unwrap(); gtree.run()?; diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 6cf1f7f..4bfe322 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -1,17 +1,18 @@ use std::fmt::{Debug, Display}; -use crate::repo::{Repo, RepoError, Repos}; +use crate::{ + batch::batch, + repo::{Repo, RepoError, Repos}, +}; impl crate::GTree { pub fn sync(&self, repos: Repos) { - for (_name, repo) in repos { - let mut repo = repo.write().unwrap(); - + batch(repos, |mut repo| { match repo.sync() { Ok(u) => println!("{}", u), Err(u) => println!("{}", u), }; - } + }); } } diff --git a/src/update/mod.rs b/src/update/mod.rs index d2d6494..8f10663 100644 --- a/src/update/mod.rs +++ b/src/update/mod.rs @@ -3,20 +3,21 @@ use std::fmt::{Debug, Display}; use git2::BranchType; use tracing::debug; -use crate::repo::{Repo, RepoError, Repos}; +use crate::{ + batch::batch, + repo::{Repo, RepoError, Repos}, +}; impl crate::GTree { pub fn update(&self, repos: Repos) { - for (_name, repo) in repos { - let mut repo = repo.write().unwrap(); - + batch(repos, |mut repo| { if repo.repo.is_some() { match repo.update() { Ok(u) => println!("{}", u), Err(u) => println!("{}", u), }; } - } + }); } } |
