feat: init project

Signed-off-by: Raito Bezarius <masterancpp@gmail.com>
This commit is contained in:
raito 2024-09-03 14:47:36 +02:00
commit 9c6d4e5f04
16 changed files with 773 additions and 0 deletions

5
.env Normal file
View file

@ -0,0 +1,5 @@
evaluation_gc_roots_dir="./gc-roots"
evaluation_logs_dir="./logs"
max_parallel_evaluation=1
local_nixpkgs_checkout="/home/raito/dev/github.com/NixOS/nixpkgs"
nixpkgs_repo_url="https://cl.forkos.org/nixpkgs"

1
.envrc Normal file
View file

@ -0,0 +1 @@
use nix

17
.gitignore vendored Normal file
View file

@ -0,0 +1,17 @@
result
.direnv
.pre-commit-config.yaml
__pycache__
.credentials
*.jsonl
.data_cache
nixpkgs
result
*.zst
*.dump
*.jsonl
*.log
# A shallow checkout of nixpkgs
nixpkgs/
gc-roots/
logs/

115
default.nix Normal file
View file

@ -0,0 +1,115 @@
{
sources ? import ./npins,
overlay ? import ./nix/overlay.nix,
pkgs ? import sources.nixpkgs { overlays = [ overlay ]; },
}:
let
self = rec {
inherit (pkgs) python3;
package = pkgs.evolive;
pre-commit-check = pkgs.pre-commit-hooks {
src = ./.;
hooks =
let
pythonExcludes = [
"/migrations/" # auto-generated code
];
in
{
# Nix setup
nixfmt = {
enable = true;
entry = pkgs.lib.mkForce "${pkgs.lib.getExe pkgs.nixfmt-rfc-style}";
};
statix = {
enable = true;
settings.ignore = [ "npins" ];
};
# Python setup
ruff = {
enable = true;
excludes = pythonExcludes;
};
ruff-format = {
enable = true;
name = "Format python code with ruff";
types = [
"text"
"python"
];
entry = "${pkgs.lib.getExe pkgs.ruff} format";
excludes = pythonExcludes;
};
pyright =
let
pyEnv = pkgs.python3.buildEnv.override {
extraLibs = pkgs.evolive.propagatedBuildInputs;
# Necessary for FastAPI CLI.
ignoreCollisions = true;
};
wrappedPyright = pkgs.runCommand "pyright" { nativeBuildInputs = [ pkgs.makeWrapper ]; } ''
makeWrapper ${pkgs.pyright}/bin/pyright $out \
--set PYTHONPATH ${pyEnv}/${pyEnv.sitePackages} \
--prefix PATH : ${pyEnv}/bin \
--set PYTHONHOME ${pyEnv}
'';
in
{
enable = true;
entry = pkgs.lib.mkForce (builtins.toString wrappedPyright);
excludes = pythonExcludes;
};
# Global setup
prettier = {
enable = true;
excludes = [
"\\.min.css$"
"\\.html$"
];
};
commitizen.enable = true;
};
};
shell = pkgs.mkShell {
DATA_CACHE_DIRECTORY = toString ./. + "/.data_cache";
REDIS_SOCKET_URL = "unix:///run/redis/redis.sock";
# `./src/website/tracker/settings.py` by default looks for LOCAL_NIXPKGS_CHECKOUT
# in the root of the repo. Make it the default here for local development.
LOCAL_NIXPKGS_CHECKOUT = toString ./. + "/nixpkgs";
packages = [
package
# (pkgs.python3.buildEnv.override {
# extraLibs = with pkgs.python3.pkgs; [ fastapi uvicorn ];
# ignoreCollisions = true;
# })
pkgs.nix-eval-jobs
pkgs.commitizen
pkgs.npins
pkgs.nixfmt-rfc-style
pkgs.hivemind
];
shellHook = ''
${pre-commit-check.shellHook}
mkdir -p .credentials
export DATABASE_URL=postgres:///evolive
export CREDENTIALS_DIRECTORY=${builtins.toString ./.credentials}
'';
};
# tests = import ./nix/tests/vm-basic.nix {
# inherit pkgs;
# wstModule = module;
# };
};
in
self # // self.tests

