client: rustfmt

This commit is contained in:
Zhaofeng Li 2023-03-05 11:05:11 -07:00
parent f0e9199817
commit 63019bb208
3 changed files with 74 additions and 31 deletions

View file

@ -9,7 +9,7 @@ use crate::api::ApiClient;
use crate::cache::CacheRef;
use crate::cli::Opts;
use crate::config::Config;
use crate::push::{Pusher, PushConfig};
use crate::push::{PushConfig, Pusher};
use attic::nix_store::NixStore;
/// Push closures to a binary cache.
@ -77,7 +77,9 @@ pub async fn run(opts: Opts) -> Result<()> {
let mp = MultiProgress::new();
let pusher = Pusher::new(store, api, cache.to_owned(), cache_config, mp, push_config);
let plan = pusher.plan(roots, sub.no_closure, sub.ignore_upstream_cache_filter).await?;
let plan = pusher
.plan(roots, sub.no_closure, sub.ignore_upstream_cache_filter)
.await?;
if plan.store_path_map.is_empty() {
if plan.num_all_paths == 0 {
@ -106,7 +108,10 @@ pub async fn run(opts: Opts) -> Result<()> {
}
let results = pusher.wait().await;
results.into_iter().map(|(_, result)| result).collect::<Result<Vec<()>>>()?;
results
.into_iter()
.map(|(_, result)| result)
.collect::<Result<Vec<()>>>()?;
Ok(())
}

View file

@ -4,13 +4,13 @@ use std::sync::Arc;
use anyhow::{anyhow, Result};
use clap::Parser;
use indicatif::MultiProgress;
use notify::{RecursiveMode, Watcher, EventKind};
use notify::{EventKind, RecursiveMode, Watcher};
use crate::api::ApiClient;
use crate::cache::CacheRef;
use crate::cli::Opts;
use crate::config::Config;
use crate::push::{Pusher, PushConfig, PushSessionConfig};
use crate::push::{PushConfig, PushSessionConfig, Pusher};
use attic::nix_store::{NixStore, StorePath};
/// Watch the Nix Store for new paths and upload them to a binary cache.
@ -72,8 +72,15 @@ pub async fn run(opts: Opts) -> Result<()> {
};
let mp = MultiProgress::new();
let session = Pusher::new(store.clone(), api, cache.to_owned(), cache_config, mp, push_config)
.into_push_session(push_session_config);
let session = Pusher::new(
store.clone(),
api,
cache.to_owned(),
cache_config,
mp,
push_config,
)
.into_push_session(push_session_config);
let mut watcher = notify::recommended_watcher(move |res: notify::Result<notify::Event>| {
match res {
@ -81,7 +88,8 @@ pub async fn run(opts: Opts) -> Result<()> {
// We watch the removals of lock files which signify
// store paths becoming valid
if let EventKind::Remove(_) = event.kind {
let paths = event.paths
let paths = event
.paths
.iter()
.filter_map(|p| {
let base = strip_lock_file(&p)?;
@ -100,13 +108,13 @@ pub async fn run(opts: Opts) -> Result<()> {
watcher.watch(&store_dir, RecursiveMode::NonRecursive)?;
eprintln!("👀 Pushing new store paths to \"{cache}\" on \"{server}\"",
eprintln!(
"👀 Pushing new store paths to \"{cache}\" on \"{server}\"",
cache = cache.as_str(),
server = server_name.as_str(),
);
loop {
}
loop {}
}
fn strip_lock_file(p: &Path) -> Option<PathBuf> {

View file

@ -25,19 +25,19 @@ use std::time::{Duration, Instant};
use anyhow::{anyhow, Result};
use async_channel as channel;
use bytes::Bytes;
use futures::stream::{Stream, TryStreamExt};
use futures::future::join_all;
use futures::stream::{Stream, TryStreamExt};
use indicatif::{HumanBytes, MultiProgress, ProgressBar, ProgressState, ProgressStyle};
use tokio::task::{JoinHandle, spawn};
use tokio::time;
use tokio::sync::Mutex;
use tokio::task::{spawn, JoinHandle};
use tokio::time;
use crate::api::ApiClient;
use attic::api::v1::cache_config::CacheConfig;
use attic::api::v1::upload_path::{UploadPathNarInfo, UploadPathResult, UploadPathResultKind};
use attic::cache::CacheName;
use attic::error::AtticResult;
use attic::nix_store::{NixStore, StorePath, StorePathHash, ValidPathInfo};
use crate::api::ApiClient;
type JobSender = channel::Sender<ValidPathInfo>;
type JobReceiver = channel::Receiver<ValidPathInfo>;
@ -131,7 +131,14 @@ struct NarStreamProgress<S> {
}
impl Pusher {
pub fn new(store: Arc<NixStore>, api: ApiClient, cache: CacheName, cache_config: CacheConfig, mp: MultiProgress, config: PushConfig) -> Self {
pub fn new(
store: Arc<NixStore>,
api: ApiClient,
cache: CacheName,
cache_config: CacheConfig,
mp: MultiProgress,
config: PushConfig,
) -> Self {
let (sender, receiver) = channel::unbounded();
let mut workers = Vec::new();
@ -146,13 +153,19 @@ impl Pusher {
)));
}
Self { api, store, cache, cache_config, workers, sender }
Self {
api,
store,
cache,
cache_config,
workers,
sender,
}
}
/// Queues a store path to be pushed.
pub async fn queue(&self, path_info: ValidPathInfo) -> Result<()> {
self.sender.send(path_info).await
.map_err(|e| anyhow!(e))
self.sender.send(path_info).await.map_err(|e| anyhow!(e))
}
/// Waits for all workers to terminate, returning all results.
@ -174,7 +187,12 @@ impl Pusher {
}
/// Creates a push plan.
pub async fn plan(&self, roots: Vec<StorePath>, no_closure: bool, ignore_upstream_filter: bool) -> Result<PushPlan> {
pub async fn plan(
&self,
roots: Vec<StorePath>,
no_closure: bool,
ignore_upstream_filter: bool,
) -> Result<PushPlan> {
PushPlan::plan(
self.store.clone(),
&self.api,
@ -183,7 +201,8 @@ impl Pusher {
roots,
no_closure,
ignore_upstream_filter,
).await
)
.await
}
/// Converts the pusher into a `PushSession`.
@ -223,7 +242,8 @@ impl Pusher {
&cache,
mp.clone(),
config.force_preamble,
).await;
)
.await;
results.insert(store_path, r);
}
@ -247,7 +267,9 @@ impl PushSession {
config.clone(),
known_paths_mutex.clone(),
receiver.clone(),
).await {
)
.await
{
eprintln!("Worker exited: {:?}", e);
} else {
break;
@ -255,9 +277,7 @@ impl PushSession {
}
});
Self {
sender,
}
Self { sender }
}
async fn worker(
@ -305,12 +325,19 @@ impl PushSession {
// Compute push plan
let roots_vec: Vec<StorePath> = {
let known_paths = known_paths_mutex.lock().await;
roots.drain()
roots
.drain()
.filter(|root| !known_paths.contains(&root.to_hash()))
.collect()
};
let mut plan = pusher.plan(roots_vec, config.no_closure, config.ignore_upstream_cache_filter).await?;
let mut plan = pusher
.plan(
roots_vec,
config.no_closure,
config.ignore_upstream_cache_filter,
)
.await?;
let mut known_paths = known_paths_mutex.lock().await;
plan.store_path_map
@ -332,7 +359,8 @@ impl PushSession {
/// Queues multiple store paths to be pushed.
pub fn queue_many(&self, store_paths: Vec<StorePath>) -> Result<()> {
self.sender.send_blocking(store_paths)
self.sender
.send_blocking(store_paths)
.map_err(|e| anyhow!(e))
}
}
@ -387,8 +415,10 @@ impl PushPlan {
if !ignore_upstream_filter {
// Filter out paths signed by upstream caches
let upstream_cache_key_names =
cache_config.upstream_cache_key_names.as_ref().map_or([].as_slice(), |v| v.as_slice());
let upstream_cache_key_names = cache_config
.upstream_cache_key_names
.as_ref()
.map_or([].as_slice(), |v| v.as_slice());
store_path_map.retain(|_, pi| {
for sig in &pi.sigs {
if let Some((name, _)) = sig.split_once(':') {