fix(s3): perform the painful rename manually

god, s3 semantics are.

Signed-off-by: Raito Bezarius <masterancpp@gmail.com>
This commit is contained in:
raito 2024-08-31 19:48:22 +02:00
parent cb5a2a2b07
commit 3934bcbb39

View file

@ -10,6 +10,7 @@ use std::path::PathBuf;
use actions::{interpret_plan, Plan, PlanInterpretation, PlanOptions};
use clap::{Subcommand, Parser, Args};
use hydra::{Channel, Evaluation, HydraClient, Release};
use tokio_stream::StreamExt;
use tracing::{info, trace, warn};
use object_store::{aws::AmazonS3, ObjectStore};
use tempdir::TempDir;
@ -140,6 +141,21 @@ impl From<PlanInterpretation> for PlanResult {
}
}
#[tracing::instrument]
async fn move_directory<OS: object_store::ObjectStore>(client: OS, from: &object_store::path::Path, to: &object_store::path::Path) -> std::io::Result<()> {
let prefix = from.to_string();
while let Some(object) = client.list(Some(from)).try_next().await? {
let source = object.location.to_string();
let stripped_object_path = source.strip_prefix(&prefix).expect(
"Failed to strip the `from` prefix during move directory"
);
let target = format!("{}/{}", to.to_string(), stripped_object_path);
client.rename(&object.location, &target.into()).await?;
}
Ok(())
}
#[tracing::instrument]
async fn plan(hydra_client: &HydraClient<'_>, channel_name: &str, job_name: String, global_opts: &GlobalOpts, config: &config::MirrorConfig) -> std::io::Result<PlanResult> {
let chan = Channel {
@ -244,7 +260,7 @@ async fn main() -> std::io::Result<()> {
PlanResult::StreamedRemotely { remote_path, target_path } => {
info!("Plan was streamed remotely successfully to '{}@{}', replacing {}", config.s3_release_bucket_name, remote_path, target_path);
config.release_bucket().rename(&remote_path, &target_path)
move_directory(config.release_bucket(), &remote_path, &target_path)
.await.expect("Failed to atomically promote the streamed channel, aborting");
}
}
@ -253,7 +269,7 @@ async fn main() -> std::io::Result<()> {
let release = config.release_bucket();
// TODO: fetch the release and obtain the right name.
release.rename(&remote.staging_prefix.try_into().unwrap(), &remote.channel_name.try_into().unwrap())
move_directory(config.release_bucket(), &remote.staging_prefix.try_into().unwrap(), &remote.channel_name.try_into().unwrap())
.await.expect("Failed to atomically rename the staging prefix into the channel name");
}
}