Compare commits

..

6 commits

Author SHA1 Message Date
Nikodem Rabuliński f0a8400f3d
Update dependencies 2024-08-14 20:33:44 +02:00
Nikodem Rabuliński 0334b8ab2a
Migrate to lix 2024-08-14 19:36:01 +02:00
Zhaofeng Li 26b9417bde
Merge pull request #158 from zhaofengli/upsert-object-on-conflict
server: Upsert object row on conflict
2024-08-13 10:21:27 -04:00
Zhaofeng Li 443ceac40f server: Upsert object row on conflict
Upsert instead of doing delete+insert or ignoring the specific error.

Fixes #132.
2024-08-13 07:39:38 -06:00
Zhaofeng Li e127acbf9a
Merge pull request #154 from cole-h/fixup-stream-error-logging
fixup: stream error logging
2024-07-31 20:41:06 -04:00
Cole Helbling 903fb4e39e fixup: stream error logging
The call to `into_inner()` discards the wrapper type constructed by
`map_err()`. So instead, `map_err()` the actual stream, and call
`Body::from_stream` on the wrapped stream.
2024-07-26 10:21:52 -07:00
7 changed files with 67 additions and 55 deletions

View file

@ -44,7 +44,7 @@ CPathInfo::CPathInfo(nix::ref<const nix::ValidPathInfo> pi) : pi(pi) {}
RHashSlice CPathInfo::nar_sha256_hash() { RHashSlice CPathInfo::nar_sha256_hash() {
auto &hash = this->pi->narHash; auto &hash = this->pi->narHash;
if (hash.type != nix::htSHA256) { if (hash.type != nix::HashType::SHA256) {
throw nix::Error("Only SHA-256 hashes are supported at the moment"); throw nix::Error("Only SHA-256 hashes are supported at the moment");
} }

View file

@ -13,14 +13,14 @@
#include <memory> #include <memory>
#include <mutex> #include <mutex>
#include <set> #include <set>
#include <libstore/store-api.hh> #include <lix/libstore/store-api.hh>
#include <libstore/local-store.hh> #include <lix/libstore/local-store.hh>
#include <libstore/remote-store.hh> #include <lix/libstore/remote-store.hh>
#include <libstore/uds-remote-store.hh> #include <lix/libstore/uds-remote-store.hh>
#include <libutil/hash.hh> #include <lix/libutil/hash.hh>
#include <libstore/path.hh> #include <lix/libstore/path.hh>
#include <libutil/serialise.hh> #include <lix/libutil/serialise.hh>
#include <libmain/shared.hh> #include <lix/libmain/shared.hh>
#include <rust/cxx.h> #include <rust/cxx.h>
template<class T> using RVec = rust::Vec<T>; template<class T> using RVec = rust::Vec<T>;

View file

@ -7,11 +7,11 @@
] ]
}, },
"locked": { "locked": {
"lastModified": 1721322122, "lastModified": 1722960479,
"narHash": "sha256-a0G1NvyXGzdwgu6e1HQpmK5R5yLsfxeBe07nNDyYd+g=", "narHash": "sha256-NhCkJJQhD5GUib8zN9JrmYGMwt4lCRp6ZVNzIiYCl0Y=",
"owner": "ipetkov", "owner": "ipetkov",
"repo": "crane", "repo": "crane",
"rev": "8a68b987c476a33e90f203f0927614a75c3f47ea", "rev": "4c6c77920b8d44cd6660c1621dea6b3fc4b4c4f4",
"type": "github" "type": "github"
}, },
"original": { "original": {
@ -90,17 +90,17 @@
"lix": { "lix": {
"flake": false, "flake": false,
"locked": { "locked": {
"lastModified": 1721371213, "lastModified": 1723577950,
"narHash": "sha256-7SdrlNe5DBlK5uLBhPPxVRWI50N1PFz3zMBeDYiX0Qs=", "narHash": "sha256-kOpGI9WPmte1L4QWHviuXsr8jxmGn27zwi82jtzYObM=",
"ref": "refs/heads/main", "ref": "refs/heads/main",
"rev": "aba5f19680b2f4c29d7ce2ff5e2a89128c1cb26d", "rev": "b016eb0895bb6714a4f6530d9a2bb6577ac6c3cf",
"revCount": 15985, "revCount": 16134,
"type": "git", "type": "git",
"url": "ssh://git@git.lix.systems/lix-project/lix" "url": "https://git.lix.systems/lix-project/lix"
}, },
"original": { "original": {
"type": "git", "type": "git",
"url": "ssh://git@git.lix.systems/lix-project/lix" "url": "https://git.lix.systems/lix-project/lix"
} }
}, },
"lix-module": { "lix-module": {
@ -115,26 +115,26 @@
] ]
}, },
"locked": { "locked": {
"lastModified": 1720695775, "lastModified": 1723511483,
"narHash": "sha256-8Oqzl9QPjEe/n8y0R2tC6+2v/H6xBgABHXOJwxmnBg0=", "narHash": "sha256-rT/OkVXKkns2YvyF1nFvl+8Gc3sld1c1sXPtGkbqaDY=",
"ref": "refs/heads/main", "ref": "refs/heads/main",
"rev": "d70318fb946a0e720dfdd1fb10b0645c14e2a02a", "rev": "cecf70b77539c1a593f60ec9d0305b5e537ab6a9",
"revCount": 94, "revCount": 106,
"type": "git", "type": "git",
"url": "ssh://git@git.lix.systems/lix-project/nixos-module" "url": "https://git.lix.systems/lix-project/nixos-module"
}, },
"original": { "original": {
"type": "git", "type": "git",
"url": "ssh://git@git.lix.systems/lix-project/nixos-module" "url": "https://git.lix.systems/lix-project/nixos-module"
} }
}, },
"nixpkgs": { "nixpkgs": {
"locked": { "locked": {
"lastModified": 1721373214, "lastModified": 1723603349,
"narHash": "sha256-crpGeGQGFlnCsMyCE5eheyjzo3xo03o1FXJ2sAbm7No=", "narHash": "sha256-VMg6N7MryOuvSJ8Sj6YydarnUCkL7cvMdrMcnsJnJCE=",
"owner": "NixOS", "owner": "NixOS",
"repo": "nixpkgs", "repo": "nixpkgs",
"rev": "af9c15bc7a314c226d7d5d85e159f7a73e8d9fae", "rev": "daf7bb95821b789db24fc1ac21f613db0c1bf2cb",
"type": "github" "type": "github"
}, },
"original": { "original": {

View file

@ -4,11 +4,11 @@
inputs = { inputs = {
nixpkgs.url = "github:NixOS/nixpkgs/nixpkgs-unstable"; nixpkgs.url = "github:NixOS/nixpkgs/nixpkgs-unstable";
lix = { lix = {
url = "git+ssh://git@git.lix.systems/lix-project/lix"; url = "git+https://git.lix.systems/lix-project/lix";
flake = false; flake = false;
}; };
lix-module = { lix-module = {
url = "git+ssh://git@git.lix.systems/lix-project/nixos-module"; url = "git+https://git.lix.systems/lix-project/nixos-module";
inputs.nixpkgs.follows = "nixpkgs"; inputs.nixpkgs.follows = "nixpkgs";
inputs.lix.follows = "lix"; inputs.lix.follows = "lix";
}; };

View file

@ -18,7 +18,7 @@ use axum::{
Router, Router,
}; };
use futures::stream::BoxStream; use futures::stream::BoxStream;
use http_body_util::BodyExt; use futures::TryStreamExt as _;
use serde::Serialize; use serde::Serialize;
use tokio_util::io::ReaderStream; use tokio_util::io::ReaderStream;
use tracing::instrument; use tracing::instrument;
@ -217,11 +217,11 @@ async fn get_nar(
match storage.download_file_db(remote_file, false).await? { match storage.download_file_db(remote_file, false).await? {
Download::Url(url) => Ok(Redirect::temporary(&url).into_response()), Download::Url(url) => Ok(Redirect::temporary(&url).into_response()),
Download::AsyncRead(stream) => { Download::AsyncRead(stream) => {
let stream = ReaderStream::new(stream); let stream = ReaderStream::new(stream).map_err(|e| {
let body = Body::from_stream(stream).map_err(|e| {
tracing::error!("Stream error: {e}"); tracing::error!("Stream error: {e}");
e e
}).into_inner(); });
let body = Body::from_stream(stream);
Ok(body.into_response()) Ok(body.into_response())
} }
@ -254,11 +254,11 @@ async fn get_nar(
// TODO: Make num_prefetch configurable // TODO: Make num_prefetch configurable
// The ideal size depends on the average chunk size // The ideal size depends on the average chunk size
let merged = merge_chunks(chunks, streamer, storage, 2); let merged = merge_chunks(chunks, streamer, storage, 2).map_err(|e| {
let body = Body::from_stream(merged).map_err(|e| {
tracing::error!("Stream error: {e}"); tracing::error!("Stream error: {e}");
e e
}).into_inner(); });
let body = Body::from_stream(merged);
Ok(body.into_response()) Ok(body.into_response())
} }

View file

@ -46,7 +46,7 @@ use crate::database::entity::cache;
use crate::database::entity::chunk::{self, ChunkState, Entity as Chunk}; use crate::database::entity::chunk::{self, ChunkState, Entity as Chunk};
use crate::database::entity::chunkref::{self, Entity as ChunkRef}; use crate::database::entity::chunkref::{self, Entity as ChunkRef};
use crate::database::entity::nar::{self, Entity as Nar, NarState}; use crate::database::entity::nar::{self, Entity as Nar, NarState};
use crate::database::entity::object::{self, Entity as Object}; use crate::database::entity::object::{self, Entity as Object, InsertExt};
use crate::database::entity::Json as DbJson; use crate::database::entity::Json as DbJson;
use crate::database::{AtticDatabase, ChunkGuard, NarGuard}; use crate::database::{AtticDatabase, ChunkGuard, NarGuard};
@ -257,12 +257,6 @@ async fn upload_path_dedup(
.map_err(ServerError::database_error)?; .map_err(ServerError::database_error)?;
// Create a mapping granting the local cache access to the NAR // Create a mapping granting the local cache access to the NAR
Object::delete_many()
.filter(object::Column::CacheId.eq(cache.id))
.filter(object::Column::StorePathHash.eq(upload_info.store_path_hash.to_string()))
.exec(&txn)
.await
.map_err(ServerError::database_error)?;
Object::insert({ Object::insert({
let mut new_object = upload_info.to_active_model(); let mut new_object = upload_info.to_active_model();
new_object.cache_id = Set(cache.id); new_object.cache_id = Set(cache.id);
@ -271,6 +265,7 @@ async fn upload_path_dedup(
new_object.created_by = Set(username); new_object.created_by = Set(username);
new_object new_object
}) })
.on_conflict_do_update()
.exec(&txn) .exec(&txn)
.await .await
.map_err(ServerError::database_error)?; .map_err(ServerError::database_error)?;
@ -487,12 +482,6 @@ async fn upload_path_new_chunked(
.map_err(ServerError::database_error)?; .map_err(ServerError::database_error)?;
// Create a mapping granting the local cache access to the NAR // Create a mapping granting the local cache access to the NAR
Object::delete_many()
.filter(object::Column::CacheId.eq(cache.id))
.filter(object::Column::StorePathHash.eq(upload_info.store_path_hash.to_string()))
.exec(&txn)
.await
.map_err(ServerError::database_error)?;
Object::insert({ Object::insert({
let mut new_object = upload_info.to_active_model(); let mut new_object = upload_info.to_active_model();
new_object.cache_id = Set(cache.id); new_object.cache_id = Set(cache.id);
@ -501,6 +490,7 @@ async fn upload_path_new_chunked(
new_object.created_by = Set(username); new_object.created_by = Set(username);
new_object new_object
}) })
.on_conflict_do_update()
.exec(&txn) .exec(&txn)
.await .await
.map_err(ServerError::database_error)?; .map_err(ServerError::database_error)?;
@ -594,12 +584,6 @@ async fn upload_path_new_unchunked(
.map_err(ServerError::database_error)?; .map_err(ServerError::database_error)?;
// Create a mapping granting the local cache access to the NAR // Create a mapping granting the local cache access to the NAR
Object::delete_many()
.filter(object::Column::CacheId.eq(cache.id))
.filter(object::Column::StorePathHash.eq(upload_info.store_path_hash.to_string()))
.exec(&txn)
.await
.map_err(ServerError::database_error)?;
Object::insert({ Object::insert({
let mut new_object = upload_info.to_active_model(); let mut new_object = upload_info.to_active_model();
new_object.cache_id = Set(cache.id); new_object.cache_id = Set(cache.id);
@ -608,6 +592,7 @@ async fn upload_path_new_unchunked(
new_object.created_by = Set(username); new_object.created_by = Set(username);
new_object new_object
}) })
.on_conflict_do_update()
.exec(&txn) .exec(&txn)
.await .await
.map_err(ServerError::database_error)?; .map_err(ServerError::database_error)?;

View file

@ -6,6 +6,8 @@ use std::path::PathBuf;
use std::str::FromStr; use std::str::FromStr;
use sea_orm::entity::prelude::*; use sea_orm::entity::prelude::*;
use sea_orm::sea_query::OnConflict;
use sea_orm::Insert;
use super::nar::NarModel; use super::nar::NarModel;
use super::Json; use super::Json;
@ -15,6 +17,10 @@ use attic::hash::Hash;
pub type ObjectModel = Model; pub type ObjectModel = Model;
pub trait InsertExt {
fn on_conflict_do_update(self) -> Self;
}
/// An object in a binary cache. /// An object in a binary cache.
#[derive(Debug, Clone, PartialEq, Eq, DeriveEntityModel)] #[derive(Debug, Clone, PartialEq, Eq, DeriveEntityModel)]
#[sea_orm(table_name = "object")] #[sea_orm(table_name = "object")]
@ -87,6 +93,27 @@ pub enum Relation {
Nar, Nar,
} }
impl InsertExt for Insert<ActiveModel> {
fn on_conflict_do_update(self) -> Self {
self.on_conflict(
OnConflict::columns([Column::CacheId, Column::StorePathHash])
.update_columns([
Column::NarId,
Column::StorePath,
Column::References,
Column::System,
Column::Deriver,
Column::Sigs,
Column::Ca,
Column::CreatedAt,
Column::LastAccessedAt,
Column::CreatedBy,
])
.to_owned(),
)
}
}
impl Model { impl Model {
/// Converts this object to a NarInfo. /// Converts this object to a NarInfo.
pub fn to_nar_info(&self, nar: &NarModel) -> ServerResult<NarInfo> { pub fn to_nar_info(&self, nar: &NarModel) -> ServerResult<NarInfo> {