import json import multiprocessing import os import sys import graphlib import base64 from collections.abc import Generator from dataclasses import dataclass, field from pathlib import Path from typing import TYPE_CHECKING, Any from buildbot.configurators import ConfiguratorBase from buildbot.plugins import reporters, schedulers, secrets, steps, util, worker from buildbot.process import buildstep, logobserver, remotecommand from buildbot.process.project import Project from buildbot.process.properties import Properties from buildbot.process.results import ALL_RESULTS, statusToString from buildbot.www.auth import AuthBase from buildbot.www.oauth2 import OAuth2Auth from buildbot.changes.gerritchangesource import GerritChangeSource from buildbot.reporters.utils import getURLForBuildrequest from buildbot.reporters.generators.build import BuildStatusGenerator from buildbot.reporters.message import MessageFormatterFunction from buildbot.process.buildstep import EXCEPTION from buildbot.process.buildstep import SUCCESS from buildbot.process.buildstep import BuildStepFailed from buildbot.process.results import worst_status if TYPE_CHECKING: from buildbot.process.log import Log from twisted.internet import defer from twisted.logger import Logger from .binary_cache import S3BinaryCacheConfig log = Logger() FLAKE_TARGET_ATTRIBUTE_FOR_JOBS = "buildbotJobs" @dataclass class EvaluatorSettings: supported_systems: list[str] worker_count: int max_memory_size: int gc_roots_dir: str lock: util.MasterLock @dataclass class NixBuilder: protocol: str hostName: str maxJobs: int speedFactor: int = 1 # without base64 publicHostKey: str | None = None sshUser: str | None = None sshKey: str | None = None systems: list[str] = field(default_factory=lambda: ["-"]) supportedFeatures: list[str] = field(default_factory=lambda: ["-"]) mandatoryFeatures: list[str] = field(default_factory=lambda: ["-"]) def to_nix_line(self): encoded_public_key = base64.b64encode(self.publicHostKey.encode('ascii')).decode('ascii') if self.publicHostKey is not None else "-" fullConnection = f"{self.protocol}://{self.sshUser}@{self.hostName}" if self.sshUser is not None else self.hostName return f"{fullConnection} {",".join(self.systems)} {self.sshKey or "-"} {self.maxJobs} {self.speedFactor} {",".join(self.supportedFeatures)} {",".join(self.mandatoryFeatures)} {encoded_public_key}" @dataclass class OAuth2Config: name: str faIcon: str resourceEndpoint: str authUri: str tokenUri: str def make_oauth2_method(oauth2_config: OAuth2Config): """ This constructs dynamically a class inheriting an OAuth2 base configured using a dataclass. """ return type(f'{oauth2_config.name}DynamicOAuth2', (OAuth2Auth,), oauth2_config.__dict__) class BuildbotNixError(Exception): pass @dataclass class GerritProject: # `project` field. name: str # Private SSH key path to access Gerrit API private_sshkey_path: str @dataclass class GerritConfig: # Gerrit server domain domain: str port: int username: str @property def repourl_template(self) -> str: """ Returns the prefix to build a repourl using that gerrit configuration. """ return f'ssh://{self.username}@{self.domain}:{self.port}/' class BuildTrigger(steps.BuildStep): def __init__( self, builds_scheduler_group: str, jobs: list[dict[str, Any]], all_deps: dict[str, Any], **kwargs: Any, ) -> None: self.jobs = jobs self.all_deps = all_deps self.config = None self.builds_scheduler_group = builds_scheduler_group self._result_list = [] self.ended = False self.waitForFinishDeferred = None self.brids = [] self.description = f"building {len(jobs)} jobs" super().__init__(**kwargs) def interrupt(self, reason): # We cancel the buildrequests, as the data api handles # both cases: # - build started: stop is sent, # - build not created yet: related buildrequests are set to CANCELLED. # Note that there is an identified race condition though (more details # are available at buildbot.data.buildrequests). for brid in self.brids: self.master.data.control( "cancel", {'reason': 'parent build was interrupted'}, ("buildrequests", brid) ) if self.running and not self.ended: self.ended = True # if we are interrupted because of a connection lost, we interrupt synchronously if self.build.conn is None and self.waitForFinishDeferred is not None: self.waitForFinishDeferred.cancel() def getSchedulerByName(self, name): schedulers = self.master.scheduler_manager.namedServices if name not in schedulers: raise ValueError(f"unknown triggered scheduler: {repr(name)}") sch = schedulers[name] # todo: check ITriggerableScheduler return sch def schedule_one(self, build_props: Properties, job): project_name = build_props.getProperty("event.refUpdate.project") or build_props.getProperty("event.change.project") source = f"{project_name}-eval" attr = job.get("attr", "eval-error") name = f"buildbotJobs.{attr}" error = job.get("error") props = Properties() props.setProperty("virtual_builder_name", name, source) props.setProperty("status_name", f"building buildbotJobs.{attr}", source) props.setProperty("virtual_builder_tags", "", source) if error is not None: props.setProperty("error", error, source) return (f"{self.builds_scheduler_group}-other", props) drv_path = job.get("drvPath") system = job.get("system") out_path = job.get("outputs", {}).get("out") build_props.setProperty(f"{attr}-out_path", out_path, source) build_props.setProperty(f"{attr}-drv_path", drv_path, source) props.setProperty("attr", attr, source) props.setProperty("system", system, source) props.setProperty("drv_path", drv_path, source) props.setProperty("out_path", out_path, source) props.setProperty("isCached", job.get("isCached"), source) return (f"{self.builds_scheduler_group}-{system}", props) @defer.inlineCallbacks def _add_results(self, brid): @defer.inlineCallbacks def _is_buildrequest_complete(brid): buildrequest = yield self.master.db.buildrequests.getBuildRequest(brid) return buildrequest['complete'] event = ('buildrequests', str(brid), 'complete') yield self.master.mq.waitUntilEvent(event, lambda: _is_buildrequest_complete(brid)) builds = yield self.master.db.builds.getBuilds(buildrequestid=brid) for build in builds: self._result_list.append(build["results"]) self.updateSummary() def prepareSourcestampListForTrigger(self): ss_for_trigger = {} objs_from_build = self.build.getAllSourceStamps() for ss in objs_from_build: ss_for_trigger[ss.codebase] = ss.asDict() trigger_values = [ss_for_trigger[k] for k in sorted(ss_for_trigger.keys())] return trigger_values @defer.inlineCallbacks def run(self): self.running = True build_props = self.build.getProperties() logs: Log = yield self.addLog("build info") builds_to_schedule = list(self.jobs) build_schedule_order = [] sorter = graphlib.TopologicalSorter(self.all_deps) for item in sorter.static_order(): i = 0 while i < len(builds_to_schedule): if item == builds_to_schedule[i].get("drvPath"): build_schedule_order.append(builds_to_schedule[i]) del builds_to_schedule[i] else: i += 1 done = [] scheduled = [] failed = [] all_results = SUCCESS ss_for_trigger = self.prepareSourcestampListForTrigger() while not self.ended and (len(build_schedule_order) > 0 or len(scheduled) > 0): schedule_now = [] for build in list(build_schedule_order): if self.all_deps.get(build.get("drvPath"), []) == []: build_schedule_order.remove(build) schedule_now.append(build) for job in schedule_now: if job.get('isCached'): logs.addStdout(f"Cached {job.get('attr')} ({job.get('drvPath')}) - skipping\n") for dep in self.all_deps: if job.get("drvPath") in self.all_deps[dep]: self.all_deps[dep].remove(job.get("drvPath")) continue logs.addStdout(f"Scheduling {job.get('attr')} ({job.get('drvPath')})\n") (scheduler, props) = self.schedule_one(build_props, job) scheduler = self.getSchedulerByName(scheduler) idsDeferred, resultsDeferred = scheduler.trigger( waited_for = True, sourcestamps = ss_for_trigger, set_props = props, parent_buildid = self.build.buildid, parent_relationship = "Triggered from", ) brids = {} try: _, brids = yield idsDeferred except Exception as e: yield self.addLogWithException(e) results = EXCEPTION scheduled.append((job, brids, resultsDeferred)) for brid in brids.values(): url = getURLForBuildrequest(self.master, brid) yield self.addURL(f"{scheduler.name} #{brid}", url) self._add_results(brid) self.brids.append(brid) if len(scheduled) == 0: if len(build_schedule_order) == 0: logs.addStderr('Ran out of builds\n') break continue wait_for_next = defer.DeferredList([results for _, _, results in scheduled], fireOnOneCallback = True, fireOnOneErrback=True) self.waitForFinishDeferred = wait_for_next results, index = yield wait_for_next job, brids, _ = scheduled[index] done.append((job, brids, results)) del scheduled[index] result = results[0] logs.addStdout(f'Build {job.get("attr")} ({job.get("drvPath")}) finished, result {util.Results[result].upper()}\n') if result != SUCCESS: failed_checks = [] failed_paths = [job.get('drvPath')] removed = [] failed.append(( job.get("attr"), "failed", [ getURLForBuildrequest(self.master, brid) for brid in brids.values() ] )) while True: old_paths = list(failed_paths) for build in list(build_schedule_order): deps = self.all_deps.get(build.get("drvPath"), []) for path in old_paths: if path in deps: failed_checks.append(build) failed_paths.append(build.get("drvPath")) build_schedule_order.remove(build) removed.append(build.get("attr")) failed.append((build.get("attr"), f"dependency {job.get('attr')} failed", [])) break if old_paths == failed_paths: break if len(removed) > 3: yield logs.addStdout(' Skipping jobs: ' + ', '.join(removed[:3]) + f', ... ({len(removed) - 3} more)\n') else: yield logs.addStdout(' Skipping jobs: ' + ', '.join(removed) + '\n') all_results = worst_status(result, all_results) for dep in self.all_deps: if job.get("drvPath") in self.all_deps[dep]: self.all_deps[dep].remove(job.get("drvPath")) yield logs.addHeader('Done!\n') yield logs.finish() build_props.setProperty("failed_builds", failed, "nix-eval") if self.ended: return util.CANCELLED return all_results def getCurrentSummary(self) -> dict[str, str]: # noqa: N802 summary = [] if self._result_list: for status in ALL_RESULTS: count = self._result_list.count(status) if count: summary.append( f"{self._result_list.count(status)} {statusToString(status, count)}", ) return {"step": f"({', '.join(summary)})"} class NixEvalCommand(buildstep.ShellMixin, steps.BuildStep): """Parses the output of `nix-eval-jobs` and triggers a `nix-build` build for every attribute. """ def __init__(self, supported_systems: list[str], **kwargs: Any) -> None: kwargs = self.setupShellMixin(kwargs) super().__init__(**kwargs) self.observer = logobserver.BufferLogObserver() self.addLogObserver("stdio", self.observer) self.supported_systems = supported_systems @defer.inlineCallbacks def run(self) -> Generator[Any, object, Any]: # run nix-eval-jobs --flake .#$FLAKE_TARGET_ATTRIBUTE_FOR_JOBS to generate the dict of stages cmd: remotecommand.RemoteCommand = yield self.makeRemoteShellCommand() build_props = self.build.getProperties() project_name = build_props.getProperty("event.refUpdate.project") or build_props.getProperty("event.change.project") assert project_name is not None, "`event.refUpdate.project` or `event.change.project` is not available on the build properties, unexpected build type!" yield self.runCommand(cmd) # if the command passes extract the list of stages result = cmd.results() if result == util.SUCCESS: # create a ShellCommand for each stage and add them to the build jobs = [] for line in self.observer.getStdout().split("\n"): if line != "": try: job = json.loads(line) except json.JSONDecodeError as e: msg = f"Failed to parse line: {line}" raise BuildbotNixError(msg) from e jobs.append(job) filtered_jobs = [] for job in jobs: system = job.get("system") if not system or system in self.supported_systems: # report eval errors filtered_jobs.append(job) # Filter out failed evaluations succeeded_jobs = [job for job in filtered_jobs if job.get('error') is None] drv_show_log: Log = yield self.getLog("stdio") all_deps = dict() def closure_of(key, deps): r, size = set([key]), 0 while len(r) != size: size = len(r) r.update(*[ deps[k] for k in r ]) return r.difference([key]) if succeeded_jobs: drv_show_log.addStdout(f"getting derivation infos for valid derivations\n") cmd = yield self.makeRemoteShellCommand( stdioLogName=None, collectStdout=True, command=( ["nix", "derivation", "show", "--recursive"] + [ drv for drv in (job.get("drvPath") for job in succeeded_jobs) if drv ] ), ) yield self.runCommand(cmd) drv_show_log.addStdout(f"done\n") try: drv_info = json.loads(cmd.stdout) except json.JSONDecodeError as e: msg = f"Failed to parse `nix derivation show` output for {cmd.command}" raise BuildbotNixError(msg) from e for drv, info in drv_info.items(): all_deps[drv] = set(info.get("inputDrvs").keys()) job_set = set(( drv for drv in ( job.get("drvPath") for job in filtered_jobs ) if drv )) all_deps = { k: list(closure_of(k, all_deps).intersection(job_set)) for k in job_set } self.build.addStepsAfterCurrentStep( [ BuildTrigger( builds_scheduler_group=f"{project_name}-nix-build", name="build derivations: ", jobs=filtered_jobs, all_deps=all_deps, ), ], ) return result def make_job_evaluator(name: str, settings: EvaluatorSettings, flake: bool) -> NixEvalCommand: actual_command = [] if flake: actual_command += ["--flake", f".#{FLAKE_TARGET_ATTRIBUTE_FOR_JOBS}"] else: actual_command += ["--expr", "import ./.ci/buildbot.nix"] return NixEvalCommand( env={}, name=name, supported_systems=settings.supported_systems, command=[ "nix-eval-jobs", "--workers", str(settings.worker_count), "--max-memory-size", str(settings.max_memory_size), "--gc-roots-dir", settings.gc_roots_dir, "--force-recurse", "--check-cache-status", ] + actual_command, haltOnFailure=True, locks=[settings.lock.access("exclusive")] ) class NixConfigure(buildstep.CommandMixin, steps.BuildStep): name = "determining jobs" """ Determine what `NixEvalCommand` step should be added after based on the existence of: - flake.nix - .ci/buildbot.nix """ def __init__(self, eval_settings: EvaluatorSettings, **kwargs: Any) -> None: self.evaluator_settings = eval_settings super().__init__(**kwargs) self.observer = logobserver.BufferLogObserver() self.addLogObserver("stdio", self.observer) @defer.inlineCallbacks def run(self) -> Generator[Any, object, Any]: try: configure_log: Log = yield self.getLog("stdio") except Exception: configure_log: Log = yield self.addLog("stdio") # Takes precedence. configure_log.addStdout("checking if there's a .ci/buildbot.nix...\n") ci_buildbot_defn_exists = yield self.pathExists('build/.ci/buildbot.nix') if ci_buildbot_defn_exists: configure_log.addStdout(".ci/buildbot.nix found, configured for non-flake CI\n") self.build.addStepsAfterCurrentStep( [ make_job_evaluator( "evaluate `.ci/buildbot.nix` jobs", self.evaluator_settings, False ) ] ) return SUCCESS flake_exists = yield self.pathExists('build/flake.nix') if flake_exists: configure_log.addStdout(f"flake.nix found") self.build.addStepsAfterCurrentStep([ make_job_evaluator( "evaluate `flake.nix` jobs", self.evaluator_settings, True ) ] ) return SUCCESS configure_log.addStdout("neither flake.nix found neither .ci/buildbot.nix, no CI to run!") return SUCCESS class NixBuildCommand(buildstep.ShellMixin, steps.BuildStep): """Builds a nix derivation.""" def __init__(self, **kwargs: Any) -> None: kwargs = self.setupShellMixin(kwargs) super().__init__(**kwargs) @defer.inlineCallbacks def run(self) -> Generator[Any, object, Any]: if error := self.getProperty("error"): attr = self.getProperty("attr") # show eval error error_log: Log = yield self.addLog("nix_error") error_log.addStderr(f"{attr} failed to evaluate:\n{error}") return util.FAILURE if self.getProperty("isCached"): yield self.addCompleteLog( "cached outpath from previous builds", # buildbot apparently hides the first line in the ui? f'\n{self.getProperty("out_path")}\n') return util.SKIPPED # run `nix build` cmd: remotecommand.RemoteCommand = yield self.makeRemoteShellCommand() yield self.runCommand(cmd) return cmd.results() def nix_eval_config( gerrit_config: GerritConfig, project: GerritProject, worker_names: list[str], supported_systems: list[str], eval_lock: util.MasterLock, worker_count: int, max_memory_size: int, ) -> util.BuilderConfig: """ Uses nix-eval-jobs to evaluate the entrypoint of this project. For each evaluated attribute a new build pipeline is started. """ factory = util.BuildFactory() gerrit_private_key = None with open(project.private_sshkey_path, 'r') as f: gerrit_private_key = f.read() if gerrit_private_key is None: raise RuntimeError('No gerrit private key to fetch the repositories') # check out the source factory.addStep( steps.Gerrit( repourl=f'{gerrit_config.repourl_template}/{project.name}', mode="full", retry=[60, 60], timeout=3600, sshPrivateKey=gerrit_private_key ), ) # use one gcroots directory per worker. this should be scoped to the largest unique resource # in charge of builds (ie, buildnumber is too narrow) to not litter the system with permanent # gcroots in case of worker restarts. # TODO perhaps we should clean the entire /drvs/ directory up too during startup. drv_gcroots_dir = util.Interpolate( "/nix/var/nix/gcroots/per-user/buildbot-worker/%(prop:project)s/drvs/%(prop:workername)s/", ) eval_settings = EvaluatorSettings( supported_systems=supported_systems, worker_count=worker_count, max_memory_size=max_memory_size, gc_roots_dir=drv_gcroots_dir, lock=eval_lock ) # NixConfigure will choose # how to add a NixEvalCommand job # based on whether there's a flake.nix or # a .ci/buildbot.nix. factory.addStep( NixConfigure( eval_settings ) ) factory.addStep( steps.ShellCommand( name="Cleanup drv paths", command=[ "rm", "-rf", drv_gcroots_dir, ], alwaysRun=True, ), ) return util.BuilderConfig( name=f"{project.name}/nix-eval", workernames=worker_names, project=project.name, factory=factory, properties=dict(status_name="nix-eval"), ) def nix_build_config( project: GerritProject, worker_arch: str, worker_names: list[str], builders_spec: str, signing_keyfile: str | None = None, binary_cache_config: S3BinaryCacheConfig | None = None ) -> util.BuilderConfig: """Builds one nix flake attribute.""" factory = util.BuildFactory() factory.addStep( steps.ShellCommand( name="Copy the derivation to the local worker store", command=[ "nix", "copy", "--derivation", "--to", "../store", util.Interpolate("%(prop:drv_path)s^*") ] ) ) factory.addStep( NixBuildCommand( env={}, name="Build flake attr", command=[ "nix", "build", "-L", "--option", "keep-going", "true", # do not build directly on the coordinator "--max-jobs", "0", "--option", # stop stuck builds after 20 minutes "--max-silent-time", str(60 * 20), "--builders", builders_spec, "--store", util.Interpolate("%(prop:builddir)s/store"), "--out-link", util.Interpolate("result-%(prop:attr)s"), util.Interpolate("%(prop:drv_path)s^*"), ], # 3 hours, defaults to 20 minutes # We increase this over the default since the build output might end up in a different `nix build`. timeout=60 * 60 * 3, haltOnFailure=True, ), ) if signing_keyfile is not None: factory.addStep( steps.ShellCommand( name="Sign the store path", command=[ "nix", "store", "sign", "--key-file", signing_keyfile, util.Interpolate( "%(prop:drv_path)s^*" ) ] ), ) if binary_cache_config is not None: factory.addStep( steps.ShellCommand( name="Upload the store path to the cache", command=[ "nix", "copy", "--to", f"s3://{binary_cache_config.bucket}?profile={binary_cache_config.profile}®ion={binary_cache_config.region}&endpoint={binary_cache_config.endpoint}", util.Property( "out_path" ) ] ) ) factory.addStep( steps.ShellCommand( name="Register gcroot", command=[ "nix-store", "--add-root", # FIXME: cleanup old build attributes util.Interpolate( "/nix/var/nix/gcroots/per-user/buildbot-worker/%(prop:project)s/%(prop:attr)s", ), "-r", util.Property("out_path"), ], doStepIf=lambda s: s.getProperty("branch") == s.getProperty("github.repository.default_branch"), ), ) factory.addStep( steps.ShellCommand( name="Delete temporary gcroots", command=["rm", "-f", util.Interpolate("result-%(prop:attr)s")], ), ) return util.BuilderConfig( name=f"{project.name}/nix-build/{worker_arch}", project=project.name, workernames=worker_names, collapseRequests=False, env={}, factory=factory, ) def assemble_secret_file_path(secret_name: str) -> Path: directory = os.environ.get("CREDENTIALS_DIRECTORY") if directory is None: print("directory not set", file=sys.stderr) sys.exit(1) return Path(directory).joinpath(secret_name) def read_secret_file(secret_name: str) -> str: return assemble_secret_file_path(secret_name).read_text().rstrip() def config_for_project( config: dict[str, Any], gerrit_config: GerritConfig, project: GerritProject, worker_names: list[str], nix_supported_systems: list[str], nix_eval_worker_count: int, nix_eval_max_memory_size: int, eval_lock: util.MasterLock, builders_spec: str, signing_keyfile: str | None = None, binary_cache_config: S3BinaryCacheConfig | None = None ) -> Project: config["projects"].append(Project(project.name)) config["schedulers"].extend( [ # build everything pertaining to a project # TODO(raito): will this catch also post-merge? we don't really care about that… do we? schedulers.SingleBranchScheduler( name=f"{project.name}-changes", change_filter=util.ChangeFilter( project=project.name, ), builderNames=[f"{project.name}/nix-eval"], ), # this is triggered from `nix-eval` *( schedulers.Triggerable( name=f"{project.name}-nix-build-{arch}", builderNames=[f"{project.name}/nix-build/{arch}"], ) for arch in nix_supported_systems + [ "other" ] ), # allow to manually trigger a nix-build schedulers.ForceScheduler( name=f"{project.name}-force", builderNames=[f"{project.name}/nix-eval"], properties=[ util.StringParameter( name="project", label="Name of the Gerrit repository.", default=project.name, ), ], ), ], ) config["builders"].extend( [ # Since all workers run on the same machine, we only assign one of them to do the evaluation. # This should prevent exessive memory usage. nix_eval_config( gerrit_config, project, [ f"{w}-other" for w in worker_names ], supported_systems=nix_supported_systems, worker_count=nix_eval_worker_count, max_memory_size=nix_eval_max_memory_size, eval_lock=eval_lock, ), *( nix_build_config( project, arch, [ f"{w}-{arch}" for w in worker_names ], builders_spec, signing_keyfile=signing_keyfile, binary_cache_config=binary_cache_config ) for arch in nix_supported_systems + [ "other" ] ), ], ) class PeriodicWithStartup(schedulers.Periodic): def __init__(self, *args: Any, run_on_startup: bool = False, **kwargs: Any) -> None: super().__init__(*args, **kwargs) self.run_on_startup = run_on_startup @defer.inlineCallbacks def activate(self) -> Generator[Any, object, Any]: if self.run_on_startup: yield self.setState("last_build", None) yield super().activate() def gerritReviewFmt(url, data): if 'build' not in data: raise ValueError('`build` is supposed to be present to format a build') build = data['build'] if 'builder' not in build and 'name' not in build['builder']: raise ValueError('either `builder` or `builder.name` is not present in the build dictionary, unexpected format request') builderName = build['builder']['name'] if len(build['results']) != 1: raise ValueError('this review request contains more than one build results, unexpected format request') result = build['results'][0] if result == util.RETRY: return dict() if builderName != f'{build["properties"].get("event.project")}/nix-eval': return dict() failed = build['properties'].get('failed_builds', [[]])[0] labels = { 'Verified': -1 if result != util.SUCCESS else 1, } message = "Buildbot finished compiling your patchset!\n" message += "The result is: %s\n" % util.Results[result].upper() if result != util.SUCCESS: message += "\nFailed checks:\n" for check, how, urls in failed: if not urls: message += " " message += f" - {check}: {how}" if urls: message += f" (see {', '.join(urls)})" message += "\n" if url: message += "\nFor more details visit:\n" message += build['url'] + "\n" return dict(message=message, labels=labels) class GerritNixConfigurator(ConfiguratorBase): """Janitor is a configurator which create a Janitor Builder with all needed Janitor steps""" def __init__( self, # Shape of this file: [ { "name": "", "pass": "", "cores": "" } ] gerrit_server: str, gerrit_user: str, gerrit_port: int, gerrit_sshkey_path: str, projects: list[str], url: str, allowed_origins: list[str], nix_builders: list[dict[str, Any]], nix_supported_systems: list[str], nix_eval_worker_count: int | None, nix_eval_max_memory_size: int, nix_workers_secret_name: str = "buildbot-nix-workers", # noqa: S107 signing_keyfile: str | None = None, binary_cache_config: dict[str, str] | None = None, auth_method: AuthBase | None = None, manhole: Any = None, ) -> None: super().__init__() self.manhole = manhole self.allowed_origins = allowed_origins self.gerrit_server = gerrit_server self.gerrit_user = gerrit_user self.gerrit_port = gerrit_port self.gerrit_sshkey_path = str(gerrit_sshkey_path) self.gerrit_config = GerritConfig(domain=self.gerrit_server, username=self.gerrit_user, port=self.gerrit_port) self.projects = projects self.nix_workers_secret_name = nix_workers_secret_name self.nix_eval_max_memory_size = nix_eval_max_memory_size self.nix_eval_worker_count = nix_eval_worker_count self.nix_supported_systems = nix_supported_systems self.nix_builders: list[NixBuilder] = [NixBuilder(**builder_cfg) for builder_cfg in nix_builders] self.gerrit_change_source = GerritChangeSource(gerrit_server, gerrit_user, gerritport=gerrit_port, identity_file=gerrit_sshkey_path) self.url = url if binary_cache_config is not None: self.binary_cache_config = S3BinaryCacheConfig(**binary_cache_config) else: self.binary_cache_config = None self.signing_keyfile = signing_keyfile self.auth_method = auth_method def configure(self, config: dict[str, Any]) -> None: worker_config = json.loads(read_secret_file(self.nix_workers_secret_name)) worker_names = [] if self.manhole is not None: config["manhole"] = self.manhole config.setdefault("projects", []) config.setdefault("secretsProviders", []) config.setdefault("www", { 'allowed_origins': self.allowed_origins }) for item in worker_config: cores = item.get("cores", 0) for i in range(cores): for arch in self.nix_supported_systems + ["other"]: worker_name = f"{item['name']}-{i:03}" config["workers"].append(worker.Worker(f"{worker_name}-{arch}", item["pass"])) worker_names.append(worker_name) eval_lock = util.MasterLock("nix-eval") builders_spec = " ; ".join(builder.to_nix_line() for builder in self.nix_builders) for project in self.projects: config_for_project( config, self.gerrit_config, GerritProject(name=project, private_sshkey_path=self.gerrit_sshkey_path), worker_names, self.nix_supported_systems, self.nix_eval_worker_count or multiprocessing.cpu_count(), self.nix_eval_max_memory_size, eval_lock, builders_spec, signing_keyfile=self.signing_keyfile, binary_cache_config=self.binary_cache_config ) config["change_source"] = self.gerrit_change_source config["services"].append( reporters.GerritStatusPush(self.gerrit_server, self.gerrit_user, port=self.gerrit_port, identity_file=self.gerrit_sshkey_path, generators=[ # gerritReviewCB / self.url BuildStatusGenerator( message_formatter=MessageFormatterFunction( lambda data: gerritReviewFmt(self.url, data), "plain", want_properties=True, want_steps=True ), ), ]) # startCB, summaryCB are too noisy, we won't use them. ) # Upstream defaults pretend they already do something similar # but they didn't work, hence the custom function. def gerritBranchKey(b): ref = b['branch'] if not ref.startswith('refs/changes/'): return ref return ref.rsplit('/', 1)[0] config["services"].append( util.OldBuildCanceller( "build_canceller", filters=[ ( [ f"{project}/nix-{kind}" for kind in [ "eval" ] + [ f"build/{arch}" for arch in self.nix_supported_systems + [ "other" ] ] ], util.SourceStampFilter(project_eq=[project]) ) for project in self.projects ], branch_key=gerritBranchKey ) ) systemd_secrets = secrets.SecretInAFile( dirname=os.environ["CREDENTIALS_DIRECTORY"], ) config["secretsProviders"].append(systemd_secrets) config["www"].setdefault("plugins", {}) if "auth" not in config["www"] and self.auth_method is not None: config["www"]["auth"] = self.auth_method