diff --git a/src/main.rs b/src/main.rs index a618d7a..dab925c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -10,6 +10,7 @@ use std::path::PathBuf; use actions::{interpret_plan, Plan, PlanInterpretation, PlanOptions}; use clap::{Subcommand, Parser, Args}; use hydra::{Channel, Evaluation, HydraClient, Release}; +use tokio_stream::StreamExt; use tracing::{info, trace, warn}; use object_store::{aws::AmazonS3, ObjectStore}; use tempdir::TempDir; @@ -140,6 +141,21 @@ impl From for PlanResult { } } +#[tracing::instrument] +async fn move_directory(client: OS, from: &object_store::path::Path, to: &object_store::path::Path) -> std::io::Result<()> { + let prefix = from.to_string(); + while let Some(object) = client.list(Some(from)).try_next().await? { + let source = object.location.to_string(); + let stripped_object_path = source.strip_prefix(&prefix).expect( + "Failed to strip the `from` prefix during move directory" + ); + let target = format!("{}/{}", to.to_string(), stripped_object_path); + client.rename(&object.location, &target.into()).await?; + } + + Ok(()) +} + #[tracing::instrument] async fn plan(hydra_client: &HydraClient<'_>, channel_name: &str, job_name: String, global_opts: &GlobalOpts, config: &config::MirrorConfig) -> std::io::Result { let chan = Channel { @@ -244,7 +260,7 @@ async fn main() -> std::io::Result<()> { PlanResult::StreamedRemotely { remote_path, target_path } => { info!("Plan was streamed remotely successfully to '{}@{}', replacing {}", config.s3_release_bucket_name, remote_path, target_path); - config.release_bucket().rename(&remote_path, &target_path) + move_directory(config.release_bucket(), &remote_path, &target_path) .await.expect("Failed to atomically promote the streamed channel, aborting"); } } @@ -253,7 +269,7 @@ async fn main() -> std::io::Result<()> { let release = config.release_bucket(); // TODO: fetch the release and obtain the right name. - release.rename(&remote.staging_prefix.try_into().unwrap(), &remote.channel_name.try_into().unwrap()) + move_directory(config.release_bucket(), &remote.staging_prefix.try_into().unwrap(), &remote.channel_name.try_into().unwrap()) .await.expect("Failed to atomically rename the staging prefix into the channel name"); } }