From 63019bb208a568e779ab70488fcb00c651608b53 Mon Sep 17 00:00:00 2001 From: Zhaofeng Li Date: Sun, 5 Mar 2023 11:05:11 -0700 Subject: [PATCH] client: rustfmt --- client/src/command/push.rs | 11 +++-- client/src/command/watch_store.rs | 24 +++++++---- client/src/push.rs | 70 ++++++++++++++++++++++--------- 3 files changed, 74 insertions(+), 31 deletions(-) diff --git a/client/src/command/push.rs b/client/src/command/push.rs index 087c5ad..44d035a 100644 --- a/client/src/command/push.rs +++ b/client/src/command/push.rs @@ -9,7 +9,7 @@ use crate::api::ApiClient; use crate::cache::CacheRef; use crate::cli::Opts; use crate::config::Config; -use crate::push::{Pusher, PushConfig}; +use crate::push::{PushConfig, Pusher}; use attic::nix_store::NixStore; /// Push closures to a binary cache. @@ -77,7 +77,9 @@ pub async fn run(opts: Opts) -> Result<()> { let mp = MultiProgress::new(); let pusher = Pusher::new(store, api, cache.to_owned(), cache_config, mp, push_config); - let plan = pusher.plan(roots, sub.no_closure, sub.ignore_upstream_cache_filter).await?; + let plan = pusher + .plan(roots, sub.no_closure, sub.ignore_upstream_cache_filter) + .await?; if plan.store_path_map.is_empty() { if plan.num_all_paths == 0 { @@ -106,7 +108,10 @@ pub async fn run(opts: Opts) -> Result<()> { } let results = pusher.wait().await; - results.into_iter().map(|(_, result)| result).collect::>>()?; + results + .into_iter() + .map(|(_, result)| result) + .collect::>>()?; Ok(()) } diff --git a/client/src/command/watch_store.rs b/client/src/command/watch_store.rs index a6f6bc1..cc07657 100644 --- a/client/src/command/watch_store.rs +++ b/client/src/command/watch_store.rs @@ -4,13 +4,13 @@ use std::sync::Arc; use anyhow::{anyhow, Result}; use clap::Parser; use indicatif::MultiProgress; -use notify::{RecursiveMode, Watcher, EventKind}; +use notify::{EventKind, RecursiveMode, Watcher}; use crate::api::ApiClient; use crate::cache::CacheRef; use crate::cli::Opts; use crate::config::Config; -use crate::push::{Pusher, PushConfig, PushSessionConfig}; +use crate::push::{PushConfig, PushSessionConfig, Pusher}; use attic::nix_store::{NixStore, StorePath}; /// Watch the Nix Store for new paths and upload them to a binary cache. @@ -72,8 +72,15 @@ pub async fn run(opts: Opts) -> Result<()> { }; let mp = MultiProgress::new(); - let session = Pusher::new(store.clone(), api, cache.to_owned(), cache_config, mp, push_config) - .into_push_session(push_session_config); + let session = Pusher::new( + store.clone(), + api, + cache.to_owned(), + cache_config, + mp, + push_config, + ) + .into_push_session(push_session_config); let mut watcher = notify::recommended_watcher(move |res: notify::Result| { match res { @@ -81,7 +88,8 @@ pub async fn run(opts: Opts) -> Result<()> { // We watch the removals of lock files which signify // store paths becoming valid if let EventKind::Remove(_) = event.kind { - let paths = event.paths + let paths = event + .paths .iter() .filter_map(|p| { let base = strip_lock_file(&p)?; @@ -100,13 +108,13 @@ pub async fn run(opts: Opts) -> Result<()> { watcher.watch(&store_dir, RecursiveMode::NonRecursive)?; - eprintln!("👀 Pushing new store paths to \"{cache}\" on \"{server}\"", + eprintln!( + "👀 Pushing new store paths to \"{cache}\" on \"{server}\"", cache = cache.as_str(), server = server_name.as_str(), ); - loop { - } + loop {} } fn strip_lock_file(p: &Path) -> Option { diff --git a/client/src/push.rs b/client/src/push.rs index aa36199..d8f9a5d 100644 --- a/client/src/push.rs +++ b/client/src/push.rs @@ -25,19 +25,19 @@ use std::time::{Duration, Instant}; use anyhow::{anyhow, Result}; use async_channel as channel; use bytes::Bytes; -use futures::stream::{Stream, TryStreamExt}; use futures::future::join_all; +use futures::stream::{Stream, TryStreamExt}; use indicatif::{HumanBytes, MultiProgress, ProgressBar, ProgressState, ProgressStyle}; -use tokio::task::{JoinHandle, spawn}; -use tokio::time; use tokio::sync::Mutex; +use tokio::task::{spawn, JoinHandle}; +use tokio::time; +use crate::api::ApiClient; use attic::api::v1::cache_config::CacheConfig; use attic::api::v1::upload_path::{UploadPathNarInfo, UploadPathResult, UploadPathResultKind}; use attic::cache::CacheName; use attic::error::AtticResult; use attic::nix_store::{NixStore, StorePath, StorePathHash, ValidPathInfo}; -use crate::api::ApiClient; type JobSender = channel::Sender; type JobReceiver = channel::Receiver; @@ -131,7 +131,14 @@ struct NarStreamProgress { } impl Pusher { - pub fn new(store: Arc, api: ApiClient, cache: CacheName, cache_config: CacheConfig, mp: MultiProgress, config: PushConfig) -> Self { + pub fn new( + store: Arc, + api: ApiClient, + cache: CacheName, + cache_config: CacheConfig, + mp: MultiProgress, + config: PushConfig, + ) -> Self { let (sender, receiver) = channel::unbounded(); let mut workers = Vec::new(); @@ -146,13 +153,19 @@ impl Pusher { ))); } - Self { api, store, cache, cache_config, workers, sender } + Self { + api, + store, + cache, + cache_config, + workers, + sender, + } } /// Queues a store path to be pushed. pub async fn queue(&self, path_info: ValidPathInfo) -> Result<()> { - self.sender.send(path_info).await - .map_err(|e| anyhow!(e)) + self.sender.send(path_info).await.map_err(|e| anyhow!(e)) } /// Waits for all workers to terminate, returning all results. @@ -174,7 +187,12 @@ impl Pusher { } /// Creates a push plan. - pub async fn plan(&self, roots: Vec, no_closure: bool, ignore_upstream_filter: bool) -> Result { + pub async fn plan( + &self, + roots: Vec, + no_closure: bool, + ignore_upstream_filter: bool, + ) -> Result { PushPlan::plan( self.store.clone(), &self.api, @@ -183,7 +201,8 @@ impl Pusher { roots, no_closure, ignore_upstream_filter, - ).await + ) + .await } /// Converts the pusher into a `PushSession`. @@ -223,7 +242,8 @@ impl Pusher { &cache, mp.clone(), config.force_preamble, - ).await; + ) + .await; results.insert(store_path, r); } @@ -247,7 +267,9 @@ impl PushSession { config.clone(), known_paths_mutex.clone(), receiver.clone(), - ).await { + ) + .await + { eprintln!("Worker exited: {:?}", e); } else { break; @@ -255,9 +277,7 @@ impl PushSession { } }); - Self { - sender, - } + Self { sender } } async fn worker( @@ -305,12 +325,19 @@ impl PushSession { // Compute push plan let roots_vec: Vec = { let known_paths = known_paths_mutex.lock().await; - roots.drain() + roots + .drain() .filter(|root| !known_paths.contains(&root.to_hash())) .collect() }; - let mut plan = pusher.plan(roots_vec, config.no_closure, config.ignore_upstream_cache_filter).await?; + let mut plan = pusher + .plan( + roots_vec, + config.no_closure, + config.ignore_upstream_cache_filter, + ) + .await?; let mut known_paths = known_paths_mutex.lock().await; plan.store_path_map @@ -332,7 +359,8 @@ impl PushSession { /// Queues multiple store paths to be pushed. pub fn queue_many(&self, store_paths: Vec) -> Result<()> { - self.sender.send_blocking(store_paths) + self.sender + .send_blocking(store_paths) .map_err(|e| anyhow!(e)) } } @@ -387,8 +415,10 @@ impl PushPlan { if !ignore_upstream_filter { // Filter out paths signed by upstream caches - let upstream_cache_key_names = - cache_config.upstream_cache_key_names.as_ref().map_or([].as_slice(), |v| v.as_slice()); + let upstream_cache_key_names = cache_config + .upstream_cache_key_names + .as_ref() + .map_or([].as_slice(), |v| v.as_slice()); store_path_map.retain(|_, pi| { for sig in &pi.sigs { if let Some((name, _)) = sig.split_once(':') {