WIP: Replace Trigger with custom logic

This commit is contained in:
puck 2024-03-10 21:27:24 +00:00 committed by eldritch horrors
parent e9874c3d98
commit 28ca39af25

View file

@ -2,6 +2,7 @@ import json
import multiprocessing
import os
import sys
import graphlib
from collections import defaultdict
from collections.abc import Generator
from dataclasses import dataclass
@ -19,6 +20,11 @@ from buildbot.util import asyncSleep
from buildbot.www.authz.endpointmatchers import EndpointMatcherBase, Match
from buildbot.www.oauth2 import OAuth2Auth
from buildbot.changes.gerritchangesource import GerritChangeSource
from buildbot.reporters.utils import getURLForBuild
from buildbot.reporters.utils import getURLForBuildrequest
from buildbot.process.buildstep import CANCELLED
from buildbot.process.buildstep import EXCEPTION
from buildbot.process.buildstep import SUCCESS
if TYPE_CHECKING:
from buildbot.process.log import Log
@ -49,9 +55,7 @@ class GerritProject:
# `project` field.
name: str
class BuildTrigger(Trigger):
"""Dynamic trigger that creates a build for every attribute."""
class BuildTrigger(steps.BuildStep):
def __init__(
self,
builds_scheduler: str,
@ -59,47 +63,20 @@ class BuildTrigger(Trigger):
drv_info: dict[str, Any],
**kwargs: Any,
) -> None:
if "name" not in kwargs:
kwargs["name"] = "trigger"
self.jobs = jobs
self.drv_info = drv_info
self.config = None
self.builds_scheduler = builds_scheduler
Trigger.__init__(
self,
waitForFinish=True,
schedulerNames=[builds_scheduler],
haltOnFailure=True,
flunkOnFailure=True,
sourceStamps=[],
alwaysUseLatest=False,
updateSourceStamp=False,
**kwargs,
)
def createTriggerProperties(self, props: Any) -> Any: # noqa: N802
return props
def getSchedulerByName(self, name):
schedulers = self.master.scheduler_manager.namedServices
if name not in schedulers:
raise ValueError(f"unknown triggered scheduler: {repr(name)}")
sch = schedulers[name]
# todo: check ITriggerableScheduler
return sch
def getSchedulersAndProperties(self) -> list[tuple[str, Properties]]: # noqa: N802
build_props = self.build.getProperties()
source = f"nix-eval-lix"
all_deps = dict()
for drv, info in self.drv_info.items():
all_deps[drv] = set(info.get("inputDrvs").keys())
def closure_of(key, deps):
r, size = set([key]), 0
while len(r) != size:
size = len(r)
r.update(*[ deps[k] for k in r ])
return r.difference([key])
job_set = set(( drv for drv in ( job.get("drvPath") for job in self.jobs ) if drv ))
all_deps = { k: list(closure_of(k, all_deps).intersection(job_set)) for k in job_set }
build_props.setProperty("sched_state", all_deps, source, True)
triggered_schedulers = []
for job in self.jobs:
def schedule_one(self, build_props, job):
attr = job.get("attr", "eval-error")
name = attr
name = f"hydraJobs.{name}"
@ -111,8 +88,7 @@ class BuildTrigger(Trigger):
if error is not None:
props.setProperty("error", error, source)
triggered_schedulers.append((self.builds_scheduler, props))
continue
return (self.builds_scheduler, props)
drv_path = job.get("drvPath")
system = job.get("system")
@ -127,8 +103,91 @@ class BuildTrigger(Trigger):
props.setProperty("out_path", out_path, source)
props.setProperty("isCached", job.get("isCached"), source)
triggered_schedulers.append((self.builds_scheduler, props))
return triggered_schedulers
return (self.builds_scheduler, props)
@defer.inlineCallbacks
def _add_results(self, brid, results):
@defer.inlineCallbacks
def _is_buildrequest_complete(brid):
buildrequest = yield self.master.db.buildrequests.getBuildRequest(brid)
return buildrequest['complete']
event = ('buildrequests', str(brid), 'complete')
yield self.master.mq.waitUntilEvent(event, lambda: _is_buildrequest_complete(brid))
builds = yield self.master.db.builds.getBuilds(buildrequestid=brid)
for build in builds:
self._result_list.append(build["results"])
self.updateSummary()
@defer.inlineCallbacks
def run(self):
build_props = self.build.getProperties()
source = f"nix-eval-lix"
all_deps = dict()
for drv, info in self.drv_info.items():
all_deps[drv] = set(info.get("inputDrvs").keys())
def closure_of(key, deps):
r, size = set([key]), 0
while len(r) != size:
size = len(r)
r.update(*[ deps[k] for k in r ])
return r.difference([key])
job_set = set(( drv for drv in ( job.get("drvPath") for job in self.jobs ) if drv ))
all_deps = { k: list(closure_of(k, all_deps).intersection(job_set)) for k in job_set }
builds_to_schedule = list(self.jobs)
build_schedule_order = []
sorter = graphlib.TopologicalSorter(all_deps)
for item in sorter.static_order():
i = 0
while i < builds_to_schedule.len():
if item == builds_to_schedule[i].get("drvPath"):
build_schedule_order.append(builds_to_schedule[i])
del builds_to_schedule[i]
else:
i += 1
done = []
scheduled = []
while len(build_schedule_order) > 0 and len(scheduled) > 0:
schedule_now = []
for build in list(build_schedule_order):
if all_deps.get(build.get("drvPath"), []) == []:
build_schedule_order.remove(build)
schedule_now.append(build)
for job in schedule_now:
(scheduler, props) = self.schedule_one(build_props, job)
scheduler = self.getSchedulerByName(scheduler)
idsDeferred, resultsDeferred = scheduler.trigger(
waited_for = True,
sourcestamps = ss_for_trigger,
set_props = props,
parent_buildid = self.build.buildid,
parent_relationship = "Triggered from",
)
brids = {}
try:
_, brids = yield idsDeferred
except Exception as e:
yield self.addLogWithException(e)
results = EXCEPTION
scheduled.append((job, brids, resultsDeferred))
for brid in brids.values():
url = getURLForBuildrequest(self.master, brid)
yield self.addURL(f"{sch.name} #{brid}", url)
self._add_results(brid)
wait_for_next = defer.DeferredList([results for _, _, results in scheduled], fireOnOneCallback = True, fireOnOneErrback=True)
results, index = yield wait_for_next
job, brids, _ = scheduled[index]
done.append((job, brids, results))
del scheduled[index]
# TODO: remove dep from all_deps
# TODO: calculate final result
def getCurrentSummary(self) -> dict[str, str]: # noqa: N802
"""The original build trigger will the generic builder name `nix-build` in this case, which is not helpful"""