buildbot-nix/buildbot_nix/__init__.py

836 lines
30 KiB
Python

import json
import multiprocessing
import os
import sys
import graphlib
from collections import defaultdict
from collections.abc import Generator
from dataclasses import dataclass
from pathlib import Path
from typing import TYPE_CHECKING, Any
from buildbot.configurators import ConfiguratorBase
from buildbot.plugins import reporters, schedulers, secrets, steps, util, worker
from buildbot.process import buildstep, logobserver, remotecommand
from buildbot.process.project import Project
from buildbot.process.properties import Interpolate, Properties
from buildbot.process.results import ALL_RESULTS, statusToString
from buildbot.steps.trigger import Trigger
from buildbot.util import asyncSleep
from buildbot.www.authz.endpointmatchers import EndpointMatcherBase, Match
from buildbot.www.oauth2 import OAuth2Auth
from buildbot.changes.gerritchangesource import GerritChangeSource
from buildbot.reporters.utils import getURLForBuild
from buildbot.reporters.utils import getURLForBuildrequest
from buildbot.process.buildstep import CANCELLED
from buildbot.process.buildstep import EXCEPTION
from buildbot.process.buildstep import SUCCESS
from buildbot.process.results import worst_status
if TYPE_CHECKING:
from buildbot.process.log import Log
from twisted.internet import defer, threads
from twisted.logger import Logger
from twisted.python.failure import Failure
from .github_projects import (
slugify_project_name,
)
log = Logger()
class LixSystemsOAuth2(OAuth2Auth):
name = 'Lix'
faIcon = 'fa-login'
resourceEndpoint = "https://identity.lix.systems"
# is passing scope necessary?
authUri = 'https://identity.lix.systems/realms/lix-project/protocol/openid-connect/auth'
tokenUri = 'https://identity.lix.systems/realms/lix-project/protocol/openid-connect/token'
class BuildbotNixError(Exception):
pass
@dataclass
class GerritProject:
# `project` field.
name: str
class BuildTrigger(steps.BuildStep):
def __init__(
self,
builds_scheduler: str,
jobs: list[dict[str, Any]],
drv_info: dict[str, Any],
**kwargs: Any,
) -> None:
self.jobs = jobs
self.drv_info = drv_info
self.config = None
self.builds_scheduler = builds_scheduler
self._result_list = []
self.ended = False
self.waitForFinishDeferred = None
super().__init__(**kwargs)
def interrupt(self, reason):
# We cancel the buildrequests, as the data api handles
# both cases:
# - build started: stop is sent,
# - build not created yet: related buildrequests are set to CANCELLED.
# Note that there is an identified race condition though (more details
# are available at buildbot.data.buildrequests).
for brid in self.brids:
self.master.data.control(
"cancel", {'reason': 'parent build was interrupted'}, ("buildrequests", brid)
)
if self.running and not self.ended:
self.ended = True
# if we are interrupted because of a connection lost, we interrupt synchronously
if self.build.conn is None and self.waitForFinishDeferred is not None:
self.waitForFinishDeferred.cancel()
def getSchedulerByName(self, name):
schedulers = self.master.scheduler_manager.namedServices
if name not in schedulers:
raise ValueError(f"unknown triggered scheduler: {repr(name)}")
sch = schedulers[name]
# todo: check ITriggerableScheduler
return sch
def schedule_one(self, build_props, job):
source = f"nix-eval-lix"
attr = job.get("attr", "eval-error")
name = attr
name = f"hydraJobs.{name}"
error = job.get("error")
props = Properties()
props.setProperty("virtual_builder_name", name, source)
props.setProperty("status_name", f"nix-build .#hydraJobs.{attr}", source)
props.setProperty("virtual_builder_tags", "", source)
if error is not None:
props.setProperty("error", error, source)
return (self.builds_scheduler, props)
drv_path = job.get("drvPath")
system = job.get("system")
out_path = job.get("outputs", {}).get("out")
build_props.setProperty(f"{attr}-out_path", out_path, source)
build_props.setProperty(f"{attr}-drv_path", drv_path, source)
props.setProperty("attr", attr, source)
props.setProperty("system", system, source)
props.setProperty("drv_path", drv_path, source)
props.setProperty("out_path", out_path, source)
props.setProperty("isCached", job.get("isCached"), source)
return (self.builds_scheduler, props)
@defer.inlineCallbacks
def _add_results(self, brid):
@defer.inlineCallbacks
def _is_buildrequest_complete(brid):
buildrequest = yield self.master.db.buildrequests.getBuildRequest(brid)
return buildrequest['complete']
event = ('buildrequests', str(brid), 'complete')
yield self.master.mq.waitUntilEvent(event, lambda: _is_buildrequest_complete(brid))
builds = yield self.master.db.builds.getBuilds(buildrequestid=brid)
for build in builds:
self._result_list.append(build["results"])
self.updateSummary()
def prepareSourcestampListForTrigger(self):
ss_for_trigger = {}
objs_from_build = self.build.getAllSourceStamps()
for ss in objs_from_build:
ss_for_trigger[ss.codebase] = ss.asDict()
trigger_values = [ss_for_trigger[k] for k in sorted(ss_for_trigger.keys())]
return trigger_values
@defer.inlineCallbacks
def run(self):
build_props = self.build.getProperties()
source = f"nix-eval-lix"
all_deps = dict()
for drv, info in self.drv_info.items():
all_deps[drv] = set(info.get("inputDrvs").keys())
def closure_of(key, deps):
r, size = set([key]), 0
while len(r) != size:
size = len(r)
r.update(*[ deps[k] for k in r ])
return r.difference([key])
job_set = set(( drv for drv in ( job.get("drvPath") for job in self.jobs ) if drv ))
all_deps = { k: list(closure_of(k, all_deps).intersection(job_set)) for k in job_set }
builds_to_schedule = list(self.jobs)
build_schedule_order = []
sorter = graphlib.TopologicalSorter(all_deps)
for item in sorter.static_order():
i = 0
while i < len(builds_to_schedule):
if item == builds_to_schedule[i].get("drvPath"):
build_schedule_order.append(builds_to_schedule[i])
del builds_to_schedule[i]
else:
i += 1
done = []
scheduled = []
failed = []
all_results = SUCCESS
ss_for_trigger = self.prepareSourcestampListForTrigger()
while len(build_schedule_order) > 0 or len(scheduled) > 0:
print('Scheduling..')
schedule_now = []
for build in list(build_schedule_order):
if all_deps.get(build.get("drvPath"), []) == []:
build_schedule_order.remove(build)
schedule_now.append(build)
if len(schedule_now) == 0:
print(' No builds to schedule found.')
for job in schedule_now:
print(f" - {job.get('attr')}")
(scheduler, props) = self.schedule_one(build_props, job)
scheduler = self.getSchedulerByName(scheduler)
idsDeferred, resultsDeferred = scheduler.trigger(
waited_for = True,
sourcestamps = ss_for_trigger,
set_props = props,
parent_buildid = self.build.buildid,
parent_relationship = "Triggered from",
)
brids = {}
try:
_, brids = yield idsDeferred
except Exception as e:
yield self.addLogWithException(e)
results = EXCEPTION
scheduled.append((job, brids, resultsDeferred))
for brid in brids.values():
url = getURLForBuildrequest(self.master, brid)
yield self.addURL(f"{scheduler.name} #{brid}", url)
self._add_results(brid)
print('Waiting..')
wait_for_next = defer.DeferredList([results for _, _, results in scheduled], fireOnOneCallback = True, fireOnOneErrback=True)
self.waitForFinishDeferred = wait_for_next
results, index = yield wait_for_next
job, brids, _ = scheduled[index]
done.append((job, brids, results))
del scheduled[index]
result = results[0]
print(f' Found finished build {job.get("attr")}, result {util.Results[result].upper()}')
if result != SUCCESS:
failed_checks = []
failed_paths = []
removed = []
while True:
old_paths = list(failed_paths)
print(failed_checks, old_paths)
for build in list(build_schedule_order):
deps = all_deps.get(build.get("drvPath"), [])
for path in old_paths:
if path in deps:
failed_checks.append(build)
failed_paths.append(build.get("drvPath"))
build_schedule_order.remove(build)
removed.append(build.get("attr"))
break
if old_paths == failed_paths:
break
print(' Removed jobs: ' + ', '.join(removed))
all_results = worst_status(result, all_results)
print(f' New result: {util.Results[all_results].upper()}')
for dep in all_deps:
if job.get("drvPath") in all_deps[dep]:
all_deps[dep].remove(job.get("drvPath"))
print('Done!')
return all_results
def getCurrentSummary(self) -> dict[str, str]: # noqa: N802
summary = []
if self._result_list:
for status in ALL_RESULTS:
count = self._result_list.count(status)
if count:
summary.append(
f"{self._result_list.count(status)} {statusToString(status, count)}",
)
return {"step": f"({', '.join(summary)})"}
class NixEvalCommand(buildstep.ShellMixin, steps.BuildStep):
"""Parses the output of `nix-eval-jobs` and triggers a `nix-build` build for
every attribute.
"""
def __init__(self, supported_systems: list[str], **kwargs: Any) -> None:
kwargs = self.setupShellMixin(kwargs)
super().__init__(**kwargs)
self.observer = logobserver.BufferLogObserver()
self.addLogObserver("stdio", self.observer)
self.supported_systems = supported_systems
@defer.inlineCallbacks
def run(self) -> Generator[Any, object, Any]:
# run nix-eval-jobs --flake .#hydraJobs to generate the dict of stages
cmd: remotecommand.RemoteCommand = yield self.makeRemoteShellCommand()
yield self.runCommand(cmd)
# if the command passes extract the list of stages
result = cmd.results()
if result == util.SUCCESS:
# create a ShellCommand for each stage and add them to the build
jobs = []
for line in self.observer.getStdout().split("\n"):
if line != "":
try:
job = json.loads(line)
except json.JSONDecodeError as e:
msg = f"Failed to parse line: {line}"
raise BuildbotNixError(msg) from e
jobs.append(job)
build_props = self.build.getProperties()
filtered_jobs = []
for job in jobs:
system = job.get("system")
if not system or system in self.supported_systems: # report eval errors
filtered_jobs.append(job)
drv_show_log: Log = yield self.getLog("stdio")
drv_show_log.addStdout(f"getting derivation infos\n")
cmd = yield self.makeRemoteShellCommand(
stdioLogName=None,
collectStdout=True,
command=(
["nix", "derivation", "show", "--recursive"]
+ [ drv for drv in (job.get("drvPath") for job in filtered_jobs) if drv ]
),
)
yield self.runCommand(cmd)
drv_show_log.addStdout(f"done\n")
try:
drv_info = json.loads(cmd.stdout)
except json.JSONDecodeError as e:
msg = f"Failed to parse `nix derivation show` output for {cmd.command}"
raise BuildbotNixError(msg) from e
self.build.addStepsAfterCurrentStep(
[
BuildTrigger(
builds_scheduler=f"lix-nix-build",
name="build flake",
jobs=filtered_jobs,
drv_info=drv_info,
),
],
)
return result
class NixBuildCommand(buildstep.ShellMixin, steps.BuildStep):
"""Builds a nix derivation."""
def __init__(self, **kwargs: Any) -> None:
kwargs = self.setupShellMixin(kwargs)
super().__init__(**kwargs)
@defer.inlineCallbacks
def run(self) -> Generator[Any, object, Any]:
if error := self.getProperty("error"):
attr = self.getProperty("attr")
# show eval error
error_log: Log = yield self.addLog("nix_error")
error_log.addStderr(f"{attr} failed to evaluate:\n{error}")
return util.FAILURE
if self.getProperty("isCached"):
yield self.addCompleteLog(
"cached outpath from previous builds",
# buildbot apparently hides the first line in the ui?
f'\n{self.getProperty("out_path")}\n')
return util.SKIPPED
# run `nix build`
cmd: remotecommand.RemoteCommand = yield self.makeRemoteShellCommand()
yield self.runCommand(cmd)
return cmd.results()
class UpdateBuildOutput(steps.BuildStep):
"""Updates store paths in a public www directory.
This is useful to prefetch updates without having to evaluate
on the target machine.
"""
def __init__(self, path: Path, **kwargs: Any) -> None:
super().__init__(**kwargs)
self.path = path
def run(self) -> Generator[Any, object, Any]:
props = self.build.getProperties()
if props.getProperty("branch") != props.getProperty(
"github.repository.default_branch",
):
return util.SKIPPED
attr = Path(props.getProperty("attr")).name
out_path = props.getProperty("out_path")
# XXX don't hardcode this
self.path.mkdir(parents=True, exist_ok=True)
(self.path / attr).write_text(out_path)
return util.SUCCESS
def nix_eval_config(
project: GerritProject,
gerrit_private_key: str,
worker_names: list[str],
supported_systems: list[str],
eval_lock: util.MasterLock,
worker_count: int,
max_memory_size: int,
) -> util.BuilderConfig:
"""Uses nix-eval-jobs to evaluate hydraJobs from flake.nix in parallel.
For each evaluated attribute a new build pipeline is started.
"""
factory = util.BuildFactory()
# check out the source
factory.addStep(
steps.Gerrit(
repourl="ssh://buildbot@gerrit.lix.systems:2022/lix",
mode="full",
retry=[60, 60],
timeout=3600,
sshPrivateKey=gerrit_private_key
),
)
# use one gcroots directory per worker. this should be scoped to the largest unique resource
# in charge of builds (ie, buildnumber is too narrow) to not litter the system with permanent
# gcroots in case of worker restarts.
# TODO perhaps we should clean the entire /drvs/ directory up too during startup.
drv_gcroots_dir = util.Interpolate(
"/nix/var/nix/gcroots/per-user/buildbot-worker/%(prop:project)s/drvs/%(prop:workername)s/",
)
factory.addStep(
NixEvalCommand(
env={},
name="evaluate flake",
supported_systems=supported_systems,
command=[
"nix-eval-jobs",
"--workers",
str(worker_count),
"--max-memory-size",
str(max_memory_size),
"--option",
"accept-flake-config",
"true",
"--gc-roots-dir",
drv_gcroots_dir,
"--force-recurse",
"--check-cache-status",
"--flake",
".#hydraJobs",
],
haltOnFailure=True,
locks=[eval_lock.access("exclusive")],
),
)
factory.addStep(
steps.ShellCommand(
name="Cleanup drv paths",
command=[
"rm",
"-rf",
drv_gcroots_dir,
],
alwaysRun=True,
),
)
return util.BuilderConfig(
name=f"{project.name}/nix-eval",
workernames=worker_names,
project=project.name,
factory=factory,
properties=dict(status_name="nix-eval"),
)
def nix_build_config(
project: GerritProject,
worker_names: list[str],
outputs_path: Path | None = None,
) -> util.BuilderConfig:
"""Builds one nix flake attribute."""
factory = util.BuildFactory()
factory.addStep(
NixBuildCommand(
env={},
name="Build flake attr",
command=[
"nix",
"build",
"-L",
"--option",
"keep-going",
"true",
"--option",
# stop stuck builds after 20 minutes
"--max-silent-time",
str(60 * 20),
"--accept-flake-config",
"--out-link",
util.Interpolate("result-%(prop:attr)s"),
util.Interpolate("%(prop:drv_path)s^*"),
],
# 3 hours, defaults to 20 minutes
# We increase this over the default since the build output might end up in a different `nix build`.
timeout=60 * 60 * 3,
haltOnFailure=True,
),
)
factory.addStep(
steps.ShellCommand(
name="Register gcroot",
command=[
"nix-store",
"--add-root",
# FIXME: cleanup old build attributes
util.Interpolate(
"/nix/var/nix/gcroots/per-user/buildbot-worker/%(prop:project)s/%(prop:attr)s",
),
"-r",
util.Property("out_path"),
],
doStepIf=lambda s: s.getProperty("branch")
== s.getProperty("github.repository.default_branch"),
),
)
factory.addStep(
steps.ShellCommand(
name="Delete temporary gcroots",
command=["rm", "-f", util.Interpolate("result-%(prop:attr)s")],
),
)
if outputs_path is not None:
factory.addStep(
UpdateBuildOutput(
name="Update build output",
path=outputs_path,
),
)
return util.BuilderConfig(
name=f"{project.name}/nix-build",
project=project.name,
workernames=worker_names,
collapseRequests=False,
env={},
factory=factory,
)
def read_secret_file(secret_name: str) -> str:
directory = os.environ.get("CREDENTIALS_DIRECTORY")
if directory is None:
print("directory not set", file=sys.stderr)
sys.exit(1)
return Path(directory).joinpath(secret_name).read_text().rstrip()
def config_for_project(
config: dict[str, Any],
project: GerritProject,
worker_names: list[str],
nix_supported_systems: list[str],
nix_eval_worker_count: int,
nix_eval_max_memory_size: int,
eval_lock: util.MasterLock,
outputs_path: Path | None = None,
) -> Project:
config["projects"].append(Project(project.name))
config["schedulers"].extend(
[
# build everything pertaining to a project
# TODO(raito): will this catch also post-merge? we don't really care about that… do we?
schedulers.SingleBranchScheduler(
name=f"{project.name}-changes",
change_filter=util.ChangeFilter(
project=project.name,
),
builderNames=[f"{project.name}/nix-eval"],
),
# this is triggered from `nix-eval`
schedulers.Triggerable(
name=f"{project.name}-nix-build",
builderNames=[f"{project.name}/nix-build"],
),
# allow to manually trigger a nix-build
schedulers.ForceScheduler(
name=f"{project.name}-force",
builderNames=[f"{project.name}/nix-eval"],
properties=[
util.StringParameter(
name="project",
label="Name of the Gerrit repository.",
default=project.name,
),
],
),
],
)
gerrit_private_key = None
with open('/var/lib/buildbot/master/id_gerrit', 'r') as f:
gerrit_private_key = f.read()
if gerrit_private_key is None:
raise RuntimeError('No gerrit private key to fetch the repositories')
config["builders"].extend(
[
# Since all workers run on the same machine, we only assign one of them to do the evaluation.
# This should prevent exessive memory usage.
nix_eval_config(
project,
gerrit_private_key,
worker_names,
supported_systems=nix_supported_systems,
worker_count=nix_eval_worker_count,
max_memory_size=nix_eval_max_memory_size,
eval_lock=eval_lock,
),
nix_build_config(
project,
worker_names,
outputs_path=outputs_path,
),
],
)
class PeriodicWithStartup(schedulers.Periodic):
def __init__(self, *args: Any, run_on_startup: bool = False, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self.run_on_startup = run_on_startup
@defer.inlineCallbacks
def activate(self) -> Generator[Any, object, Any]:
if self.run_on_startup:
yield self.setState("last_build", None)
yield super().activate()
def gerritReviewCB(builderName, build, result, master, arg):
if result == util.RETRY:
return dict()
if builderName != 'lix/nix-eval':
return dict()
all_checks = {}
for step in build['steps']:
if step['name'] != 'build flake':
continue
for url in step['urls']:
if url['name'].startswith('success: hydraJobs.'):
path = url['name'].split(' ')[1]
all_checks[path] = (True, url['url'])
elif url['name'].startswith('failure: hydraJobs.'):
path = url['name'].split(' ')[1]
all_checks[path] = (False, url['url'])
collected_oses = {}
for check in all_checks:
arch = check.split('.')[-1]
if not arch.endswith('-linux') and not arch.endswith('-darwin'):
# Not an architecture-specific job, just a test
os = "test"
else:
os = arch.split('-')[1]
(success, failure) = collected_oses.get(os, (0, 0))
if all_checks[check][0]:
success += 1
else:
failure += 1
collected_oses[os] = (success, failure)
labels = {}
if 'linux' in collected_oses:
(success, failure) = collected_oses['linux']
if success > 0 and failure == 0:
labels['Verified-On-Linux'] = 1
elif failure > 0:
labels['Verified-On-Linux'] = -1
if 'darwin' in collected_oses:
(success, failure) = collected_oses['darwin']
if success > 0 and failure == 0:
labels['Verified-On-Darwin'] = 1
elif failure > 0:
labels['Verified-On-Darwin'] = -1
message = "Buildbot finished compiling your patchset!\n"
message += "The result is: %s\n" % util.Results[result].upper()
if result != util.SUCCESS:
successful_checks = []
failed_checks = []
for check in all_checks:
if not all_checks[check][0]:
failed_checks.append(f" - {check} (see {all_checks[check][1]})")
if len(failed_checks) > 0:
message += "Failed checks:\n" + "\n".join(failed_checks) + "\n"
if arg:
message += "\nFor more details visit:\n"
message += build['url'] + "\n"
return dict(message=message, labels=labels)
def gerritStartCB(builderName, build, arg):
message = "Buildbot started compiling your patchset\n"
message += "on configuration: %s\n" % builderName
message += "See your build here: %s" % build['url']
return dict(message=message)
def gerritSummaryCB(buildInfoList, results, status, arg):
success = False
failure = False
msgs = []
for buildInfo in buildInfoList:
msg = "Builder %(name)s %(resultText)s (%(text)s)" % buildInfo
link = buildInfo.get('url', None)
if link:
msg += " - " + link
else:
msg += "."
msgs.append(msg)
if buildInfo['result'] == util.SUCCESS:
success = True
else:
failure = True
if success and not failure:
verified = 1
else:
verified = -1
return dict(message='\n\n'.join(msgs),
labels={
'Verified': verified
})
class GerritNixConfigurator(ConfiguratorBase):
"""Janitor is a configurator which create a Janitor Builder with all needed Janitor steps"""
def __init__(
self,
# Shape of this file: [ { "name": "<worker-name>", "pass": "<worker-password>", "cores": "<cpu-cores>" } ]
gerrit_server: str,
gerrit_user: str,
gerrit_port: int,
gerrit_sshkey_path: str,
url: str,
nix_supported_systems: list[str],
nix_eval_worker_count: int | None,
nix_eval_max_memory_size: int,
nix_workers_secret_name: str = "buildbot-nix-workers", # noqa: S107
outputs_path: str | None = None,
) -> None:
super().__init__()
self.gerrit_server = gerrit_server
self.gerrit_user = gerrit_user
self.gerrit_port = gerrit_port
self.nix_workers_secret_name = nix_workers_secret_name
self.nix_eval_max_memory_size = nix_eval_max_memory_size
self.nix_eval_worker_count = nix_eval_worker_count
self.nix_supported_systems = nix_supported_systems
self.gerrit_change_source = GerritChangeSource(gerrit_server, gerrit_user, gerritport=gerrit_port, identity_file=gerrit_sshkey_path)
self.url = url
if outputs_path is None:
self.outputs_path = None
else:
self.outputs_path = Path(outputs_path)
def configure(self, config: dict[str, Any]) -> None:
worker_config = json.loads(read_secret_file(self.nix_workers_secret_name))
worker_names = []
config.setdefault("projects", [])
config.setdefault("secretsProviders", [])
config.setdefault("www", {})
for item in worker_config:
cores = item.get("cores", 0)
for i in range(cores):
worker_name = f"{item['name']}-{i:03}"
config["workers"].append(worker.Worker(worker_name, item["pass"]))
worker_names.append(worker_name)
eval_lock = util.MasterLock("nix-eval")
# Configure the Lix project.
config_for_project(
config,
GerritProject(name="lix"),
worker_names,
self.nix_supported_systems,
self.nix_eval_worker_count or multiprocessing.cpu_count(),
self.nix_eval_max_memory_size,
eval_lock,
self.outputs_path,
)
config["change_source"] = self.gerrit_change_source
"""
config["services"].append(
reporters.GerritStatusPush(self.gerrit_server, self.gerrit_user,
port=2022,
identity_file='/var/lib/buildbot/master/id_gerrit',
summaryCB=None,
startCB=None,
wantSteps=True,
reviewCB=gerritReviewCB,
reviewArg=self.url)
# startCB=gerritStartCB,
# startArg=self.url,
# summaryCB=gerritSummaryCB,
# summaryArg=self.url)
)
"""
systemd_secrets = secrets.SecretInAFile(
dirname=os.environ["CREDENTIALS_DIRECTORY"],
)
config["secretsProviders"].append(systemd_secrets)
config["www"].setdefault("plugins", {})
config["www"]["plugins"].update(dict(base_react={}))
if "auth" not in config["www"]:
config["www"]["auth"] = LixSystemsOAuth2('buildbot', read_secret_file('buildbot-oauth2-secret'), autologin=True)