feat(operations): add support for git pushing the channel

This time, we have almost a complete mechanism!

Signed-off-by: Raito Bezarius <masterancpp@gmail.com>
This commit is contained in:
raito 2024-12-14 18:10:43 +01:00
parent a9ee947914
commit a08351a9d4
5 changed files with 217 additions and 37 deletions

81
Cargo.lock generated
View file

@ -157,7 +157,7 @@ checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.77",
"syn 2.0.90",
]
[[package]]
@ -168,7 +168,7 @@ checksum = "6e0c28dcc82d7c8ead5cb13beb15405b57b8546e93215673ff8ca0349a028107"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.77",
"syn 2.0.90",
]
[[package]]
@ -399,7 +399,7 @@ dependencies = [
"heck 0.5.0",
"proc-macro2",
"quote",
"syn 2.0.77",
"syn 2.0.90",
]
[[package]]
@ -498,7 +498,7 @@ checksum = "f46882e17999c6cc590af592290432be3bce0428cb0d5f8b6715e4dc7b383eb3"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.77",
"syn 2.0.90",
]
[[package]]
@ -578,7 +578,7 @@ checksum = "ba7795da175654fe16979af73f81f26a8ea27638d8d9823d317016888a63dc4c"
dependencies = [
"num-traits",
"quote",
"syn 2.0.77",
"syn 2.0.90",
]
[[package]]
@ -670,7 +670,7 @@ checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.77",
"syn 2.0.90",
]
[[package]]
@ -766,7 +766,7 @@ dependencies = [
"pest_derive",
"serde",
"serde_json",
"thiserror",
"thiserror 1.0.63",
]
[[package]]
@ -1190,6 +1190,7 @@ dependencies = [
"tempdir",
"termtree",
"textwrap",
"thiserror 2.0.7",
"tokio",
"tokio-stream",
"tokio-util",
@ -1222,7 +1223,7 @@ dependencies = [
"serde",
"serde_json",
"sha2",
"thiserror",
"thiserror 1.0.63",
"tokio",
"tracing",
]
@ -1234,7 +1235,7 @@ source = "git+https://github.com/tvlfyi/tvix#10c6435ae9e11ed2fbc2fa96886cb4d17a0
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.77",
"syn 2.0.90",
]
[[package]]
@ -1334,7 +1335,7 @@ dependencies = [
"js-sys",
"once_cell",
"pin-project-lite",
"thiserror",
"thiserror 1.0.63",
]
[[package]]
@ -1366,7 +1367,7 @@ dependencies = [
"prost",
"reqwest",
"serde_json",
"thiserror",
"thiserror 1.0.63",
"tokio",
"tonic",
]
@ -1401,7 +1402,7 @@ dependencies = [
"percent-encoding",
"rand 0.8.5",
"serde_json",
"thiserror",
"thiserror 1.0.63",
"tokio",
"tokio-stream",
]
@ -1448,7 +1449,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cd53dff83f26735fdc1ca837098ccf133605d794cdae66acfc2bfac3ec809d95"
dependencies = [
"memchr",
"thiserror",
"thiserror 1.0.63",
"ucd-trie",
]
@ -1472,7 +1473,7 @@ dependencies = [
"pest_meta",
"proc-macro2",
"quote",
"syn 2.0.77",
"syn 2.0.90",
]
[[package]]
@ -1503,7 +1504,7 @@ checksum = "2f38a4412a78282e09a2cf38d195ea5420d15ba0602cb375210efbc877243965"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.77",
"syn 2.0.90",
]
[[package]]
@ -1551,9 +1552,9 @@ dependencies = [
[[package]]
name = "proc-macro2"
version = "1.0.86"
version = "1.0.92"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5e719e8df665df0d1c8fbfd238015744736151d4445ec0836b8e628aae103b77"
checksum = "37d3544b3f2748c54e147655edb5025752e2303145b5aefb3c3ea2c78b973bb0"
dependencies = [
"unicode-ident",
]
@ -1578,7 +1579,7 @@ dependencies = [
"itertools",
"proc-macro2",
"quote",
"syn 2.0.77",
"syn 2.0.90",
]
[[package]]
@ -1604,7 +1605,7 @@ dependencies = [
"rustc-hash",
"rustls",
"socket2",
"thiserror",
"thiserror 1.0.63",
"tokio",
"tracing",
]
@ -1621,7 +1622,7 @@ dependencies = [
"rustc-hash",
"rustls",
"slab",
"thiserror",
"thiserror 1.0.63",
"tinyvec",
"tracing",
]
@ -1996,7 +1997,7 @@ checksum = "a5831b979fd7b5439637af1752d535ff49f4860c0f341d1baeb6faf0f4242170"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.77",
"syn 2.0.90",
]
[[package]]
@ -2170,9 +2171,9 @@ dependencies = [
[[package]]
name = "syn"
version = "2.0.77"
version = "2.0.90"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9f35bcdf61fd8e7be6caf75f429fdca8beb3ed76584befb503b1569faee373ed"
checksum = "919d3b74a5dd0ccd15aeb8f93e7006bd9e14c295087c9896a110f490752bcf31"
dependencies = [
"proc-macro2",
"quote",
@ -2227,7 +2228,16 @@ version = "1.0.63"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c0342370b38b6a11b6cc11d6a805569958d54cfa061a29969c3b5ce2ea405724"
dependencies = [
"thiserror-impl",
"thiserror-impl 1.0.63",
]
[[package]]
name = "thiserror"
version = "2.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "93605438cbd668185516ab499d589afb7ee1859ea3d5fc8f6b0755e1c7443767"
dependencies = [
"thiserror-impl 2.0.7",
]
[[package]]
@ -2238,7 +2248,18 @@ checksum = "a4558b58466b9ad7ca0f102865eccc95938dca1a74a856f2b57b6629050da261"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.77",
"syn 2.0.90",
]
[[package]]
name = "thiserror-impl"
version = "2.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e1d8749b4531af2117677a5fcd12b1348a3fe2b81e36e61ffeac5c4aa3273e36"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.90",
]
[[package]]
@ -2292,7 +2313,7 @@ checksum = "693d596312e88961bc67d7f1f97af8a70227d9f90c31bba5806eec004978d752"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.77",
"syn 2.0.90",
]
[[package]]
@ -2445,7 +2466,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.77",
"syn 2.0.90",
]
[[package]]
@ -2638,7 +2659,7 @@ dependencies = [
"once_cell",
"proc-macro2",
"quote",
"syn 2.0.77",
"syn 2.0.90",
"wasm-bindgen-shared",
]
@ -2672,7 +2693,7 @@ checksum = "afc340c74d9005395cf9dd098506f7f44e38f2b4a21c6aaacf9a105ea5e1e836"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.77",
"syn 2.0.90",
"wasm-bindgen-backend",
"wasm-bindgen-shared",
]
@ -2895,7 +2916,7 @@ checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.77",
"syn 2.0.90",
]
[[package]]

