Compare commits
6 commits
f2cfa79c61
...
f0a8400f3d
Author | SHA1 | Date | |
---|---|---|---|
Nikodem Rabuliński | f0a8400f3d | ||
Nikodem Rabuliński | 0334b8ab2a | ||
26b9417bde | |||
443ceac40f | |||
e127acbf9a | |||
903fb4e39e |
|
@ -44,7 +44,7 @@ CPathInfo::CPathInfo(nix::ref<const nix::ValidPathInfo> pi) : pi(pi) {}
|
||||||
RHashSlice CPathInfo::nar_sha256_hash() {
|
RHashSlice CPathInfo::nar_sha256_hash() {
|
||||||
auto &hash = this->pi->narHash;
|
auto &hash = this->pi->narHash;
|
||||||
|
|
||||||
if (hash.type != nix::htSHA256) {
|
if (hash.type != nix::HashType::SHA256) {
|
||||||
throw nix::Error("Only SHA-256 hashes are supported at the moment");
|
throw nix::Error("Only SHA-256 hashes are supported at the moment");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -13,14 +13,14 @@
|
||||||
#include <memory>
|
#include <memory>
|
||||||
#include <mutex>
|
#include <mutex>
|
||||||
#include <set>
|
#include <set>
|
||||||
#include <libstore/store-api.hh>
|
#include <lix/libstore/store-api.hh>
|
||||||
#include <libstore/local-store.hh>
|
#include <lix/libstore/local-store.hh>
|
||||||
#include <libstore/remote-store.hh>
|
#include <lix/libstore/remote-store.hh>
|
||||||
#include <libstore/uds-remote-store.hh>
|
#include <lix/libstore/uds-remote-store.hh>
|
||||||
#include <libutil/hash.hh>
|
#include <lix/libutil/hash.hh>
|
||||||
#include <libstore/path.hh>
|
#include <lix/libstore/path.hh>
|
||||||
#include <libutil/serialise.hh>
|
#include <lix/libutil/serialise.hh>
|
||||||
#include <libmain/shared.hh>
|
#include <lix/libmain/shared.hh>
|
||||||
#include <rust/cxx.h>
|
#include <rust/cxx.h>
|
||||||
|
|
||||||
template<class T> using RVec = rust::Vec<T>;
|
template<class T> using RVec = rust::Vec<T>;
|
||||||
|
|
36
flake.lock
36
flake.lock
|
@ -7,11 +7,11 @@
|
||||||
]
|
]
|
||||||
},
|
},
|
||||||
"locked": {
|
"locked": {
|
||||||
"lastModified": 1721322122,
|
"lastModified": 1722960479,
|
||||||
"narHash": "sha256-a0G1NvyXGzdwgu6e1HQpmK5R5yLsfxeBe07nNDyYd+g=",
|
"narHash": "sha256-NhCkJJQhD5GUib8zN9JrmYGMwt4lCRp6ZVNzIiYCl0Y=",
|
||||||
"owner": "ipetkov",
|
"owner": "ipetkov",
|
||||||
"repo": "crane",
|
"repo": "crane",
|
||||||
"rev": "8a68b987c476a33e90f203f0927614a75c3f47ea",
|
"rev": "4c6c77920b8d44cd6660c1621dea6b3fc4b4c4f4",
|
||||||
"type": "github"
|
"type": "github"
|
||||||
},
|
},
|
||||||
"original": {
|
"original": {
|
||||||
|
@ -90,17 +90,17 @@
|
||||||
"lix": {
|
"lix": {
|
||||||
"flake": false,
|
"flake": false,
|
||||||
"locked": {
|
"locked": {
|
||||||
"lastModified": 1721371213,
|
"lastModified": 1723577950,
|
||||||
"narHash": "sha256-7SdrlNe5DBlK5uLBhPPxVRWI50N1PFz3zMBeDYiX0Qs=",
|
"narHash": "sha256-kOpGI9WPmte1L4QWHviuXsr8jxmGn27zwi82jtzYObM=",
|
||||||
"ref": "refs/heads/main",
|
"ref": "refs/heads/main",
|
||||||
"rev": "aba5f19680b2f4c29d7ce2ff5e2a89128c1cb26d",
|
"rev": "b016eb0895bb6714a4f6530d9a2bb6577ac6c3cf",
|
||||||
"revCount": 15985,
|
"revCount": 16134,
|
||||||
"type": "git",
|
"type": "git",
|
||||||
"url": "ssh://git@git.lix.systems/lix-project/lix"
|
"url": "https://git.lix.systems/lix-project/lix"
|
||||||
},
|
},
|
||||||
"original": {
|
"original": {
|
||||||
"type": "git",
|
"type": "git",
|
||||||
"url": "ssh://git@git.lix.systems/lix-project/lix"
|
"url": "https://git.lix.systems/lix-project/lix"
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
"lix-module": {
|
"lix-module": {
|
||||||
|
@ -115,26 +115,26 @@
|
||||||
]
|
]
|
||||||
},
|
},
|
||||||
"locked": {
|
"locked": {
|
||||||
"lastModified": 1720695775,
|
"lastModified": 1723511483,
|
||||||
"narHash": "sha256-8Oqzl9QPjEe/n8y0R2tC6+2v/H6xBgABHXOJwxmnBg0=",
|
"narHash": "sha256-rT/OkVXKkns2YvyF1nFvl+8Gc3sld1c1sXPtGkbqaDY=",
|
||||||
"ref": "refs/heads/main",
|
"ref": "refs/heads/main",
|
||||||
"rev": "d70318fb946a0e720dfdd1fb10b0645c14e2a02a",
|
"rev": "cecf70b77539c1a593f60ec9d0305b5e537ab6a9",
|
||||||
"revCount": 94,
|
"revCount": 106,
|
||||||
"type": "git",
|
"type": "git",
|
||||||
"url": "ssh://git@git.lix.systems/lix-project/nixos-module"
|
"url": "https://git.lix.systems/lix-project/nixos-module"
|
||||||
},
|
},
|
||||||
"original": {
|
"original": {
|
||||||
"type": "git",
|
"type": "git",
|
||||||
"url": "ssh://git@git.lix.systems/lix-project/nixos-module"
|
"url": "https://git.lix.systems/lix-project/nixos-module"
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
"nixpkgs": {
|
"nixpkgs": {
|
||||||
"locked": {
|
"locked": {
|
||||||
"lastModified": 1721373214,
|
"lastModified": 1723603349,
|
||||||
"narHash": "sha256-crpGeGQGFlnCsMyCE5eheyjzo3xo03o1FXJ2sAbm7No=",
|
"narHash": "sha256-VMg6N7MryOuvSJ8Sj6YydarnUCkL7cvMdrMcnsJnJCE=",
|
||||||
"owner": "NixOS",
|
"owner": "NixOS",
|
||||||
"repo": "nixpkgs",
|
"repo": "nixpkgs",
|
||||||
"rev": "af9c15bc7a314c226d7d5d85e159f7a73e8d9fae",
|
"rev": "daf7bb95821b789db24fc1ac21f613db0c1bf2cb",
|
||||||
"type": "github"
|
"type": "github"
|
||||||
},
|
},
|
||||||
"original": {
|
"original": {
|
||||||
|
|
|
@ -4,11 +4,11 @@
|
||||||
inputs = {
|
inputs = {
|
||||||
nixpkgs.url = "github:NixOS/nixpkgs/nixpkgs-unstable";
|
nixpkgs.url = "github:NixOS/nixpkgs/nixpkgs-unstable";
|
||||||
lix = {
|
lix = {
|
||||||
url = "git+ssh://git@git.lix.systems/lix-project/lix";
|
url = "git+https://git.lix.systems/lix-project/lix";
|
||||||
flake = false;
|
flake = false;
|
||||||
};
|
};
|
||||||
lix-module = {
|
lix-module = {
|
||||||
url = "git+ssh://git@git.lix.systems/lix-project/nixos-module";
|
url = "git+https://git.lix.systems/lix-project/nixos-module";
|
||||||
inputs.nixpkgs.follows = "nixpkgs";
|
inputs.nixpkgs.follows = "nixpkgs";
|
||||||
inputs.lix.follows = "lix";
|
inputs.lix.follows = "lix";
|
||||||
};
|
};
|
||||||
|
|
|
@ -18,7 +18,7 @@ use axum::{
|
||||||
Router,
|
Router,
|
||||||
};
|
};
|
||||||
use futures::stream::BoxStream;
|
use futures::stream::BoxStream;
|
||||||
use http_body_util::BodyExt;
|
use futures::TryStreamExt as _;
|
||||||
use serde::Serialize;
|
use serde::Serialize;
|
||||||
use tokio_util::io::ReaderStream;
|
use tokio_util::io::ReaderStream;
|
||||||
use tracing::instrument;
|
use tracing::instrument;
|
||||||
|
@ -217,11 +217,11 @@ async fn get_nar(
|
||||||
match storage.download_file_db(remote_file, false).await? {
|
match storage.download_file_db(remote_file, false).await? {
|
||||||
Download::Url(url) => Ok(Redirect::temporary(&url).into_response()),
|
Download::Url(url) => Ok(Redirect::temporary(&url).into_response()),
|
||||||
Download::AsyncRead(stream) => {
|
Download::AsyncRead(stream) => {
|
||||||
let stream = ReaderStream::new(stream);
|
let stream = ReaderStream::new(stream).map_err(|e| {
|
||||||
let body = Body::from_stream(stream).map_err(|e| {
|
|
||||||
tracing::error!("Stream error: {e}");
|
tracing::error!("Stream error: {e}");
|
||||||
e
|
e
|
||||||
}).into_inner();
|
});
|
||||||
|
let body = Body::from_stream(stream);
|
||||||
|
|
||||||
Ok(body.into_response())
|
Ok(body.into_response())
|
||||||
}
|
}
|
||||||
|
@ -254,11 +254,11 @@ async fn get_nar(
|
||||||
|
|
||||||
// TODO: Make num_prefetch configurable
|
// TODO: Make num_prefetch configurable
|
||||||
// The ideal size depends on the average chunk size
|
// The ideal size depends on the average chunk size
|
||||||
let merged = merge_chunks(chunks, streamer, storage, 2);
|
let merged = merge_chunks(chunks, streamer, storage, 2).map_err(|e| {
|
||||||
let body = Body::from_stream(merged).map_err(|e| {
|
|
||||||
tracing::error!("Stream error: {e}");
|
tracing::error!("Stream error: {e}");
|
||||||
e
|
e
|
||||||
}).into_inner();
|
});
|
||||||
|
let body = Body::from_stream(merged);
|
||||||
|
|
||||||
Ok(body.into_response())
|
Ok(body.into_response())
|
||||||
}
|
}
|
||||||
|
|
|
@ -46,7 +46,7 @@ use crate::database::entity::cache;
|
||||||
use crate::database::entity::chunk::{self, ChunkState, Entity as Chunk};
|
use crate::database::entity::chunk::{self, ChunkState, Entity as Chunk};
|
||||||
use crate::database::entity::chunkref::{self, Entity as ChunkRef};
|
use crate::database::entity::chunkref::{self, Entity as ChunkRef};
|
||||||
use crate::database::entity::nar::{self, Entity as Nar, NarState};
|
use crate::database::entity::nar::{self, Entity as Nar, NarState};
|
||||||
use crate::database::entity::object::{self, Entity as Object};
|
use crate::database::entity::object::{self, Entity as Object, InsertExt};
|
||||||
use crate::database::entity::Json as DbJson;
|
use crate::database::entity::Json as DbJson;
|
||||||
use crate::database::{AtticDatabase, ChunkGuard, NarGuard};
|
use crate::database::{AtticDatabase, ChunkGuard, NarGuard};
|
||||||
|
|
||||||
|
@ -257,12 +257,6 @@ async fn upload_path_dedup(
|
||||||
.map_err(ServerError::database_error)?;
|
.map_err(ServerError::database_error)?;
|
||||||
|
|
||||||
// Create a mapping granting the local cache access to the NAR
|
// Create a mapping granting the local cache access to the NAR
|
||||||
Object::delete_many()
|
|
||||||
.filter(object::Column::CacheId.eq(cache.id))
|
|
||||||
.filter(object::Column::StorePathHash.eq(upload_info.store_path_hash.to_string()))
|
|
||||||
.exec(&txn)
|
|
||||||
.await
|
|
||||||
.map_err(ServerError::database_error)?;
|
|
||||||
Object::insert({
|
Object::insert({
|
||||||
let mut new_object = upload_info.to_active_model();
|
let mut new_object = upload_info.to_active_model();
|
||||||
new_object.cache_id = Set(cache.id);
|
new_object.cache_id = Set(cache.id);
|
||||||
|
@ -271,6 +265,7 @@ async fn upload_path_dedup(
|
||||||
new_object.created_by = Set(username);
|
new_object.created_by = Set(username);
|
||||||
new_object
|
new_object
|
||||||
})
|
})
|
||||||
|
.on_conflict_do_update()
|
||||||
.exec(&txn)
|
.exec(&txn)
|
||||||
.await
|
.await
|
||||||
.map_err(ServerError::database_error)?;
|
.map_err(ServerError::database_error)?;
|
||||||
|
@ -487,12 +482,6 @@ async fn upload_path_new_chunked(
|
||||||
.map_err(ServerError::database_error)?;
|
.map_err(ServerError::database_error)?;
|
||||||
|
|
||||||
// Create a mapping granting the local cache access to the NAR
|
// Create a mapping granting the local cache access to the NAR
|
||||||
Object::delete_many()
|
|
||||||
.filter(object::Column::CacheId.eq(cache.id))
|
|
||||||
.filter(object::Column::StorePathHash.eq(upload_info.store_path_hash.to_string()))
|
|
||||||
.exec(&txn)
|
|
||||||
.await
|
|
||||||
.map_err(ServerError::database_error)?;
|
|
||||||
Object::insert({
|
Object::insert({
|
||||||
let mut new_object = upload_info.to_active_model();
|
let mut new_object = upload_info.to_active_model();
|
||||||
new_object.cache_id = Set(cache.id);
|
new_object.cache_id = Set(cache.id);
|
||||||
|
@ -501,6 +490,7 @@ async fn upload_path_new_chunked(
|
||||||
new_object.created_by = Set(username);
|
new_object.created_by = Set(username);
|
||||||
new_object
|
new_object
|
||||||
})
|
})
|
||||||
|
.on_conflict_do_update()
|
||||||
.exec(&txn)
|
.exec(&txn)
|
||||||
.await
|
.await
|
||||||
.map_err(ServerError::database_error)?;
|
.map_err(ServerError::database_error)?;
|
||||||
|
@ -594,12 +584,6 @@ async fn upload_path_new_unchunked(
|
||||||
.map_err(ServerError::database_error)?;
|
.map_err(ServerError::database_error)?;
|
||||||
|
|
||||||
// Create a mapping granting the local cache access to the NAR
|
// Create a mapping granting the local cache access to the NAR
|
||||||
Object::delete_many()
|
|
||||||
.filter(object::Column::CacheId.eq(cache.id))
|
|
||||||
.filter(object::Column::StorePathHash.eq(upload_info.store_path_hash.to_string()))
|
|
||||||
.exec(&txn)
|
|
||||||
.await
|
|
||||||
.map_err(ServerError::database_error)?;
|
|
||||||
Object::insert({
|
Object::insert({
|
||||||
let mut new_object = upload_info.to_active_model();
|
let mut new_object = upload_info.to_active_model();
|
||||||
new_object.cache_id = Set(cache.id);
|
new_object.cache_id = Set(cache.id);
|
||||||
|
@ -608,6 +592,7 @@ async fn upload_path_new_unchunked(
|
||||||
new_object.created_by = Set(username);
|
new_object.created_by = Set(username);
|
||||||
new_object
|
new_object
|
||||||
})
|
})
|
||||||
|
.on_conflict_do_update()
|
||||||
.exec(&txn)
|
.exec(&txn)
|
||||||
.await
|
.await
|
||||||
.map_err(ServerError::database_error)?;
|
.map_err(ServerError::database_error)?;
|
||||||
|
|
|
@ -6,6 +6,8 @@ use std::path::PathBuf;
|
||||||
use std::str::FromStr;
|
use std::str::FromStr;
|
||||||
|
|
||||||
use sea_orm::entity::prelude::*;
|
use sea_orm::entity::prelude::*;
|
||||||
|
use sea_orm::sea_query::OnConflict;
|
||||||
|
use sea_orm::Insert;
|
||||||
|
|
||||||
use super::nar::NarModel;
|
use super::nar::NarModel;
|
||||||
use super::Json;
|
use super::Json;
|
||||||
|
@ -15,6 +17,10 @@ use attic::hash::Hash;
|
||||||
|
|
||||||
pub type ObjectModel = Model;
|
pub type ObjectModel = Model;
|
||||||
|
|
||||||
|
pub trait InsertExt {
|
||||||
|
fn on_conflict_do_update(self) -> Self;
|
||||||
|
}
|
||||||
|
|
||||||
/// An object in a binary cache.
|
/// An object in a binary cache.
|
||||||
#[derive(Debug, Clone, PartialEq, Eq, DeriveEntityModel)]
|
#[derive(Debug, Clone, PartialEq, Eq, DeriveEntityModel)]
|
||||||
#[sea_orm(table_name = "object")]
|
#[sea_orm(table_name = "object")]
|
||||||
|
@ -87,6 +93,27 @@ pub enum Relation {
|
||||||
Nar,
|
Nar,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl InsertExt for Insert<ActiveModel> {
|
||||||
|
fn on_conflict_do_update(self) -> Self {
|
||||||
|
self.on_conflict(
|
||||||
|
OnConflict::columns([Column::CacheId, Column::StorePathHash])
|
||||||
|
.update_columns([
|
||||||
|
Column::NarId,
|
||||||
|
Column::StorePath,
|
||||||
|
Column::References,
|
||||||
|
Column::System,
|
||||||
|
Column::Deriver,
|
||||||
|
Column::Sigs,
|
||||||
|
Column::Ca,
|
||||||
|
Column::CreatedAt,
|
||||||
|
Column::LastAccessedAt,
|
||||||
|
Column::CreatedBy,
|
||||||
|
])
|
||||||
|
.to_owned(),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl Model {
|
impl Model {
|
||||||
/// Converts this object to a NarInfo.
|
/// Converts this object to a NarInfo.
|
||||||
pub fn to_nar_info(&self, nar: &NarModel) -> ServerResult<NarInfo> {
|
pub fn to_nar_info(&self, nar: &NarModel) -> ServerResult<NarInfo> {
|
||||||
|
|
Loading…
Reference in a new issue