feat: write the evaluation as a parquet on-disk

Not all fields, just some for now.

Signed-off-by: Raito Bezarius <masterancpp@gmail.com>
This commit is contained in:
raito 2024-09-04 17:26:43 +02:00
parent 64fee32cc0
commit 71dded6ef3
4 changed files with 188 additions and 3 deletions

View file

@ -38,6 +38,8 @@ in
aiofiles aiofiles
pydantic-settings pydantic-settings
uvicorn uvicorn
polars
dataclass-wizard
]; ];
}; };
} }

View file

@ -0,0 +1,130 @@
import json
from dataclasses import dataclass, field
from typing import Any
from dataclass_wizard import DumpMixin, JSONWizard, LoadMixin
@dataclass
class MaintainerAttribute(JSONWizard):
name: str
github: str | None = None
github_id: int | None = None
email: str | None = None
matrix: str | None = None
@dataclass
class LicenseAttribute(JSONWizard):
full_name: str | None = None
deprecated: bool = False
free: bool = False
redistributable: bool = False
short_name: str | None = None
spdx_id: str | None = None
url: str | None = None
@dataclass
class MetadataAttribute(JSONWizard, LoadMixin, DumpMixin):
outputs_to_install: list[str] = field(default_factory=list)
available: bool = True
broken: bool = False
unfree: bool = False
unsupported: bool = False
insecure: bool = False
main_program: str | None = None
position: str | None = None
homepage: str | None = None
description: str | None = None
name: str | None = None
maintainers: list[MaintainerAttribute] = field(default_factory=list)
license: list[LicenseAttribute] = field(default_factory=list)
platforms: list[str] = field(default_factory=list)
known_vulnerabilities: list[str] = field(default_factory=list)
def __pre_as_dict__(self) -> None:
linearized_maintainers = []
for maintainer in self.maintainers:
if maintainer.get("scope") is not None: # pyright: ignore generalTypeIssue
linearized_maintainers.extend(
maintainer.get("members", []) # pyright: ignore generalTypeIssue
)
else:
linearized_maintainers.append(maintainer)
self.maintainers = linearized_maintainers
@dataclass
class EvaluatedAttribute(JSONWizard):
"""
This is a totally evaluated attribute.
"""
attr: str
attr_path: list[str]
name: str
drv_path: str
# drv -> list of outputs.
input_drvs: dict[str, list[str]]
meta: MetadataAttribute | None
outputs: dict[str, str]
system: str
@dataclass
class PartialEvaluatedAttribute:
"""
This represents a potentially invalid partially
evaluated attribute for some reasons.
Open the `evaluation` for more or read the `error`.
"""
attr: str
attr_path: list[str]
error: str | None = None
evaluation: EvaluatedAttribute | None = None
def parse_total_evaluation(raw: dict[str, Any]) -> EvaluatedAttribute:
# Various fixups to deal with... things.
# my lord...
if raw.get("meta", {}) is None:
print(raw)
if (
raw.get("meta", {}) is not None
and "license" in raw.get("meta", {})
and not isinstance(raw.get("meta", {})["license"], list)
):
if raw["meta"]["license"] == "unknown":
raw["meta"]["license"] = []
elif isinstance(raw["meta"]["license"], str):
raw["meta"]["license"] = [{"fullName": raw["meta"]["license"]}]
else:
raw["meta"]["license"] = [raw["meta"]["license"]]
new_maintainers = []
if (
raw.get("meta", {}) is not None
and "maintainers" in raw.get("meta", {})
and isinstance(raw.get("meta", {})["maintainers"], list)
):
for maintainer in raw.get("meta", {})["maintainers"]:
if maintainer.get("scope") is not None:
new_maintainers.extend(maintainer["members"])
else:
new_maintainers.append(maintainer)
raw["meta"]["maintainers"] = new_maintainers
return EvaluatedAttribute.from_dict(raw)
def parse_evaluation_result(line: str) -> PartialEvaluatedAttribute:
raw = json.loads(line)
return PartialEvaluatedAttribute(
attr=raw.get("attr"),
attr_path=raw.get("attr_path"),
error=raw.get("error"),
evaluation=parse_total_evaluation(raw) if raw.get("error") is None else None,
)

View file

@ -1,19 +1,72 @@
from collections.abc import AsyncGenerator import time
from typing import Annotated from collections.abc import AsyncGenerator, Generator
from typing import Annotated, Any
from fastapi import FastAPI, Path import polars as pl
from fastapi import FastAPI, Path, WebSocket
from fastapi.exceptions import HTTPException from fastapi.exceptions import HTTPException
from fastapi.responses import StreamingResponse from fastapi.responses import StreamingResponse
from api.config import settings
from api.evaluation import evaluation_entrypoint from api.evaluation import evaluation_entrypoint
from api.evaluation.models import parse_evaluation_result
app = FastAPI() app = FastAPI()
async def stream_evaluation(commit_sha1: str) -> AsyncGenerator[bytes, None]: async def stream_evaluation(commit_sha1: str) -> AsyncGenerator[bytes, None]:
rows = []
async for lines in evaluation_entrypoint(commit_sha1): async for lines in evaluation_entrypoint(commit_sha1):
for line in lines: for line in lines:
yield line.encode("utf8") yield line.encode("utf8")
eval_result = parse_evaluation_result(line)
if eval_result.evaluation is not None:
eval_result = eval_result.evaluation
rows.append(
[
eval_result.attr,
eval_result.attr_path,
eval_result.name,
eval_result.drv_path,
eval_result.input_drvs,
eval_result.system,
]
)
df = pl.DataFrame(
rows,
schema={
"attr": pl.String,
"attr_path": pl.List,
"name": pl.String,
"drv_path": pl.String,
"input_drvs": pl.Struct,
"system": pl.String,
},
)
df.write_parquet(f"/tmp/nixpkgs-{commit_sha1}-success-eval.parquet")
def follow(thefile: Any) -> Generator[str, None, None]:
thefile.seek(0, 2)
while True:
line = thefile.readline()
if not line:
time.sleep(0.1)
continue
yield line
@app.websocket("/logs/{project_slug}/{revision}")
async def stream_evaluation_log(
websocket: WebSocket,
project_slug: Annotated[str, Path(title="The slug of the project, e.g. nixpkgs")],
revision: Annotated[str, Path(title="The SHA1 revision for this repository")],
) -> None:
await websocket.accept()
while True:
with open(settings.evaluation_logs_dir / f"evaluation-{revision}.log") as log:
for line in follow(log):
await websocket.send_text(line)
@app.post("/evaluations/{project_slug}/{revision}") @app.post("/evaluations/{project_slug}/{revision}")