WIP: Non-flakes entrypoint #21

Closed
raito wants to merge 24 commits from non-flakes into main
3 changed files with 217 additions and 75 deletions

View file

@ -4,6 +4,7 @@ import os
import sys
import graphlib
import base64
import random
from collections.abc import Generator
from dataclasses import dataclass, field
from pathlib import Path
@ -24,6 +25,7 @@ from buildbot.reporters.generators.build import BuildStatusGenerator
from buildbot.reporters.message import MessageFormatterFunction
from buildbot.process.buildstep import EXCEPTION
from buildbot.process.buildstep import SUCCESS
from buildbot.process.buildstep import BuildStepFailed
from buildbot.process.results import worst_status
import requests
@ -39,6 +41,14 @@ log = Logger()
FLAKE_TARGET_ATTRIBUTE_FOR_JOBS = "buildbotJobs"
@dataclass
class EvaluatorSettings:
supported_systems: list[str]
worker_count: int
max_memory_size: int
gc_roots_dir: str
lock: util.MasterLock
@dataclass
class NixBuilder:
protocol: str
@ -49,14 +59,24 @@ class NixBuilder:
publicHostKey: str | None = None
sshUser: str | None = None
sshKey: str | None = None
systems: list[str] = field(default_factory=lambda: ["-"])
supportedFeatures: list[str] = field(default_factory=lambda: ["-"])
mandatoryFeatures: list[str] = field(default_factory=lambda: ["-"])
systems: list[str] = field(default_factory=lambda: [])
supportedFeatures: list[str] = field(default_factory=lambda: [])
mandatoryFeatures: list[str] = field(default_factory=lambda: [])
def to_nix_line(self):
encoded_public_key = base64.b64encode(self.publicHostKey.encode('ascii')).decode('ascii') if self.publicHostKey is not None else "-"
fullConnection = f"{self.protocol}://{self.sshUser}@{self.hostName}" if self.sshUser is not None else self.hostName
return f"{fullConnection} {",".join(self.systems)} {self.sshKey or "-"} {self.maxJobs} {self.speedFactor} {",".join(self.supportedFeatures)} {",".join(self.mandatoryFeatures)} {encoded_public_key}"
def to_nix_store(self):
fullConnection = f"{self.sshUser}@{self.hostName}" if self.sshUser is not None else self.hostName
fullConnection = f"{self.protocol}://{fullConnection}"
params = []
if self.sshKey is not None:
params.append(f"ssh-key={self.sshKey}")
if self.publicHostKey is not None:
encoded_public_key = base64.b64encode(self.publicHostKey.encode('ascii')).decode('ascii')
params.append(f"base64-ssh-public-host-key={encoded_public_key}")
if params != []:
fullConnection += "?"
fullConnection += "&".join(params)
return fullConnection
@dataclass
@ -130,7 +150,7 @@ class GerritConfig:
"""
Returns the prefix to build a repourl using that gerrit configuration.
"""
return 'ssh://{self.username}@{self.domain}:{self.port}/'
return f'ssh://{self.username}@{self.domain}:{self.port}/'
class BuildTrigger(steps.BuildStep):
def __init__(
@ -148,7 +168,7 @@ class BuildTrigger(steps.BuildStep):
self.ended = False
self.waitForFinishDeferred = None
self.brids = []
self.description = f"building {len(jobs)} hydra jobs"
self.description = f"building {len(jobs)} jobs"
super().__init__(**kwargs)
def interrupt(self, reason):
@ -177,15 +197,14 @@ class BuildTrigger(steps.BuildStep):
return sch
def schedule_one(self, build_props: Properties, job):
project_name = build_props.getProperty('event.project')
source = f"{project_name}-eval-lix"
project_name = build_props.getProperty("event.refUpdate.project") or build_props.getProperty("event.change.project")
source = f"{project_name}-eval"
attr = job.get("attr", "eval-error")
name = attr
name = f"{FLAKE_TARGET_ATTRIBUTE_FOR_JOBS}.{name}"
name = f"buildbotJobs.{attr}"
error = job.get("error")
props = Properties()
props.setProperty("virtual_builder_name", name, source)
props.setProperty("status_name", f"nix-build .#{FLAKE_TARGET_ATTRIBUTE_FOR_JOBS}.{attr}", source)
props.setProperty("status_name", f"building buildbotJobs.{attr}", source)
props.setProperty("virtual_builder_tags", "", source)
if error is not None:
@ -372,7 +391,8 @@ class NixEvalCommand(buildstep.ShellMixin, steps.BuildStep):
# run nix-eval-jobs --flake .#$FLAKE_TARGET_ATTRIBUTE_FOR_JOBS to generate the dict of stages
cmd: remotecommand.RemoteCommand = yield self.makeRemoteShellCommand()
build_props = self.build.getProperties()
project_name = build_props.get('event.project')
project_name = build_props.getProperty("event.refUpdate.project") or build_props.getProperty("event.change.project")
assert project_name is not None, "`event.refUpdate.project` or `event.change.project` is not available on the build properties, unexpected build type!"
yield self.runCommand(cmd)
@ -396,14 +416,27 @@ class NixEvalCommand(buildstep.ShellMixin, steps.BuildStep):
if not system or system in self.supported_systems: # report eval errors
filtered_jobs.append(job)
# Filter out failed evaluations
succeeded_jobs = [job for job in filtered_jobs if job.get('error') is None]
drv_show_log: Log = yield self.getLog("stdio")
drv_show_log.addStdout(f"getting derivation infos\n")
all_deps = dict()
def closure_of(key, deps):
r, size = set([key]), 0
while len(r) != size:
size = len(r)
r.update(*[ deps[k] for k in r ])
return r.difference([key])
if succeeded_jobs:
drv_show_log.addStdout(f"getting derivation infos for valid derivations\n")
cmd = yield self.makeRemoteShellCommand(
stdioLogName=None,
collectStdout=True,
command=(
["nix", "derivation", "show", "--recursive"]
+ [ drv for drv in (job.get("drvPath") for job in filtered_jobs) if drv ]
+ [ drv for drv in (job.get("drvPath") for job in succeeded_jobs) if drv ]
),
)
yield self.runCommand(cmd)
@ -413,17 +446,9 @@ class NixEvalCommand(buildstep.ShellMixin, steps.BuildStep):
except json.JSONDecodeError as e:
msg = f"Failed to parse `nix derivation show` output for {cmd.command}"
raise BuildbotNixError(msg) from e
all_deps = dict()
for drv, info in drv_info.items():
all_deps[drv] = set(info.get("inputDrvs").keys())
def closure_of(key, deps):
r, size = set([key]), 0
while len(r) != size:
size = len(r)
r.update(*[ deps[k] for k in r ])
return r.difference([key])
job_set = set(( drv for drv in ( job.get("drvPath") for job in filtered_jobs ) if drv ))
all_deps = { k: list(closure_of(k, all_deps).intersection(job_set)) for k in job_set }
@ -431,7 +456,7 @@ class NixEvalCommand(buildstep.ShellMixin, steps.BuildStep):
[
BuildTrigger(
builds_scheduler_group=f"{project_name}-nix-build",
name="build flake",
name="build derivations: ",
jobs=filtered_jobs,
all_deps=all_deps,
),
@ -440,6 +465,88 @@ class NixEvalCommand(buildstep.ShellMixin, steps.BuildStep):
return result
def make_job_evaluator(name: str, settings: EvaluatorSettings, flake: bool) -> NixEvalCommand:
actual_command = []
if flake:
actual_command += ["--flake", f".#{FLAKE_TARGET_ATTRIBUTE_FOR_JOBS}"]
else:
actual_command += ["--expr", "import ./.ci/buildbot.nix"]
return NixEvalCommand(
env={},
name=name,
supported_systems=settings.supported_systems,
command=[
"nix-eval-jobs",
"--workers",
str(settings.worker_count),
"--max-memory-size",
str(settings.max_memory_size),
"--gc-roots-dir",
settings.gc_roots_dir,
"--force-recurse",
"--check-cache-status",
] + actual_command,
haltOnFailure=True,
locks=[settings.lock.access("exclusive")]
)
class NixConfigure(buildstep.CommandMixin, steps.BuildStep):
name = "determining jobs"
"""
Determine what `NixEvalCommand` step should be added after
based on the existence of:
- flake.nix
- .ci/buildbot.nix
"""
def __init__(self, eval_settings: EvaluatorSettings, **kwargs: Any) -> None:
self.evaluator_settings = eval_settings
super().__init__(**kwargs)
self.observer = logobserver.BufferLogObserver()
self.addLogObserver("stdio", self.observer)
@defer.inlineCallbacks
def run(self) -> Generator[Any, object, Any]:
try:
configure_log: Log = yield self.getLog("stdio")
except Exception:
configure_log: Log = yield self.addLog("stdio")
# Takes precedence.
configure_log.addStdout("checking if there's a .ci/buildbot.nix...\n")
ci_buildbot_defn_exists = yield self.pathExists('build/.ci/buildbot.nix')
if ci_buildbot_defn_exists:
configure_log.addStdout(".ci/buildbot.nix found, configured for non-flake CI\n")
self.build.addStepsAfterCurrentStep(
[
make_job_evaluator(
"evaluate `.ci/buildbot.nix` jobs",
self.evaluator_settings,
False
)
]
)
return SUCCESS
flake_exists = yield self.pathExists('build/flake.nix')
if flake_exists:
configure_log.addStdout(f"flake.nix found")
self.build.addStepsAfterCurrentStep([
make_job_evaluator(
"evaluate `flake.nix` jobs",
self.evaluator_settings,
True
)
]
)
return SUCCESS
configure_log.addStdout("neither flake.nix found neither .ci/buildbot.nix, no CI to run!")
return SUCCESS
class NixBuildCommand(buildstep.ShellMixin, steps.BuildStep):
"""Builds a nix derivation."""
@ -481,10 +588,19 @@ def nix_eval_config(
worker_count: int,
max_memory_size: int,
) -> util.BuilderConfig:
"""Uses nix-eval-jobs to evaluate $FLAKE_TARGET_ATTRIBUTE_FOR_JOBS (`.#hydraJobs` by default) from flake.nix in parallel.
"""
Uses nix-eval-jobs to evaluate the entrypoint of this project.
For each evaluated attribute a new build pipeline is started.
"""
factory = util.BuildFactory()
gerrit_private_key = None
with open(project.private_sshkey_path, 'r') as f:
gerrit_private_key = f.read()
if gerrit_private_key is None:
raise RuntimeError('No gerrit private key to fetch the repositories')
# check out the source
factory.addStep(
steps.Gerrit(
@ -492,9 +608,10 @@ def nix_eval_config(
mode="full",
retry=[60, 60],
timeout=3600,
sshPrivateKey=project.private_sshkey_path
sshPrivateKey=gerrit_private_key
),
)
# use one gcroots directory per worker. this should be scoped to the largest unique resource
# in charge of builds (ie, buildnumber is too narrow) to not litter the system with permanent
# gcroots in case of worker restarts.
@ -503,27 +620,22 @@ def nix_eval_config(
"/nix/var/nix/gcroots/per-user/buildbot-worker/%(prop:project)s/drvs/%(prop:workername)s/",
)
factory.addStep(
NixEvalCommand(
env={},
name="evaluate flake",
eval_settings = EvaluatorSettings(
supported_systems=supported_systems,
command=[
"nix-eval-jobs",
"--workers",
str(worker_count),
"--max-memory-size",
str(max_memory_size),
"--gc-roots-dir",
drv_gcroots_dir,
"--force-recurse",
"--check-cache-status",
"--flake",
f".#{FLAKE_TARGET_ATTRIBUTE_FOR_JOBS}"
],
haltOnFailure=True,
locks=[eval_lock.access("exclusive")],
),
worker_count=worker_count,
max_memory_size=max_memory_size,
gc_roots_dir=drv_gcroots_dir,
lock=eval_lock
)
# NixConfigure will choose
# how to add a NixEvalCommand job
# based on whether there's a flake.nix or
# a .ci/buildbot.nix.
factory.addStep(
NixConfigure(
eval_settings
)
)
factory.addStep(
@ -551,12 +663,17 @@ def nix_build_config(
project: GerritProject,
worker_arch: str,
worker_names: list[str],
builders_spec: str,
build_stores: list[str],
signing_keyfile: str | None = None,
binary_cache_config: S3BinaryCacheConfig | None = None
) -> util.BuilderConfig:
"""Builds one nix flake attribute."""
factory = util.BuildFactory()
# pick a store to run the build on
# TODO proper scheduling instead of picking the first builder
build_store = build_stores[0]
factory.addStep(
NixBuildCommand(
env={},
@ -578,6 +695,10 @@ def nix_build_config(
"7200",
"--builders",
builders_spec,
"--store",
build_store,
"--eval-store",
"ssh-ng://localhost",
"--out-link",
util.Interpolate("result-%(prop:attr)s"),
util.Interpolate("%(prop:drv_path)s^*"),
@ -597,6 +718,8 @@ def nix_build_config(
"nix",
"store",
"sign",
"--store",
build_store,
"--key-file",
signing_keyfile,
util.Interpolate(
@ -613,6 +736,8 @@ def nix_build_config(
command=[
"nix",
"copy",
"--store",
build_store,
"--to",
f"s3://{binary_cache_config.bucket}?profile={binary_cache_config.profile}&region={binary_cache_config.region}&endpoint={binary_cache_config.endpoint}",
util.Property(
@ -674,7 +799,7 @@ def config_for_project(
nix_eval_worker_count: int,
nix_eval_max_memory_size: int,
eval_lock: util.MasterLock,
builders_spec: str,
nix_builders: list[NixBuilder],
signing_keyfile: str | None = None,
binary_cache_config: S3BinaryCacheConfig | None = None
) -> Project:
@ -712,12 +837,6 @@ def config_for_project(
),
],
)
gerrit_private_key = None
with open(project.private_sshkey_path, 'r') as f:
gerrit_private_key = f.read()
if gerrit_private_key is None:
raise RuntimeError('No gerrit private key to fetch the repositories')
config["builders"].extend(
[
@ -737,7 +856,7 @@ def config_for_project(
project,
arch,
[ f"{w}-{arch}" for w in worker_names ],
builders_spec,
[b.to_nix_store() for b in nix_builders if arch in b.systems or arch == "other"],
signing_keyfile=signing_keyfile,
binary_cache_config=binary_cache_config
)
@ -768,10 +887,7 @@ def gerritReviewFmt(url, data):
builderName = build['builder']['name']
if len(build['results']) != 1:
raise ValueError('this review request contains more than one build results, unexpected format request')
result = build['results'][0]
result = build['results']
if result == util.RETRY:
return dict()
@ -824,13 +940,15 @@ class GerritNixConfigurator(ConfiguratorBase):
prometheus_config: dict[str, int | str] | None = None,
binary_cache_config: dict[str, str] | None = None,
auth_method: AuthBase | None = None,
manhole: Any = None,
) -> None:
super().__init__()
self.manhole = manhole
self.allowed_origins = allowed_origins
self.gerrit_server = gerrit_server
self.gerrit_user = gerrit_user
self.gerrit_port = gerrit_port
self.gerrit_sshkey_path = gerrit_sshkey_path
self.gerrit_sshkey_path = str(gerrit_sshkey_path)
self.gerrit_config = GerritConfig(domain=self.gerrit_server,
username=self.gerrit_user,
port=self.gerrit_port)
@ -860,6 +978,9 @@ class GerritNixConfigurator(ConfiguratorBase):
worker_config = json.loads(read_secret_file(self.nix_workers_secret_name))
worker_names = []
if self.manhole is not None:
config["manhole"] = self.manhole
config.setdefault("projects", [])
config.setdefault("secretsProviders", [])
config.setdefault("www", {
@ -876,7 +997,6 @@ class GerritNixConfigurator(ConfiguratorBase):
eval_lock = util.MasterLock("nix-eval")
builders_spec = " ; ".join(builder.to_nix_line() for builder in self.nix_builders)
for project in self.projects:
config_for_project(
config,
@ -887,7 +1007,7 @@ class GerritNixConfigurator(ConfiguratorBase):
self.nix_eval_worker_count or multiprocessing.cpu_count(),
self.nix_eval_max_memory_size,
eval_lock,
builders_spec,
self.nix_builders,
signing_keyfile=self.signing_keyfile,
binary_cache_config=self.binary_cache_config
)

View file

@ -7,6 +7,9 @@
let
inherit (lib) filterAttrs;
cfg = config.services.buildbot-nix.coordinator;
debuggingManhole = if cfg.debugging.enable then
"manhole.TelnetManhole(${toString cfg.debugging.port}, 'admin', 'admin')"
else "None";
in
{
options = {
@ -28,6 +31,14 @@ in
description = "List of local remote builders machines associated to that Buildbot instance";
};
debugging = {
enable = lib.mkEnableOption "manhole's buildbot debugging on localhost using `admin:admin`";
port = lib.mkOption {
type = lib.types.port;
default = 15000;
};
};
oauth2 = {
name = lib.mkOption {
type = lib.types.str;
@ -163,6 +174,14 @@ in
'';
example = [ "lix" ];
};
projects = lib.mkOption {
type = lib.types.listOf lib.types.str;
description = ''
List of projects which are to check on Gerrit.
'';
example = [ "lix" ];
};
};
binaryCache = {
@ -216,6 +235,7 @@ in
extraImports = ''
from datetime import timedelta
from buildbot_nix import GerritNixConfigurator, read_secret_file, make_oauth2_method, OAuth2Config, assemble_secret_file_path
from buildbot import manhole
# TODO(raito): make me configurable from the NixOS module.
# how?
@ -257,7 +277,8 @@ in
auth_method=CustomOAuth2(${builtins.toJSON cfg.oauth2.clientId},
read_secret_file('buildbot-oauth2-secret'),
autologin=True
)
),
manhole=${debuggingManhole}
)
''
];

View file

@ -62,6 +62,7 @@ in
pkgs.openssh
pkgs.nix
pkgs.nix-eval-jobs
pkgs.bash
];
environment.PYTHONPATH = "${python.withPackages (_: [cfg.package])}/${python.sitePackages}";
environment.MASTER_URL = cfg.coordinatorUrl;