Compare commits

...

6 commits

Author SHA1 Message Date
Nikodem Rabuliński f0a8400f3d
Update dependencies 2024-08-14 20:33:44 +02:00
Nikodem Rabuliński 0334b8ab2a
Migrate to lix 2024-08-14 19:36:01 +02:00
Zhaofeng Li 26b9417bde
Merge pull request #158 from zhaofengli/upsert-object-on-conflict
server: Upsert object row on conflict
2024-08-13 10:21:27 -04:00
Zhaofeng Li 443ceac40f server: Upsert object row on conflict
Upsert instead of doing delete+insert or ignoring the specific error.

Fixes #132.
2024-08-13 07:39:38 -06:00
Zhaofeng Li e127acbf9a
Merge pull request #154 from cole-h/fixup-stream-error-logging
fixup: stream error logging
2024-07-31 20:41:06 -04:00
Cole Helbling 903fb4e39e fixup: stream error logging
The call to `into_inner()` discards the wrapper type constructed by
`map_err()`. So instead, `map_err()` the actual stream, and call
`Body::from_stream` on the wrapped stream.
2024-07-26 10:21:52 -07:00
12 changed files with 199 additions and 125 deletions

View file

@ -9,36 +9,28 @@ fn main() {
#[cfg(feature = "nix_store")]
fn build_bridge() {
// Temporary workaround for issue in <https://github.com/NixOS/nix/pull/8484>
let hacky_include = {
let dir = tempfile::tempdir().expect("Failed to create temporary directory for workaround");
std::fs::write(dir.path().join("uds-remote-store.md"), "\"\"").unwrap();
dir
};
let libstore = pkg_config::Config::new()
.probe("lix-store")
.unwrap();
let libmain = pkg_config::Config::new()
.probe("lix-main")
.unwrap();
let libutil = pkg_config::Config::new()
.probe("lix-util")
.unwrap();
cxx_build::bridge("src/nix_store/bindings/mod.rs")
.file("src/nix_store/bindings/nix.cpp")
.flag("-std=c++2a")
.flag("-O2")
.flag("-include")
.flag("nix/config.h")
.flag("-idirafter")
.flag(hacky_include.path().to_str().unwrap())
// In Nix 2.19+, nix/args/root.hh depends on being able to #include "args.hh" (which is in its parent directory), for some reason
.flag("-I")
.flag(concat!(env!("NIX_INCLUDE_PATH"), "/nix"))
.flag("lix/config.h")
.includes(&libmain.include_paths)
.includes(&libutil.include_paths)
.includes(&libstore.include_paths)
.compile("nixbinding");
println!("cargo:rerun-if-changed=src/nix_store/bindings");
// the -l flags must be after -lnixbinding
pkg_config::Config::new()
.atleast_version("2.4")
.probe("nix-store")
.unwrap();
pkg_config::Config::new()
.atleast_version("2.4")
.probe("nix-main")
.unwrap();
}

View file

@ -44,7 +44,7 @@ CPathInfo::CPathInfo(nix::ref<const nix::ValidPathInfo> pi) : pi(pi) {}
RHashSlice CPathInfo::nar_sha256_hash() {
auto &hash = this->pi->narHash;
if (hash.type != nix::htSHA256) {
if (hash.type != nix::HashType::SHA256) {
throw nix::Error("Only SHA-256 hashes are supported at the moment");
}
@ -140,7 +140,7 @@ void CNixStore::nar_from_path(RVec<unsigned char> base_name, RBox<AsyncWriteSend
nix::StorePath store_path(sv);
// exceptions will be thrown into Rust
this->store->narFromPath(store_path, sink);
sink << this->store->narFromPath(store_path);
sink.eof();
}

View file

@ -13,14 +13,14 @@
#include <memory>
#include <mutex>
#include <set>
#include <nix/store-api.hh>
#include <nix/local-store.hh>
#include <nix/remote-store.hh>
#include <nix/uds-remote-store.hh>
#include <nix/hash.hh>
#include <nix/path.hh>
#include <nix/serialise.hh>
#include <nix/shared.hh>
#include <lix/libstore/store-api.hh>
#include <lix/libstore/local-store.hh>
#include <lix/libstore/remote-store.hh>
#include <lix/libstore/uds-remote-store.hh>
#include <lix/libutil/hash.hh>
#include <lix/libstore/path.hh>
#include <lix/libutil/serialise.hh>
#include <lix/libmain/shared.hh>
#include <rust/cxx.h>
template<class T> using RVec = rust::Vec<T>;

View file

@ -62,9 +62,6 @@ let
ATTIC_DISTRIBUTOR = "attic";
# See comment in `attic/build.rs`
NIX_INCLUDE_PATH = "${lib.getDev nix}/include";
# See comment in `attic-tests`
doCheck = false;
@ -139,9 +136,6 @@ let
checkPhaseCargoCommand = "cargoWithProfile test --no-run --message-format=json >cargo-test.json";
doInstallCargoArtifacts = false;
# See comment in `attic/build.rs`
NIX_INCLUDE_PATH = "${lib.getDev nix}/include";
installPhase = ''
runHook preInstall

View file

@ -7,11 +7,11 @@
]
},
"locked": {
"lastModified": 1717025063,
"narHash": "sha256-dIubLa56W9sNNz0e8jGxrX3CAkPXsq7snuFA/Ie6dn8=",
"lastModified": 1722960479,
"narHash": "sha256-NhCkJJQhD5GUib8zN9JrmYGMwt4lCRp6ZVNzIiYCl0Y=",
"owner": "ipetkov",
"repo": "crane",
"rev": "480dff0be03dac0e51a8dfc26e882b0d123a450e",
"rev": "4c6c77920b8d44cd6660c1621dea6b3fc4b4c4f4",
"type": "github"
},
"original": {
@ -23,11 +23,11 @@
"flake-compat": {
"flake": false,
"locked": {
"lastModified": 1673956053,
"narHash": "sha256-4gtG9iQuiKITOjNQQeQIpoIB6b16fm+504Ch3sNKLd8=",
"lastModified": 1696426674,
"narHash": "sha256-kvjfFW7WAETZlt09AgDn1MrtKzP7t90Vf7vypd3OL1U=",
"owner": "edolstra",
"repo": "flake-compat",
"rev": "35bb57c0c8d8b62bbfd284272c928ceb64ddbde9",
"rev": "0f9255e01c2351cc7d116c072cb317785dd33b33",
"type": "github"
},
"original": {
@ -37,12 +37,15 @@
}
},
"flake-utils": {
"inputs": {
"systems": "systems"
},
"locked": {
"lastModified": 1667395993,
"narHash": "sha256-nuEHfE/LcWyuSWnS8t12N1wc105Qtau+/OdUAjtQ0rA=",
"lastModified": 1710146030,
"narHash": "sha256-SZ5L6eA7HJ/nmkzGG7/ISclqe6oZdOZTNoesiInkXPQ=",
"owner": "numtide",
"repo": "flake-utils",
"rev": "5aed5285a952e0b949eb3ba02c12fa4fcfef535f",
"rev": "b1d9ab70662946ef0850d488da1c9019f3a9752a",
"type": "github"
},
"original": {
@ -51,13 +54,87 @@
"type": "github"
}
},
"flake-utils_2": {
"inputs": {
"systems": "systems_2"
},
"locked": {
"lastModified": 1710146030,
"narHash": "sha256-SZ5L6eA7HJ/nmkzGG7/ISclqe6oZdOZTNoesiInkXPQ=",
"owner": "numtide",
"repo": "flake-utils",
"rev": "b1d9ab70662946ef0850d488da1c9019f3a9752a",
"type": "github"
},
"original": {
"owner": "numtide",
"repo": "flake-utils",
"type": "github"
}
},
"flakey-profile": {
"locked": {
"lastModified": 1712898590,
"narHash": "sha256-FhGIEU93VHAChKEXx905TSiPZKga69bWl1VB37FK//I=",
"owner": "lf-",
"repo": "flakey-profile",
"rev": "243c903fd8eadc0f63d205665a92d4df91d42d9d",
"type": "github"
},
"original": {
"owner": "lf-",
"repo": "flakey-profile",
"type": "github"
}
},
"lix": {
"flake": false,
"locked": {
"lastModified": 1723577950,
"narHash": "sha256-kOpGI9WPmte1L4QWHviuXsr8jxmGn27zwi82jtzYObM=",
"ref": "refs/heads/main",
"rev": "b016eb0895bb6714a4f6530d9a2bb6577ac6c3cf",
"revCount": 16134,
"type": "git",
"url": "https://git.lix.systems/lix-project/lix"
},
"original": {
"type": "git",
"url": "https://git.lix.systems/lix-project/lix"
}
},
"lix-module": {
"inputs": {
"flake-utils": "flake-utils_2",
"flakey-profile": "flakey-profile",
"lix": [
"lix"
],
"nixpkgs": [
"nixpkgs"
]
},
"locked": {
"lastModified": 1723511483,
"narHash": "sha256-rT/OkVXKkns2YvyF1nFvl+8Gc3sld1c1sXPtGkbqaDY=",
"ref": "refs/heads/main",
"rev": "cecf70b77539c1a593f60ec9d0305b5e537ab6a9",
"revCount": 106,
"type": "git",
"url": "https://git.lix.systems/lix-project/nixos-module"
},
"original": {
"type": "git",
"url": "https://git.lix.systems/lix-project/nixos-module"
}
},
"nixpkgs": {
"locked": {
"lastModified": 1711401922,
"narHash": "sha256-QoQqXoj8ClGo0sqD/qWKFWezgEwUL0SUh37/vY2jNhc=",
"lastModified": 1723603349,
"narHash": "sha256-VMg6N7MryOuvSJ8Sj6YydarnUCkL7cvMdrMcnsJnJCE=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "07262b18b97000d16a4bdb003418bd2fb067a932",
"rev": "daf7bb95821b789db24fc1ac21f613db0c1bf2cb",
"type": "github"
},
"original": {
@ -67,29 +144,44 @@
"type": "github"
}
},
"nixpkgs-stable": {
"locked": {
"lastModified": 1711460390,
"narHash": "sha256-akSgjDZL6pVHEfSE6sz1DNSXuYX6hq+P/1Z5IoYWs7E=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "44733514b72e732bd49f5511bd0203dea9b9a434",
"type": "github"
},
"original": {
"owner": "NixOS",
"ref": "nixos-23.11",
"repo": "nixpkgs",
"type": "github"
}
},
"root": {
"inputs": {
"crane": "crane",
"flake-compat": "flake-compat",
"flake-utils": "flake-utils",
"nixpkgs": "nixpkgs",
"nixpkgs-stable": "nixpkgs-stable"
"lix": "lix",
"lix-module": "lix-module",
"nixpkgs": "nixpkgs"
}
},
"systems": {
"locked": {
"lastModified": 1681028828,
"narHash": "sha256-Vy1rq5AaRuLzOxct8nz4T6wlgyUR7zLU309k9mBC768=",
"owner": "nix-systems",
"repo": "default",
"rev": "da67096a3b9bf56a91d16901293e51ba5b49a27e",
"type": "github"
},
"original": {
"owner": "nix-systems",
"repo": "default",
"type": "github"
}
},
"systems_2": {
"locked": {
"lastModified": 1681028828,
"narHash": "sha256-Vy1rq5AaRuLzOxct8nz4T6wlgyUR7zLU309k9mBC768=",
"owner": "nix-systems",
"repo": "default",
"rev": "da67096a3b9bf56a91d16901293e51ba5b49a27e",
"type": "github"
},
"original": {
"owner": "nix-systems",
"repo": "default",
"type": "github"
}
}
},

View file

@ -3,7 +3,15 @@
inputs = {
nixpkgs.url = "github:NixOS/nixpkgs/nixpkgs-unstable";
nixpkgs-stable.url = "github:NixOS/nixpkgs/nixos-23.11";
lix = {
url = "git+https://git.lix.systems/lix-project/lix";
flake = false;
};
lix-module = {
url = "git+https://git.lix.systems/lix-project/nixos-module";
inputs.nixpkgs.follows = "nixpkgs";
inputs.lix.follows = "lix";
};
flake-utils.url = "github:numtide/flake-utils";
crane = {
@ -17,7 +25,7 @@
};
};
outputs = { self, nixpkgs, nixpkgs-stable, flake-utils, crane, ... }: let
outputs = { self, nixpkgs, lix-module, flake-utils, crane, ... }: let
supportedSystems = flake-utils.lib.defaultSystems ++ [ "riscv64-linux" ];
makeCranePkgs = pkgs: let
@ -26,16 +34,10 @@
in flake-utils.lib.eachSystem supportedSystems (system: let
pkgs = import nixpkgs {
inherit system;
overlays = [];
overlays = [lix-module.overlays.default];
};
cranePkgs = makeCranePkgs pkgs;
pkgsStable = import nixpkgs-stable {
inherit system;
overlays = [];
};
cranePkgsStable = makeCranePkgs pkgsStable;
inherit (pkgs) lib;
in rec {
packages = {
@ -55,17 +57,7 @@
} // (lib.optionalAttrs (system != "x86_64-darwin") {
# Unfortunately, x86_64-darwin fails to evaluate static builds
# TODO: Make this work with Crane
attic-static = (pkgs.pkgsStatic.callPackage ./package.nix {
nix = pkgs.pkgsStatic.nix.overrideAttrs (old: {
patches = (old.patches or []) ++ [
# To be submitted
(pkgs.fetchpatch {
url = "https://github.com/NixOS/nix/compare/3172c51baff5c81362fcdafa2e28773c2949c660...6b09a02536d5946458b537dfc36b7d268c9ce823.diff";
hash = "sha256-LFLq++J2XitEWQ0o57ihuuUlYk2PgUr11h7mMMAEe3c=";
})
];
});
}).overrideAttrs (old: {
attic-static = (pkgs.pkgsStatic.callPackage ./package.nix { }).overrideAttrs (old: {
nativeBuildInputs = (old.nativeBuildInputs or []) ++ [
pkgs.nukeReferences
];
@ -127,12 +119,8 @@
linuxPackages.perf
]);
NIX_PATH = "nixpkgs=${pkgs.path}";
RUST_SRC_PATH = "${pkgs.rustPlatform.rustcSrc}/library";
# See comment in `attic/build.rs`
NIX_INCLUDE_PATH = "${lib.getDev pkgs.nix}/include";
ATTIC_DISTRIBUTOR = "dev";
};
@ -163,8 +151,7 @@
flake = self;
};
unstableTests = makeIntegrationTests pkgs;
stableTests = lib.mapAttrs' (name: lib.nameValuePair "stable-${name}") (makeIntegrationTests pkgsStable);
in lib.optionalAttrs pkgs.stdenv.isLinux (unstableTests // stableTests);
in lib.optionalAttrs pkgs.stdenv.isLinux unstableTests;
}) // {
overlays = {
default = final: prev: let

View file

@ -33,6 +33,6 @@ let
}
];
};
}) (lib.cartesianProductOfSets matrix));
}) (lib.cartesianProduct matrix));
in {
} // basicTests

