aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Cargo.lock2
-rw-r--r--Cargo.toml2
-rw-r--r--src/batch/mod.rs62
-rw-r--r--src/config/args.rs4
-rw-r--r--src/main.rs18
-rw-r--r--src/sync/mod.rs11
-rw-r--r--src/update/mod.rs11
7 files changed, 94 insertions, 16 deletions
diff --git a/Cargo.lock b/Cargo.lock
index df85de7..75c0bc5 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -574,6 +574,8 @@ dependencies = [
"gitlab",
"graphql_client",
"itertools",
+ "num_cpus",
+ "once_cell",
"serde",
"thiserror",
"tokio",
diff --git a/Cargo.toml b/Cargo.toml
index a9f1766..88e490c 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -8,6 +8,8 @@ tokio = { version = "1", features = ["rt", "rt-multi-thread", "macros"] }
futures = "0.3"
itertools = "0.10"
+num_cpus = "1"
+once_cell = "1"
gitlab = "0.1408"
graphql_client = "0.10"
diff --git a/src/batch/mod.rs b/src/batch/mod.rs
new file mode 100644
index 0000000..ea231c7
--- /dev/null
+++ b/src/batch/mod.rs
@@ -0,0 +1,62 @@
+use itertools::Itertools;
+use std::{sync::RwLockWriteGuard, thread};
+use tracing::debug;
+
+use crate::repo::{Repo, Repos};
+
+pub fn batch(repos: Repos, f: fn(RwLockWriteGuard<Repo>)) {
+ let cpus = num_cpus::get();
+ let len = repos.len();
+
+ let jobs = crate::GTREE.get().unwrap().args.jobs;
+
+ let batch_size = if jobs != 0 {
+ if len <= jobs {
+ 1
+ } else {
+ len / jobs
+ }
+ } else {
+ if len <= cpus {
+ 1
+ } else {
+ len / cpus
+ }
+ };
+
+ debug!(
+ "got {} repos and {} threads, batch size is {}",
+ len, cpus, batch_size
+ );
+
+ let chunks = repos.into_iter().chunks(batch_size);
+ let mut handles = Vec::with_capacity(cpus);
+
+ // This has to be a for loop because iterators are lazy
+ // and will only start processing the closure in a map when used.
+ // https://stackoverflow.com/q/34765967
+ for (i, chunk) in chunks.into_iter().enumerate() {
+ let repos: Repos = chunk.collect();
+
+ let span = tracing::debug_span!("spawn_batch_thread", number = i);
+ let t_span = tracing::debug_span!("batch_thread", number = i);
+ let _enter = span.enter();
+ let handle = thread::Builder::new()
+ .name(format!("batch_{}", i))
+ .spawn(move || {
+ let _enter = t_span.enter();
+ for (_name, repo) in repos {
+ let repo = repo.write().unwrap();
+
+ f(repo)
+ }
+ })
+ .unwrap();
+
+ handles.push(handle);
+ }
+
+ handles
+ .into_iter()
+ .for_each(|handle| handle.join().unwrap());
+}
diff --git a/src/config/args.rs b/src/config/args.rs
index 446c5ce..f8ad8e8 100644
--- a/src/config/args.rs
+++ b/src/config/args.rs
@@ -10,6 +10,10 @@ pub struct Args {
/// Only operate on this subtree
#[clap(global = true)]
pub scope: Option<String>,
+
+ /// Number of jobs to run in parallel, 0 is automatic
+ #[clap(short = 'j', long = "jobs", default_value = "0", global = true)]
+ pub jobs: usize,
}
#[derive(PartialEq, Clone, Debug, Subcommand)]
diff --git a/src/main.rs b/src/main.rs
index e37427c..8b72a34 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -18,17 +18,23 @@ mod list;
mod sync;
mod update;
+mod batch;
+
#[cfg(test)]
mod tests;
-#[derive(Debug)]
+use once_cell::sync::OnceCell;
+
+static GTREE: OnceCell<GTree> = OnceCell::new();
+static RUNTIME: OnceCell<Runtime> = OnceCell::new();
+
+#[derive(Debug, Clone)]
#[allow(dead_code)]
struct GTree {
figment: figment::Figment,
config: config::Config,
args: config::args::Args,
forge: forge::Forge,
- rt: Runtime,
}
impl GTree {
@@ -44,16 +50,15 @@ impl GTree {
.next()
.context("No Forge configured, please setup a forge")?;
- let rt = Runtime::new()?;
+ RUNTIME.set(Runtime::new()?).unwrap();
- let forge = rt.block_on(forge::Forge::new(forge_config))?;
+ let forge = RUNTIME.get().unwrap().block_on(forge::Forge::new(forge_config))?;
Ok(GTree {
figment,
config,
args,
forge,
- rt
})
}
@@ -74,7 +79,7 @@ impl GTree {
Repos::from_local(forge_t.root(), &scope_t)
});
- let projects = self.rt.block_on(self.forge.projects(&scope))?;
+ let projects = RUNTIME.get().unwrap().block_on(self.forge.projects(&scope))?;
let remote = Repos::from_forge(forge.root(), projects);
let local = handle.join().unwrap();
@@ -108,6 +113,7 @@ fn main() -> Result<()> {
debug!("starting");
let gtree = GTree::new()?;
+ GTREE.set(gtree.clone()).unwrap();
gtree.run()?;
diff --git a/src/sync/mod.rs b/src/sync/mod.rs
index 6cf1f7f..4bfe322 100644
--- a/src/sync/mod.rs
+++ b/src/sync/mod.rs
@@ -1,17 +1,18 @@
use std::fmt::{Debug, Display};
-use crate::repo::{Repo, RepoError, Repos};
+use crate::{
+ batch::batch,
+ repo::{Repo, RepoError, Repos},
+};
impl crate::GTree {
pub fn sync(&self, repos: Repos) {
- for (_name, repo) in repos {
- let mut repo = repo.write().unwrap();
-
+ batch(repos, |mut repo| {
match repo.sync() {
Ok(u) => println!("{}", u),
Err(u) => println!("{}", u),
};
- }
+ });
}
}
diff --git a/src/update/mod.rs b/src/update/mod.rs
index d2d6494..8f10663 100644
--- a/src/update/mod.rs
+++ b/src/update/mod.rs
@@ -3,20 +3,21 @@ use std::fmt::{Debug, Display};
use git2::BranchType;
use tracing::debug;
-use crate::repo::{Repo, RepoError, Repos};
+use crate::{
+ batch::batch,
+ repo::{Repo, RepoError, Repos},
+};
impl crate::GTree {
pub fn update(&self, repos: Repos) {
- for (_name, repo) in repos {
- let mut repo = repo.write().unwrap();
-
+ batch(repos, |mut repo| {
if repo.repo.is_some() {
match repo.update() {
Ok(u) => println!("{}", u),
Err(u) => println!("{}", u),
};
}
- }
+ });
}
}