feat: enable per-worker local store slots #24

Open
raito wants to merge 20 commits from local-stores into main
3 changed files with 217 additions and 75 deletions

View file

@ -4,6 +4,7 @@ import os
import sys
import graphlib
import base64
import random
from collections.abc import Generator
from dataclasses import dataclass, field
from pathlib import Path
@ -24,6 +25,7 @@ from buildbot.reporters.generators.build import BuildStatusGenerator
from buildbot.reporters.message import MessageFormatterFunction
from buildbot.process.buildstep import EXCEPTION
from buildbot.process.buildstep import SUCCESS
from buildbot.process.buildstep import BuildStepFailed
from buildbot.process.results import worst_status
import requests
@ -39,6 +41,14 @@ log = Logger()
FLAKE_TARGET_ATTRIBUTE_FOR_JOBS = "buildbotJobs"
@dataclass
class EvaluatorSettings:
supported_systems: list[str]
worker_count: int
max_memory_size: int
gc_roots_dir: str
lock: util.MasterLock
@dataclass
class NixBuilder:
protocol: str
@ -49,14 +59,24 @@ class NixBuilder:
publicHostKey: str | None = None
sshUser: str | None = None
sshKey: str | None = None
systems: list[str] = field(default_factory=lambda: ["-"])
supportedFeatures: list[str] = field(default_factory=lambda: ["-"])
mandatoryFeatures: list[str] = field(default_factory=lambda: ["-"])
systems: list[str] = field(default_factory=lambda: [])
supportedFeatures: list[str] = field(default_factory=lambda: [])
mandatoryFeatures: list[str] = field(default_factory=lambda: [])
def to_nix_line(self):
encoded_public_key = base64.b64encode(self.publicHostKey.encode('ascii')).decode('ascii') if self.publicHostKey is not None else "-"
fullConnection = f"{self.protocol}://{self.sshUser}@{self.hostName}" if self.sshUser is not None else self.hostName
return f"{fullConnection} {",".join(self.systems)} {self.sshKey or "-"} {self.maxJobs} {self.speedFactor} {",".join(self.supportedFeatures)} {",".join(self.mandatoryFeatures)} {encoded_public_key}"
def to_nix_store(self):
fullConnection = f"{self.sshUser}@{self.hostName}" if self.sshUser is not None else self.hostName
fullConnection = f"{self.protocol}://{fullConnection}"
params = []
if self.sshKey is not None:
params.append(f"ssh-key={self.sshKey}")
if self.publicHostKey is not None:
encoded_public_key = base64.b64encode(self.publicHostKey.encode('ascii')).decode('ascii')
params.append(f"base64-ssh-public-host-key={encoded_public_key}")
if params != []:
fullConnection += "?"
fullConnection += "&".join(params)
return fullConnection
@dataclass
@ -130,7 +150,7 @@ class GerritConfig:
"""
Returns the prefix to build a repourl using that gerrit configuration.
"""
return 'ssh://{self.username}@{self.domain}:{self.port}/'
return f'ssh://{self.username}@{self.domain}:{self.port}/'
class BuildTrigger(steps.BuildStep):
def __init__(
@ -148,7 +168,7 @@ class BuildTrigger(steps.BuildStep):
self.ended = False
self.waitForFinishDeferred = None
self.brids = []
self.description = f"building {len(jobs)} hydra jobs"
self.description = f"building {len(jobs)} jobs"
super().__init__(**kwargs)
def interrupt(self, reason):
@ -177,15 +197,14 @@ class BuildTrigger(steps.BuildStep):
return sch
def schedule_one(self, build_props: Properties, job):
project_name = build_props.getProperty('event.project')
source = f"{project_name}-eval-lix"
project_name = build_props.getProperty("event.refUpdate.project") or build_props.getProperty("event.change.project")
source = f"{project_name}-eval"
attr = job.get("attr", "eval-error")
name = attr
name = f"{FLAKE_TARGET_ATTRIBUTE_FOR_JOBS}.{name}"
name = f"buildbotJobs.{attr}"
error = job.get("error")
props = Properties()
props.setProperty("virtual_builder_name", name, source)
props.setProperty("status_name", f"nix-build .#{FLAKE_TARGET_ATTRIBUTE_FOR_JOBS}.{attr}", source)
props.setProperty("status_name", f"building buildbotJobs.{attr}", source)
props.setProperty("virtual_builder_tags", "", source)
if error is not None:
@ -372,7 +391,8 @@ class NixEvalCommand(buildstep.ShellMixin, steps.BuildStep):
# run nix-eval-jobs --flake .#$FLAKE_TARGET_ATTRIBUTE_FOR_JOBS to generate the dict of stages
cmd: remotecommand.RemoteCommand = yield self.makeRemoteShellCommand()
build_props = self.build.getProperties()
project_name = build_props.get('event.project')
project_name = build_props.getProperty("event.refUpdate.project") or build_props.getProperty("event.change.project")
assert project_name is not None, "`event.refUpdate.project` or `event.change.project` is not available on the build properties, unexpected build type!"
yield self.runCommand(cmd)
@ -396,14 +416,27 @@ class NixEvalCommand(buildstep.ShellMixin, steps.BuildStep):
if not system or system in self.supported_systems: # report eval errors
filtered_jobs.append(job)
# Filter out failed evaluations
succeeded_jobs = [job for job in filtered_jobs if job.get('error') is None]
drv_show_log: Log = yield self.getLog("stdio")
drv_show_log.addStdout(f"getting derivation infos\n")
all_deps = dict()
def closure_of(key, deps):
r, size = set([key]), 0
while len(r) != size:
size = len(r)
r.update(*[ deps[k] for k in r ])
return r.difference([key])
if succeeded_jobs:
drv_show_log.addStdout(f"getting derivation infos for valid derivations\n")
cmd = yield self.makeRemoteShellCommand(
stdioLogName=None,
collectStdout=True,
command=(
["nix", "derivation", "show", "--recursive"]
+ [ drv for drv in (job.get("drvPath") for job in filtered_jobs) if drv ]
+ [ drv for drv in (job.get("drvPath") for job in succeeded_jobs) if drv ]
),
)
yield self.runCommand(cmd)
@ -413,17 +446,9 @@ class NixEvalCommand(buildstep.ShellMixin, steps.BuildStep):
except json.JSONDecodeError as e:
msg = f"Failed to parse `nix derivation show` output for {cmd.command}"
raise BuildbotNixError(msg) from e
all_deps = dict()
for drv, info in drv_info.items():
all_deps[drv] = set(info.get("inputDrvs").keys())
def closure_of(key, deps):
r, size = set([key]), 0
while len(r) != size:
size = len(r)
r.update(*[ deps[k] for k in r ])
return r.difference([key])
job_set = set(( drv for drv in ( job.get("drvPath") for job in filtered_jobs ) if drv ))
all_deps = { k: list(closure_of(k, all_deps).intersection(job_set)) for k in job_set }
@ -431,7 +456,7 @@ class NixEvalCommand(buildstep.ShellMixin, steps.BuildStep):
[
BuildTrigger(
builds_scheduler_group=f"{project_name}-nix-build",
name="build flake",
name="build derivations",
jobs=filtered_jobs,
all_deps=all_deps,
),
@ -440,6 +465,88 @@ class NixEvalCommand(buildstep.ShellMixin, steps.BuildStep):
return result
def make_job_evaluator(name: str, settings: EvaluatorSettings, flake: bool) -> NixEvalCommand:
actual_command = []
if flake:
actual_command += ["--flake", f".#{FLAKE_TARGET_ATTRIBUTE_FOR_JOBS}"]
else:
actual_command += ["--expr", "import ./.ci/buildbot.nix"]
return NixEvalCommand(
env={},
name=name,
supported_systems=settings.supported_systems,
command=[
"nix-eval-jobs",
"--workers",
str(settings.worker_count),
"--max-memory-size",
str(settings.max_memory_size),
"--gc-roots-dir",
settings.gc_roots_dir,
"--force-recurse",
"--check-cache-status",
] + actual_command,
haltOnFailure=True,
locks=[settings.lock.access("exclusive")]
)
class NixConfigure(buildstep.CommandMixin, steps.BuildStep):
name = "determining jobs"
"""
Determine what `NixEvalCommand` step should be added after
based on the existence of:
- flake.nix
- .ci/buildbot.nix
"""
def __init__(self, eval_settings: EvaluatorSettings, **kwargs: Any) -> None:
self.evaluator_settings = eval_settings
super().__init__(**kwargs)
self.observer = logobserver.BufferLogObserver()
self.addLogObserver("stdio", self.observer)
@defer.inlineCallbacks
def run(self) -> Generator[Any, object, Any]:
try:
configure_log: Log = yield self.getLog("stdio")
except Exception:
configure_log: Log = yield self.addLog("stdio")
# Takes precedence.
configure_log.addStdout("checking if there's a .ci/buildbot.nix...\n")
ci_buildbot_defn_exists = yield self.pathExists('build/.ci/buildbot.nix')
if ci_buildbot_defn_exists:
configure_log.addStdout(".ci/buildbot.nix found, configured for non-flake CI\n")
self.build.addStepsAfterCurrentStep(
[
make_job_evaluator(
"evaluate `.ci/buildbot.nix` jobs",
self.evaluator_settings,
False
)
]
)
return SUCCESS
flake_exists = yield self.pathExists('build/flake.nix')
if flake_exists:
configure_log.addStdout(f"flake.nix found")
self.build.addStepsAfterCurrentStep([
make_job_evaluator(
"evaluate `flake.nix` jobs",
self.evaluator_settings,
True
)
]
)
return SUCCESS
configure_log.addStdout("neither flake.nix found neither .ci/buildbot.nix, no CI to run!")
return SUCCESS
class NixBuildCommand(buildstep.ShellMixin, steps.BuildStep):
"""Builds a nix derivation."""
@ -481,10 +588,19 @@ def nix_eval_config(
worker_count: int,
max_memory_size: int,
) -> util.BuilderConfig:
"""Uses nix-eval-jobs to evaluate $FLAKE_TARGET_ATTRIBUTE_FOR_JOBS (`.#hydraJobs` by default) from flake.nix in parallel.
"""
Uses nix-eval-jobs to evaluate the entrypoint of this project.
For each evaluated attribute a new build pipeline is started.
"""
factory = util.BuildFactory()
gerrit_private_key = None
with open(project.private_sshkey_path, 'r') as f:
gerrit_private_key = f.read()
if gerrit_private_key is None:
raise RuntimeError('No gerrit private key to fetch the repositories')
# check out the source
factory.addStep(
steps.Gerrit(
@ -492,9 +608,10 @@ def nix_eval_config(
mode="full",
retry=[60, 60],
timeout=3600,
sshPrivateKey=project.private_sshkey_path
sshPrivateKey=gerrit_private_key
),
)
# use one gcroots directory per worker. this should be scoped to the largest unique resource
# in charge of builds (ie, buildnumber is too narrow) to not litter the system with permanent
# gcroots in case of worker restarts.
@ -503,27 +620,22 @@ def nix_eval_config(
"/nix/var/nix/gcroots/per-user/buildbot-worker/%(prop:project)s/drvs/%(prop:workername)s/",
)
factory.addStep(
NixEvalCommand(
env={},
name="evaluate flake",
eval_settings = EvaluatorSettings(
supported_systems=supported_systems,
command=[
"nix-eval-jobs",
"--workers",
str(worker_count),
"--max-memory-size",
str(max_memory_size),
"--gc-roots-dir",
drv_gcroots_dir,
"--force-recurse",
"--check-cache-status",
"--flake",
f".#{FLAKE_TARGET_ATTRIBUTE_FOR_JOBS}"
],
haltOnFailure=True,
locks=[eval_lock.access("exclusive")],
),
worker_count=worker_count,
max_memory_size=max_memory_size,
gc_roots_dir=drv_gcroots_dir,
lock=eval_lock
)
# NixConfigure will choose
# how to add a NixEvalCommand job
# based on whether there's a flake.nix or
# a .ci/buildbot.nix.
factory.addStep(
NixConfigure(
eval_settings
)
)
factory.addStep(
@ -551,12 +663,17 @@ def nix_build_config(
project: GerritProject,
worker_arch: str,
worker_names: list[str],
builders_spec: str,
build_stores: list[str],
signing_keyfile: str | None = None,
binary_cache_config: S3BinaryCacheConfig | None = None
) -> util.BuilderConfig:
"""Builds one nix flake attribute."""
factory = util.BuildFactory()
# pick a store to run the build on
# TODO proper scheduling instead of picking the first builder
build_store = build_stores[0]
factory.addStep(
NixBuildCommand(
env={},
@ -578,6 +695,10 @@ def nix_build_config(
"7200",
"--builders",
builders_spec,
"--store",
build_store,
"--eval-store",
"ssh-ng://localhost",
Review

does this require trusted user on localhost? or wait, are we doing this to prevent needing trusted user while still doing remote builds? am confuse

does this require trusted user on localhost? or wait, are we doing this to prevent needing trusted user while still doing remote builds? am confuse
Review

no, it doesn't anymore!

no, it doesn't anymore!
Review

(and yes, that's correct, that's untrusted remote building!)

(and yes, that's correct, that's untrusted remote building!)
"--out-link",
util.Interpolate("result-%(prop:attr)s"),
util.Interpolate("%(prop:drv_path)s^*"),
@ -597,6 +718,8 @@ def nix_build_config(
"nix",
"store",
"sign",
"--store",
build_store,
"--key-file",
signing_keyfile,
util.Interpolate(
@ -613,6 +736,8 @@ def nix_build_config(
command=[
"nix",
"copy",
"--store",
build_store,
"--to",
f"s3://{binary_cache_config.bucket}?profile={binary_cache_config.profile}&region={binary_cache_config.region}&endpoint={binary_cache_config.endpoint}",
util.Property(
@ -674,7 +799,7 @@ def config_for_project(
nix_eval_worker_count: int,
nix_eval_max_memory_size: int,
eval_lock: util.MasterLock,
builders_spec: str,
nix_builders: list[NixBuilder],
signing_keyfile: str | None = None,
binary_cache_config: S3BinaryCacheConfig | None = None
) -> Project:
@ -712,12 +837,6 @@ def config_for_project(
),
],
)
gerrit_private_key = None
with open(project.private_sshkey_path, 'r') as f:
gerrit_private_key = f.read()
if gerrit_private_key is None:
raise RuntimeError('No gerrit private key to fetch the repositories')
config["builders"].extend(
[
@ -737,7 +856,7 @@ def config_for_project(
project,
arch,
[ f"{w}-{arch}" for w in worker_names ],
builders_spec,
[b.to_nix_store() for b in nix_builders if arch in b.systems or arch == "other"],
signing_keyfile=signing_keyfile,
binary_cache_config=binary_cache_config
)
@ -768,10 +887,7 @@ def gerritReviewFmt(url, data):
builderName = build['builder']['name']
if len(build['results']) != 1:
raise ValueError('this review request contains more than one build results, unexpected format request')
result = build['results'][0]
result = build['results']
if result == util.RETRY:
return dict()
@ -824,13 +940,15 @@ class GerritNixConfigurator(ConfiguratorBase):
prometheus_config: dict[str, int | str] | None = None,
binary_cache_config: dict[str, str] | None = None,
auth_method: AuthBase | None = None,
manhole: Any = None,
) -> None:
super().__init__()
self.manhole = manhole
self.allowed_origins = allowed_origins
self.gerrit_server = gerrit_server
self.gerrit_user = gerrit_user
self.gerrit_port = gerrit_port
self.gerrit_sshkey_path = gerrit_sshkey_path
self.gerrit_sshkey_path = str(gerrit_sshkey_path)
self.gerrit_config = GerritConfig(domain=self.gerrit_server,
username=self.gerrit_user,
port=self.gerrit_port)
@ -860,6 +978,9 @@ class GerritNixConfigurator(ConfiguratorBase):
worker_config = json.loads(read_secret_file(self.nix_workers_secret_name))
worker_names = []
if self.manhole is not None:
config["manhole"] = self.manhole
config.setdefault("projects", [])
config.setdefault("secretsProviders", [])
config.setdefault("www", {
@ -876,7 +997,6 @@ class GerritNixConfigurator(ConfiguratorBase):
eval_lock = util.MasterLock("nix-eval")
builders_spec = " ; ".join(builder.to_nix_line() for builder in self.nix_builders)
for project in self.projects:
config_for_project(
config,
@ -887,7 +1007,7 @@ class GerritNixConfigurator(ConfiguratorBase):
self.nix_eval_worker_count or multiprocessing.cpu_count(),
self.nix_eval_max_memory_size,
eval_lock,
builders_spec,
self.nix_builders,
signing_keyfile=self.signing_keyfile,
binary_cache_config=self.binary_cache_config
)

View file

@ -7,6 +7,9 @@
let
inherit (lib) filterAttrs;
cfg = config.services.buildbot-nix.coordinator;
debuggingManhole = if cfg.debugging.enable then
"manhole.TelnetManhole(${toString cfg.debugging.port}, 'admin', 'admin')"
else "None";
in
{
options = {
@ -28,6 +31,14 @@ in
description = "List of local remote builders machines associated to that Buildbot instance";
};
debugging = {
enable = lib.mkEnableOption "manhole's buildbot debugging on localhost using `admin:admin`";
port = lib.mkOption {
type = lib.types.port;
default = 15000;
};
};
oauth2 = {
name = lib.mkOption {
type = lib.types.str;
@ -163,6 +174,14 @@ in
'';
example = [ "lix" ];
};
projects = lib.mkOption {
type = lib.types.listOf lib.types.str;
description = ''
List of projects which are to check on Gerrit.
'';
example = [ "lix" ];
};
};
binaryCache = {
@ -216,6 +235,7 @@ in
extraImports = ''
from datetime import timedelta
from buildbot_nix import GerritNixConfigurator, read_secret_file, make_oauth2_method, OAuth2Config, assemble_secret_file_path
from buildbot import manhole
# TODO(raito): make me configurable from the NixOS module.
# how?
@ -257,7 +277,8 @@ in
auth_method=CustomOAuth2(${builtins.toJSON cfg.oauth2.clientId},
read_secret_file('buildbot-oauth2-secret'),
autologin=True
)
),
manhole=${debuggingManhole}
)
''
];

View file

@ -62,6 +62,7 @@ in
pkgs.openssh
pkgs.nix
pkgs.nix-eval-jobs
pkgs.bash
];
environment.PYTHONPATH = "${python.withPackages (_: [cfg.package])}/${python.sitePackages}";
environment.MASTER_URL = cfg.coordinatorUrl;