43
nix/overlay.nix Normal file
View file

@ -0,0 +1,43 @@
final: prev:
let
inherit (final) python3;
extraPackages = import ../pkgs {
pkgs = prev;
inherit python3;
};
extraPython3Packages = extraPackages.python3Packages;
sources = import ../npins;
in
{
python3 = prev.lib.attrsets.recursiveUpdate prev.python3 { pkgs = extraPython3Packages; };
# go through the motions to make a flake-incompat project use the build
# inputs we want
pre-commit-hooks = final.callPackage "${sources.pre-commit-hooks}/nix/run.nix" {
tools = import "${sources.pre-commit-hooks}/nix/call-tools.nix" final;
# wat
gitignore-nix-src = {
lib = import sources.gitignore { inherit (final) lib; };
};
isFlakes = false;
};
evolive = python3.pkgs.buildPythonPackage {
pname = "evolive";
version = "0.1.0";
format = "pyproject";
src = final.nix-gitignore.gitignoreSourcePure [ ../.gitignore ] ../src;
propagatedBuildInputs = with python3.pkgs; [
setuptools
setuptools-scm
fastapi
ipython
tqdm
aiofiles
pydantic-settings
uvicorn
];
};
}

80
npins/default.nix Normal file
View file

@ -0,0 +1,80 @@
# Generated by npins. Do not modify; will be overwritten regularly
let
data = builtins.fromJSON (builtins.readFile ./sources.json);
version = data.version;
mkSource =
spec:
assert spec ? type;
let
path =
if spec.type == "Git" then
mkGitSource spec
else if spec.type == "GitRelease" then
mkGitSource spec
else if spec.type == "PyPi" then
mkPyPiSource spec
else if spec.type == "Channel" then
mkChannelSource spec
else
builtins.throw "Unknown source type ${spec.type}";
in
spec // { outPath = path; };
mkGitSource =
{
repository,
revision,
url ? null,
hash,
branch ? null,
...
}:
assert repository ? type;
# At the moment, either it is a plain git repository (which has an url), or it is a GitHub/GitLab repository
# In the latter case, there we will always be an url to the tarball
if url != null then
(builtins.fetchTarball {
inherit url;
sha256 = hash;
})
else
assert repository.type == "Git";
let
urlToName =
url: rev:
let
matched = builtins.match "^.*/([^/]*)(\\.git)?$" repository.url;
short = builtins.substring 0 7 rev;
appendShort = if (builtins.match "[a-f0-9]*" rev) != null then "-${short}" else "";
in
"${if matched == null then "source" else builtins.head matched}${appendShort}";
name = urlToName repository.url revision;
in
builtins.fetchGit {
url = repository.url;
rev = revision;
inherit name;
narHash = hash;
};
mkPyPiSource =
{ url, hash, ... }:
builtins.fetchurl {
inherit url;
sha256 = hash;
};
mkChannelSource =
{ url, hash, ... }:
builtins.fetchTarball {
inherit url;
sha256 = hash;
};
in
if version == 4 then
builtins.mapAttrs (_: mkSource) data.pins
else
throw "Unsupported format version ${toString version} in sources.json. Try running `npins upgrade`"

23
npins/sources.json Normal file
View file

