Compare commits
5 commits
main
...
puck/gerri
Author | SHA1 | Date | |
---|---|---|---|
d628ca10d6 | |||
4057371c76 | |||
d2b6fd674c | |||
544a492000 | |||
a6fae8d3a0 |
1 changed files with 219 additions and 77 deletions
|
@ -3,6 +3,7 @@ import multiprocessing
|
|||
import os
|
||||
import sys
|
||||
import uuid
|
||||
import graphlib
|
||||
from collections import defaultdict
|
||||
from collections.abc import Generator
|
||||
from dataclasses import dataclass
|
||||
|
@ -20,6 +21,12 @@ from buildbot.util import asyncSleep
|
|||
from buildbot.www.authz.endpointmatchers import EndpointMatcherBase, Match
|
||||
from buildbot.www.oauth2 import OAuth2Auth
|
||||
from buildbot.changes.gerritchangesource import GerritChangeSource
|
||||
from buildbot.reporters.utils import getURLForBuild
|
||||
from buildbot.reporters.utils import getURLForBuildrequest
|
||||
from buildbot.process.buildstep import CANCELLED
|
||||
from buildbot.process.buildstep import EXCEPTION
|
||||
from buildbot.process.buildstep import SUCCESS
|
||||
from buildbot.process.results import worst_status
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from buildbot.process.log import Log
|
||||
|
@ -52,43 +59,52 @@ class GerritProject:
|
|||
# `project` field.
|
||||
name: str
|
||||
|
||||
class BuildTrigger(Trigger):
|
||||
"""Dynamic trigger that creates a build for every attribute."""
|
||||
|
||||
class BuildTrigger(steps.BuildStep):
|
||||
def __init__(
|
||||
self,
|
||||
builds_scheduler: str,
|
||||
skipped_builds_scheduler: str,
|
||||
jobs: list[dict[str, Any]],
|
||||
drv_info: dict[str, Any],
|
||||
**kwargs: Any,
|
||||
) -> None:
|
||||
if "name" not in kwargs:
|
||||
kwargs["name"] = "trigger"
|
||||
self.jobs = jobs
|
||||
self.drv_info = drv_info
|
||||
self.config = None
|
||||
self.builds_scheduler = builds_scheduler
|
||||
self.skipped_builds_scheduler = skipped_builds_scheduler
|
||||
Trigger.__init__(
|
||||
self,
|
||||
waitForFinish=True,
|
||||
schedulerNames=[builds_scheduler, skipped_builds_scheduler],
|
||||
haltOnFailure=True,
|
||||
flunkOnFailure=True,
|
||||
sourceStamps=[],
|
||||
alwaysUseLatest=False,
|
||||
updateSourceStamp=False,
|
||||
**kwargs,
|
||||
self._result_list = []
|
||||
self.ended = False
|
||||
self.waitForFinishDeferred = None
|
||||
super().__init__(**kwargs)
|
||||
|
||||
def interrupt(self, reason):
|
||||
# We cancel the buildrequests, as the data api handles
|
||||
# both cases:
|
||||
# - build started: stop is sent,
|
||||
# - build not created yet: related buildrequests are set to CANCELLED.
|
||||
# Note that there is an identified race condition though (more details
|
||||
# are available at buildbot.data.buildrequests).
|
||||
for brid in self.brids:
|
||||
self.master.data.control(
|
||||
"cancel", {'reason': 'parent build was interrupted'}, ("buildrequests", brid)
|
||||
)
|
||||
if self.running and not self.ended:
|
||||
self.ended = True
|
||||
# if we are interrupted because of a connection lost, we interrupt synchronously
|
||||
if self.build.conn is None and self.waitForFinishDeferred is not None:
|
||||
self.waitForFinishDeferred.cancel()
|
||||
|
||||
def createTriggerProperties(self, props: Any) -> Any: # noqa: N802
|
||||
return props
|
||||
def getSchedulerByName(self, name):
|
||||
schedulers = self.master.scheduler_manager.namedServices
|
||||
if name not in schedulers:
|
||||
raise ValueError(f"unknown triggered scheduler: {repr(name)}")
|
||||
sch = schedulers[name]
|
||||
# todo: check ITriggerableScheduler
|
||||
return sch
|
||||
|
||||
def getSchedulersAndProperties(self) -> list[tuple[str, Properties]]: # noqa: N802
|
||||
build_props = self.build.getProperties()
|
||||
def schedule_one(self, build_props, job):
|
||||
source = f"nix-eval-lix"
|
||||
|
||||
triggered_schedulers = []
|
||||
for job in self.jobs:
|
||||
attr = job.get("attr", "eval-error")
|
||||
name = attr
|
||||
name = f"hydraJobs.{name}"
|
||||
|
@ -100,12 +116,10 @@ class BuildTrigger(Trigger):
|
|||
|
||||
if error is not None:
|
||||
props.setProperty("error", error, source)
|
||||
triggered_schedulers.append((self.skipped_builds_scheduler, props))
|
||||
continue
|
||||
return (self.skipped_builds_scheduler, props)
|
||||
|
||||
if job.get("isCached"):
|
||||
triggered_schedulers.append((self.skipped_builds_scheduler, props))
|
||||
continue
|
||||
return (self.skipped_builds_scheduler, props)
|
||||
|
||||
drv_path = job.get("drvPath")
|
||||
system = job.get("system")
|
||||
|
@ -121,13 +135,143 @@ class BuildTrigger(Trigger):
|
|||
# we use this to identify builds when running a retry
|
||||
props.setProperty("build_uuid", str(uuid.uuid4()), source)
|
||||
|
||||
triggered_schedulers.append((self.builds_scheduler, props))
|
||||
return triggered_schedulers
|
||||
return (self.builds_scheduler, props)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _add_results(self, brid):
|
||||
@defer.inlineCallbacks
|
||||
def _is_buildrequest_complete(brid):
|
||||
buildrequest = yield self.master.db.buildrequests.getBuildRequest(brid)
|
||||
return buildrequest['complete']
|
||||
|
||||
event = ('buildrequests', str(brid), 'complete')
|
||||
yield self.master.mq.waitUntilEvent(event, lambda: _is_buildrequest_complete(brid))
|
||||
builds = yield self.master.db.builds.getBuilds(buildrequestid=brid)
|
||||
for build in builds:
|
||||
self._result_list.append(build["results"])
|
||||
self.updateSummary()
|
||||
|
||||
def prepareSourcestampListForTrigger(self):
|
||||
ss_for_trigger = {}
|
||||
objs_from_build = self.build.getAllSourceStamps()
|
||||
for ss in objs_from_build:
|
||||
ss_for_trigger[ss.codebase] = ss.asDict()
|
||||
|
||||
trigger_values = [ss_for_trigger[k] for k in sorted(ss_for_trigger.keys())]
|
||||
return trigger_values
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def run(self):
|
||||
self.running = True
|
||||
build_props = self.build.getProperties()
|
||||
source = f"nix-eval-lix"
|
||||
|
||||
all_deps = dict()
|
||||
for drv, info in self.drv_info.items():
|
||||
all_deps[drv] = set(info.get("inputDrvs").keys())
|
||||
def closure_of(key, deps):
|
||||
r = set()
|
||||
r.add(key)
|
||||
while True:
|
||||
more = set(r)
|
||||
more.update(*( deps[k] for k in r ))
|
||||
if r == more:
|
||||
break
|
||||
r = more
|
||||
r.remove(key)
|
||||
return r
|
||||
job_set = set(( drv for drv in ( job.get("drvPath") for job in self.jobs ) if drv ))
|
||||
all_deps = { k: list(closure_of(k, all_deps).intersection(job_set)) for k in job_set }
|
||||
builds_to_schedule = list(self.jobs)
|
||||
build_schedule_order = []
|
||||
sorter = graphlib.TopologicalSorter(all_deps)
|
||||
for item in sorter.static_order():
|
||||
i = 0
|
||||
while i < len(builds_to_schedule):
|
||||
if item == builds_to_schedule[i].get("drvPath"):
|
||||
build_schedule_order.append(builds_to_schedule[i])
|
||||
del builds_to_schedule[i]
|
||||
else:
|
||||
i += 1
|
||||
|
||||
done = []
|
||||
scheduled = []
|
||||
failed = []
|
||||
all_results = SUCCESS
|
||||
ss_for_trigger = self.prepareSourcestampListForTrigger()
|
||||
while not self.ended and (len(build_schedule_order) > 0 or len(scheduled) > 0):
|
||||
print('Scheduling..')
|
||||
schedule_now = []
|
||||
for build in list(build_schedule_order):
|
||||
if all_deps.get(build.get("drvPath"), []) == []:
|
||||
build_schedule_order.remove(build)
|
||||
schedule_now.append(build)
|
||||
if len(schedule_now) == 0:
|
||||
print(' No builds to schedule found.')
|
||||
for job in schedule_now:
|
||||
print(f" - {job.get('attr')}")
|
||||
(scheduler, props) = self.schedule_one(build_props, job)
|
||||
scheduler = self.getSchedulerByName(scheduler)
|
||||
|
||||
idsDeferred, resultsDeferred = scheduler.trigger(
|
||||
waited_for = True,
|
||||
sourcestamps = ss_for_trigger,
|
||||
set_props = props,
|
||||
parent_buildid = self.build.buildid,
|
||||
parent_relationship = "Triggered from",
|
||||
)
|
||||
|
||||
brids = {}
|
||||
try:
|
||||
_, brids = yield idsDeferred
|
||||
except Exception as e:
|
||||
yield self.addLogWithException(e)
|
||||
results = EXCEPTION
|
||||
scheduled.append((job, brids, resultsDeferred))
|
||||
|
||||
for brid in brids.values():
|
||||
url = getURLForBuildrequest(self.master, brid)
|
||||
yield self.addURL(f"{scheduler.name} #{brid}", url)
|
||||
self._add_results(brid)
|
||||
print('Waiting..')
|
||||
wait_for_next = defer.DeferredList([results for _, _, results in scheduled], fireOnOneCallback = True, fireOnOneErrback=True)
|
||||
self.waitForFinishDeferred = wait_for_next
|
||||
results, index = yield wait_for_next
|
||||
job, brids, _ = scheduled[index]
|
||||
done.append((job, brids, results))
|
||||
del scheduled[index]
|
||||
result = results[0]
|
||||
print(f' Found finished build {job.get("attr")}, result {util.Results[result].upper()}')
|
||||
if result != SUCCESS:
|
||||
failed_checks = []
|
||||
failed_paths = [job.get('drvPath')]
|
||||
removed = []
|
||||
while True:
|
||||
old_paths = list(failed_paths)
|
||||
for build in list(build_schedule_order):
|
||||
deps = all_deps.get(build.get("drvPath"), [])
|
||||
for path in old_paths:
|
||||
if path in deps:
|
||||
failed_checks.append(build)
|
||||
failed_paths.append(build.get("drvPath"))
|
||||
build_schedule_order.remove(build)
|
||||
removed.append(build.get("attr"))
|
||||
|
||||
break
|
||||
if old_paths == failed_paths:
|
||||
break
|
||||
print(' Removed jobs: ' + ', '.join(removed))
|
||||
all_results = worst_status(result, all_results)
|
||||
print(f' New result: {util.Results[all_results].upper()}')
|
||||
for dep in all_deps:
|
||||
if job.get("drvPath") in all_deps[dep]:
|
||||
all_deps[dep].remove(job.get("drvPath"))
|
||||
print('Done!')
|
||||
if self.ended:
|
||||
return util.CANCELLED
|
||||
return all_results
|
||||
|
||||
def getCurrentSummary(self) -> dict[str, str]: # noqa: N802
|
||||
"""The original build trigger will the generic builder name `nix-build` in this case, which is not helpful"""
|
||||
if not self.triggeredNames:
|
||||
return {"step": "running"}
|
||||
summary = []
|
||||
if self._result_list:
|
||||
for status in ALL_RESULTS:
|
||||
|
@ -178,6 +322,24 @@ class NixEvalCommand(buildstep.ShellMixin, steps.BuildStep):
|
|||
if not system or system in self.supported_systems: # report eval errors
|
||||
filtered_jobs.append(job)
|
||||
|
||||
drv_show_log: Log = yield self.getLog("stdio")
|
||||
drv_show_log.addStdout(f"getting derivation infos\n")
|
||||
cmd = yield self.makeRemoteShellCommand(
|
||||
stdioLogName=None,
|
||||
collectStdout=True,
|
||||
command=(
|
||||
["nix", "derivation", "show", "--recursive"]
|
||||
+ [ drv for drv in (job.get("drvPath") for job in filtered_jobs) if drv ]
|
||||
),
|
||||
)
|
||||
yield self.runCommand(cmd)
|
||||
drv_show_log.addStdout(f"done\n")
|
||||
try:
|
||||
drv_info = json.loads(cmd.stdout)
|
||||
except json.JSONDecodeError as e:
|
||||
msg = f"Failed to parse `nix derivation show` output for {cmd.command}"
|
||||
raise BuildbotNixError(msg) from e
|
||||
|
||||
self.build.addStepsAfterCurrentStep(
|
||||
[
|
||||
BuildTrigger(
|
||||
|
@ -185,6 +347,7 @@ class NixEvalCommand(buildstep.ShellMixin, steps.BuildStep):
|
|||
skipped_builds_scheduler=f"lix-nix-skipped-build",
|
||||
name="build flake",
|
||||
jobs=filtered_jobs,
|
||||
drv_info=drv_info,
|
||||
),
|
||||
],
|
||||
)
|
||||
|
@ -192,24 +355,6 @@ class NixEvalCommand(buildstep.ShellMixin, steps.BuildStep):
|
|||
return result
|
||||
|
||||
|
||||
# FIXME this leaks memory... but probably not enough that we care
|
||||
class RetryCounter:
|
||||
def __init__(self, retries: int) -> None:
|
||||
self.builds: dict[uuid.UUID, int] = defaultdict(lambda: retries)
|
||||
|
||||
def retry_build(self, build_id: uuid.UUID) -> int:
|
||||
retries = self.builds[build_id]
|
||||
if retries > 1:
|
||||
self.builds[build_id] = retries - 1
|
||||
return retries
|
||||
return 0
|
||||
|
||||
|
||||
# For now we limit this to two. Often this allows us to make the error log
|
||||
# shorter because we won't see the logs for all previous succeeded builds
|
||||
RETRY_COUNTER = RetryCounter(retries=2)
|
||||
|
||||
|
||||
class EvalErrorStep(steps.BuildStep):
|
||||
"""Shows the error message of a failed evaluation."""
|
||||
|
||||
|
@ -236,12 +381,7 @@ class NixBuildCommand(buildstep.ShellMixin, steps.BuildStep):
|
|||
cmd: remotecommand.RemoteCommand = yield self.makeRemoteShellCommand()
|
||||
yield self.runCommand(cmd)
|
||||
|
||||
res = cmd.results()
|
||||
if res == util.FAILURE:
|
||||
retries = RETRY_COUNTER.retry_build(self.getProperty("build_uuid"))
|
||||
if retries > 0:
|
||||
return util.RETRY
|
||||
return res
|
||||
return cmd.results()
|
||||
|
||||
|
||||
class UpdateBuildOutput(steps.BuildStep):
|
||||
|
@ -775,6 +915,7 @@ class GerritNixConfigurator(ConfiguratorBase):
|
|||
|
||||
config["change_source"] = self.gerrit_change_source
|
||||
config["workers"].append(worker.LocalWorker(SKIPPED_BUILDER_NAME))
|
||||
"""
|
||||
config["services"].append(
|
||||
reporters.GerritStatusPush(self.gerrit_server, self.gerrit_user,
|
||||
port=2022,
|
||||
|
@ -790,6 +931,7 @@ class GerritNixConfigurator(ConfiguratorBase):
|
|||
# summaryArg=self.url)
|
||||
|
||||
)
|
||||
"""
|
||||
|
||||
systemd_secrets = secrets.SecretInAFile(
|
||||
dirname=os.environ["CREDENTIALS_DIRECTORY"],
|
||||
|
|
Loading…
Reference in a new issue