commit 9c6d4e5f04e8c592909dd07c14be124e062ea214 Author: Raito Bezarius Date: Tue Sep 3 14:47:36 2024 +0200 feat: init project Signed-off-by: Raito Bezarius diff --git a/.env b/.env new file mode 100644 index 0000000..37c8a62 --- /dev/null +++ b/.env @@ -0,0 +1,5 @@ +evaluation_gc_roots_dir="./gc-roots" +evaluation_logs_dir="./logs" +max_parallel_evaluation=1 +local_nixpkgs_checkout="/home/raito/dev/github.com/NixOS/nixpkgs" +nixpkgs_repo_url="https://cl.forkos.org/nixpkgs" diff --git a/.envrc b/.envrc new file mode 100644 index 0000000..1d953f4 --- /dev/null +++ b/.envrc @@ -0,0 +1 @@ +use nix diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..bd3bffe --- /dev/null +++ b/.gitignore @@ -0,0 +1,17 @@ +result +.direnv +.pre-commit-config.yaml +__pycache__ +.credentials +*.jsonl +.data_cache +nixpkgs +result +*.zst +*.dump +*.jsonl +*.log +# A shallow checkout of nixpkgs +nixpkgs/ +gc-roots/ +logs/ diff --git a/default.nix b/default.nix new file mode 100644 index 0000000..b3bbeed --- /dev/null +++ b/default.nix @@ -0,0 +1,115 @@ +{ + sources ? import ./npins, + overlay ? import ./nix/overlay.nix, + pkgs ? import sources.nixpkgs { overlays = [ overlay ]; }, +}: +let + self = rec { + inherit (pkgs) python3; + + package = pkgs.evolive; + + pre-commit-check = pkgs.pre-commit-hooks { + src = ./.; + + hooks = + let + pythonExcludes = [ + "/migrations/" # auto-generated code + ]; + in + { + # Nix setup + nixfmt = { + enable = true; + entry = pkgs.lib.mkForce "${pkgs.lib.getExe pkgs.nixfmt-rfc-style}"; + }; + statix = { + enable = true; + settings.ignore = [ "npins" ]; + }; + + # Python setup + ruff = { + enable = true; + excludes = pythonExcludes; + }; + ruff-format = { + enable = true; + name = "Format python code with ruff"; + types = [ + "text" + "python" + ]; + entry = "${pkgs.lib.getExe pkgs.ruff} format"; + excludes = pythonExcludes; + }; + + pyright = + let + pyEnv = pkgs.python3.buildEnv.override { + extraLibs = pkgs.evolive.propagatedBuildInputs; + # Necessary for FastAPI CLI. + ignoreCollisions = true; + }; + wrappedPyright = pkgs.runCommand "pyright" { nativeBuildInputs = [ pkgs.makeWrapper ]; } '' + makeWrapper ${pkgs.pyright}/bin/pyright $out \ + --set PYTHONPATH ${pyEnv}/${pyEnv.sitePackages} \ + --prefix PATH : ${pyEnv}/bin \ + --set PYTHONHOME ${pyEnv} + ''; + in + { + enable = true; + entry = pkgs.lib.mkForce (builtins.toString wrappedPyright); + excludes = pythonExcludes; + }; + + # Global setup + prettier = { + enable = true; + excludes = [ + "\\.min.css$" + "\\.html$" + ]; + }; + commitizen.enable = true; + }; + }; + + shell = pkgs.mkShell { + DATA_CACHE_DIRECTORY = toString ./. + "/.data_cache"; + REDIS_SOCKET_URL = "unix:///run/redis/redis.sock"; + # `./src/website/tracker/settings.py` by default looks for LOCAL_NIXPKGS_CHECKOUT + # in the root of the repo. Make it the default here for local development. + LOCAL_NIXPKGS_CHECKOUT = toString ./. + "/nixpkgs"; + + packages = [ + package + # (pkgs.python3.buildEnv.override { + # extraLibs = with pkgs.python3.pkgs; [ fastapi uvicorn ]; + # ignoreCollisions = true; + # }) + pkgs.nix-eval-jobs + pkgs.commitizen + pkgs.npins + pkgs.nixfmt-rfc-style + pkgs.hivemind + ]; + + shellHook = '' + ${pre-commit-check.shellHook} + + mkdir -p .credentials + export DATABASE_URL=postgres:///evolive + export CREDENTIALS_DIRECTORY=${builtins.toString ./.credentials} + ''; + }; + + # tests = import ./nix/tests/vm-basic.nix { + # inherit pkgs; + # wstModule = module; + # }; + }; +in +self # // self.tests diff --git a/nix/overlay.nix b/nix/overlay.nix new file mode 100644 index 0000000..3873e8e --- /dev/null +++ b/nix/overlay.nix @@ -0,0 +1,43 @@ +final: prev: +let + inherit (final) python3; + extraPackages = import ../pkgs { + pkgs = prev; + inherit python3; + }; + extraPython3Packages = extraPackages.python3Packages; + sources = import ../npins; +in +{ + python3 = prev.lib.attrsets.recursiveUpdate prev.python3 { pkgs = extraPython3Packages; }; + + # go through the motions to make a flake-incompat project use the build + # inputs we want + pre-commit-hooks = final.callPackage "${sources.pre-commit-hooks}/nix/run.nix" { + tools = import "${sources.pre-commit-hooks}/nix/call-tools.nix" final; + # wat + gitignore-nix-src = { + lib = import sources.gitignore { inherit (final) lib; }; + }; + isFlakes = false; + }; + + evolive = python3.pkgs.buildPythonPackage { + pname = "evolive"; + version = "0.1.0"; + format = "pyproject"; + + src = final.nix-gitignore.gitignoreSourcePure [ ../.gitignore ] ../src; + + propagatedBuildInputs = with python3.pkgs; [ + setuptools + setuptools-scm + fastapi + ipython + tqdm + aiofiles + pydantic-settings + uvicorn + ]; + }; +} diff --git a/npins/default.nix b/npins/default.nix new file mode 100644 index 0000000..fb04b70 --- /dev/null +++ b/npins/default.nix @@ -0,0 +1,80 @@ +# Generated by npins. Do not modify; will be overwritten regularly +let + data = builtins.fromJSON (builtins.readFile ./sources.json); + version = data.version; + + mkSource = + spec: + assert spec ? type; + let + path = + if spec.type == "Git" then + mkGitSource spec + else if spec.type == "GitRelease" then + mkGitSource spec + else if spec.type == "PyPi" then + mkPyPiSource spec + else if spec.type == "Channel" then + mkChannelSource spec + else + builtins.throw "Unknown source type ${spec.type}"; + in + spec // { outPath = path; }; + + mkGitSource = + { + repository, + revision, + url ? null, + hash, + branch ? null, + ... + }: + assert repository ? type; + # At the moment, either it is a plain git repository (which has an url), or it is a GitHub/GitLab repository + # In the latter case, there we will always be an url to the tarball + if url != null then + (builtins.fetchTarball { + inherit url; + sha256 = hash; + }) + else + assert repository.type == "Git"; + let + urlToName = + url: rev: + let + matched = builtins.match "^.*/([^/]*)(\\.git)?$" repository.url; + + short = builtins.substring 0 7 rev; + + appendShort = if (builtins.match "[a-f0-9]*" rev) != null then "-${short}" else ""; + in + "${if matched == null then "source" else builtins.head matched}${appendShort}"; + name = urlToName repository.url revision; + in + builtins.fetchGit { + url = repository.url; + rev = revision; + inherit name; + narHash = hash; + }; + + mkPyPiSource = + { url, hash, ... }: + builtins.fetchurl { + inherit url; + sha256 = hash; + }; + + mkChannelSource = + { url, hash, ... }: + builtins.fetchTarball { + inherit url; + sha256 = hash; + }; +in +if version == 4 then + builtins.mapAttrs (_: mkSource) data.pins +else + throw "Unsupported format version ${toString version} in sources.json. Try running `npins upgrade`" diff --git a/npins/sources.json b/npins/sources.json new file mode 100644 index 0000000..7541010 --- /dev/null +++ b/npins/sources.json @@ -0,0 +1,23 @@ +{ + "pins": { + "nixpkgs": { + "type": "Channel", + "name": "nixpkgs-unstable", + "url": "https://releases.nixos.org/nixpkgs/nixpkgs-24.11pre674705.b833ff01a0d6/nixexprs.tar.xz", + "hash": "12cda9rvpgjcsxykbcg5cxjaayhibjjabv6svacjc5n5kpcbx5sf" + }, + "pre-commit-hooks": { + "type": "Git", + "repository": { + "type": "GitHub", + "owner": "cachix", + "repo": "pre-commit-hooks.nix" + }, + "branch": "master", + "revision": "4509ca64f1084e73bc7a721b20c669a8d4c5ebe6", + "url": "https://github.com/cachix/pre-commit-hooks.nix/archive/4509ca64f1084e73bc7a721b20c669a8d4c5ebe6.tar.gz", + "hash": "06cjchycvj4wjzwm8ly1d36s9d7lj3mz1zqnkikvck0chi1psaa3" + } + }, + "version": 4 +} diff --git a/pkgs/default.nix b/pkgs/default.nix new file mode 100644 index 0000000..f26fa40 --- /dev/null +++ b/pkgs/default.nix @@ -0,0 +1,6 @@ +{ pkgs, python3 }: + +{ + # place more custom packages here + python3Packages = pkgs.callPackage ./python { inherit python3; }; +} diff --git a/pkgs/python/default.nix b/pkgs/python/default.nix new file mode 100644 index 0000000..6d05b4c --- /dev/null +++ b/pkgs/python/default.nix @@ -0,0 +1,19 @@ +{ pkgs, python3 }: + +let + callPackage = pkgs.lib.callPackageWith ( + pkgs // { inherit python3; } // python3.pkgs // python3Packages + ); + + python3Packages = mkPackages ./.; + mkPackages = + dir: + with builtins; + listToAttrs ( + map (name: { + inherit name; + value = callPackage (dir + "/${name}") { }; + }) (attrNames (readDir dir)) + ); +in +python3Packages diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..fd584dc --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,18 @@ +[tool.ruff] +target-version = "py311" +select = ["E", "F", "I", "N", "U", "ANN"] +ignore = [ + "F403", + "E501", # line too long + "ANN101", # Missing type annotation for `self` in method + "ANN401" # Dynamically typed expressions (typing.Any) are disallowed +] +exclude = ["src/website/shared/migrations/*.py"] # auto-generated code + +[tool.commitizen] +name = "cz_conventional_commits" +tag_format = "$version" +version_scheme = "pep440" +version_provider = "pep621" +update_changelog_on_bump = true +major_version_zero = true diff --git a/shell.nix b/shell.nix new file mode 100644 index 0000000..a6bdf20 --- /dev/null +++ b/shell.nix @@ -0,0 +1 @@ +(import ./. { }).shell diff --git a/src/api/config.py b/src/api/config.py new file mode 100644 index 0000000..938bd28 --- /dev/null +++ b/src/api/config.py @@ -0,0 +1,16 @@ +import pathlib + +from pydantic_settings import BaseSettings, SettingsConfigDict + + +class Settings(BaseSettings): + evaluation_gc_roots_dir: pathlib.Path + evaluation_logs_dir: pathlib.Path + max_parallel_evaluation: int + local_nixpkgs_checkout: pathlib.Path + nixpkgs_repo_url: str + + model_config = SettingsConfigDict(env_file=".env") + + +settings = Settings() # pyright: ignore diff --git a/src/api/evaluation.py b/src/api/evaluation.py new file mode 100644 index 0000000..cc5c119 --- /dev/null +++ b/src/api/evaluation.py @@ -0,0 +1,165 @@ +import asyncio +import logging +import pathlib +import tempfile +import time +from collections.abc import AsyncGenerator + +import aiofiles + +from api.config import settings +from api.git import GitRepo + +logger = logging.getLogger(__name__) + +SIGSEGV = 137 +SIGABRT = 134 + + +async def perform_evaluation( + working_tree: pathlib.Path, evaluation_log_fd: int, limit: int = 16 * 1024 * 1024 +) -> asyncio.subprocess.Process: + """ + This will run `nix-eval-jobs` on the working tree as a nixpkgs + input and collect all Hydra jobs, including insecure ones. + + `limit` is the stream reader buffer limit. + + By default, we use 16MB which should give enough time to consume all + the chunks in time and buffer enough of the evaluation attributes. + + This will return an asynchronous process you can use to control + the execution runtime. + """ + # TODO(raitobezarius): bring a Nix code generator, that'd be cuter. + nixpkgs_config = "{ config = { allowUnfree = true; inHydra = false; allowInsecurePredicate = (_: true); scrubJobs = false; }; };" + evaluation_wrapper = f"(import {{ nixpkgsArgs = {nixpkgs_config} }})" + arguments = [ + "--force-recurse", + "--meta", + "--repair", + "--quiet", + "--gc-roots-dir", + settings.evaluation_gc_roots_dir, + "--expr", + evaluation_wrapper, + "--include", + f"nixpkgs={working_tree}", + ] + return await asyncio.create_subprocess_exec( + "nix-eval-jobs", + *arguments, + limit=limit, + stdout=asyncio.subprocess.PIPE, + stderr=evaluation_log_fd, + ) + + +async def drain_lines( + stream: asyncio.StreamReader, timeout: float = 0.25, max_batch_window: int = 10_000 +) -> AsyncGenerator[list[bytes], None]: + """ + This utility will perform an opportunistic line draining + operation on a StreamReader, the timeout will be reset + every time we obtain another line. + """ + lines = [] + eof = False + + class TooManyStuff(BaseException): + pass + + while not eof: + try: + async with asyncio.timeout(timeout) as cm: + lines.append(await stream.readline()) + old_deadline = cm.when() + assert ( + old_deadline is not None + ), "Timeout context does not have timeout!" + new_deadline = old_deadline + timeout + cm.reschedule(new_deadline) + + if len(lines) >= max_batch_window: + raise TooManyStuff + except (TimeoutError, TooManyStuff): + # No line, we retry. + if len(lines) == 0: + continue + # Last line is EOF, so there won't be more lines. + while lines and ( + lines[-1] == b"" or lines[-1].decode("utf8").strip() == "" + ): + eof = True + # Drop the last line. + lines = lines[:-1] + # If we had lines = ["", ...], we just break immediately, there's nothing to yield anymore. + if not lines: + break + + yield lines + lines = [] + + assert eof, "Reached the end of `drain_lines` without EOF!" + + +async def evaluation_entrypoint(commit_sha1: str) -> AsyncGenerator[list[str], None]: + # TODO: relinquish control immediately if there's too many parallel evaluations ongoing. + # TODO: take a lock. + repo = GitRepo(str(settings.local_nixpkgs_checkout)) + start = time.time() + try: + # Pull our local checkout up to that evaluation revision. + await repo.update_from_ref(settings.nixpkgs_repo_url, commit_sha1) + with tempfile.TemporaryDirectory() as working_tree_path: + # Extract a working tree out of it for our needs. + evaluation_log_filepath = ( + settings.evaluation_logs_dir / f"evaluation-{commit_sha1}.log" + ) + async with ( + repo.extract_working_tree( + commit_sha1, working_tree_path + ) as working_tree, + aiofiles.open(evaluation_log_filepath, "w") as eval_log, + ): + # Kickstart the evaluation asynchronously. + eval_process = await perform_evaluation( + working_tree.path, eval_log.fileno() + ) + assert ( + eval_process.stdout is not None + ), "Expected a valid `stdout` pipe for the asynchronous evaluation process" + + # The idea here is that we want to match as close as possible + # our evaluation speed. So, we read as much lines as possible + # and then insert them During the insertion time, more lines + # may come in our internal buffer. On the next read, we will + # drain them again. + # Adding an item in the database takes around 1s max. + # So we don't want to wait more than one second for all the lines we can get. + count = 0 + async for lines in drain_lines(eval_process.stdout): + yield [line.decode("utf8") for line in lines] + count += len(lines) + # Wait for `nix-eval-jobs` to exit, at this point, + # It should be fairly quick because EOF has been reached. + rc = await eval_process.wait() + elapsed = time.time() - start + if rc in (SIGSEGV, SIGABRT): + raise RuntimeError("`nix-eval-jobs` crashed!") + elif rc != 0: + logger.error( + "`nix-eval-jobs` failed to evaluate (non-zero exit status), check the evaluation logs" + ) + else: + logger.info( + "Processed %d derivations in real-time in %f seconds", + count, + elapsed, + ) + except Exception: + elapsed = time.time() - start + logger.exception( + "Failed to run the `nix-eval-job` on revision '%s', marking job as crashed...", + commit_sha1, + ) diff --git a/src/api/git.py b/src/api/git.py new file mode 100644 index 0000000..f252203 --- /dev/null +++ b/src/api/git.py @@ -0,0 +1,226 @@ +# Taken from the security tracker project. +import asyncio +import itertools +import os.path +import pathlib +from collections.abc import AsyncGenerator +from contextlib import asynccontextmanager +from dataclasses import dataclass +from typing import IO, Any + + +@dataclass +class Worktree: + path: pathlib.Path + revision: str + detached: bool + prunable: bool + + @classmethod + def parse_from_porcelain( + cls: type["Worktree"], porcelain: list[str] + ) -> "Worktree | None": + path = porcelain[0].split(" ")[1] + if porcelain[1] == "bare": + return None + + revision = porcelain[1].split(" ")[1] + detached = porcelain[2] == "detached" + if len(porcelain) >= 4: + prunable = porcelain[3].startswith("prunable") + else: + prunable = False + return Worktree( + path=pathlib.Path(path), + revision=revision, + detached=detached, + prunable=prunable, + ) + + def name(self) -> str: + """ + By default, `git` uses the basename + of the target path to name a worktree. + """ + return os.path.basename(str(self.path)) + + +class RepositoryError(Exception): + pass + + +class GitRepo: + def __init__( + self, + repo_path: str, + stdout: int | IO[Any] | None = None, + stderr: int | IO[Any] | None = None, + ) -> None: + self.stdout = stdout + self.stderr = stderr + self.repo_path = repo_path + + async def execute_git_command( + self, + cmd: str, + stdout: int | IO[Any] | None = None, + stderr: int | IO[Any] | None = None, + ) -> asyncio.subprocess.Process: + final_stdout = stdout or self.stdout or asyncio.subprocess.PIPE + final_stderr = stderr or self.stderr or asyncio.subprocess.PIPE + return await asyncio.create_subprocess_shell( + cmd, cwd=self.repo_path, stdout=final_stdout, stderr=final_stderr + ) + + async def clone( + self, repo_clone_url: str, reference_repo_path: str | None = None + ) -> asyncio.subprocess.Process: + """ + Clones the repository. + If you pass a `reference_repo_path`, the cloning will be much faster. + + Otherwise, it will use a shallow clone, since we can fetch commits manually. + """ + stdout = self.stdout or asyncio.subprocess.PIPE + stderr = self.stderr or asyncio.subprocess.PIPE + if reference_repo_path is not None: + clone_process = await asyncio.create_subprocess_shell( + f"git clone --depth=1 --bare --progress --reference={reference_repo_path} {repo_clone_url} {self.repo_path}", + stdout=stdout, + stderr=stderr, + ) + else: + clone_process = await asyncio.create_subprocess_shell( + f"git clone --depth=1 --bare --progress {repo_clone_url} {self.repo_path}", + stdout=stdout, + stderr=stderr, + ) + + return clone_process + + async def worktrees(self) -> list[Worktree]: + """ + Returns a list of relevant worktrees, + e.g. filter out the `bare` worktree. + """ + process = await self.execute_git_command( + "git worktree list -z --porcelain", stdout=asyncio.subprocess.PIPE + ) + stdout, _ = await process.communicate() + parts = stdout.split(b"\x00") + assert parts[-1] == b"", "Worktrees list are not terminated by a NUL character" + parts = [part.decode("utf8") for part in parts] + return list( + filter( + None, + [ + Worktree.parse_from_porcelain(list(group)) + for k, group in itertools.groupby(parts, lambda x: x == "") + if not k + ], + ) + ) + + async def update_from_ref(self, repo_clone_url: str, object_sha1: str) -> bool: + """ + This checks if `object_sha1` is already present in the repo or not. + If not, perform a fetch to our remote to obtain it. + + Returns whether this was fetched or already present. + """ + exists = ( + await ( + await self.execute_git_command( + f"git cat-file commit {object_sha1}", + stderr=asyncio.subprocess.PIPE, + ) + ).wait() + == 0 + ) + if exists: + return False + else: + locking_problem = True + while locking_problem: + # We need to acquire a shallow lock here. + # TODO: replace me by an async variant. + while os.path.exists(os.path.join(self.repo_path, "shallow.lock")): + await asyncio.sleep(1) + process = await self.execute_git_command( + f"git fetch --porcelain --depth=1 {repo_clone_url} {object_sha1}", + stderr=asyncio.subprocess.PIPE, + ) + _, stderr = await process.communicate() + rc = await process.wait() + locking_problem = "shallow" in stderr.decode("utf8") + if rc != 0 and not locking_problem: + print(stderr) + raise RepositoryError( + f"failed to fetch {object_sha1} while running `git fetch --depth=1 {repo_clone_url} {object_sha1}`" + ) + return True + + async def remove_working_tree(self, name: str) -> bool: + """ + This deletes the working tree, if it exists. + + Returns `True` if it does exist, otherwise `False`. + """ + process = await self.execute_git_command(f"git worktree remove {name}") + deleted = await process.wait() == 0 + + return deleted + + async def prune_working_trees(self) -> bool: + """ + This prunes all the working trees, if possible. + + Returns `True` if it does get pruned, otherwise `False`. + """ + process = await self.execute_git_command("git worktree prune") + pruned = await process.wait() == 0 + + return pruned + + @asynccontextmanager + async def extract_working_tree( + self, commit_sha1: str, target_path: str + ) -> AsyncGenerator[Worktree, None]: + """ + This will extract the working tree represented at the reference + induced by the object's commit SHA1 into the `target_path`. + + This returns it as an asynchronous context manager. + """ + path = pathlib.Path(target_path) + worktrees = {wt.path: wt for wt in await self.worktrees()} + existing_wt = worktrees.get(path) + if existing_wt is not None and not existing_wt.prunable: + raise RepositoryError( + f"failed to perform extraction of the worktree at {target_path} for commit {commit_sha1}, such a worktree already exist!" + ) + if existing_wt is not None and existing_wt.prunable: + await self.prune_working_trees() + + process = await self.execute_git_command( + f"git worktree add {target_path} {commit_sha1}" + ) + created = await process.wait() == 0 + + if not created: + raise RepositoryError( + f"failed to perform extraction of the worktree at {target_path} for commit {commit_sha1}, cannot create it!" + ) + + wt = None + try: + wt = Worktree( + path=pathlib.Path(target_path), + revision=commit_sha1, + detached=True, + prunable=False, + ) + yield wt + finally: + if wt is not None: + await self.remove_working_tree(wt.name()) diff --git a/src/api/main.py b/src/api/main.py new file mode 100644 index 0000000..5a181e3 --- /dev/null +++ b/src/api/main.py @@ -0,0 +1,28 @@ +from collections.abc import AsyncGenerator +from typing import Annotated + +from fastapi import FastAPI, Path +from fastapi.exceptions import HTTPException +from fastapi.responses import StreamingResponse + +from api.evaluation import evaluation_entrypoint + +app = FastAPI() + + +async def stream_evaluation(commit_sha1: str) -> AsyncGenerator[bytes, None]: + async for lines in evaluation_entrypoint(commit_sha1): + for line in lines: + yield line.encode("utf8") + + +@app.post("/evaluations/{project_slug}/{revision}") +async def submit_evaluation( + project_slug: Annotated[str, Path(title="The slug of the project, e.g. nixpkgs")], + revision: Annotated[str, Path(title="The SHA1 revision for this repository")], +) -> StreamingResponse: + if project_slug != "nixpkgs": + raise HTTPException(status_code=404, detail="Project not found") + + print("Evaluating nixpkgs at", revision) + return StreamingResponse(stream_evaluation(revision)) diff --git a/src/pyproject.toml b/src/pyproject.toml new file mode 100644 index 0000000..ad9f8eb --- /dev/null +++ b/src/pyproject.toml @@ -0,0 +1,10 @@ +[project] +name = "evolive" +version = "0.1.0" + +[setuptools] +packages = [ "api" ] + +[build-system] +requires = ["setuptools", "setuptools-scm"] +build-backend = "setuptools.build_meta"