@ -0,0 +1,23 @@
{
"pins": {
"nixpkgs": {
"type": "Channel",
"name": "nixpkgs-unstable",
"url": "https://releases.nixos.org/nixpkgs/nixpkgs-24.11pre674705.b833ff01a0d6/nixexprs.tar.xz",
"hash": "12cda9rvpgjcsxykbcg5cxjaayhibjjabv6svacjc5n5kpcbx5sf"
},
"pre-commit-hooks": {
"type": "Git",
"repository": {
"type": "GitHub",
"owner": "cachix",
"repo": "pre-commit-hooks.nix"
},
"branch": "master",
"revision": "4509ca64f1084e73bc7a721b20c669a8d4c5ebe6",
"url": "https://github.com/cachix/pre-commit-hooks.nix/archive/4509ca64f1084e73bc7a721b20c669a8d4c5ebe6.tar.gz",
"hash": "06cjchycvj4wjzwm8ly1d36s9d7lj3mz1zqnkikvck0chi1psaa3"
}
},
"version": 4
}

6
pkgs/default.nix Normal file
View file

@ -0,0 +1,6 @@
{ pkgs, python3 }:
{
# place more custom packages here
python3Packages = pkgs.callPackage ./python { inherit python3; };
}

19
pkgs/python/default.nix Normal file
View file

@ -0,0 +1,19 @@
{ pkgs, python3 }:
let
callPackage = pkgs.lib.callPackageWith (
pkgs // { inherit python3; } // python3.pkgs // python3Packages
);
python3Packages = mkPackages ./.;
mkPackages =
dir:
with builtins;
listToAttrs (
map (name: {
inherit name;
value = callPackage (dir + "/${name}") { };
}) (attrNames (readDir dir))
);
in
python3Packages

18
pyproject.toml Normal file
View file

@ -0,0 +1,18 @@
[tool.ruff]
target-version = "py311"
select = ["E", "F", "I", "N", "U", "ANN"]
ignore = [
"F403",
"E501", # line too long
"ANN101", # Missing type annotation for `self` in method
"ANN401" # Dynamically typed expressions (typing.Any) are disallowed
]
exclude = ["src/website/shared/migrations/*.py"] # auto-generated code
[tool.commitizen]
name = "cz_conventional_commits"
tag_format = "$version"
version_scheme = "pep440"
version_provider = "pep621"
update_changelog_on_bump = true
major_version_zero = true

1
shell.nix Normal file
View file

@ -0,0 +1 @@
(import ./. { }).shell

16
src/api/config.py Normal file
View file

@ -0,0 +1,16 @@
import pathlib
from pydantic_settings import BaseSettings, SettingsConfigDict
class Settings(BaseSettings):
evaluation_gc_roots_dir: pathlib.Path
evaluation_logs_dir: pathlib.Path
max_parallel_evaluation: int
local_nixpkgs_checkout: pathlib.Path
nixpkgs_repo_url: str
model_config = SettingsConfigDict(env_file=".env")
settings = Settings() # pyright: ignore

165
src/api/evaluation.py Normal file
View file

