buildbot-nix/buildbot_nix/__init__.py
jade 4edf5a6f33 feat: Report evaluation errors and fail the damn build
I am being the fallback maintainer on this one and putting my foot down
and fixing it. We should have fixed this months ago, but such as it is.

Fixes: lix-project/lix#237
2024-10-18 16:55:33 -07:00

1113 lines
42 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import multiprocessing
import os
import sys
import graphlib
import base64
from collections.abc import Generator
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any
import buildbot
from buildbot.configurators import ConfiguratorBase
from buildbot.plugins import reporters, schedulers, secrets, steps, util, worker
from buildbot.process import buildstep, logobserver, remotecommand
from buildbot.process.log import StreamLog
from buildbot.process.project import Project
from buildbot.process.properties import Properties
from buildbot.process.results import ALL_RESULTS, statusToString
from buildbot.www.auth import AuthBase
from buildbot.www.oauth2 import OAuth2Auth
from buildbot.changes.gerritchangesource import GerritChangeSource
from buildbot.reporters.generators.build import BuildStatusGenerator
from buildbot.reporters.utils import getURLForBuildrequest
from buildbot.process.buildstep import EXCEPTION
from buildbot.process.buildstep import SUCCESS
from buildbot.process.results import worst_status
import requests
from twisted.internet import defer
from twisted.logger import Logger
from .binary_cache import S3BinaryCacheConfig
from .message_formatter import ReasonableMessageFormatter, CallbackPayloadBuild, CallbackPayloadBuildSet, CallbackReturn
log = Logger()
# util is a plugin variable, not a module...
BuilderConfig = util.BuilderConfig
MasterLock = util.MasterLock
FLAKE_TARGET_ATTRIBUTE_FOR_JOBS = "hydraJobs"
@dataclass
class EvaluatorSettings:
supported_systems: list[str]
worker_count: int
max_memory_size: int
gc_roots_dir: str
lock: MasterLock
@dataclass
class NixBuilder:
protocol: str
hostName: str
maxJobs: int
speedFactor: int = 1
# without base64
publicHostKey: str | None = None
sshUser: str | None = None
sshKey: str | None = None
systems: list[str] = field(default_factory=lambda: [])
supportedFeatures: list[str] = field(default_factory=lambda: [])
mandatoryFeatures: list[str] = field(default_factory=lambda: [])
def to_nix_store(self):
fullConnection = f"{self.sshUser}@{self.hostName}" if self.sshUser is not None else self.hostName
fullConnection = f"{self.protocol}://{fullConnection}"
params = []
if self.sshKey is not None:
params.append(f"ssh-key={self.sshKey}")
if self.publicHostKey is not None:
encoded_public_key = base64.b64encode(self.publicHostKey.encode('ascii')).decode('ascii')
params.append(f"base64-ssh-public-host-key={encoded_public_key}")
if params != []:
fullConnection += "?"
fullConnection += "&".join(params)
return fullConnection
@dataclass
class OAuth2Config:
name: str
faIcon: str
resourceEndpoint: str
authUri: str
tokenUri: str
userinfoUri: str
sslVerify: bool = True
debug: bool = False
class KeycloakOAuth2Auth(OAuth2Auth):
userinfoUri: str
def __init__(self, *args, debug=False, **kwargs):
super().__init__(*args, **kwargs)
self.debug = debug
def createSessionFromToken(self, token):
s = requests.Session()
s.headers = {
'Authorization': 'Bearer ' + token['access_token'],
'User-Agent': f'buildbot/{buildbot.version}',
}
if self.debug:
log.info("Token obtained: {}".format(token))
s.verify = self.sslVerify
return s
def getUserInfoFromOAuthClient(self, c):
userinfo_resp = c.get(self.userinfoUri)
log.info("Userinfo request to OAuth2: {}".format(userinfo_resp.status_code))
if userinfo_resp.status_code != 200:
log.error("Userinfo failure: {}".format(userinfo_resp.headers["www-authenticate"]))
userinfo_resp.raise_for_status()
userinfo_data = userinfo_resp.json()
return {
'groups': userinfo_data['buildbot_roles']
}
def make_oauth2_method(oauth2_config: OAuth2Config):
"""
This constructs dynamically a class inheriting
an OAuth2 base configured using a dataclass.
"""
return type(f'{oauth2_config.name}DynamicOAuth2',
(KeycloakOAuth2Auth,),
oauth2_config.__dict__)
class BuildbotNixError(Exception):
pass
@dataclass
class GerritProject:
# `project` field.
name: str
# Private SSH key path to access Gerrit API
private_sshkey_path: str
@dataclass
class GerritConfig:
# Gerrit server domain
domain: str
port: int
username: str
@property
def repourl_template(self) -> str:
"""
Returns the prefix to build a repourl using that gerrit configuration.
"""
return f'ssh://{self.username}@{self.domain}:{self.port}/'
class BuildTrigger(steps.BuildStep):
def __init__(
self,
builds_scheduler_group: str,
jobs: list[dict[str, Any]],
all_deps: dict[str, Any],
**kwargs: Any,
) -> None:
self.jobs = jobs
self.all_deps = all_deps
self.config = None
self.builds_scheduler_group = builds_scheduler_group
self._result_list = []
self.ended = False
self.waitForFinishDeferred = None
self.brids = []
self.description = f"building {len(jobs)} jobs"
super().__init__(**kwargs)
def interrupt(self, reason):
# We cancel the buildrequests, as the data api handles
# both cases:
# - build started: stop is sent,
# - build not created yet: related buildrequests are set to CANCELLED.
# Note that there is an identified race condition though (more details
# are available at buildbot.data.buildrequests).
for brid in self.brids:
self.master.data.control(
"cancel", {'reason': 'parent build was interrupted'}, ("buildrequests", brid)
)
if self.running and not self.ended:
self.ended = True
# if we are interrupted because of a connection lost, we interrupt synchronously
if self.build.conn is None and self.waitForFinishDeferred is not None:
self.waitForFinishDeferred.cancel()
def getSchedulerByName(self, name):
schedulers = self.master.scheduler_manager.namedServices
if name not in schedulers:
raise ValueError(f"unknown triggered scheduler: {repr(name)}")
sch = schedulers[name]
# todo: check ITriggerableScheduler
return sch
def schedule_one(self, build_props: Properties, job):
project_name = build_props.getProperty("event.refUpdate.project") or build_props.getProperty("event.change.project")
source = f"{project_name}-eval"
attr = job.get("attr", "eval-error")
# FIXME(raito): this was named this way for backward compatibility with Lix deployment.
# We should just parametrize this.
name = f"hydraJobs.{attr}"
error = job.get("error")
props = Properties()
props.setProperty("virtual_builder_name", name, source)
props.setProperty("status_name", f"building hydraJobs.{attr}", source)
props.setProperty("virtual_builder_tags", "", source)
if error is not None:
props.setProperty("error", error, source)
return (f"{self.builds_scheduler_group}-other", props)
drv_path = job.get("drvPath")
system = job.get("system")
out_path = job.get("outputs", {}).get("out")
build_props.setProperty(f"{attr}-out_path", out_path, source)
build_props.setProperty(f"{attr}-drv_path", drv_path, source)
props.setProperty("attr", attr, source)
props.setProperty("system", system, source)
props.setProperty("drv_path", drv_path, source)
props.setProperty("out_path", out_path, source)
props.setProperty("isCached", job.get("isCached"), source)
return (f"{self.builds_scheduler_group}-{system}", props)
@defer.inlineCallbacks
def _add_results(self, brid):
@defer.inlineCallbacks
def _is_buildrequest_complete(brid):
buildrequest = yield self.master.db.buildrequests.getBuildRequest(brid)
return buildrequest['complete']
event = ('buildrequests', str(brid), 'complete')
yield self.master.mq.waitUntilEvent(event, lambda: _is_buildrequest_complete(brid))
builds = yield self.master.db.builds.getBuilds(buildrequestid=brid)
for build in builds:
self._result_list.append(build["results"])
self.updateSummary()
def prepareSourcestampListForTrigger(self):
ss_for_trigger = {}
objs_from_build = self.build.getAllSourceStamps()
for ss in objs_from_build:
ss_for_trigger[ss.codebase] = ss.asDict()
trigger_values = [ss_for_trigger[k] for k in sorted(ss_for_trigger.keys())]
return trigger_values
@defer.inlineCallbacks
def run(self):
self.running = True
build_props = self.build.getProperties()
logs: StreamLog = yield self.addLog("build info")
builds_to_schedule = list(self.jobs)
build_schedule_order = []
sorter = graphlib.TopologicalSorter(self.all_deps)
for item in sorter.static_order():
i = 0
while i < len(builds_to_schedule):
if item == builds_to_schedule[i].get("drvPath"):
build_schedule_order.append(builds_to_schedule[i])
del builds_to_schedule[i]
else:
i += 1
done = []
scheduled = []
failed = []
all_results = SUCCESS
ss_for_trigger = self.prepareSourcestampListForTrigger()
while not self.ended and (len(build_schedule_order) > 0 or len(scheduled) > 0):
schedule_now = []
for build in list(build_schedule_order):
if self.all_deps.get(build.get("drvPath"), []) == []:
build_schedule_order.remove(build)
schedule_now.append(build)
for job in schedule_now:
if job.get('isCached'):
logs.addStdout(f"Cached {job.get('attr')} ({job.get('drvPath')}) - skipping\n")
for dep in self.all_deps:
if job.get("drvPath") in self.all_deps[dep]:
self.all_deps[dep].remove(job.get("drvPath"))
continue
logs.addStdout(f"Scheduling {job.get('attr')} ({job.get('drvPath')})\n")
(scheduler, props) = self.schedule_one(build_props, job)
scheduler = self.getSchedulerByName(scheduler)
idsDeferred, resultsDeferred = scheduler.trigger(
waited_for = True,
sourcestamps = ss_for_trigger,
set_props = props,
parent_buildid = self.build.buildid,
parent_relationship = "Triggered from",
)
brids = {}
try:
_, brids = yield idsDeferred
except Exception as e:
yield self.addLogWithException(e)
results = EXCEPTION
scheduled.append((job, brids, resultsDeferred))
for brid in brids.values():
url = getURLForBuildrequest(self.master, brid)
yield self.addURL(f"{scheduler.name} #{brid}", url)
self._add_results(brid)
self.brids.append(brid)
if len(scheduled) == 0:
if len(build_schedule_order) == 0:
logs.addStderr('Ran out of builds\n')
break
continue
wait_for_next = defer.DeferredList([results for _, _, results in scheduled], fireOnOneCallback = True, fireOnOneErrback=True)
self.waitForFinishDeferred = wait_for_next
results, index = yield wait_for_next
job, brids, _ = scheduled[index]
done.append((job, brids, results))
del scheduled[index]
result = results[0]
logs.addStdout(f'Build {job.get("attr")} ({job.get("drvPath")}) finished, result {util.Results[result].upper()}\n')
if result != SUCCESS:
failed_checks = []
failed_paths = [job.get('drvPath')]
removed = []
failed.append((
job.get("attr"),
"failed",
[ getURLForBuildrequest(self.master, brid) for brid in brids.values() ]
))
while True:
old_paths = list(failed_paths)
for build in list(build_schedule_order):
deps = self.all_deps.get(build.get("drvPath"), [])
for path in old_paths:
if path in deps:
failed_checks.append(build)
failed_paths.append(build.get("drvPath"))
build_schedule_order.remove(build)
removed.append(build.get("attr"))
failed.append((build.get("attr"), f"dependency {job.get('attr')} failed", []))
break
if old_paths == failed_paths:
break
if len(removed) > 3:
yield logs.addStdout(' Skipping jobs: ' + ', '.join(removed[:3]) + f', ... ({len(removed) - 3} more)\n')
else:
yield logs.addStdout(' Skipping jobs: ' + ', '.join(removed) + '\n')
all_results = worst_status(result, all_results)
for dep in self.all_deps:
if job.get("drvPath") in self.all_deps[dep]:
self.all_deps[dep].remove(job.get("drvPath"))
yield logs.addHeader('Done!\n')
yield logs.finish()
build_props.setProperty("failed_builds", failed, "nix-eval")
if self.ended:
return util.CANCELLED
return all_results
def getCurrentSummary(self) -> dict[str, str]: # noqa: N802
summary = []
if self._result_list:
for status in ALL_RESULTS:
count = self._result_list.count(status)
if count:
summary.append(
f"{self._result_list.count(status)} {statusToString(status, count)}",
)
return {"step": f"({', '.join(summary)})"}
class NixEvalCommand(buildstep.ShellMixin, steps.BuildStep):
"""Parses the output of `nix-eval-jobs` and triggers a `nix-build` build for
every attribute.
"""
def __init__(self, supported_systems: list[str], **kwargs: Any) -> None:
kwargs = self.setupShellMixin(kwargs)
super().__init__(**kwargs)
self.observer = logobserver.BufferLogObserver()
self.addLogObserver("stdio", self.observer)
self.supported_systems = supported_systems
@defer.inlineCallbacks
def run(self) -> Generator[Any, object, Any]:
# run nix-eval-jobs --flake .#$FLAKE_TARGET_ATTRIBUTE_FOR_JOBS to generate the dict of stages
cmd: remotecommand.RemoteCommand = yield self.makeRemoteShellCommand()
build_props = self.build.getProperties()
project_name = build_props.getProperty("event.refUpdate.project") or build_props.getProperty("event.change.project")
assert project_name is not None, "`event.refUpdate.project` or `event.change.project` is not available on the build properties, unexpected build type!"
yield self.runCommand(cmd)
# if the command passes extract the list of stages
result = cmd.results()
if result == util.SUCCESS:
# create a ShellCommand for each stage and add them to the build
jobs = []
eval_errors: list[tuple[str, str]] = []
for line in self.observer.getStdout().split("\n"):
if line != "":
try:
job = json.loads(line)
except json.JSONDecodeError as e:
msg = f"Failed to parse line: {line}"
raise BuildbotNixError(msg) from e
jobs.append(job)
filtered_jobs = []
for job in jobs:
if err := job.get("error"):
eval_errors.append((job.get('attr'), err))
system = job.get("system")
if not system or system in self.supported_systems:
filtered_jobs.append(job)
# Filter out failed evaluations
succeeded_jobs = [job for job in filtered_jobs if job.get('error') is None]
drv_show_log: StreamLog = yield self.getLog("stdio")
if eval_errors:
msg = "Failing job due to evaluation errors!\n"
msg += "\n".join(
f"- {attr}: {failure}" for (attr, failure) in eval_errors)
drv_show_log.addStdout(msg)
raise BuildbotNixError("Evaluation error in attributes: " + ", ".join(attr for (attr, _) in eval_errors))
all_deps = dict()
def closure_of(key, deps):
r, size = set([key]), 0
while len(r) != size:
size = len(r)
r.update(*[ deps[k] for k in r ])
return r.difference([key])
if succeeded_jobs:
drv_show_log.addStdout(f"getting derivation infos for valid derivations\n")
cmd = yield self.makeRemoteShellCommand(
stdioLogName=None,
collectStdout=True,
command=(
["nix", "derivation", "show", "--recursive"]
+ [ drv for drv in (job.get("drvPath") for job in succeeded_jobs) if drv ]
),
)
yield self.runCommand(cmd)
drv_show_log.addStdout(f"done\n")
try:
drv_info = json.loads(cmd.stdout)
except json.JSONDecodeError as e:
msg = f"Failed to parse `nix derivation show` output for {cmd.command}"
raise BuildbotNixError(msg) from e
for drv, info in drv_info.items():
all_deps[drv] = set(info.get("inputDrvs").keys())
job_set = set(( drv for drv in ( job.get("drvPath") for job in filtered_jobs ) if drv ))
all_deps = { k: list(closure_of(k, all_deps).intersection(job_set)) for k in job_set }
self.build.addStepsAfterCurrentStep(
[
BuildTrigger(
builds_scheduler_group=f"{project_name}-nix-build",
name="build derivations",
jobs=filtered_jobs,
all_deps=all_deps,
),
],
)
return result
def make_job_evaluator(name: str, settings: EvaluatorSettings, flake: bool, incoming_ref_filename: str) -> NixEvalCommand:
actual_command = []
if flake:
actual_command += ["--flake", f".#{FLAKE_TARGET_ATTRIBUTE_FOR_JOBS}"]
else:
actual_command += ["--expr",
f"import ./.ci/buildbot.nix {{ incoming_ref_data = builtins.fromJSON (builtins.readFile {incoming_ref_filename}); }}"]
return NixEvalCommand(
env={},
name=name,
supported_systems=settings.supported_systems,
command=[
"nix-eval-jobs",
"--workers",
str(settings.worker_count),
"--max-memory-size",
str(settings.max_memory_size),
"--gc-roots-dir",
settings.gc_roots_dir,
"--force-recurse",
"--check-cache-status",
] + actual_command,
haltOnFailure=True,
locks=[settings.lock.access("exclusive")]
)
class NixConfigure(buildstep.CommandMixin, steps.BuildStep):
name = "determining jobs"
"""
Determine what `NixEvalCommand` step should be added after
based on the existence of:
- flake.nix
- .ci/buildbot.nix
"""
def __init__(self, eval_settings: EvaluatorSettings, **kwargs: Any) -> None:
self.evaluator_settings = eval_settings
super().__init__(**kwargs)
self.observer = logobserver.BufferLogObserver()
self.addLogObserver("stdio", self.observer)
@defer.inlineCallbacks
def run(self) -> Generator[Any, object, Any]:
try:
configure_log: StreamLog = yield self.getLog("stdio")
except Exception:
configure_log: StreamLog = yield self.addLog("stdio")
# Takes precedence.
configure_log.addStdout("checking if there's a .ci/buildbot.nix...\n")
ci_buildbot_defn_exists = yield self.pathExists('build/.ci/buildbot.nix')
if ci_buildbot_defn_exists:
configure_log.addStdout(".ci/buildbot.nix found, configured for non-flake CI\n")
self.build.addStepsAfterCurrentStep(
[
make_job_evaluator(
"evaluate `.ci/buildbot.nix` jobs",
self.evaluator_settings,
False,
"./incoming-ref.json"
)
]
)
return SUCCESS
flake_exists = yield self.pathExists('build/flake.nix')
if flake_exists:
configure_log.addStdout(f"flake.nix found")
self.build.addStepsAfterCurrentStep([
make_job_evaluator(
"evaluate `flake.nix` jobs",
self.evaluator_settings,
True,
"./incoming-ref.json"
)
]
)
return SUCCESS
configure_log.addStdout("neither flake.nix found neither .ci/buildbot.nix, no CI to run!")
return SUCCESS
class NixBuildCommand(buildstep.ShellMixin, steps.BuildStep):
"""Builds a nix derivation."""
def __init__(self, **kwargs: Any) -> None:
kwargs = self.setupShellMixin(kwargs)
super().__init__(**kwargs)
@defer.inlineCallbacks
def run(self) -> Generator[Any, object, Any]:
if error := self.getProperty("error"):
attr = self.getProperty("attr")
# show eval error
error_log: StreamLog = yield self.addLog("nix_error")
error_log.addStderr(f"{attr} failed to evaluate:\n{error}")
return util.FAILURE
if self.getProperty("isCached"):
yield self.addCompleteLog(
"cached outpath from previous builds",
# buildbot apparently hides the first line in the ui?
f'\n{self.getProperty("out_path")}\n')
return util.SKIPPED
# run `nix build`
cmd: remotecommand.RemoteCommand = yield self.makeRemoteShellCommand()
yield self.runCommand(cmd)
return cmd.results()
def nix_eval_config(
gerrit_config: GerritConfig,
project: GerritProject,
worker_names: list[str],
supported_systems: list[str],
eval_lock: MasterLock,
worker_count: int,
max_memory_size: int,
) -> BuilderConfig:
"""
Uses nix-eval-jobs to evaluate the entrypoint of this project.
For each evaluated attribute a new build pipeline is started.
"""
factory = util.BuildFactory()
gerrit_private_key = None
with open(project.private_sshkey_path, 'r') as f:
gerrit_private_key = f.read()
if gerrit_private_key is None:
raise RuntimeError('No gerrit private key to fetch the repositories')
# check out the source
factory.addStep(
steps.Gerrit(
repourl=f'{gerrit_config.repourl_template}/{project.name}',
mode="full",
retry=[60, 60],
timeout=3600,
sshPrivateKey=gerrit_private_key
),
)
# use one gcroots directory per worker. this should be scoped to the largest unique resource
# in charge of builds (ie, buildnumber is too narrow) to not litter the system with permanent
# gcroots in case of worker restarts.
# TODO perhaps we should clean the entire /drvs/ directory up too during startup.
drv_gcroots_dir = util.Interpolate(
"/nix/var/nix/gcroots/per-user/buildbot-worker/%(prop:project)s/drvs/%(prop:workername)s/",
)
eval_settings = EvaluatorSettings(
supported_systems=supported_systems,
worker_count=worker_count,
max_memory_size=max_memory_size,
gc_roots_dir=drv_gcroots_dir,
lock=eval_lock
)
# This information can be passed at job evaluation time
# to skip some jobs, e.g. expensive jobs, etc.
# Transfer incoming ref data to the target.
factory.addStep(steps.JSONPropertiesDownload(workerdest="incoming-ref.json"))
# NixConfigure will choose
# how to add a NixEvalCommand job
# based on whether there's a flake.nix or
# a .ci/buildbot.nix.
factory.addStep(
NixConfigure(
eval_settings
)
)
factory.addStep(
steps.ShellCommand(
name="Cleanup drv paths",
command=[
"rm",
"-rf",
drv_gcroots_dir,
],
alwaysRun=True,
),
)
return util.BuilderConfig(
name=f"{project.name}/nix-eval",
workernames=worker_names,
project=project.name,
factory=factory,
properties=dict(status_name="nix-eval"),
)
def nix_build_config(
project: GerritProject,
worker_arch: str,
worker_names: list[str],
build_stores: list[str],
signing_keyfile: str | None = None,
binary_cache_config: S3BinaryCacheConfig | None = None
) -> BuilderConfig:
"""Builds one nix flake attribute."""
factory = util.BuildFactory()
# pick a store to run the build on
# TODO proper scheduling instead of picking the first builder
build_store = build_stores[0]
factory.addStep(
NixBuildCommand(
env={},
name="Build flake attr",
command=[
"nix",
"build",
"-L",
"--option",
"keep-going",
"true",
# do not build directly on the coordinator
"--max-jobs", "0",
# stop stuck builds after 20 minutes
"--max-silent-time",
str(60 * 20),
# kill builds after two hours regardless of activity
"--timeout",
"7200",
"--store",
build_store,
"--eval-store",
"ssh-ng://localhost",
"--out-link",
util.Interpolate("result-%(prop:attr)s"),
util.Interpolate("%(prop:drv_path)s^*"),
],
# 3 hours, defaults to 20 minutes
# We increase this over the default since the build output might end up in a different `nix build`.
timeout=60 * 60 * 3,
haltOnFailure=True,
),
)
if signing_keyfile is not None:
factory.addStep(
steps.ShellCommand(
name="Sign the store path",
command=[
"nix",
"store",
"sign",
"--store",
build_store,
"--key-file",
signing_keyfile,
util.Interpolate(
"%(prop:drv_path)s^*"
)
]
),
)
if binary_cache_config is not None:
factory.addStep(
steps.ShellCommand(
name="Upload the store path to the cache",
command=[
"nix",
"copy",
"--store",
build_store,
"--to",
f"s3://{binary_cache_config.bucket}?profile={binary_cache_config.profile}&region={binary_cache_config.region}&endpoint={binary_cache_config.endpoint}",
util.Property(
"out_path"
)
]
)
)
factory.addStep(
steps.ShellCommand(
name="Register gcroot",
command=[
"nix-store",
"--add-root",
# FIXME: cleanup old build attributes
util.Interpolate(
"/nix/var/nix/gcroots/per-user/buildbot-worker/%(prop:project)s/%(prop:attr)s",
),
"-r",
util.Property("out_path"),
],
doStepIf=lambda s: s.getProperty("branch")
== s.getProperty("github.repository.default_branch"),
),
)
factory.addStep(
steps.ShellCommand(
name="Delete temporary gcroots",
command=["rm", "-f", util.Interpolate("result-%(prop:attr)s")],
),
)
return util.BuilderConfig(
name=f"{project.name}/nix-build/{worker_arch}",
project=project.name,
workernames=worker_names,
collapseRequests=False,
env={},
factory=factory,
)
def assemble_secret_file_path(secret_name: str) -> Path:
directory = os.environ.get("CREDENTIALS_DIRECTORY")
if directory is None:
print("directory not set", file=sys.stderr)
sys.exit(1)
return Path(directory).joinpath(secret_name)
def read_secret_file(secret_name: str) -> str:
return assemble_secret_file_path(secret_name).read_text().rstrip()
def config_for_project(
config: dict[str, Any],
gerrit_config: GerritConfig,
project: GerritProject,
worker_names: list[str],
nix_supported_systems: list[str],
nix_eval_worker_count: int,
nix_eval_max_memory_size: int,
eval_lock: MasterLock,
nix_builders: list[NixBuilder],
signing_keyfile: str | None = None,
binary_cache_config: S3BinaryCacheConfig | None = None
) -> None:
config["projects"].append(Project(project.name))
config["schedulers"].extend(
[
# build everything pertaining to a project
# TODO(raito): will this catch also post-merge? we don't really care about that… do we?
schedulers.SingleBranchScheduler(
name=f"{project.name}-changes",
change_filter=util.ChangeFilter(
project=project.name,
),
builderNames=[f"{project.name}/nix-eval"],
),
# this is triggered from `nix-eval`
*(
schedulers.Triggerable(
name=f"{project.name}-nix-build-{arch}",
builderNames=[f"{project.name}/nix-build/{arch}"],
)
for arch in nix_supported_systems + [ "other" ]
),
# allow to manually trigger a nix-build
schedulers.ForceScheduler(
name=f"{project.name}-force",
builderNames=[f"{project.name}/nix-eval"],
properties=[
util.StringParameter(
name="project",
label="Name of the Gerrit repository.",
default=project.name,
),
],
),
],
)
config["builders"].extend(
[
# Since all workers run on the same machine, we only assign one of them to do the evaluation.
# This should prevent exessive memory usage.
nix_eval_config(
gerrit_config,
project,
[ f"{w}-other" for w in worker_names ],
supported_systems=nix_supported_systems,
worker_count=nix_eval_worker_count,
max_memory_size=nix_eval_max_memory_size,
eval_lock=eval_lock,
),
*(
nix_build_config(
project,
arch,
[ f"{w}-{arch}" for w in worker_names ],
[b.to_nix_store() for b in nix_builders if arch in b.systems or arch == "other"],
signing_keyfile=signing_keyfile,
binary_cache_config=binary_cache_config
)
for arch in nix_supported_systems + [ "other" ]
),
],
)
class PeriodicWithStartup(schedulers.Periodic):
def __init__(self, *args: Any, run_on_startup: bool = False, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self.run_on_startup = run_on_startup
@defer.inlineCallbacks
def activate(self) -> Generator[Any, object, Any]:
if self.run_on_startup:
yield self.setState("last_build", None)
yield super().activate()
def gerritReviewFmt(url: str, payload: CallbackPayloadBuild | CallbackPayloadBuildSet) -> CallbackReturn:
assert isinstance(payload, CallbackPayloadBuild), "BuildSet are not handled yet!"
build = payload.build
if 'builder' not in build and 'name' not in build['builder']:
raise ValueError('either `builder` or `builder.name` is not present in the build dictionary, unexpected format request')
builderName = build['builder']['name']
result = build['results']
log.info("Formatting a message for a Gerrit build: {} -- result is {}".format(builderName, result))
if result == util.RETRY:
return CallbackReturn()
expectedBuilderName = f'{build["properties"].get("event.project")[0]}/nix-eval'
if builderName != expectedBuilderName:
log.info("Passing {} builder which is not of the form '{}'".format(builderName, expectedBuilderName))
return CallbackReturn()
failed = build['properties'].get('failed_builds', [[]])[0]
labels = {
'Verified': -1 if result != util.SUCCESS else 1,
}
message = "Buildbot finished compiling your patchset!\n"
message += "The result is: %s\n" % util.Results[result].upper()
if result != util.SUCCESS:
message += "\nFailed checks:\n"
for check, how, urls in failed:
if not urls:
message += " "
message += f" - {check}: {how}"
if urls:
message += f" (see {', '.join(urls)})"
message += "\n"
if url:
message += "\nFor more details visit:\n"
message += build['url'] + "\n"
log.info("Message formatted: {}, labels: Verified={}".format(message, labels['Verified']))
return CallbackReturn(body=message, extra_info={'labels': labels})
class GerritNixConfigurator(ConfiguratorBase):
"""Janitor is a configurator which create a Janitor Builder with all needed Janitor steps"""
def __init__(
self,
# Shape of this file: [ { "name": "<worker-name>", "pass": "<worker-password>", "cores": "<cpu-cores>" } ]
gerrit_server: str,
gerrit_user: str,
gerrit_port: int,
gerrit_sshkey_path: str,
projects: list[str],
url: str,
allowed_origins: list[str],
nix_builders: list[dict[str, Any]],
nix_supported_systems: list[str],
nix_eval_worker_count: int | None,
nix_eval_max_memory_size: int,
nix_workers_secret_name: str = "buildbot-nix-workers", # noqa: S107
signing_keyfile: str | None = None,
prometheus_config: dict[str, int | str] | None = None,
binary_cache_config: dict[str, str] | None = None,
auth_method: AuthBase | None = None,
manhole: Any = None,
) -> None:
super().__init__()
self.manhole = manhole
self.allowed_origins = allowed_origins
self.gerrit_server = gerrit_server
self.gerrit_user = gerrit_user
self.gerrit_port = gerrit_port
self.gerrit_sshkey_path = str(gerrit_sshkey_path)
self.gerrit_config = GerritConfig(domain=self.gerrit_server,
username=self.gerrit_user,
port=self.gerrit_port)
self.projects = projects
self.nix_workers_secret_name = nix_workers_secret_name
self.nix_eval_max_memory_size = nix_eval_max_memory_size
self.nix_eval_worker_count = nix_eval_worker_count
self.nix_supported_systems = nix_supported_systems
self.nix_builders: list[NixBuilder] = [NixBuilder(**builder_cfg) for builder_cfg in nix_builders]
self.gerrit_change_source = GerritChangeSource(gerrit_server, gerrit_user, gerritport=gerrit_port, identity_file=gerrit_sshkey_path)
self.url = url
self.prometheus_config = prometheus_config
if binary_cache_config is not None:
self.binary_cache_config = S3BinaryCacheConfig(**binary_cache_config)
else:
self.binary_cache_config = None
self.signing_keyfile = signing_keyfile
self.auth_method = auth_method
def configure(self, config_dict: dict[str, Any]) -> None:
worker_config_dict = json.loads(read_secret_file(self.nix_workers_secret_name))
worker_names = []
if self.manhole is not None:
config_dict["manhole"] = self.manhole
config_dict.setdefault("projects", [])
config_dict.setdefault("secretsProviders", [])
print('Default allowed origins for this Buildbot server: {}'.format(', '.join(self.allowed_origins)))
config_dict["www"]["allowed_origins"] = self.allowed_origins
for item in worker_config_dict:
cores = item.get("cores", 0)
for i in range(cores):
for arch in self.nix_supported_systems + ["other"]:
worker_name = f"{item['name']}-{i:03}"
config_dict["workers"].append(worker.Worker(f"{worker_name}-{arch}", item["pass"]))
worker_names.append(worker_name)
eval_lock = util.MasterLock("nix-eval")
for project in self.projects:
config_for_project(
config_dict,
self.gerrit_config,
GerritProject(name=project, private_sshkey_path=self.gerrit_sshkey_path),
worker_names,
self.nix_supported_systems,
self.nix_eval_worker_count or multiprocessing.cpu_count(),
self.nix_eval_max_memory_size,
eval_lock,
self.nix_builders,
signing_keyfile=self.signing_keyfile,
binary_cache_config=self.binary_cache_config
)
config_dict["change_source"] = self.gerrit_change_source
config_dict["services"].append(
reporters.GerritStatusPush(self.gerrit_server, self.gerrit_user,
port=self.gerrit_port,
identity_file=self.gerrit_sshkey_path,
generators=[
BuildStatusGenerator(
mode='all',
message_formatter=ReasonableMessageFormatter(
lambda data: gerritReviewFmt(self.url, data),
"plain",
want_properties=True,
want_steps=True
),
),
])
)
if self.prometheus_config is not None:
config_dict['services'].append(reporters.Prometheus(port=self.prometheus_config.get('port', 9100), interface=self.prometheus_config.get('address', '')))
# Upstream defaults pretend they already do something similar
# but they didn't work, hence the custom function.
def gerritBranchKey(b):
ref = b['branch']
if not ref.startswith('refs/changes/'):
return ref
return ref.rsplit('/', 1)[0]
config_dict["services"].append(
util.OldBuildCanceller(
"build_canceller",
filters=[
(
[
f"{project}/nix-{kind}"
for kind in [ "eval" ] + [
f"build/{arch}"
for arch in self.nix_supported_systems + [ "other" ]
]
],
util.SourceStampFilter(project_eq=[project])
)
for project in self.projects
],
branch_key=gerritBranchKey
)
)
systemd_secrets = secrets.SecretInAFile(
dirname=os.environ["CREDENTIALS_DIRECTORY"],
)
config_dict["secretsProviders"].append(systemd_secrets)
config_dict["www"].setdefault("plugins", {})
if "authz" not in config_dict["www"]:
config_dict["www"]["authz"] = util.Authz(
allowRules=[
util.AnyEndpointMatcher(role="admin", defaultDeny=False),
util.StopBuildEndpointMatcher(role="owner"),
util.AnyControlEndpointMatcher(role="admin"),
],
roleMatchers=[
# A user must have buildbot-<something> to have the role <something>
# e.g. buildbot-admin to be admin.
util.RolesFromGroups(groupPrefix="buildbot-"),
util.RolesFromOwner(role="owner")
],
)
if "auth" not in config_dict["www"] and self.auth_method is not None:
config_dict["www"]["auth"] = self.auth_method