import json import multiprocessing import os import sys from collections import defaultdict from collections.abc import Generator from dataclasses import dataclass from pathlib import Path from typing import TYPE_CHECKING, Any from buildbot.configurators import ConfiguratorBase from buildbot.plugins import reporters, schedulers, secrets, steps, util, worker from buildbot.process import buildstep, logobserver, remotecommand from buildbot.process.project import Project from buildbot.process.properties import Interpolate, Properties from buildbot.process.results import ALL_RESULTS, statusToString from buildbot.steps.trigger import Trigger from buildbot.util import asyncSleep from buildbot.www.authz.endpointmatchers import EndpointMatcherBase, Match from buildbot.www.oauth2 import OAuth2Auth from buildbot.changes.gerritchangesource import GerritChangeSource if TYPE_CHECKING: from buildbot.process.log import Log from twisted.internet import defer, threads from twisted.logger import Logger from twisted.python.failure import Failure from .github_projects import ( slugify_project_name, ) log = Logger() class LixSystemsOAuth2(OAuth2Auth): name = 'Lix' faIcon = 'fa-login' resourceEndpoint = "https://identity.lix.systems" # is passing scope necessary? authUri = 'https://identity.lix.systems/realms/lix-project/protocol/openid-connect/auth' tokenUri = 'https://identity.lix.systems/realms/lix-project/protocol/openid-connect/token' class BuildbotNixError(Exception): pass @dataclass class GerritProject: # `project` field. name: str class BuildTrigger(Trigger): """Dynamic trigger that creates a build for every attribute.""" def __init__( self, builds_scheduler: str, jobs: list[dict[str, Any]], drv_info: dict[str, Any], **kwargs: Any, ) -> None: if "name" not in kwargs: kwargs["name"] = "trigger" self.jobs = jobs self.drv_info = drv_info self.config = None self.builds_scheduler = builds_scheduler Trigger.__init__( self, waitForFinish=True, schedulerNames=[builds_scheduler], haltOnFailure=True, flunkOnFailure=True, sourceStamps=[], alwaysUseLatest=False, updateSourceStamp=False, **kwargs, ) def createTriggerProperties(self, props: Any) -> Any: # noqa: N802 return props def getSchedulersAndProperties(self) -> list[tuple[str, Properties]]: # noqa: N802 build_props = self.build.getProperties() source = f"nix-eval-lix" all_deps = dict() for drv, info in self.drv_info.items(): all_deps[drv] = set(info.get("inputDrvs").keys()) def closure_of(key, deps): r, size = set([key]), 0 while len(r) != size: size = len(r) r.update(*[ deps[k] for k in r ]) return r.difference([key]) job_set = set(( drv for drv in ( job.get("drvPath") for job in self.jobs ) if drv )) all_deps = { k: list(closure_of(k, all_deps).intersection(job_set)) for k in job_set } build_props.setProperty("sched_state", all_deps, source, True) triggered_schedulers = [] for job in self.jobs: attr = job.get("attr", "eval-error") name = attr name = f"hydraJobs.{name}" error = job.get("error") props = Properties() props.setProperty("virtual_builder_name", name, source) props.setProperty("status_name", f"nix-build .#hydraJobs.{attr}", source) props.setProperty("virtual_builder_tags", "", source) if error is not None: props.setProperty("error", error, source) triggered_schedulers.append((self.builds_scheduler, props)) continue drv_path = job.get("drvPath") system = job.get("system") out_path = job.get("outputs", {}).get("out") build_props.setProperty(f"{attr}-out_path", out_path, source) build_props.setProperty(f"{attr}-drv_path", drv_path, source) props.setProperty("attr", attr, source) props.setProperty("system", system, source) props.setProperty("drv_path", drv_path, source) props.setProperty("out_path", out_path, source) props.setProperty("isCached", job.get("isCached"), source) triggered_schedulers.append((self.builds_scheduler, props)) return triggered_schedulers def getCurrentSummary(self) -> dict[str, str]: # noqa: N802 """The original build trigger will the generic builder name `nix-build` in this case, which is not helpful""" if not self.triggeredNames: return {"step": "running"} summary = [] if self._result_list: for status in ALL_RESULTS: count = self._result_list.count(status) if count: summary.append( f"{self._result_list.count(status)} {statusToString(status, count)}", ) return {"step": f"({', '.join(summary)})"} class NixEvalCommand(buildstep.ShellMixin, steps.BuildStep): """Parses the output of `nix-eval-jobs` and triggers a `nix-build` build for every attribute. """ def __init__(self, supported_systems: list[str], **kwargs: Any) -> None: kwargs = self.setupShellMixin(kwargs) super().__init__(**kwargs) self.observer = logobserver.BufferLogObserver() self.addLogObserver("stdio", self.observer) self.supported_systems = supported_systems @defer.inlineCallbacks def run(self) -> Generator[Any, object, Any]: # run nix-eval-jobs --flake .#hydraJobs to generate the dict of stages cmd: remotecommand.RemoteCommand = yield self.makeRemoteShellCommand() yield self.runCommand(cmd) # if the command passes extract the list of stages result = cmd.results() if result == util.SUCCESS: # create a ShellCommand for each stage and add them to the build jobs = [] for line in self.observer.getStdout().split("\n"): if line != "": try: job = json.loads(line) except json.JSONDecodeError as e: msg = f"Failed to parse line: {line}" raise BuildbotNixError(msg) from e jobs.append(job) build_props = self.build.getProperties() filtered_jobs = [] for job in jobs: system = job.get("system") if not system or system in self.supported_systems: # report eval errors filtered_jobs.append(job) drv_show_log: Log = yield self.getLog("stdio") drv_show_log.addStdout(f"getting derivation infos\n") cmd = yield self.makeRemoteShellCommand( stdioLogName=None, collectStdout=True, command=( ["nix", "derivation", "show", "--recursive"] + [ drv for drv in (job.get("drvPath") for job in filtered_jobs) if drv ] ), ) yield self.runCommand(cmd) drv_show_log.addStdout(f"done\n") try: drv_info = json.loads(cmd.stdout) except json.JSONDecodeError as e: msg = f"Failed to parse `nix derivation show` output for {cmd.command}" raise BuildbotNixError(msg) from e self.build.addStepsAfterCurrentStep( [ BuildTrigger( builds_scheduler=f"lix-nix-build", name="build flake", jobs=filtered_jobs, drv_info=drv_info, ), ], ) return result class NixBuildCommand(buildstep.ShellMixin, steps.BuildStep): """Builds a nix derivation.""" def __init__(self, **kwargs: Any) -> None: kwargs = self.setupShellMixin(kwargs) super().__init__(**kwargs) @defer.inlineCallbacks def run(self) -> Generator[Any, object, Any]: if error := self.getProperty("error"): attr = self.getProperty("attr") # show eval error error_log: Log = yield self.addLog("nix_error") error_log.addStderr(f"{attr} failed to evaluate:\n{error}") return util.FAILURE if self.getProperty("isCached"): yield self.addCompleteLog( "cached outpath from previous builds", # buildbot apparently hides the first line in the ui? f'\n{self.getProperty("out_path")}\n') return util.SKIPPED # run `nix build` cmd: remotecommand.RemoteCommand = yield self.makeRemoteShellCommand() yield self.runCommand(cmd) return cmd.results() class UpdateBuildOutput(steps.BuildStep): """Updates store paths in a public www directory. This is useful to prefetch updates without having to evaluate on the target machine. """ def __init__(self, path: Path, **kwargs: Any) -> None: super().__init__(**kwargs) self.path = path def run(self) -> Generator[Any, object, Any]: props = self.build.getProperties() if props.getProperty("branch") != props.getProperty( "github.repository.default_branch", ): return util.SKIPPED attr = Path(props.getProperty("attr")).name out_path = props.getProperty("out_path") # XXX don't hardcode this self.path.mkdir(parents=True, exist_ok=True) (self.path / attr).write_text(out_path) return util.SUCCESS def nix_eval_config( project: GerritProject, gerrit_private_key: str, worker_names: list[str], supported_systems: list[str], eval_lock: util.MasterLock, worker_count: int, max_memory_size: int, ) -> util.BuilderConfig: """Uses nix-eval-jobs to evaluate hydraJobs from flake.nix in parallel. For each evaluated attribute a new build pipeline is started. """ factory = util.BuildFactory() # check out the source factory.addStep( steps.Gerrit( repourl="ssh://buildbot@gerrit.lix.systems:2022/lix", mode="full", retry=[60, 60], timeout=3600, sshPrivateKey=gerrit_private_key ), ) # use one gcroots directory per worker. this should be scoped to the largest unique resource # in charge of builds (ie, buildnumber is too narrow) to not litter the system with permanent # gcroots in case of worker restarts. # TODO perhaps we should clean the entire /drvs/ directory up too during startup. drv_gcroots_dir = util.Interpolate( "/nix/var/nix/gcroots/per-user/buildbot-worker/%(prop:project)s/drvs/%(prop:workername)s/", ) factory.addStep( NixEvalCommand( env={}, name="evaluate flake", supported_systems=supported_systems, command=[ "nix-eval-jobs", "--workers", str(worker_count), "--max-memory-size", str(max_memory_size), "--option", "accept-flake-config", "true", "--gc-roots-dir", drv_gcroots_dir, "--force-recurse", "--check-cache-status", "--flake", ".#hydraJobs", ], haltOnFailure=True, locks=[eval_lock.access("exclusive")], ), ) factory.addStep( steps.ShellCommand( name="Cleanup drv paths", command=[ "rm", "-rf", drv_gcroots_dir, ], alwaysRun=True, ), ) return util.BuilderConfig( name=f"{project.name}/nix-eval", workernames=worker_names, project=project.name, factory=factory, properties=dict(status_name="nix-eval"), ) def nix_build_config( project: GerritProject, worker_names: list[str], outputs_path: Path | None = None, ) -> util.BuilderConfig: """Builds one nix flake attribute.""" factory = util.BuildFactory() factory.addStep( NixBuildCommand( env={}, name="Build flake attr", command=[ "nix", "build", "-L", "--option", "keep-going", "true", "--option", # stop stuck builds after 20 minutes "--max-silent-time", str(60 * 20), "--accept-flake-config", "--out-link", util.Interpolate("result-%(prop:attr)s"), util.Interpolate("%(prop:drv_path)s^*"), ], # 3 hours, defaults to 20 minutes # We increase this over the default since the build output might end up in a different `nix build`. timeout=60 * 60 * 3, haltOnFailure=True, ), ) factory.addStep( steps.ShellCommand( name="Register gcroot", command=[ "nix-store", "--add-root", # FIXME: cleanup old build attributes util.Interpolate( "/nix/var/nix/gcroots/per-user/buildbot-worker/%(prop:project)s/%(prop:attr)s", ), "-r", util.Property("out_path"), ], doStepIf=lambda s: s.getProperty("branch") == s.getProperty("github.repository.default_branch"), ), ) factory.addStep( steps.ShellCommand( name="Delete temporary gcroots", command=["rm", "-f", util.Interpolate("result-%(prop:attr)s")], ), ) if outputs_path is not None: factory.addStep( UpdateBuildOutput( name="Update build output", path=outputs_path, ), ) return util.BuilderConfig( name=f"{project.name}/nix-build", project=project.name, workernames=worker_names, collapseRequests=False, env={}, factory=factory, ) def read_secret_file(secret_name: str) -> str: directory = os.environ.get("CREDENTIALS_DIRECTORY") if directory is None: print("directory not set", file=sys.stderr) sys.exit(1) return Path(directory).joinpath(secret_name).read_text().rstrip() def config_for_project( config: dict[str, Any], project: GerritProject, worker_names: list[str], nix_supported_systems: list[str], nix_eval_worker_count: int, nix_eval_max_memory_size: int, eval_lock: util.MasterLock, outputs_path: Path | None = None, ) -> Project: config["projects"].append(Project(project.name)) config["schedulers"].extend( [ # build everything pertaining to a project # TODO(raito): will this catch also post-merge? we don't really care about that… do we? schedulers.SingleBranchScheduler( name=f"{project.name}-changes", change_filter=util.ChangeFilter( project=project.name, ), builderNames=[f"{project.name}/nix-eval"], ), # this is triggered from `nix-eval` schedulers.Triggerable( name=f"{project.name}-nix-build", builderNames=[f"{project.name}/nix-build"], ), # allow to manually trigger a nix-build schedulers.ForceScheduler( name=f"{project.name}-force", builderNames=[f"{project.name}/nix-eval"], properties=[ util.StringParameter( name="project", label="Name of the Gerrit repository.", default=project.name, ), ], ), ], ) gerrit_private_key = None with open('/var/lib/buildbot/master/id_gerrit', 'r') as f: gerrit_private_key = f.read() if gerrit_private_key is None: raise RuntimeError('No gerrit private key to fetch the repositories') config["builders"].extend( [ # Since all workers run on the same machine, we only assign one of them to do the evaluation. # This should prevent exessive memory usage. nix_eval_config( project, gerrit_private_key, worker_names, supported_systems=nix_supported_systems, worker_count=nix_eval_worker_count, max_memory_size=nix_eval_max_memory_size, eval_lock=eval_lock, ), nix_build_config( project, worker_names, outputs_path=outputs_path, ), ], ) class PeriodicWithStartup(schedulers.Periodic): def __init__(self, *args: Any, run_on_startup: bool = False, **kwargs: Any) -> None: super().__init__(*args, **kwargs) self.run_on_startup = run_on_startup @defer.inlineCallbacks def activate(self) -> Generator[Any, object, Any]: if self.run_on_startup: yield self.setState("last_build", None) yield super().activate() def gerritReviewCB(builderName, build, result, master, arg): if result == util.RETRY: return dict() if builderName != 'lix/nix-eval': return dict() all_checks = {} for step in build['steps']: if step['name'] != 'build flake': continue for url in step['urls']: if url['name'].startswith('success: hydraJobs.'): path = url['name'].split(' ')[1] all_checks[path] = (True, url['url']) elif url['name'].startswith('failure: hydraJobs.'): path = url['name'].split(' ')[1] all_checks[path] = (False, url['url']) collected_oses = {} for check in all_checks: arch = check.split('.')[-1] if not arch.endswith('-linux') and not arch.endswith('-darwin'): # Not an architecture-specific job, just a test os = "test" else: os = arch.split('-')[1] (success, failure) = collected_oses.get(os, (0, 0)) if all_checks[check][0]: success += 1 else: failure += 1 collected_oses[os] = (success, failure) labels = {} if 'linux' in collected_oses: (success, failure) = collected_oses['linux'] if success > 0 and failure == 0: labels['Verified-On-Linux'] = 1 elif failure > 0: labels['Verified-On-Linux'] = -1 if 'darwin' in collected_oses: (success, failure) = collected_oses['darwin'] if success > 0 and failure == 0: labels['Verified-On-Darwin'] = 1 elif failure > 0: labels['Verified-On-Darwin'] = -1 message = "Buildbot finished compiling your patchset!\n" message += "The result is: %s\n" % util.Results[result].upper() if result != util.SUCCESS: successful_checks = [] failed_checks = [] for check in all_checks: if not all_checks[check][0]: failed_checks.append(f" - {check} (see {all_checks[check][1]})") if len(failed_checks) > 0: message += "Failed checks:\n" + "\n".join(failed_checks) + "\n" if arg: message += "\nFor more details visit:\n" message += build['url'] + "\n" return dict(message=message, labels=labels) def gerritStartCB(builderName, build, arg): message = "Buildbot started compiling your patchset\n" message += "on configuration: %s\n" % builderName message += "See your build here: %s" % build['url'] return dict(message=message) def gerritSummaryCB(buildInfoList, results, status, arg): success = False failure = False msgs = [] for buildInfo in buildInfoList: msg = "Builder %(name)s %(resultText)s (%(text)s)" % buildInfo link = buildInfo.get('url', None) if link: msg += " - " + link else: msg += "." msgs.append(msg) if buildInfo['result'] == util.SUCCESS: success = True else: failure = True if success and not failure: verified = 1 else: verified = -1 return dict(message='\n\n'.join(msgs), labels={ 'Verified': verified }) class GerritNixConfigurator(ConfiguratorBase): """Janitor is a configurator which create a Janitor Builder with all needed Janitor steps""" def __init__( self, # Shape of this file: [ { "name": "", "pass": "", "cores": "" } ] gerrit_server: str, gerrit_user: str, gerrit_port: int, gerrit_sshkey_path: str, url: str, nix_supported_systems: list[str], nix_eval_worker_count: int | None, nix_eval_max_memory_size: int, nix_workers_secret_name: str = "buildbot-nix-workers", # noqa: S107 outputs_path: str | None = None, ) -> None: super().__init__() self.gerrit_server = gerrit_server self.gerrit_user = gerrit_user self.gerrit_port = gerrit_port self.nix_workers_secret_name = nix_workers_secret_name self.nix_eval_max_memory_size = nix_eval_max_memory_size self.nix_eval_worker_count = nix_eval_worker_count self.nix_supported_systems = nix_supported_systems self.gerrit_change_source = GerritChangeSource(gerrit_server, gerrit_user, gerritport=gerrit_port, identity_file=gerrit_sshkey_path) self.url = url if outputs_path is None: self.outputs_path = None else: self.outputs_path = Path(outputs_path) def configure(self, config: dict[str, Any]) -> None: worker_config = json.loads(read_secret_file(self.nix_workers_secret_name)) worker_names = [] config.setdefault("projects", []) config.setdefault("secretsProviders", []) config.setdefault("www", {}) for item in worker_config: cores = item.get("cores", 0) for i in range(cores): worker_name = f"{item['name']}-{i:03}" config["workers"].append(worker.Worker(worker_name, item["pass"])) worker_names.append(worker_name) eval_lock = util.MasterLock("nix-eval") # Configure the Lix project. config_for_project( config, GerritProject(name="lix"), worker_names, self.nix_supported_systems, self.nix_eval_worker_count or multiprocessing.cpu_count(), self.nix_eval_max_memory_size, eval_lock, self.outputs_path, ) config["change_source"] = self.gerrit_change_source config["services"].append( reporters.GerritStatusPush(self.gerrit_server, self.gerrit_user, port=2022, identity_file='/var/lib/buildbot/master/id_gerrit', summaryCB=None, startCB=None, wantSteps=True, reviewCB=gerritReviewCB, reviewArg=self.url) # startCB=gerritStartCB, # startArg=self.url, # summaryCB=gerritSummaryCB, # summaryArg=self.url) ) systemd_secrets = secrets.SecretInAFile( dirname=os.environ["CREDENTIALS_DIRECTORY"], ) config["secretsProviders"].append(systemd_secrets) config["www"].setdefault("plugins", {}) config["www"]["plugins"].update(dict(base_react={})) if "auth" not in config["www"]: config["www"]["auth"] = LixSystemsOAuth2('buildbot', read_secret_file('buildbot-oauth2-secret'), autologin=True)