@ -0,0 +1,165 @@
import asyncio
import logging
import pathlib
import tempfile
import time
from collections.abc import AsyncGenerator
import aiofiles
from api.config import settings
from api.git import GitRepo
logger = logging.getLogger(__name__)
SIGSEGV = 137
SIGABRT = 134
async def perform_evaluation(
working_tree: pathlib.Path, evaluation_log_fd: int, limit: int = 16 * 1024 * 1024
) -> asyncio.subprocess.Process:
"""
This will run `nix-eval-jobs` on the working tree as a nixpkgs
input and collect all Hydra jobs, including insecure ones.
`limit` is the stream reader buffer limit.
By default, we use 16MB which should give enough time to consume all
the chunks in time and buffer enough of the evaluation attributes.
This will return an asynchronous process you can use to control
the execution runtime.
"""
# TODO(raitobezarius): bring a Nix code generator, that'd be cuter.
nixpkgs_config = "{ config = { allowUnfree = true; inHydra = false; allowInsecurePredicate = (_: true); scrubJobs = false; }; };"
evaluation_wrapper = f"(import <nixpkgs/pkgs/top-level/release.nix> {{ nixpkgsArgs = {nixpkgs_config} }})"
arguments = [
"--force-recurse",
"--meta",
"--repair",
"--quiet",
"--gc-roots-dir",
settings.evaluation_gc_roots_dir,
"--expr",
evaluation_wrapper,
"--include",
f"nixpkgs={working_tree}",
]
return await asyncio.create_subprocess_exec(
"nix-eval-jobs",
*arguments,
limit=limit,
stdout=asyncio.subprocess.PIPE,
stderr=evaluation_log_fd,
)
async def drain_lines(
stream: asyncio.StreamReader, timeout: float = 0.25, max_batch_window: int = 10_000
) -> AsyncGenerator[list[bytes], None]:
"""
This utility will perform an opportunistic line draining
operation on a StreamReader, the timeout will be reset
every time we obtain another line.
"""
lines = []
eof = False
class TooManyStuff(BaseException):
pass
while not eof:
try:
async with asyncio.timeout(timeout) as cm:
lines.append(await stream.readline())
old_deadline = cm.when()
assert (
old_deadline is not None
), "Timeout context does not have timeout!"
new_deadline = old_deadline + timeout
cm.reschedule(new_deadline)
if len(lines) >= max_batch_window:
raise TooManyStuff
except (TimeoutError, TooManyStuff):
# No line, we retry.
if len(lines) == 0:
continue
# Last line is EOF, so there won't be more lines.
while lines and (
lines[-1] == b"" or lines[-1].decode("utf8").strip() == ""
):
eof = True
# Drop the last line.
lines = lines[:-1]
# If we had lines = ["", ...], we just break immediately, there's nothing to yield anymore.
if not lines:
break
yield lines
lines = []
assert eof, "Reached the end of `drain_lines` without EOF!"
async def evaluation_entrypoint(commit_sha1: str) -> AsyncGenerator[list[str], None]:
# TODO: relinquish control immediately if there's too many parallel evaluations ongoing.
# TODO: take a lock.
repo = GitRepo(str(settings.local_nixpkgs_checkout))
start = time.time()
try:
# Pull our local checkout up to that evaluation revision.
await repo.update_from_ref(settings.nixpkgs_repo_url, commit_sha1)
with tempfile.TemporaryDirectory() as working_tree_path:
# Extract a working tree out of it for our needs.
evaluation_log_filepath = (
settings.evaluation_logs_dir / f"evaluation-{commit_sha1}.log"
)
async with (
repo.extract_working_tree(
commit_sha1, working_tree_path
) as working_tree,
aiofiles.open(evaluation_log_filepath, "w") as eval_log,
):
# Kickstart the evaluation asynchronously.
eval_process = await perform_evaluation(
working_tree.path, eval_log.fileno()
)
assert (
eval_process.stdout is not None
), "Expected a valid `stdout` pipe for the asynchronous evaluation process"
# The idea here is that we want to match as close as possible
# our evaluation speed. So, we read as much lines as possible
# and then insert them During the insertion time, more lines
# may come in our internal buffer. On the next read, we will
# drain them again.
# Adding an item in the database takes around 1s max.
# So we don't want to wait more than one second for all the lines we can get.
count = 0
async for lines in drain_lines(eval_process.stdout):
yield [line.decode("utf8") for line in lines]
count += len(lines)
# Wait for `nix-eval-jobs` to exit, at this point,
# It should be fairly quick because EOF has been reached.
rc = await eval_process.wait()
elapsed = time.time() - start
if rc in (SIGSEGV, SIGABRT):
raise RuntimeError("`nix-eval-jobs` crashed!")
elif rc != 0:
logger.error(
"`nix-eval-jobs` failed to evaluate (non-zero exit status), check the evaluation logs"
)
else:
logger.info(
"Processed %d derivations in real-time in %f seconds",
count,
elapsed,
)
except Exception:
elapsed = time.time() - start
logger.exception(
"Failed to run the `nix-eval-job` on revision '%s', marking job as crashed...",
commit_sha1,
)

