From 6a064f904ea1e0ab9c425f25f409b7a5852cae0b Mon Sep 17 00:00:00 2001 From: Zhaofeng Li Date: Tue, 4 Apr 2023 16:25:05 -0600 Subject: [PATCH] client/watch_store: Refactor main loop --- client/src/command/watch_store.rs | 30 +++++++++++++++++------------- 1 file changed, 17 insertions(+), 13 deletions(-) diff --git a/client/src/command/watch_store.rs b/client/src/command/watch_store.rs index b577dd3..949cba2 100644 --- a/client/src/command/watch_store.rs +++ b/client/src/command/watch_store.rs @@ -1,11 +1,11 @@ use std::path::{Path, PathBuf}; use std::sync::Arc; -use std::{thread, time}; use anyhow::{anyhow, Result}; use clap::Parser; use indicatif::MultiProgress; use notify::{EventKind, RecursiveMode, Watcher}; +use tokio::sync::mpsc; use crate::api::ApiClient; use crate::cache::CacheRef; @@ -83,7 +83,21 @@ pub async fn run(opts: Opts) -> Result<()> { ) .into_push_session(push_session_config); + let (tx, mut rx) = mpsc::unbounded_channel(); + let mut watcher = notify::recommended_watcher(move |res: notify::Result| { + tx.send(res).unwrap(); + })?; + + watcher.watch(&store_dir, RecursiveMode::NonRecursive)?; + + eprintln!( + "👀 Pushing new store paths to \"{cache}\" on \"{server}\"", + cache = cache.as_str(), + server = server_name.as_str(), + ); + + while let Some(res) = rx.recv().await { match res { Ok(event) => { // We watch the removals of lock files which signify @@ -105,19 +119,9 @@ pub async fn run(opts: Opts) -> Result<()> { } Err(e) => eprintln!("Error during watch: {:?}", e), } - })?; - - watcher.watch(&store_dir, RecursiveMode::NonRecursive)?; - - eprintln!( - "👀 Pushing new store paths to \"{cache}\" on \"{server}\"", - cache = cache.as_str(), - server = server_name.as_str(), - ); - - loop { - thread::sleep(time::Duration::from_secs(60)); } + + Ok(()) } fn strip_lock_file(p: &Path) -> Option {