View file

@ -49,9 +49,6 @@ in rustPlatform.buildRustPackage rec {
ATTIC_DISTRIBUTOR = "attic";
# See comment in `attic/build.rs`
NIX_INCLUDE_PATH = "${lib.getDev nix}/include";
# Recursive Nix is not stable yet
doCheck = false;

View file

@ -18,7 +18,7 @@ use axum::{
Router,
};
use futures::stream::BoxStream;
use http_body_util::BodyExt;
use futures::TryStreamExt as _;
use serde::Serialize;
use tokio_util::io::ReaderStream;
use tracing::instrument;
@ -217,11 +217,11 @@ async fn get_nar(
match storage.download_file_db(remote_file, false).await? {
Download::Url(url) => Ok(Redirect::temporary(&url).into_response()),
Download::AsyncRead(stream) => {
let stream = ReaderStream::new(stream);
let body = Body::from_stream(stream).map_err(|e| {
let stream = ReaderStream::new(stream).map_err(|e| {
tracing::error!("Stream error: {e}");
e
}).into_inner();
});
let body = Body::from_stream(stream);
Ok(body.into_response())
}
@ -254,11 +254,11 @@ async fn get_nar(
// TODO: Make num_prefetch configurable
// The ideal size depends on the average chunk size
let merged = merge_chunks(chunks, streamer, storage, 2);
let body = Body::from_stream(merged).map_err(|e| {
let merged = merge_chunks(chunks, streamer, storage, 2).map_err(|e| {
tracing::error!("Stream error: {e}");
e
}).into_inner();
});
let body = Body::from_stream(merged);
Ok(body.into_response())
}

View file

@ -46,7 +46,7 @@ use crate::database::entity::cache;
use crate::database::entity::chunk::{self, ChunkState, Entity as Chunk};
use crate::database::entity::chunkref::{self, Entity as ChunkRef};
use crate::database::entity::nar::{self, Entity as Nar, NarState};
use crate::database::entity::object::{self, Entity as Object};
use crate::database::entity::object::{self, Entity as Object, InsertExt};
use crate::database::entity::Json as DbJson;
use crate::database::{AtticDatabase, ChunkGuard, NarGuard};
@ -257,12 +257,6 @@ async fn upload_path_dedup(
.map_err(ServerError::database_error)?;
// Create a mapping granting the local cache access to the NAR
Object::delete_many()
.filter(object::Column::CacheId.eq(cache.id))
.filter(object::Column::StorePathHash.eq(upload_info.store_path_hash.to_string()))
.exec(&txn)
.await
.map_err(ServerError::database_error)?;
Object::insert({
let mut new_object = upload_info.to_active_model();
new_object.cache_id = Set(cache.id);
@ -271,6 +265,7 @@ async fn upload_path_dedup(
new_object.created_by = Set(username);
new_object
})
.on_conflict_do_update()
.exec(&txn)
.await
.map_err(ServerError::database_error)?;
@ -487,12 +482,6 @@ async fn upload_path_new_chunked(
.map_err(ServerError::database_error)?;
// Create a mapping granting the local cache access to the NAR
Object::delete_many()
.filter(object::Column::CacheId.eq(cache.id))
.filter(object::Column::StorePathHash.eq(upload_info.store_path_hash.to_string()))
.exec(&txn)
.await
.map_err(ServerError::database_error)?;
Object::insert({
let mut new_object = upload_info.to_active_model();
new_object.cache_id = Set(cache.id);
@ -501,6 +490,7 @@ async fn upload_path_new_chunked(
new_object.created_by = Set(username);
new_object
})
.on_conflict_do_update()
.exec(&txn)
.await
.map_err(ServerError::database_error)?;
@ -594,12 +584,6 @@ async fn upload_path_new_unchunked(
.map_err(ServerError::database_error)?;
// Create a mapping granting the local cache access to the NAR
Object::delete_many()
.filter(object::Column::CacheId.eq(cache.id))
.filter(object::Column::StorePathHash.eq(upload_info.store_path_hash.to_string()))
.exec(&txn)
.await
.map_err(ServerError::database_error)?;
Object::insert({
let mut new_object = upload_info.to_active_model();
new_object.cache_id = Set(cache.id);
@ -608,6 +592,7 @@ async fn upload_path_new_unchunked(
new_object.created_by = Set(username);
new_object
})
.on_conflict_do_update()
.exec(&txn)
.await
.map_err(ServerError::database_error)?;

View file

@ -6,6 +6,8 @@ use std::path::PathBuf;
use std::str::FromStr;
use sea_orm::entity::prelude::*;
use sea_orm::sea_query::OnConflict;
use sea_orm::Insert;
use super::nar::NarModel;
use super::Json;
@ -15,6 +17,10 @@ use attic::hash::Hash;
pub type ObjectModel = Model;
pub trait InsertExt {
fn on_conflict_do_update(self) -> Self;
}
/// An object in a binary cache.
#[derive(Debug, Clone, PartialEq, Eq, DeriveEntityModel)]
#[sea_orm(table_name = "object")]
@ -87,6 +93,27 @@ pub enum Relation {
Nar,
}
impl InsertExt for Insert<ActiveModel> {
fn on_conflict_do_update(self) -> Self {
self.on_conflict(
OnConflict::columns([Column::CacheId, Column::StorePathHash])
.update_columns([
Column::NarId,
Column::StorePath,
Column::References,
Column::System,
Column::Deriver,
Column::Sigs,
Column::Ca,
Column::CreatedAt,
Column::LastAccessedAt,
Column::CreatedBy,
])
.to_owned(),
)
}
}
impl Model {
/// Converts this object to a NarInfo.
pub fn to_nar_info(&self, nar: &NarModel) -> ServerResult<NarInfo> {