226
src/api/git.py Normal file
View file

@ -0,0 +1,226 @@
# Taken from the security tracker project.
import asyncio
import itertools
import os.path
import pathlib
from collections.abc import AsyncGenerator
from contextlib import asynccontextmanager
from dataclasses import dataclass
from typing import IO, Any
@dataclass
class Worktree:
path: pathlib.Path
revision: str
detached: bool
prunable: bool
@classmethod
def parse_from_porcelain(
cls: type["Worktree"], porcelain: list[str]
) -> "Worktree | None":
path = porcelain[0].split(" ")[1]
if porcelain[1] == "bare":
return None
revision = porcelain[1].split(" ")[1]
detached = porcelain[2] == "detached"
if len(porcelain) >= 4:
prunable = porcelain[3].startswith("prunable")
else:
prunable = False
return Worktree(
path=pathlib.Path(path),
revision=revision,
detached=detached,
prunable=prunable,
)
def name(self) -> str:
"""
By default, `git` uses the basename
of the target path to name a worktree.
"""
return os.path.basename(str(self.path))
class RepositoryError(Exception):
pass
class GitRepo:
def __init__(
self,
repo_path: str,
stdout: int | IO[Any] | None = None,
stderr: int | IO[Any] | None = None,
) -> None:
self.stdout = stdout
self.stderr = stderr
self.repo_path = repo_path
async def execute_git_command(
self,
cmd: str,
stdout: int | IO[Any] | None = None,
stderr: int | IO[Any] | None = None,
) -> asyncio.subprocess.Process:
final_stdout = stdout or self.stdout or asyncio.subprocess.PIPE
final_stderr = stderr or self.stderr or asyncio.subprocess.PIPE
return await asyncio.create_subprocess_shell(
cmd, cwd=self.repo_path, stdout=final_stdout, stderr=final_stderr
)
async def clone(
self, repo_clone_url: str, reference_repo_path: str | None = None
) -> asyncio.subprocess.Process:
"""
Clones the repository.
If you pass a `reference_repo_path`, the cloning will be much faster.
Otherwise, it will use a shallow clone, since we can fetch commits manually.
"""
stdout = self.stdout or asyncio.subprocess.PIPE
stderr = self.stderr or asyncio.subprocess.PIPE
if reference_repo_path is not None:
clone_process = await asyncio.create_subprocess_shell(
f"git clone --depth=1 --bare --progress --reference={reference_repo_path} {repo_clone_url} {self.repo_path}",
stdout=stdout,
stderr=stderr,
)
else:
clone_process = await asyncio.create_subprocess_shell(
f"git clone --depth=1 --bare --progress {repo_clone_url} {self.repo_path}",
stdout=stdout,
stderr=stderr,
)
return clone_process
async def worktrees(self) -> list[Worktree]:
"""
Returns a list of relevant worktrees,
e.g. filter out the `bare` worktree.
"""
process = await self.execute_git_command(
"git worktree list -z --porcelain", stdout=asyncio.subprocess.PIPE
)
stdout, _ = await process.communicate()
parts = stdout.split(b"\x00")
assert parts[-1] == b"", "Worktrees list are not terminated by a NUL character"
parts = [part.decode("utf8") for part in parts]
return list(
filter(
None,
[
Worktree.parse_from_porcelain(list(group))
for k, group in itertools.groupby(parts, lambda x: x == "")
if not k
],
)
)
async def update_from_ref(self, repo_clone_url: str, object_sha1: str) -> bool:
"""
This checks if `object_sha1` is already present in the repo or not.
If not, perform a fetch to our remote to obtain it.
Returns whether this was fetched or already present.
"""
exists = (
await (
await self.execute_git_command(
f"git cat-file commit {object_sha1}",
stderr=asyncio.subprocess.PIPE,
)
).wait()
== 0
)
if exists:
return False
else:
locking_problem = True
while locking_problem:
# We need to acquire a shallow lock here.
# TODO: replace me by an async variant.
while os.path.exists(os.path.join(self.repo_path, "shallow.lock")):
await asyncio.sleep(1)
process = await self.execute_git_command(
f"git fetch --porcelain --depth=1 {repo_clone_url} {object_sha1}",
stderr=asyncio.subprocess.PIPE,
)
_, stderr = await process.communicate()
rc = await process.wait()
locking_problem = "shallow" in stderr.decode("utf8")
if rc != 0 and not locking_problem:
print(stderr)
raise RepositoryError(
f"failed to fetch {object_sha1} while running `git fetch --depth=1 {repo_clone_url} {object_sha1}`"
)
return True
async def remove_working_tree(self, name: str) -> bool:
"""
This deletes the working tree, if it exists.
Returns `True` if it does exist, otherwise `False`.
"""
process = await self.execute_git_command(f"git worktree remove {name}")
deleted = await process.wait() == 0
return deleted
async def prune_working_trees(self) -> bool:
"""
This prunes all the working trees, if possible.
Returns `True` if it does get pruned, otherwise `False`.
"""
process = await self.execute_git_command("git worktree prune")
pruned = await process.wait() == 0
return pruned
@asynccontextmanager
async def extract_working_tree(
self, commit_sha1: str, target_path: str
) -> AsyncGenerator[Worktree, None]:
"""
This will extract the working tree represented at the reference
induced by the object's commit SHA1 into the `target_path`.
This returns it as an asynchronous context manager.
"""
path = pathlib.Path(target_path)
worktrees = {wt.path: wt for wt in await self.worktrees()}
existing_wt = worktrees.get(path)
if existing_wt is not None and not existing_wt.prunable:
raise RepositoryError(
f"failed to perform extraction of the worktree at {target_path} for commit {commit_sha1}, such a worktree already exist!"
)
if existing_wt is not None and existing_wt.prunable:
await self.prune_working_trees()
process = await self.execute_git_command(
f"git worktree add {target_path} {commit_sha1}"
)
created = await process.wait() == 0
if not created:
raise RepositoryError(
f"failed to perform extraction of the worktree at {target_path} for commit {commit_sha1}, cannot create it!"
)
wt = None
try:
wt = Worktree(
path=pathlib.Path(target_path),
revision=commit_sha1,
detached=True,
prunable=False,
)
yield wt
finally:
if wt is not None:
await self.remove_working_tree(wt.name())