View file

@ -37,3 +37,4 @@ opentelemetry = "0.24.0"
opentelemetry_sdk = { version = "0.24.1", features = ["rt-tokio", "trace"] }
tracing-opentelemetry = { version = "0.25.0", features = ["metrics"] }
opentelemetry-otlp = { version = "0.17.0", features = ["http-json", "reqwest-client", "reqwest-rustls"] }
thiserror = "2.0.7"

View file

@ -11,8 +11,8 @@ pub struct MirrorConfig {
/// Base URI to access to a web interface pertaining to a specific Git revision of the nixpkgs
/// input
pub base_git_uri_for_revision: String,
/// A path to a checkout of nixpkgs
nixpkgs_dir: PathBuf,
/// A path to a checkout of the target repository
pub repo_dir: PathBuf,
/// S3 releases bucket name
pub s3_release_bucket_name: String,
/// S3 channels bucket name

View file

@ -1 +1,131 @@
/// Very simple wrappers for Git shelling out.
use std::path::PathBuf;
use std::process::ExitStatus;
use std::io;
use thiserror::Error;
use tokio::process::Command;
use tokio::io::{BufReader, AsyncBufReadExt};
use tokio::sync::mpsc::{channel, Receiver};
use tokio::task::JoinHandle;
#[derive(Error, Debug)]
pub enum GitError {
#[error("Git command failed with status: {0}")]
CommandFailed(ExitStatus),
#[error("IO error occurred: {0}")]
IoError(#[from] io::Error),
#[error("Failed to capture {0}")]
StreamCaptureError(String),
#[error("Failed to wait for process to complete")]
WaitError,
}
/// Encapsulates the result of a Git command execution
pub struct GitCommand {
pub stdout: Receiver<String>,
pub stderr: Receiver<String>,
pub handle: JoinHandle<Result<(), GitError>>,
}
impl GitCommand {
/// Waits for the Git command to complete and returns the result.
pub async fn wait(self) -> Result<(), GitError> {
self.handle.await.unwrap_or_else(|_| Err(GitError::WaitError))
}
}
/// Git operations structure
pub struct Git {
repo: PathBuf,
remote: String,
}
impl Git {
pub fn new(repo: impl Into<PathBuf>, remote: impl Into<String>) -> Self {
Self {
repo: repo.into(),
remote: remote.into(),
}
}
pub fn default(repo: impl Into<PathBuf>) -> Self {
Self::new(repo, "origin")
}
pub async fn push(&self, rev: &str, channel_name: &str) -> Result<GitCommand, GitError> {
// Run "git remote update"
self.run_git_command(&["remote", "update", &self.remote]).await?;
// Prepare the "git push" command
let push_args = ["push", &self.remote, &format!("{rev}:refs/heads/{channel_name}")];
let command = self.run_git_command_with_streams(&push_args).await?;
Ok(command)
}
async fn run_git_command(&self, args: &[&str]) -> Result<(), GitError> {
let status = Command::new("git")
.args(args)
.current_dir(&self.repo)
.stderr(std::process::Stdio::inherit())
.stdout(std::process::Stdio::inherit())
.status()
.await?;
if !status.success() {
return Err(GitError::CommandFailed(status));
}
Ok(())
}
async fn run_git_command_with_streams(
&self,
args: &[&str],
) -> Result<GitCommand, GitError> {
let mut child = Command::new("git")
.args(args)
.current_dir(&self.repo)
.stderr(std::process::Stdio::piped())
.stdout(std::process::Stdio::piped())
.spawn()?;
let stdout = child.stdout.take().ok_or_else(|| GitError::StreamCaptureError("stdout".into()))?;
let stderr = child.stderr.take().ok_or_else(|| GitError::StreamCaptureError("stderr".into()))?;
let stdout_rx = Self::stream_output(stdout);
let stderr_rx = Self::stream_output(stderr);
// Spawn a task to wait for the command's completion
let handle = tokio::spawn(async move {
let status = child.wait().await.map_err(|e| GitError::IoError(e))?;
if !status.success() {
Err(GitError::CommandFailed(status))
} else {
Ok(())
}
});
Ok(GitCommand {
stdout: stdout_rx,
stderr: stderr_rx,
handle,
})
}
fn stream_output(
stream: impl tokio::io::AsyncRead + Unpin + Send + 'static,
) -> Receiver<String> {
let (tx, rx) = channel(100);
let reader = BufReader::new(stream);
tokio::spawn(async move {
let mut lines = reader.lines();
while let Ok(Some(line)) = lines.next_line().await {
if tx.send(line).await.is_err() {
break;
}
}
});
rx
}
}

View file

@ -115,6 +115,7 @@ async fn run_preflight_checks(channel: &Channel, release: &Release, evaluation:
true
}
#[derive(Debug)]
enum PlanResult {
AlreadyExecuted {
remote_path: object_store::path::Path
@ -129,6 +130,12 @@ enum PlanResult {
},
}
#[derive(Debug)]
struct PlanMetadata {
/// Git SHA1 induced by this plan
revision: String
}
impl From<PlanInterpretation> for PlanResult {
fn from(value: PlanInterpretation) -> Self {
match value {
@ -160,7 +167,7 @@ async fn move_directory<OS: object_store::ObjectStore>(client: OS, from: &object
}
#[tracing::instrument]
async fn plan(hydra_client: &HydraClient<'_>, channel_name: &str, job_name: String, global_opts: &GlobalOpts, config: &config::MirrorConfig) -> std::io::Result<PlanResult> {
async fn plan(hydra_client: &HydraClient<'_>, channel_name: &str, job_name: String, global_opts: &GlobalOpts, config: &config::MirrorConfig) -> std::io::Result<(PlanResult, PlanMetadata)> {
let chan = Channel {
name: channel_name.to_string()
};
@ -183,13 +190,16 @@ async fn plan(hydra_client: &HydraClient<'_>, channel_name: &str, job_name: Stri
release.nix_name, release.id, evaluation.id, release.prefix(&chan), revision);
let release_bucket = config.release_bucket();
let metadata = PlanMetadata {
revision: revision.to_owned()
};
// If the release already exists, skip it.
if release_bucket.head(&release.prefix(&chan)).await.is_ok() {
tracing::warn!("Release already exists, skipping");
return Ok(PlanResult::AlreadyExecuted {
return Ok((PlanResult::AlreadyExecuted {
remote_path: release.prefix(&chan)
});
}, metadata));
}
let channel_bucket = config.channel_bucket();
@ -206,7 +216,7 @@ async fn plan(hydra_client: &HydraClient<'_>, channel_name: &str, job_name: Stri
}, plan_options).await;
info!("Plan interpreted: {:?}", interpretation);
Ok(interpretation.into())
Ok((interpretation.into(), metadata))
} else {
tracing::error!("Preflight check failed, cannot continue, pass `--force` if you want to bypass preflight checks");
Err(std::io::Error::other("preflight check failure"))
@ -216,6 +226,18 @@ async fn plan(hydra_client: &HydraClient<'_>, channel_name: &str, job_name: Stri
}
}
#[tracing::instrument]
async fn sync_channel_over_git(repo: &PathBuf, plan_metadata: &PlanMetadata, branch_name: &str) -> Result<(), ()> {
let repo = git::Git::default(repo);
// TODO: better error handling
Ok(repo.push(&plan_metadata.revision, &branch_name).await
.expect("Failed to prepare the push")
.wait()
.await
.expect("Failed to push"))
}
#[tokio::main]
async fn main() -> std::io::Result<()> {
@ -253,7 +275,7 @@ async fn main() -> std::io::Result<()> {
plan(&hydra_client, &channel.channel_name, channel.job_name, &args.global_opts, &config).await?;
},
Commands::Apply(channel) => {
let plan = plan(&hydra_client, &channel.channel_name, channel.job_name, &args.global_opts, &config).await?;
let (plan, metadata) = plan(&hydra_client, &channel.channel_name, channel.job_name, &args.global_opts, &config).await?;
match plan {
PlanResult::AlreadyExecuted { .. } => {
@ -267,6 +289,9 @@ async fn main() -> std::io::Result<()> {
.await.expect("Failed to atomically promote the streamed channel, aborting");
}
}
sync_channel_over_git(&config.repo_dir, &metadata, &channel.channel_name).await.expect("Failed to synchronize the Git repository for this channel");
// TODO: redirects all URIs
},
Commands::ApplyRemote(remote) => {
let release = config.release_bucket();
@ -274,6 +299,9 @@ async fn main() -> std::io::Result<()> {
move_directory(config.release_bucket(), &remote.staging_prefix.try_into().unwrap(), &remote.channel_name.try_into().unwrap())
.await.expect("Failed to atomically rename the staging prefix into the channel name");
todo!("Remote apply does not support synchronizing the Git repository yet. We need to fetch the remote revision and channel name to perform it.");
// TODO: redirects all URIs
},
Commands::CleanupStreamedPrefixes => {
let channel = config.channel_bucket();