From d2b6fd674cd45788d8144163e41c88aa467aab0e Mon Sep 17 00:00:00 2001 From: Puck Meerburg Date: Sun, 10 Mar 2024 21:27:24 +0000 Subject: [PATCH] WIP: Replace Trigger with custom logic --- buildbot_nix/__init__.py | 162 ++++++++++++++++++++++++++------------- 1 file changed, 110 insertions(+), 52 deletions(-) diff --git a/buildbot_nix/__init__.py b/buildbot_nix/__init__.py index 371f95b..55bc97d 100644 --- a/buildbot_nix/__init__.py +++ b/buildbot_nix/__init__.py @@ -3,6 +3,7 @@ import multiprocessing import os import sys import uuid +import graphlib from collections import defaultdict from collections.abc import Generator from dataclasses import dataclass @@ -20,6 +21,11 @@ from buildbot.util import asyncSleep from buildbot.www.authz.endpointmatchers import EndpointMatcherBase, Match from buildbot.www.oauth2 import OAuth2Auth from buildbot.changes.gerritchangesource import GerritChangeSource +from buildbot.reporters.utils import getURLForBuild +from buildbot.reporters.utils import getURLForBuildrequest +from buildbot.process.buildstep import CANCELLED +from buildbot.process.buildstep import EXCEPTION +from buildbot.process.buildstep import SUCCESS if TYPE_CHECKING: from buildbot.process.log import Log @@ -52,9 +58,7 @@ class GerritProject: # `project` field. name: str -class BuildTrigger(Trigger): - """Dynamic trigger that creates a build for every attribute.""" - +class BuildTrigger(steps.BuildStep): def __init__( self, builds_scheduler: str, @@ -63,29 +67,69 @@ class BuildTrigger(Trigger): drv_info: dict[str, Any], **kwargs: Any, ) -> None: - if "name" not in kwargs: - kwargs["name"] = "trigger" self.jobs = jobs self.drv_info = drv_info self.config = None self.builds_scheduler = builds_scheduler self.skipped_builds_scheduler = skipped_builds_scheduler - Trigger.__init__( - self, - waitForFinish=True, - schedulerNames=[builds_scheduler, skipped_builds_scheduler], - haltOnFailure=True, - flunkOnFailure=True, - sourceStamps=[], - alwaysUseLatest=False, - updateSourceStamp=False, - **kwargs, - ) - def createTriggerProperties(self, props: Any) -> Any: # noqa: N802 - return props + def getSchedulerByName(self, name): + schedulers = self.master.scheduler_manager.namedServices + if name not in schedulers: + raise ValueError(f"unknown triggered scheduler: {repr(name)}") + sch = schedulers[name] + # todo: check ITriggerableScheduler + return sch - def getSchedulersAndProperties(self) -> list[tuple[str, Properties]]: # noqa: N802 + def schedule_one(self, build_props, job): + attr = job.get("attr", "eval-error") + name = attr + name = f"hydraJobs.{name}" + error = job.get("error") + props = Properties() + props.setProperty("virtual_builder_name", name, source) + props.setProperty("status_name", f"nix-build .#hydraJobs.{attr}", source) + props.setProperty("virtual_builder_tags", "", source) + + if error is not None: + props.setProperty("error", error, source) + return (self.skipped_builds_scheduler, props) + + if job.get("isCached"): + return (self.skipped_builds_scheduler, props) + + drv_path = job.get("drvPath") + system = job.get("system") + out_path = job.get("outputs", {}).get("out") + + build_props.setProperty(f"{attr}-out_path", out_path, source) + build_props.setProperty(f"{attr}-drv_path", drv_path, source) + + props.setProperty("attr", attr, source) + props.setProperty("system", system, source) + props.setProperty("drv_path", drv_path, source) + props.setProperty("out_path", out_path, source) + # we use this to identify builds when running a retry + props.setProperty("build_uuid", str(uuid.uuid4()), source) + + return (self.builds_scheduler, props) + + @defer.inlineCallbacks + def _add_results(self, brid, results): + @defer.inlineCallbacks + def _is_buildrequest_complete(brid): + buildrequest = yield self.master.db.buildrequests.getBuildRequest(brid) + return buildrequest['complete'] + + event = ('buildrequests', str(brid), 'complete') + yield self.master.mq.waitUntilEvent(event, lambda: _is_buildrequest_complete(brid)) + builds = yield self.master.db.builds.getBuilds(buildrequestid=brid) + for build in builds: + self._result_list.append(build["results"]) + self.updateSummary() + + @defer.inlineCallbacks + def run(self): build_props = self.build.getProperties() source = f"nix-eval-lix" @@ -105,45 +149,59 @@ class BuildTrigger(Trigger): return r job_set = set(( drv for drv in ( job.get("drvPath") for job in self.jobs ) if drv )) all_deps = { k: list(closure_of(k, all_deps).intersection(job_set)) for k in job_set } + builds_to_schedule = list(self.jobs) + build_schedule_order = [] + sorter = graphlib.TopologicalSorter(all_deps) + for item in sorter.static_order(): + i = 0 + while i < builds_to_schedule.len(): + if item == builds_to_schedule[i].get("drvPath"): + build_schedule_order.append(builds_to_schedule[i]) + del builds_to_schedule[i] + else: + i += 1 - build_props.setProperty("sched_state", all_deps, source, True) + done = [] + scheduled = [] + while len(build_schedule_order) > 0 and len(scheduled) > 0: + schedule_now = [] + for build in list(build_schedule_order): + if all_deps.get(build.get("drvPath"), []) == []: + build_schedule_order.remove(build) + schedule_now.append(build) - triggered_schedulers = [] - for job in self.jobs: - attr = job.get("attr", "eval-error") - name = attr - name = f"hydraJobs.{name}" - error = job.get("error") - props = Properties() - props.setProperty("virtual_builder_name", name, source) - props.setProperty("status_name", f"nix-build .#hydraJobs.{attr}", source) - props.setProperty("virtual_builder_tags", "", source) + for job in schedule_now: + (scheduler, props) = self.schedule_one(build_props, job) + scheduler = self.getSchedulerByName(scheduler) - if error is not None: - props.setProperty("error", error, source) - triggered_schedulers.append((self.skipped_builds_scheduler, props)) - continue + idsDeferred, resultsDeferred = scheduler.trigger( + waited_for = True, + sourcestamps = ss_for_trigger, + set_props = props, + parent_buildid = self.build.buildid, + parent_relationship = "Triggered from", + ) - if job.get("isCached"): - triggered_schedulers.append((self.skipped_builds_scheduler, props)) - continue + brids = {} + try: + _, brids = yield idsDeferred + except Exception as e: + yield self.addLogWithException(e) + results = EXCEPTION + scheduled.append((job, brids, resultsDeferred)) - drv_path = job.get("drvPath") - system = job.get("system") - out_path = job.get("outputs", {}).get("out") + for brid in brids.values(): + url = getURLForBuildrequest(self.master, brid) + yield self.addURL(f"{sch.name} #{brid}", url) + self._add_results(brid) - build_props.setProperty(f"{attr}-out_path", out_path, source) - build_props.setProperty(f"{attr}-drv_path", drv_path, source) - - props.setProperty("attr", attr, source) - props.setProperty("system", system, source) - props.setProperty("drv_path", drv_path, source) - props.setProperty("out_path", out_path, source) - # we use this to identify builds when running a retry - props.setProperty("build_uuid", str(uuid.uuid4()), source) - - triggered_schedulers.append((self.builds_scheduler, props)) - return triggered_schedulers + wait_for_next = defer.DeferredList([results for _, _, results in scheduled], fireOnOneCallback = True, fireOnOneErrback=True) + results, index = yield wait_for_next + job, brids, _ = scheduled[index] + done.append((job, brids, results)) + del scheduled[index] + # TODO: remove dep from all_deps + # TODO: calculate final result def getCurrentSummary(self) -> dict[str, str]: # noqa: N802 """The original build trigger will the generic builder name `nix-build` in this case, which is not helpful"""