28
src/api/main.py Normal file
View file

@ -0,0 +1,28 @@
from collections.abc import AsyncGenerator
from typing import Annotated
from fastapi import FastAPI, Path
from fastapi.exceptions import HTTPException
from fastapi.responses import StreamingResponse
from api.evaluation import evaluation_entrypoint
app = FastAPI()
async def stream_evaluation(commit_sha1: str) -> AsyncGenerator[bytes, None]:
async for lines in evaluation_entrypoint(commit_sha1):
for line in lines:
yield line.encode("utf8")
@app.post("/evaluations/{project_slug}/{revision}")
async def submit_evaluation(
project_slug: Annotated[str, Path(title="The slug of the project, e.g. nixpkgs")],
revision: Annotated[str, Path(title="The SHA1 revision for this repository")],
) -> StreamingResponse:
if project_slug != "nixpkgs":
raise HTTPException(status_code=404, detail="Project not found")
print("Evaluating nixpkgs at", revision)
return StreamingResponse(stream_evaluation(revision))

10
src/pyproject.toml Normal file
View file

@ -0,0 +1,10 @@
[project]
name = "evolive"
version = "0.1.0"
[setuptools]
packages = [ "api" ]
[build-system]
requires = ["setuptools", "setuptools-scm"]
build-backend = "setuptools.build_meta"