feat: enable per-worker local store slots #24

Open
raito wants to merge 20 commits from local-stores into main
3 changed files with 217 additions and 75 deletions

View file

@ -4,6 +4,7 @@ import os
import sys import sys
import graphlib import graphlib
import base64 import base64
import random
from collections.abc import Generator from collections.abc import Generator
from dataclasses import dataclass, field from dataclasses import dataclass, field
from pathlib import Path from pathlib import Path
@ -24,6 +25,7 @@ from buildbot.reporters.generators.build import BuildStatusGenerator
from buildbot.reporters.message import MessageFormatterFunction from buildbot.reporters.message import MessageFormatterFunction
from buildbot.process.buildstep import EXCEPTION from buildbot.process.buildstep import EXCEPTION
from buildbot.process.buildstep import SUCCESS from buildbot.process.buildstep import SUCCESS
from buildbot.process.buildstep import BuildStepFailed
from buildbot.process.results import worst_status from buildbot.process.results import worst_status
import requests import requests
@ -39,6 +41,14 @@ log = Logger()
FLAKE_TARGET_ATTRIBUTE_FOR_JOBS = "buildbotJobs" FLAKE_TARGET_ATTRIBUTE_FOR_JOBS = "buildbotJobs"
@dataclass
class EvaluatorSettings:
supported_systems: list[str]
worker_count: int
max_memory_size: int
gc_roots_dir: str
lock: util.MasterLock
@dataclass @dataclass
class NixBuilder: class NixBuilder:
protocol: str protocol: str
@ -49,14 +59,24 @@ class NixBuilder:
publicHostKey: str | None = None publicHostKey: str | None = None
sshUser: str | None = None sshUser: str | None = None
sshKey: str | None = None sshKey: str | None = None
systems: list[str] = field(default_factory=lambda: ["-"]) systems: list[str] = field(default_factory=lambda: [])
supportedFeatures: list[str] = field(default_factory=lambda: ["-"]) supportedFeatures: list[str] = field(default_factory=lambda: [])
mandatoryFeatures: list[str] = field(default_factory=lambda: ["-"]) mandatoryFeatures: list[str] = field(default_factory=lambda: [])
def to_nix_line(self): def to_nix_store(self):
encoded_public_key = base64.b64encode(self.publicHostKey.encode('ascii')).decode('ascii') if self.publicHostKey is not None else "-" fullConnection = f"{self.sshUser}@{self.hostName}" if self.sshUser is not None else self.hostName
fullConnection = f"{self.protocol}://{self.sshUser}@{self.hostName}" if self.sshUser is not None else self.hostName fullConnection = f"{self.protocol}://{fullConnection}"
return f"{fullConnection} {",".join(self.systems)} {self.sshKey or "-"} {self.maxJobs} {self.speedFactor} {",".join(self.supportedFeatures)} {",".join(self.mandatoryFeatures)} {encoded_public_key}" params = []
if self.sshKey is not None:
params.append(f"ssh-key={self.sshKey}")
if self.publicHostKey is not None:
encoded_public_key = base64.b64encode(self.publicHostKey.encode('ascii')).decode('ascii')
params.append(f"base64-ssh-public-host-key={encoded_public_key}")
if params != []:
fullConnection += "?"
fullConnection += "&".join(params)
return fullConnection
@dataclass @dataclass
@ -130,7 +150,7 @@ class GerritConfig:
""" """
Returns the prefix to build a repourl using that gerrit configuration. Returns the prefix to build a repourl using that gerrit configuration.
""" """
return 'ssh://{self.username}@{self.domain}:{self.port}/' return f'ssh://{self.username}@{self.domain}:{self.port}/'
class BuildTrigger(steps.BuildStep): class BuildTrigger(steps.BuildStep):
def __init__( def __init__(
@ -148,7 +168,7 @@ class BuildTrigger(steps.BuildStep):
self.ended = False self.ended = False
self.waitForFinishDeferred = None self.waitForFinishDeferred = None
self.brids = [] self.brids = []
self.description = f"building {len(jobs)} hydra jobs" self.description = f"building {len(jobs)} jobs"
super().__init__(**kwargs) super().__init__(**kwargs)
def interrupt(self, reason): def interrupt(self, reason):
@ -177,15 +197,14 @@ class BuildTrigger(steps.BuildStep):
return sch return sch
def schedule_one(self, build_props: Properties, job): def schedule_one(self, build_props: Properties, job):
project_name = build_props.getProperty('event.project') project_name = build_props.getProperty("event.refUpdate.project") or build_props.getProperty("event.change.project")
source = f"{project_name}-eval-lix" source = f"{project_name}-eval"
attr = job.get("attr", "eval-error") attr = job.get("attr", "eval-error")
name = attr name = f"buildbotJobs.{attr}"
name = f"{FLAKE_TARGET_ATTRIBUTE_FOR_JOBS}.{name}"
error = job.get("error") error = job.get("error")
props = Properties() props = Properties()
props.setProperty("virtual_builder_name", name, source) props.setProperty("virtual_builder_name", name, source)
props.setProperty("status_name", f"nix-build .#{FLAKE_TARGET_ATTRIBUTE_FOR_JOBS}.{attr}", source) props.setProperty("status_name", f"building buildbotJobs.{attr}", source)
props.setProperty("virtual_builder_tags", "", source) props.setProperty("virtual_builder_tags", "", source)
if error is not None: if error is not None:
@ -372,7 +391,8 @@ class NixEvalCommand(buildstep.ShellMixin, steps.BuildStep):
# run nix-eval-jobs --flake .#$FLAKE_TARGET_ATTRIBUTE_FOR_JOBS to generate the dict of stages # run nix-eval-jobs --flake .#$FLAKE_TARGET_ATTRIBUTE_FOR_JOBS to generate the dict of stages
cmd: remotecommand.RemoteCommand = yield self.makeRemoteShellCommand() cmd: remotecommand.RemoteCommand = yield self.makeRemoteShellCommand()
build_props = self.build.getProperties() build_props = self.build.getProperties()
project_name = build_props.get('event.project') project_name = build_props.getProperty("event.refUpdate.project") or build_props.getProperty("event.change.project")
assert project_name is not None, "`event.refUpdate.project` or `event.change.project` is not available on the build properties, unexpected build type!"
yield self.runCommand(cmd) yield self.runCommand(cmd)
@ -396,26 +416,11 @@ class NixEvalCommand(buildstep.ShellMixin, steps.BuildStep):
if not system or system in self.supported_systems: # report eval errors if not system or system in self.supported_systems: # report eval errors
filtered_jobs.append(job) filtered_jobs.append(job)
# Filter out failed evaluations
succeeded_jobs = [job for job in filtered_jobs if job.get('error') is None]
drv_show_log: Log = yield self.getLog("stdio") drv_show_log: Log = yield self.getLog("stdio")
drv_show_log.addStdout(f"getting derivation infos\n")
cmd = yield self.makeRemoteShellCommand(
stdioLogName=None,
collectStdout=True,
command=(
["nix", "derivation", "show", "--recursive"]
+ [ drv for drv in (job.get("drvPath") for job in filtered_jobs) if drv ]
),
)
yield self.runCommand(cmd)
drv_show_log.addStdout(f"done\n")
try:
drv_info = json.loads(cmd.stdout)
except json.JSONDecodeError as e:
msg = f"Failed to parse `nix derivation show` output for {cmd.command}"
raise BuildbotNixError(msg) from e
all_deps = dict() all_deps = dict()
for drv, info in drv_info.items():
all_deps[drv] = set(info.get("inputDrvs").keys())
def closure_of(key, deps): def closure_of(key, deps):
r, size = set([key]), 0 r, size = set([key]), 0
@ -424,14 +429,34 @@ class NixEvalCommand(buildstep.ShellMixin, steps.BuildStep):
r.update(*[ deps[k] for k in r ]) r.update(*[ deps[k] for k in r ])
return r.difference([key]) return r.difference([key])
job_set = set(( drv for drv in ( job.get("drvPath") for job in filtered_jobs ) if drv )) if succeeded_jobs:
all_deps = { k: list(closure_of(k, all_deps).intersection(job_set)) for k in job_set } drv_show_log.addStdout(f"getting derivation infos for valid derivations\n")
cmd = yield self.makeRemoteShellCommand(
stdioLogName=None,
collectStdout=True,
command=(
["nix", "derivation", "show", "--recursive"]
+ [ drv for drv in (job.get("drvPath") for job in succeeded_jobs) if drv ]
),
)
yield self.runCommand(cmd)
drv_show_log.addStdout(f"done\n")
try:
drv_info = json.loads(cmd.stdout)
except json.JSONDecodeError as e:
msg = f"Failed to parse `nix derivation show` output for {cmd.command}"
raise BuildbotNixError(msg) from e
for drv, info in drv_info.items():
all_deps[drv] = set(info.get("inputDrvs").keys())
job_set = set(( drv for drv in ( job.get("drvPath") for job in filtered_jobs ) if drv ))
all_deps = { k: list(closure_of(k, all_deps).intersection(job_set)) for k in job_set }
self.build.addStepsAfterCurrentStep( self.build.addStepsAfterCurrentStep(
[ [
BuildTrigger( BuildTrigger(
builds_scheduler_group=f"{project_name}-nix-build", builds_scheduler_group=f"{project_name}-nix-build",
name="build flake", name="build derivations",
jobs=filtered_jobs, jobs=filtered_jobs,
all_deps=all_deps, all_deps=all_deps,
), ),
@ -440,6 +465,88 @@ class NixEvalCommand(buildstep.ShellMixin, steps.BuildStep):
return result return result
def make_job_evaluator(name: str, settings: EvaluatorSettings, flake: bool) -> NixEvalCommand:
actual_command = []
if flake:
actual_command += ["--flake", f".#{FLAKE_TARGET_ATTRIBUTE_FOR_JOBS}"]
else:
actual_command += ["--expr", "import ./.ci/buildbot.nix"]
return NixEvalCommand(
env={},
name=name,
supported_systems=settings.supported_systems,
command=[
"nix-eval-jobs",
"--workers",
str(settings.worker_count),
"--max-memory-size",
str(settings.max_memory_size),
"--gc-roots-dir",
settings.gc_roots_dir,
"--force-recurse",
"--check-cache-status",
] + actual_command,
haltOnFailure=True,
locks=[settings.lock.access("exclusive")]
)
class NixConfigure(buildstep.CommandMixin, steps.BuildStep):
name = "determining jobs"
"""
Determine what `NixEvalCommand` step should be added after
based on the existence of:
- flake.nix
- .ci/buildbot.nix
"""
def __init__(self, eval_settings: EvaluatorSettings, **kwargs: Any) -> None:
self.evaluator_settings = eval_settings
super().__init__(**kwargs)
self.observer = logobserver.BufferLogObserver()
self.addLogObserver("stdio", self.observer)
@defer.inlineCallbacks
def run(self) -> Generator[Any, object, Any]:
try:
configure_log: Log = yield self.getLog("stdio")
except Exception:
configure_log: Log = yield self.addLog("stdio")
# Takes precedence.
configure_log.addStdout("checking if there's a .ci/buildbot.nix...\n")
ci_buildbot_defn_exists = yield self.pathExists('build/.ci/buildbot.nix')
if ci_buildbot_defn_exists:
configure_log.addStdout(".ci/buildbot.nix found, configured for non-flake CI\n")
self.build.addStepsAfterCurrentStep(
[
make_job_evaluator(
"evaluate `.ci/buildbot.nix` jobs",
self.evaluator_settings,
False
)
]
)
return SUCCESS
flake_exists = yield self.pathExists('build/flake.nix')
if flake_exists:
configure_log.addStdout(f"flake.nix found")
self.build.addStepsAfterCurrentStep([
make_job_evaluator(
"evaluate `flake.nix` jobs",
self.evaluator_settings,
True
)
]
)
return SUCCESS
configure_log.addStdout("neither flake.nix found neither .ci/buildbot.nix, no CI to run!")
return SUCCESS
class NixBuildCommand(buildstep.ShellMixin, steps.BuildStep): class NixBuildCommand(buildstep.ShellMixin, steps.BuildStep):
"""Builds a nix derivation.""" """Builds a nix derivation."""
@ -481,10 +588,19 @@ def nix_eval_config(
worker_count: int, worker_count: int,
max_memory_size: int, max_memory_size: int,
) -> util.BuilderConfig: ) -> util.BuilderConfig:
"""Uses nix-eval-jobs to evaluate $FLAKE_TARGET_ATTRIBUTE_FOR_JOBS (`.#hydraJobs` by default) from flake.nix in parallel. """
Uses nix-eval-jobs to evaluate the entrypoint of this project.
For each evaluated attribute a new build pipeline is started. For each evaluated attribute a new build pipeline is started.
""" """
factory = util.BuildFactory() factory = util.BuildFactory()
gerrit_private_key = None
with open(project.private_sshkey_path, 'r') as f:
gerrit_private_key = f.read()
if gerrit_private_key is None:
raise RuntimeError('No gerrit private key to fetch the repositories')
# check out the source # check out the source
factory.addStep( factory.addStep(
steps.Gerrit( steps.Gerrit(
@ -492,9 +608,10 @@ def nix_eval_config(
mode="full", mode="full",
retry=[60, 60], retry=[60, 60],
timeout=3600, timeout=3600,
sshPrivateKey=project.private_sshkey_path sshPrivateKey=gerrit_private_key
), ),
) )
# use one gcroots directory per worker. this should be scoped to the largest unique resource # use one gcroots directory per worker. this should be scoped to the largest unique resource
# in charge of builds (ie, buildnumber is too narrow) to not litter the system with permanent # in charge of builds (ie, buildnumber is too narrow) to not litter the system with permanent
# gcroots in case of worker restarts. # gcroots in case of worker restarts.
@ -503,27 +620,22 @@ def nix_eval_config(
"/nix/var/nix/gcroots/per-user/buildbot-worker/%(prop:project)s/drvs/%(prop:workername)s/", "/nix/var/nix/gcroots/per-user/buildbot-worker/%(prop:project)s/drvs/%(prop:workername)s/",
) )
eval_settings = EvaluatorSettings(
supported_systems=supported_systems,
worker_count=worker_count,
max_memory_size=max_memory_size,
gc_roots_dir=drv_gcroots_dir,
lock=eval_lock
)
# NixConfigure will choose
# how to add a NixEvalCommand job
# based on whether there's a flake.nix or
# a .ci/buildbot.nix.
factory.addStep( factory.addStep(
NixEvalCommand( NixConfigure(
env={}, eval_settings
name="evaluate flake", )
supported_systems=supported_systems,
command=[
"nix-eval-jobs",
"--workers",
str(worker_count),
"--max-memory-size",
str(max_memory_size),
"--gc-roots-dir",
drv_gcroots_dir,
"--force-recurse",
"--check-cache-status",
"--flake",
f".#{FLAKE_TARGET_ATTRIBUTE_FOR_JOBS}"
],
haltOnFailure=True,
locks=[eval_lock.access("exclusive")],
),
) )
factory.addStep( factory.addStep(
@ -551,12 +663,17 @@ def nix_build_config(
project: GerritProject, project: GerritProject,
worker_arch: str, worker_arch: str,
worker_names: list[str], worker_names: list[str],
builders_spec: str, build_stores: list[str],
signing_keyfile: str | None = None, signing_keyfile: str | None = None,
binary_cache_config: S3BinaryCacheConfig | None = None binary_cache_config: S3BinaryCacheConfig | None = None
) -> util.BuilderConfig: ) -> util.BuilderConfig:
"""Builds one nix flake attribute.""" """Builds one nix flake attribute."""
factory = util.BuildFactory() factory = util.BuildFactory()
# pick a store to run the build on
# TODO proper scheduling instead of picking the first builder
build_store = build_stores[0]
factory.addStep( factory.addStep(
NixBuildCommand( NixBuildCommand(
env={}, env={},
@ -578,6 +695,10 @@ def nix_build_config(
"7200", "7200",
"--builders", "--builders",
builders_spec, builders_spec,
"--store",
build_store,
"--eval-store",
"ssh-ng://localhost",
Review

does this require trusted user on localhost? or wait, are we doing this to prevent needing trusted user while still doing remote builds? am confuse

does this require trusted user on localhost? or wait, are we doing this to prevent needing trusted user while still doing remote builds? am confuse
Review

no, it doesn't anymore!

no, it doesn't anymore!
Review

(and yes, that's correct, that's untrusted remote building!)

(and yes, that's correct, that's untrusted remote building!)
"--out-link", "--out-link",
util.Interpolate("result-%(prop:attr)s"), util.Interpolate("result-%(prop:attr)s"),
util.Interpolate("%(prop:drv_path)s^*"), util.Interpolate("%(prop:drv_path)s^*"),
@ -597,6 +718,8 @@ def nix_build_config(
"nix", "nix",
"store", "store",
"sign", "sign",
"--store",
build_store,
"--key-file", "--key-file",
signing_keyfile, signing_keyfile,
util.Interpolate( util.Interpolate(
@ -613,6 +736,8 @@ def nix_build_config(
command=[ command=[
"nix", "nix",
"copy", "copy",
"--store",
build_store,
"--to", "--to",
f"s3://{binary_cache_config.bucket}?profile={binary_cache_config.profile}&region={binary_cache_config.region}&endpoint={binary_cache_config.endpoint}", f"s3://{binary_cache_config.bucket}?profile={binary_cache_config.profile}&region={binary_cache_config.region}&endpoint={binary_cache_config.endpoint}",
util.Property( util.Property(
@ -674,7 +799,7 @@ def config_for_project(
nix_eval_worker_count: int, nix_eval_worker_count: int,
nix_eval_max_memory_size: int, nix_eval_max_memory_size: int,
eval_lock: util.MasterLock, eval_lock: util.MasterLock,
builders_spec: str, nix_builders: list[NixBuilder],
signing_keyfile: str | None = None, signing_keyfile: str | None = None,
binary_cache_config: S3BinaryCacheConfig | None = None binary_cache_config: S3BinaryCacheConfig | None = None
) -> Project: ) -> Project:
@ -712,12 +837,6 @@ def config_for_project(
), ),
], ],
) )
gerrit_private_key = None
with open(project.private_sshkey_path, 'r') as f:
gerrit_private_key = f.read()
if gerrit_private_key is None:
raise RuntimeError('No gerrit private key to fetch the repositories')
config["builders"].extend( config["builders"].extend(
[ [
@ -737,7 +856,7 @@ def config_for_project(
project, project,
arch, arch,
[ f"{w}-{arch}" for w in worker_names ], [ f"{w}-{arch}" for w in worker_names ],
builders_spec, [b.to_nix_store() for b in nix_builders if arch in b.systems or arch == "other"],
signing_keyfile=signing_keyfile, signing_keyfile=signing_keyfile,
binary_cache_config=binary_cache_config binary_cache_config=binary_cache_config
) )
@ -768,10 +887,7 @@ def gerritReviewFmt(url, data):
builderName = build['builder']['name'] builderName = build['builder']['name']
if len(build['results']) != 1: result = build['results']
raise ValueError('this review request contains more than one build results, unexpected format request')
result = build['results'][0]
if result == util.RETRY: if result == util.RETRY:
return dict() return dict()
@ -824,13 +940,15 @@ class GerritNixConfigurator(ConfiguratorBase):
prometheus_config: dict[str, int | str] | None = None, prometheus_config: dict[str, int | str] | None = None,
binary_cache_config: dict[str, str] | None = None, binary_cache_config: dict[str, str] | None = None,
auth_method: AuthBase | None = None, auth_method: AuthBase | None = None,
manhole: Any = None,
) -> None: ) -> None:
super().__init__() super().__init__()
self.manhole = manhole
self.allowed_origins = allowed_origins self.allowed_origins = allowed_origins
self.gerrit_server = gerrit_server self.gerrit_server = gerrit_server
self.gerrit_user = gerrit_user self.gerrit_user = gerrit_user
self.gerrit_port = gerrit_port self.gerrit_port = gerrit_port
self.gerrit_sshkey_path = gerrit_sshkey_path self.gerrit_sshkey_path = str(gerrit_sshkey_path)
self.gerrit_config = GerritConfig(domain=self.gerrit_server, self.gerrit_config = GerritConfig(domain=self.gerrit_server,
username=self.gerrit_user, username=self.gerrit_user,
port=self.gerrit_port) port=self.gerrit_port)
@ -860,6 +978,9 @@ class GerritNixConfigurator(ConfiguratorBase):
worker_config = json.loads(read_secret_file(self.nix_workers_secret_name)) worker_config = json.loads(read_secret_file(self.nix_workers_secret_name))
worker_names = [] worker_names = []
if self.manhole is not None:
config["manhole"] = self.manhole
config.setdefault("projects", []) config.setdefault("projects", [])
config.setdefault("secretsProviders", []) config.setdefault("secretsProviders", [])
config.setdefault("www", { config.setdefault("www", {
@ -876,7 +997,6 @@ class GerritNixConfigurator(ConfiguratorBase):
eval_lock = util.MasterLock("nix-eval") eval_lock = util.MasterLock("nix-eval")
builders_spec = " ; ".join(builder.to_nix_line() for builder in self.nix_builders)
for project in self.projects: for project in self.projects:
config_for_project( config_for_project(
config, config,
@ -887,7 +1007,7 @@ class GerritNixConfigurator(ConfiguratorBase):
self.nix_eval_worker_count or multiprocessing.cpu_count(), self.nix_eval_worker_count or multiprocessing.cpu_count(),
self.nix_eval_max_memory_size, self.nix_eval_max_memory_size,
eval_lock, eval_lock,
builders_spec, self.nix_builders,
signing_keyfile=self.signing_keyfile, signing_keyfile=self.signing_keyfile,
binary_cache_config=self.binary_cache_config binary_cache_config=self.binary_cache_config
) )

View file

@ -7,6 +7,9 @@
let let
inherit (lib) filterAttrs; inherit (lib) filterAttrs;
cfg = config.services.buildbot-nix.coordinator; cfg = config.services.buildbot-nix.coordinator;
debuggingManhole = if cfg.debugging.enable then
"manhole.TelnetManhole(${toString cfg.debugging.port}, 'admin', 'admin')"
else "None";
in in
{ {
options = { options = {
@ -28,6 +31,14 @@ in
description = "List of local remote builders machines associated to that Buildbot instance"; description = "List of local remote builders machines associated to that Buildbot instance";
}; };
debugging = {
enable = lib.mkEnableOption "manhole's buildbot debugging on localhost using `admin:admin`";
port = lib.mkOption {
type = lib.types.port;
default = 15000;
};
};
oauth2 = { oauth2 = {
name = lib.mkOption { name = lib.mkOption {
type = lib.types.str; type = lib.types.str;
@ -163,6 +174,14 @@ in
''; '';
example = [ "lix" ]; example = [ "lix" ];
}; };
projects = lib.mkOption {
type = lib.types.listOf lib.types.str;
description = ''
List of projects which are to check on Gerrit.
'';
example = [ "lix" ];
};
}; };
binaryCache = { binaryCache = {
@ -216,6 +235,7 @@ in
extraImports = '' extraImports = ''
from datetime import timedelta from datetime import timedelta
from buildbot_nix import GerritNixConfigurator, read_secret_file, make_oauth2_method, OAuth2Config, assemble_secret_file_path from buildbot_nix import GerritNixConfigurator, read_secret_file, make_oauth2_method, OAuth2Config, assemble_secret_file_path
from buildbot import manhole
# TODO(raito): make me configurable from the NixOS module. # TODO(raito): make me configurable from the NixOS module.
# how? # how?
@ -257,7 +277,8 @@ in
auth_method=CustomOAuth2(${builtins.toJSON cfg.oauth2.clientId}, auth_method=CustomOAuth2(${builtins.toJSON cfg.oauth2.clientId},
read_secret_file('buildbot-oauth2-secret'), read_secret_file('buildbot-oauth2-secret'),
autologin=True autologin=True
) ),
manhole=${debuggingManhole}
) )
'' ''
]; ];

View file

@ -62,6 +62,7 @@ in
pkgs.openssh pkgs.openssh
pkgs.nix pkgs.nix
pkgs.nix-eval-jobs pkgs.nix-eval-jobs
pkgs.bash
]; ];
environment.PYTHONPATH = "${python.withPackages (_: [cfg.package])}/${python.sitePackages}"; environment.PYTHONPATH = "${python.withPackages (_: [cfg.package])}/${python.sitePackages}";
environment.MASTER_URL = cfg.coordinatorUrl; environment.MASTER_URL = cfg.coordinatorUrl;