917 lines
34 KiB
Python
917 lines
34 KiB
Python
import json
|
||
import multiprocessing
|
||
import os
|
||
import sys
|
||
import graphlib
|
||
import base64
|
||
from collections.abc import Generator
|
||
from dataclasses import dataclass
|
||
from pathlib import Path
|
||
from typing import TYPE_CHECKING, Any
|
||
|
||
from buildbot.configurators import ConfiguratorBase
|
||
from buildbot.plugins import reporters, schedulers, secrets, steps, util, worker
|
||
from buildbot.process import buildstep, logobserver, remotecommand
|
||
from buildbot.process.project import Project
|
||
from buildbot.process.properties import Properties
|
||
from buildbot.process.results import ALL_RESULTS, statusToString
|
||
from buildbot.www.auth import AuthBase
|
||
from buildbot.www.oauth2 import OAuth2Auth
|
||
from buildbot.changes.gerritchangesource import GerritChangeSource
|
||
from buildbot.reporters.utils import getURLForBuildrequest
|
||
from buildbot.reporters.generators.build import BuildStatusGenerator
|
||
from buildbot.reporters.message import MessageFormatterFunction
|
||
from buildbot.process.buildstep import EXCEPTION
|
||
from buildbot.process.buildstep import SUCCESS
|
||
from buildbot.process.results import worst_status
|
||
|
||
if TYPE_CHECKING:
|
||
from buildbot.process.log import Log
|
||
|
||
from twisted.internet import defer
|
||
from twisted.logger import Logger
|
||
|
||
from .binary_cache import S3BinaryCacheConfig
|
||
|
||
log = Logger()
|
||
|
||
FLAKE_TARGET_ATTRIBUTE_FOR_JOBS = "buildbotJobs"
|
||
|
||
@dataclass
|
||
class NixBuilder:
|
||
protocol: str
|
||
hostName: str
|
||
maxJobs: int
|
||
speedFactor: int = 1
|
||
# without base64
|
||
publicHostKey: str | None = None
|
||
sshUser: str | None = None
|
||
sshKey: str | None = None
|
||
systems: list[str] = ["-"]
|
||
supportedFeatures: list[str] = ["-"]
|
||
mandatoryFeatures: list[str] = ["-"]
|
||
|
||
def to_nix_line(self):
|
||
encoded_public_key = base64.b64encode(self.publicHostKey.encode('ascii')).decode('ascii') if self.publicHostKey is not None else "-"
|
||
fullConnection = f"{self.protocol}://{self.sshUser}@{self.hostName}" if self.sshUser is not None else self.hostName
|
||
return f"{fullConnection} {",".join(self.systems)} {self.sshKey or "-"} {self.maxJobs} {self.speedFactor} {",".join(self.supportedFeatures)} {",".join(self.mandatoryFeatures)} {encoded_public_key}"
|
||
|
||
|
||
@dataclass
|
||
class OAuth2Config:
|
||
name: str
|
||
faIcon: str
|
||
resourceEndpoint: str
|
||
authUri: str
|
||
tokenUri: str
|
||
|
||
def make_oauth2_method(oauth2_config: OAuth2Config):
|
||
"""
|
||
This constructs dynamically a class inheriting
|
||
an OAuth2 base configured using a dataclass.
|
||
"""
|
||
return type(f'{oauth2_config.name}DynamicOAuth2',
|
||
(OAuth2Auth,),
|
||
oauth2_config.__dict__)
|
||
|
||
class BuildbotNixError(Exception):
|
||
pass
|
||
|
||
@dataclass
|
||
class GerritProject:
|
||
# `project` field.
|
||
name: str
|
||
# Private SSH key path to access Gerrit API
|
||
private_sshkey_path: str
|
||
|
||
@dataclass
|
||
class GerritConfig:
|
||
# Gerrit server domain
|
||
domain: str
|
||
port: int
|
||
username: str
|
||
|
||
@property
|
||
def repourl_template(self) -> str:
|
||
"""
|
||
Returns the prefix to build a repourl using that gerrit configuration.
|
||
"""
|
||
return 'ssh://{self.username}@{self.domain}:{self.port}/'
|
||
|
||
class BuildTrigger(steps.BuildStep):
|
||
def __init__(
|
||
self,
|
||
builds_scheduler_group: str,
|
||
jobs: list[dict[str, Any]],
|
||
all_deps: dict[str, Any],
|
||
**kwargs: Any,
|
||
) -> None:
|
||
self.jobs = jobs
|
||
self.all_deps = all_deps
|
||
self.config = None
|
||
self.builds_scheduler_group = builds_scheduler_group
|
||
self._result_list = []
|
||
self.ended = False
|
||
self.waitForFinishDeferred = None
|
||
self.brids = []
|
||
self.description = f"building {len(jobs)} hydra jobs"
|
||
super().__init__(**kwargs)
|
||
|
||
def interrupt(self, reason):
|
||
# We cancel the buildrequests, as the data api handles
|
||
# both cases:
|
||
# - build started: stop is sent,
|
||
# - build not created yet: related buildrequests are set to CANCELLED.
|
||
# Note that there is an identified race condition though (more details
|
||
# are available at buildbot.data.buildrequests).
|
||
for brid in self.brids:
|
||
self.master.data.control(
|
||
"cancel", {'reason': 'parent build was interrupted'}, ("buildrequests", brid)
|
||
)
|
||
if self.running and not self.ended:
|
||
self.ended = True
|
||
# if we are interrupted because of a connection lost, we interrupt synchronously
|
||
if self.build.conn is None and self.waitForFinishDeferred is not None:
|
||
self.waitForFinishDeferred.cancel()
|
||
|
||
def getSchedulerByName(self, name):
|
||
schedulers = self.master.scheduler_manager.namedServices
|
||
if name not in schedulers:
|
||
raise ValueError(f"unknown triggered scheduler: {repr(name)}")
|
||
sch = schedulers[name]
|
||
# todo: check ITriggerableScheduler
|
||
return sch
|
||
|
||
def schedule_one(self, build_props: Properties, job):
|
||
project_name = build_props.getProperty('event.project')
|
||
source = f"{project_name}-eval-lix"
|
||
attr = job.get("attr", "eval-error")
|
||
name = attr
|
||
name = f"{FLAKE_TARGET_ATTRIBUTE_FOR_JOBS}.{name}"
|
||
error = job.get("error")
|
||
props = Properties()
|
||
props.setProperty("virtual_builder_name", name, source)
|
||
props.setProperty("status_name", f"nix-build .#{FLAKE_TARGET_ATTRIBUTE_FOR_JOBS}.{attr}", source)
|
||
props.setProperty("virtual_builder_tags", "", source)
|
||
|
||
if error is not None:
|
||
props.setProperty("error", error, source)
|
||
return (f"{self.builds_scheduler_group}-other", props)
|
||
|
||
drv_path = job.get("drvPath")
|
||
system = job.get("system")
|
||
out_path = job.get("outputs", {}).get("out")
|
||
|
||
build_props.setProperty(f"{attr}-out_path", out_path, source)
|
||
build_props.setProperty(f"{attr}-drv_path", drv_path, source)
|
||
|
||
props.setProperty("attr", attr, source)
|
||
props.setProperty("system", system, source)
|
||
props.setProperty("drv_path", drv_path, source)
|
||
props.setProperty("out_path", out_path, source)
|
||
props.setProperty("isCached", job.get("isCached"), source)
|
||
|
||
return (f"{self.builds_scheduler_group}-{system}", props)
|
||
|
||
@defer.inlineCallbacks
|
||
def _add_results(self, brid):
|
||
@defer.inlineCallbacks
|
||
def _is_buildrequest_complete(brid):
|
||
buildrequest = yield self.master.db.buildrequests.getBuildRequest(brid)
|
||
return buildrequest['complete']
|
||
|
||
event = ('buildrequests', str(brid), 'complete')
|
||
yield self.master.mq.waitUntilEvent(event, lambda: _is_buildrequest_complete(brid))
|
||
builds = yield self.master.db.builds.getBuilds(buildrequestid=brid)
|
||
for build in builds:
|
||
self._result_list.append(build["results"])
|
||
self.updateSummary()
|
||
|
||
def prepareSourcestampListForTrigger(self):
|
||
ss_for_trigger = {}
|
||
objs_from_build = self.build.getAllSourceStamps()
|
||
for ss in objs_from_build:
|
||
ss_for_trigger[ss.codebase] = ss.asDict()
|
||
|
||
trigger_values = [ss_for_trigger[k] for k in sorted(ss_for_trigger.keys())]
|
||
return trigger_values
|
||
|
||
@defer.inlineCallbacks
|
||
def run(self):
|
||
self.running = True
|
||
build_props = self.build.getProperties()
|
||
logs: Log = yield self.addLog("build info")
|
||
|
||
builds_to_schedule = list(self.jobs)
|
||
build_schedule_order = []
|
||
sorter = graphlib.TopologicalSorter(self.all_deps)
|
||
for item in sorter.static_order():
|
||
i = 0
|
||
while i < len(builds_to_schedule):
|
||
if item == builds_to_schedule[i].get("drvPath"):
|
||
build_schedule_order.append(builds_to_schedule[i])
|
||
del builds_to_schedule[i]
|
||
else:
|
||
i += 1
|
||
|
||
done = []
|
||
scheduled = []
|
||
failed = []
|
||
all_results = SUCCESS
|
||
ss_for_trigger = self.prepareSourcestampListForTrigger()
|
||
while not self.ended and (len(build_schedule_order) > 0 or len(scheduled) > 0):
|
||
schedule_now = []
|
||
for build in list(build_schedule_order):
|
||
if self.all_deps.get(build.get("drvPath"), []) == []:
|
||
build_schedule_order.remove(build)
|
||
schedule_now.append(build)
|
||
for job in schedule_now:
|
||
if job.get('isCached'):
|
||
logs.addStdout(f"Cached {job.get('attr')} ({job.get('drvPath')}) - skipping\n")
|
||
for dep in self.all_deps:
|
||
if job.get("drvPath") in self.all_deps[dep]:
|
||
self.all_deps[dep].remove(job.get("drvPath"))
|
||
continue
|
||
logs.addStdout(f"Scheduling {job.get('attr')} ({job.get('drvPath')})\n")
|
||
(scheduler, props) = self.schedule_one(build_props, job)
|
||
scheduler = self.getSchedulerByName(scheduler)
|
||
|
||
idsDeferred, resultsDeferred = scheduler.trigger(
|
||
waited_for = True,
|
||
sourcestamps = ss_for_trigger,
|
||
set_props = props,
|
||
parent_buildid = self.build.buildid,
|
||
parent_relationship = "Triggered from",
|
||
)
|
||
|
||
brids = {}
|
||
try:
|
||
_, brids = yield idsDeferred
|
||
except Exception as e:
|
||
yield self.addLogWithException(e)
|
||
results = EXCEPTION
|
||
scheduled.append((job, brids, resultsDeferred))
|
||
|
||
for brid in brids.values():
|
||
url = getURLForBuildrequest(self.master, brid)
|
||
yield self.addURL(f"{scheduler.name} #{brid}", url)
|
||
self._add_results(brid)
|
||
self.brids.append(brid)
|
||
if len(scheduled) == 0:
|
||
if len(build_schedule_order) == 0:
|
||
logs.addStderr('Ran out of builds\n')
|
||
break
|
||
continue
|
||
wait_for_next = defer.DeferredList([results for _, _, results in scheduled], fireOnOneCallback = True, fireOnOneErrback=True)
|
||
self.waitForFinishDeferred = wait_for_next
|
||
results, index = yield wait_for_next
|
||
job, brids, _ = scheduled[index]
|
||
done.append((job, brids, results))
|
||
del scheduled[index]
|
||
result = results[0]
|
||
logs.addStdout(f'Build {job.get("attr")} ({job.get("drvPath")}) finished, result {util.Results[result].upper()}\n')
|
||
if result != SUCCESS:
|
||
failed_checks = []
|
||
failed_paths = [job.get('drvPath')]
|
||
removed = []
|
||
failed.append((
|
||
job.get("attr"),
|
||
"failed",
|
||
[ getURLForBuildrequest(self.master, brid) for brid in brids.values() ]
|
||
))
|
||
while True:
|
||
old_paths = list(failed_paths)
|
||
for build in list(build_schedule_order):
|
||
deps = self.all_deps.get(build.get("drvPath"), [])
|
||
for path in old_paths:
|
||
if path in deps:
|
||
failed_checks.append(build)
|
||
failed_paths.append(build.get("drvPath"))
|
||
build_schedule_order.remove(build)
|
||
removed.append(build.get("attr"))
|
||
failed.append((build.get("attr"), f"dependency {job.get('attr')} failed", []))
|
||
|
||
break
|
||
if old_paths == failed_paths:
|
||
break
|
||
if len(removed) > 3:
|
||
yield logs.addStdout(' Skipping jobs: ' + ', '.join(removed[:3]) + f', ... ({len(removed) - 3} more)\n')
|
||
else:
|
||
yield logs.addStdout(' Skipping jobs: ' + ', '.join(removed) + '\n')
|
||
all_results = worst_status(result, all_results)
|
||
for dep in self.all_deps:
|
||
if job.get("drvPath") in self.all_deps[dep]:
|
||
self.all_deps[dep].remove(job.get("drvPath"))
|
||
yield logs.addHeader('Done!\n')
|
||
yield logs.finish()
|
||
build_props.setProperty("failed_builds", failed, "nix-eval")
|
||
if self.ended:
|
||
return util.CANCELLED
|
||
return all_results
|
||
|
||
def getCurrentSummary(self) -> dict[str, str]: # noqa: N802
|
||
summary = []
|
||
if self._result_list:
|
||
for status in ALL_RESULTS:
|
||
count = self._result_list.count(status)
|
||
if count:
|
||
summary.append(
|
||
f"{self._result_list.count(status)} {statusToString(status, count)}",
|
||
)
|
||
return {"step": f"({', '.join(summary)})"}
|
||
|
||
|
||
class NixEvalCommand(buildstep.ShellMixin, steps.BuildStep):
|
||
"""Parses the output of `nix-eval-jobs` and triggers a `nix-build` build for
|
||
every attribute.
|
||
"""
|
||
|
||
def __init__(self, supported_systems: list[str], **kwargs: Any) -> None:
|
||
kwargs = self.setupShellMixin(kwargs)
|
||
super().__init__(**kwargs)
|
||
self.observer = logobserver.BufferLogObserver()
|
||
self.addLogObserver("stdio", self.observer)
|
||
self.supported_systems = supported_systems
|
||
|
||
@defer.inlineCallbacks
|
||
def run(self) -> Generator[Any, object, Any]:
|
||
# run nix-eval-jobs --flake .#$FLAKE_TARGET_ATTRIBUTE_FOR_JOBS to generate the dict of stages
|
||
cmd: remotecommand.RemoteCommand = yield self.makeRemoteShellCommand()
|
||
build_props = self.build.getProperties()
|
||
project_name = build_props.get('event.project')
|
||
|
||
yield self.runCommand(cmd)
|
||
|
||
# if the command passes extract the list of stages
|
||
result = cmd.results()
|
||
if result == util.SUCCESS:
|
||
# create a ShellCommand for each stage and add them to the build
|
||
jobs = []
|
||
|
||
for line in self.observer.getStdout().split("\n"):
|
||
if line != "":
|
||
try:
|
||
job = json.loads(line)
|
||
except json.JSONDecodeError as e:
|
||
msg = f"Failed to parse line: {line}"
|
||
raise BuildbotNixError(msg) from e
|
||
jobs.append(job)
|
||
filtered_jobs = []
|
||
for job in jobs:
|
||
system = job.get("system")
|
||
if not system or system in self.supported_systems: # report eval errors
|
||
filtered_jobs.append(job)
|
||
|
||
drv_show_log: Log = yield self.getLog("stdio")
|
||
drv_show_log.addStdout(f"getting derivation infos\n")
|
||
cmd = yield self.makeRemoteShellCommand(
|
||
stdioLogName=None,
|
||
collectStdout=True,
|
||
command=(
|
||
["nix", "derivation", "show", "--recursive"]
|
||
+ [ drv for drv in (job.get("drvPath") for job in filtered_jobs) if drv ]
|
||
),
|
||
)
|
||
yield self.runCommand(cmd)
|
||
drv_show_log.addStdout(f"done\n")
|
||
try:
|
||
drv_info = json.loads(cmd.stdout)
|
||
except json.JSONDecodeError as e:
|
||
msg = f"Failed to parse `nix derivation show` output for {cmd.command}"
|
||
raise BuildbotNixError(msg) from e
|
||
all_deps = dict()
|
||
for drv, info in drv_info.items():
|
||
all_deps[drv] = set(info.get("inputDrvs").keys())
|
||
|
||
def closure_of(key, deps):
|
||
r, size = set([key]), 0
|
||
while len(r) != size:
|
||
size = len(r)
|
||
r.update(*[ deps[k] for k in r ])
|
||
return r.difference([key])
|
||
|
||
job_set = set(( drv for drv in ( job.get("drvPath") for job in filtered_jobs ) if drv ))
|
||
all_deps = { k: list(closure_of(k, all_deps).intersection(job_set)) for k in job_set }
|
||
|
||
self.build.addStepsAfterCurrentStep(
|
||
[
|
||
BuildTrigger(
|
||
builds_scheduler_group=f"{project_name}-nix-build",
|
||
name="build flake",
|
||
jobs=filtered_jobs,
|
||
all_deps=all_deps,
|
||
),
|
||
],
|
||
)
|
||
|
||
return result
|
||
|
||
|
||
class NixBuildCommand(buildstep.ShellMixin, steps.BuildStep):
|
||
"""Builds a nix derivation."""
|
||
|
||
def __init__(self, **kwargs: Any) -> None:
|
||
kwargs = self.setupShellMixin(kwargs)
|
||
super().__init__(**kwargs)
|
||
|
||
@defer.inlineCallbacks
|
||
def run(self) -> Generator[Any, object, Any]:
|
||
if error := self.getProperty("error"):
|
||
attr = self.getProperty("attr")
|
||
# show eval error
|
||
error_log: Log = yield self.addLog("nix_error")
|
||
error_log.addStderr(f"{attr} failed to evaluate:\n{error}")
|
||
return util.FAILURE
|
||
|
||
if self.getProperty("isCached"):
|
||
yield self.addCompleteLog(
|
||
"cached outpath from previous builds",
|
||
# buildbot apparently hides the first line in the ui?
|
||
f'\n{self.getProperty("out_path")}\n')
|
||
return util.SKIPPED
|
||
|
||
# run `nix build`
|
||
cmd: remotecommand.RemoteCommand = yield self.makeRemoteShellCommand()
|
||
yield self.runCommand(cmd)
|
||
|
||
return cmd.results()
|
||
|
||
|
||
|
||
def nix_eval_config(
|
||
gerrit_config: GerritConfig,
|
||
project: GerritProject,
|
||
worker_names: list[str],
|
||
supported_systems: list[str],
|
||
eval_lock: util.MasterLock,
|
||
worker_count: int,
|
||
max_memory_size: int,
|
||
) -> util.BuilderConfig:
|
||
"""Uses nix-eval-jobs to evaluate $FLAKE_TARGET_ATTRIBUTE_FOR_JOBS (`.#hydraJobs` by default) from flake.nix in parallel.
|
||
For each evaluated attribute a new build pipeline is started.
|
||
"""
|
||
factory = util.BuildFactory()
|
||
# check out the source
|
||
factory.addStep(
|
||
steps.Gerrit(
|
||
repourl=f'{gerrit_config.repourl_template}/{project.name}',
|
||
mode="full",
|
||
retry=[60, 60],
|
||
timeout=3600,
|
||
sshPrivateKey=project.private_sshkey_path
|
||
),
|
||
)
|
||
# use one gcroots directory per worker. this should be scoped to the largest unique resource
|
||
# in charge of builds (ie, buildnumber is too narrow) to not litter the system with permanent
|
||
# gcroots in case of worker restarts.
|
||
# TODO perhaps we should clean the entire /drvs/ directory up too during startup.
|
||
drv_gcroots_dir = util.Interpolate(
|
||
"/nix/var/nix/gcroots/per-user/buildbot-worker/%(prop:project)s/drvs/%(prop:workername)s/",
|
||
)
|
||
|
||
factory.addStep(
|
||
NixEvalCommand(
|
||
env={},
|
||
name="evaluate flake",
|
||
supported_systems=supported_systems,
|
||
command=[
|
||
"nix-eval-jobs",
|
||
"--workers",
|
||
str(worker_count),
|
||
"--max-memory-size",
|
||
str(max_memory_size),
|
||
"--option",
|
||
"accept-flake-config",
|
||
"true",
|
||
"--gc-roots-dir",
|
||
drv_gcroots_dir,
|
||
"--force-recurse",
|
||
"--check-cache-status",
|
||
"--flake",
|
||
f".#{FLAKE_TARGET_ATTRIBUTE_FOR_JOBS}"
|
||
],
|
||
haltOnFailure=True,
|
||
locks=[eval_lock.access("exclusive")],
|
||
),
|
||
)
|
||
|
||
factory.addStep(
|
||
steps.ShellCommand(
|
||
name="Cleanup drv paths",
|
||
command=[
|
||
"rm",
|
||
"-rf",
|
||
drv_gcroots_dir,
|
||
],
|
||
alwaysRun=True,
|
||
),
|
||
)
|
||
|
||
return util.BuilderConfig(
|
||
name=f"{project.name}/nix-eval",
|
||
workernames=worker_names,
|
||
project=project.name,
|
||
factory=factory,
|
||
properties=dict(status_name="nix-eval"),
|
||
)
|
||
|
||
|
||
def nix_build_config(
|
||
project: GerritProject,
|
||
worker_arch: str,
|
||
worker_names: list[str],
|
||
builders_spec: str,
|
||
signing_keyfile: str | None = None,
|
||
binary_cache_config: S3BinaryCacheConfig | None = None
|
||
) -> util.BuilderConfig:
|
||
"""Builds one nix flake attribute."""
|
||
factory = util.BuildFactory()
|
||
factory.addStep(
|
||
NixBuildCommand(
|
||
env={},
|
||
name="Build flake attr",
|
||
command=[
|
||
"nix",
|
||
"build",
|
||
"-L",
|
||
"--option",
|
||
"keep-going",
|
||
"true",
|
||
# do not build directly on the coordinator
|
||
"--max-jobs", "0",
|
||
"--option",
|
||
# stop stuck builds after 20 minutes
|
||
"--max-silent-time",
|
||
str(60 * 20),
|
||
"--accept-flake-config",
|
||
"--builders",
|
||
builders_spec,
|
||
"--out-link",
|
||
util.Interpolate("result-%(prop:attr)s"),
|
||
util.Interpolate("%(prop:drv_path)s^*"),
|
||
],
|
||
# 3 hours, defaults to 20 minutes
|
||
# We increase this over the default since the build output might end up in a different `nix build`.
|
||
timeout=60 * 60 * 3,
|
||
haltOnFailure=True,
|
||
),
|
||
)
|
||
|
||
if signing_keyfile is not None:
|
||
factory.addStep(
|
||
steps.ShellCommand(
|
||
name="Sign the store path",
|
||
command=[
|
||
"nix",
|
||
"store",
|
||
"sign",
|
||
"--key-file",
|
||
signing_keyfile,
|
||
util.Interpolate(
|
||
"%(prop:drv_path)s^*"
|
||
)
|
||
]
|
||
),
|
||
)
|
||
|
||
if binary_cache_config is not None:
|
||
factory.addStep(
|
||
steps.ShellCommand(
|
||
name="Upload the store path to the cache",
|
||
command=[
|
||
"nix",
|
||
"copy",
|
||
"--to",
|
||
f"s3://{binary_cache_config.bucket}?profile={binary_cache_config.profile}®ion={binary_cache_config.region}&endpoint={binary_cache_config.endpoint}",
|
||
util.Property(
|
||
"out_path"
|
||
)
|
||
]
|
||
)
|
||
)
|
||
|
||
factory.addStep(
|
||
steps.ShellCommand(
|
||
name="Register gcroot",
|
||
command=[
|
||
"nix-store",
|
||
"--add-root",
|
||
# FIXME: cleanup old build attributes
|
||
util.Interpolate(
|
||
"/nix/var/nix/gcroots/per-user/buildbot-worker/%(prop:project)s/%(prop:attr)s",
|
||
),
|
||
"-r",
|
||
util.Property("out_path"),
|
||
],
|
||
doStepIf=lambda s: s.getProperty("branch")
|
||
== s.getProperty("github.repository.default_branch"),
|
||
),
|
||
)
|
||
factory.addStep(
|
||
steps.ShellCommand(
|
||
name="Delete temporary gcroots",
|
||
command=["rm", "-f", util.Interpolate("result-%(prop:attr)s")],
|
||
),
|
||
)
|
||
|
||
return util.BuilderConfig(
|
||
name=f"{project.name}/nix-build/{worker_arch}",
|
||
project=project.name,
|
||
workernames=worker_names,
|
||
collapseRequests=False,
|
||
env={},
|
||
factory=factory,
|
||
)
|
||
|
||
def assemble_secret_file_path(secret_name: str) -> Path:
|
||
directory = os.environ.get("CREDENTIALS_DIRECTORY")
|
||
if directory is None:
|
||
print("directory not set", file=sys.stderr)
|
||
sys.exit(1)
|
||
return Path(directory).joinpath(secret_name)
|
||
|
||
def read_secret_file(secret_name: str) -> str:
|
||
return assemble_secret_file_path(secret_name).read_text().rstrip()
|
||
|
||
def config_for_project(
|
||
config: dict[str, Any],
|
||
gerrit_config: GerritConfig,
|
||
project: GerritProject,
|
||
worker_names: list[str],
|
||
nix_supported_systems: list[str],
|
||
nix_eval_worker_count: int,
|
||
nix_eval_max_memory_size: int,
|
||
eval_lock: util.MasterLock,
|
||
builders_spec: str,
|
||
signing_keyfile: str | None = None,
|
||
binary_cache_config: S3BinaryCacheConfig | None = None
|
||
) -> Project:
|
||
config["projects"].append(Project(project.name))
|
||
config["schedulers"].extend(
|
||
[
|
||
# build everything pertaining to a project
|
||
# TODO(raito): will this catch also post-merge? we don't really care about that… do we?
|
||
schedulers.SingleBranchScheduler(
|
||
name=f"{project.name}-changes",
|
||
change_filter=util.ChangeFilter(
|
||
project=project.name,
|
||
),
|
||
builderNames=[f"{project.name}/nix-eval"],
|
||
),
|
||
# this is triggered from `nix-eval`
|
||
*(
|
||
schedulers.Triggerable(
|
||
name=f"{project.name}-nix-build-{arch}",
|
||
builderNames=[f"{project.name}/nix-build/{arch}"],
|
||
)
|
||
for arch in nix_supported_systems + [ "other" ]
|
||
),
|
||
# allow to manually trigger a nix-build
|
||
schedulers.ForceScheduler(
|
||
name=f"{project.name}-force",
|
||
builderNames=[f"{project.name}/nix-eval"],
|
||
properties=[
|
||
util.StringParameter(
|
||
name="project",
|
||
label="Name of the Gerrit repository.",
|
||
default=project.name,
|
||
),
|
||
],
|
||
),
|
||
],
|
||
)
|
||
gerrit_private_key = None
|
||
with open(project.private_sshkey_path, 'r') as f:
|
||
gerrit_private_key = f.read()
|
||
|
||
if gerrit_private_key is None:
|
||
raise RuntimeError('No gerrit private key to fetch the repositories')
|
||
|
||
config["builders"].extend(
|
||
[
|
||
# Since all workers run on the same machine, we only assign one of them to do the evaluation.
|
||
# This should prevent exessive memory usage.
|
||
nix_eval_config(
|
||
gerrit_config,
|
||
project,
|
||
[ f"{w}-other" for w in worker_names ],
|
||
supported_systems=nix_supported_systems,
|
||
worker_count=nix_eval_worker_count,
|
||
max_memory_size=nix_eval_max_memory_size,
|
||
eval_lock=eval_lock,
|
||
),
|
||
*(
|
||
nix_build_config(
|
||
project,
|
||
arch,
|
||
[ f"{w}-{arch}" for w in worker_names ],
|
||
builders_spec,
|
||
signing_keyfile=signing_keyfile,
|
||
binary_cache_config=binary_cache_config
|
||
)
|
||
for arch in nix_supported_systems + [ "other" ]
|
||
),
|
||
],
|
||
)
|
||
|
||
|
||
class PeriodicWithStartup(schedulers.Periodic):
|
||
def __init__(self, *args: Any, run_on_startup: bool = False, **kwargs: Any) -> None:
|
||
super().__init__(*args, **kwargs)
|
||
self.run_on_startup = run_on_startup
|
||
|
||
@defer.inlineCallbacks
|
||
def activate(self) -> Generator[Any, object, Any]:
|
||
if self.run_on_startup:
|
||
yield self.setState("last_build", None)
|
||
yield super().activate()
|
||
|
||
def gerritReviewFmt(url, data):
|
||
if 'build' not in data:
|
||
raise ValueError('`build` is supposed to be present to format a build')
|
||
|
||
build = data['build']
|
||
if 'builder' not in build and 'name' not in build['builder']:
|
||
raise ValueError('either `builder` or `builder.name` is not present in the build dictionary, unexpected format request')
|
||
|
||
builderName = build['builder']['name']
|
||
|
||
if len(build['results']) != 1:
|
||
raise ValueError('this review request contains more than one build results, unexpected format request')
|
||
|
||
result = build['results'][0]
|
||
if result == util.RETRY:
|
||
return dict()
|
||
|
||
if builderName != f'{build["properties"].get("event.project")}/nix-eval':
|
||
return dict()
|
||
|
||
failed = build['properties'].get('failed_builds', [[]])[0]
|
||
|
||
labels = {
|
||
'Verified': -1 if result != util.SUCCESS else 1,
|
||
}
|
||
|
||
message = "Buildbot finished compiling your patchset!\n"
|
||
message += "The result is: %s\n" % util.Results[result].upper()
|
||
if result != util.SUCCESS:
|
||
message += "\nFailed checks:\n"
|
||
for check, how, urls in failed:
|
||
if not urls:
|
||
message += " "
|
||
message += f" - {check}: {how}"
|
||
if urls:
|
||
message += f" (see {', '.join(urls)})"
|
||
message += "\n"
|
||
|
||
if url:
|
||
message += "\nFor more details visit:\n"
|
||
message += build['url'] + "\n"
|
||
|
||
return dict(message=message, labels=labels)
|
||
|
||
class GerritNixConfigurator(ConfiguratorBase):
|
||
"""Janitor is a configurator which create a Janitor Builder with all needed Janitor steps"""
|
||
|
||
def __init__(
|
||
self,
|
||
# Shape of this file: [ { "name": "<worker-name>", "pass": "<worker-password>", "cores": "<cpu-cores>" } ]
|
||
gerrit_server: str,
|
||
gerrit_user: str,
|
||
gerrit_port: int,
|
||
gerrit_sshkey_path: str,
|
||
projects: list[str],
|
||
url: str,
|
||
allowed_origins: list[str],
|
||
nix_builders: list[dict[str, Any]],
|
||
nix_supported_systems: list[str],
|
||
nix_eval_worker_count: int | None,
|
||
nix_eval_max_memory_size: int,
|
||
nix_workers_secret_name: str = "buildbot-nix-workers", # noqa: S107
|
||
signing_keyfile: str | None = None,
|
||
binary_cache_config: dict[str, str] | None = None,
|
||
auth_method: AuthBase | None = None,
|
||
) -> None:
|
||
super().__init__()
|
||
self.allowed_origins = allowed_origins
|
||
self.gerrit_server = gerrit_server
|
||
self.gerrit_user = gerrit_user
|
||
self.gerrit_port = gerrit_port
|
||
self.gerrit_sshkey_path = gerrit_sshkey_path
|
||
self.gerrit_config = GerritConfig(domain=self.gerrit_server,
|
||
username=self.gerrit_user,
|
||
port=self.gerrit_port)
|
||
self.projects = projects
|
||
|
||
self.nix_workers_secret_name = nix_workers_secret_name
|
||
self.nix_eval_max_memory_size = nix_eval_max_memory_size
|
||
self.nix_eval_worker_count = nix_eval_worker_count
|
||
self.nix_supported_systems = nix_supported_systems
|
||
self.nix_builders: list[NixBuilder] = [NixBuilder(**builder_cfg) for builder_cfg in nix_builders]
|
||
|
||
self.gerrit_change_source = GerritChangeSource(gerrit_server, gerrit_user, gerritport=gerrit_port, identity_file=gerrit_sshkey_path)
|
||
|
||
self.url = url
|
||
|
||
if binary_cache_config is not None:
|
||
self.binary_cache_config = S3BinaryCacheConfig(**binary_cache_config)
|
||
else:
|
||
self.binary_cache_config = None
|
||
|
||
self.signing_keyfile = signing_keyfile
|
||
|
||
self.auth_method = auth_method
|
||
|
||
def configure(self, config: dict[str, Any]) -> None:
|
||
worker_config = json.loads(read_secret_file(self.nix_workers_secret_name))
|
||
worker_names = []
|
||
|
||
config.setdefault("projects", [])
|
||
config.setdefault("secretsProviders", [])
|
||
config.setdefault("www", {
|
||
'allowed_origins': self.allowed_origins
|
||
})
|
||
|
||
for item in worker_config:
|
||
cores = item.get("cores", 0)
|
||
for i in range(cores):
|
||
for arch in self.nix_supported_systems + ["other"]:
|
||
worker_name = f"{item['name']}-{i:03}"
|
||
config["workers"].append(worker.Worker(f"{worker_name}-{arch}", item["pass"]))
|
||
worker_names.append(worker_name)
|
||
|
||
eval_lock = util.MasterLock("nix-eval")
|
||
|
||
builders_spec = " ; ".join(builder.to_nix_line() for builder in self.nix_builders)
|
||
for project in self.projects:
|
||
config_for_project(
|
||
config,
|
||
self.gerrit_config,
|
||
GerritProject(name=project, private_sshkey_path=self.gerrit_sshkey_path),
|
||
worker_names,
|
||
self.nix_supported_systems,
|
||
self.nix_eval_worker_count or multiprocessing.cpu_count(),
|
||
self.nix_eval_max_memory_size,
|
||
eval_lock,
|
||
builders_spec,
|
||
signing_keyfile=self.signing_keyfile,
|
||
binary_cache_config=self.binary_cache_config
|
||
)
|
||
|
||
config["change_source"] = self.gerrit_change_source
|
||
config["services"].append(
|
||
reporters.GerritStatusPush(self.gerrit_server, self.gerrit_user,
|
||
port=self.gerrit_port,
|
||
identity_file=self.gerrit_sshkey_path,
|
||
generators=[
|
||
# gerritReviewCB / self.url
|
||
BuildStatusGenerator(
|
||
message_formatter=MessageFormatterFunction(
|
||
lambda data: gerritReviewFmt(self.url, data),
|
||
"plain",
|
||
want_properties=True,
|
||
want_steps=True
|
||
),
|
||
),
|
||
])
|
||
# startCB, summaryCB are too noisy, we won't use them.
|
||
)
|
||
|
||
# Upstream defaults pretend they already do something similar
|
||
# but they didn't work, hence the custom function.
|
||
def gerritBranchKey(b):
|
||
ref = b['branch']
|
||
if not ref.startswith('refs/changes/'):
|
||
return ref
|
||
return ref.rsplit('/', 1)[0]
|
||
|
||
config["services"].append(
|
||
util.OldBuildCanceller(
|
||
"build_canceller",
|
||
filters=[
|
||
(
|
||
[
|
||
f"{project}/nix-{kind}"
|
||
for kind in [ "eval" ] + [
|
||
f"build/{arch}"
|
||
for arch in self.nix_supported_systems + [ "other" ]
|
||
]
|
||
],
|
||
util.SourceStampFilter(project_eq=[project])
|
||
)
|
||
for project in self.projects
|
||
],
|
||
branch_key=gerritBranchKey
|
||
)
|
||
)
|
||
|
||
systemd_secrets = secrets.SecretInAFile(
|
||
dirname=os.environ["CREDENTIALS_DIRECTORY"],
|
||
)
|
||
config["secretsProviders"].append(systemd_secrets)
|
||
|
||
config["www"].setdefault("plugins", {})
|
||
|
||
if "auth" not in config["www"] and self.auth_method is not None:
|
||
config["www"]["auth"] = self.auth_method
|