Compare commits

...

5 commits

Author SHA1 Message Date
d628ca10d6 Fix up a few loose ends 2024-03-10 23:09:27 +00:00
4057371c76 Add build result tracking, schedule newly available builds 2024-03-10 22:55:38 +00:00
d2b6fd674c WIP: Replace Trigger with custom logic 2024-03-10 21:27:24 +00:00
544a492000 wip: dependency-tracked build triggering 2024-03-10 21:27:45 +01:00
a6fae8d3a0 remove retry logic
retries don't help us very much, in fact they mostly hurt by repeating
builds that failed for non-transient reasons. retries could help with
workers dropping while running a build, but those rare cases are better
to restart manually than to pend at least twice the ci time for commits
that simply do not build cleanly.
2024-03-10 17:34:22 +01:00

View file

@ -3,6 +3,7 @@ import multiprocessing
import os
import sys
import uuid
import graphlib
from collections import defaultdict
from collections.abc import Generator
from dataclasses import dataclass
@ -20,6 +21,12 @@ from buildbot.util import asyncSleep
from buildbot.www.authz.endpointmatchers import EndpointMatcherBase, Match
from buildbot.www.oauth2 import OAuth2Auth
from buildbot.changes.gerritchangesource import GerritChangeSource
from buildbot.reporters.utils import getURLForBuild
from buildbot.reporters.utils import getURLForBuildrequest
from buildbot.process.buildstep import CANCELLED
from buildbot.process.buildstep import EXCEPTION
from buildbot.process.buildstep import SUCCESS
from buildbot.process.results import worst_status
if TYPE_CHECKING:
from buildbot.process.log import Log
@ -52,82 +59,219 @@ class GerritProject:
# `project` field.
name: str
class BuildTrigger(Trigger):
"""Dynamic trigger that creates a build for every attribute."""
class BuildTrigger(steps.BuildStep):
def __init__(
self,
builds_scheduler: str,
skipped_builds_scheduler: str,
jobs: list[dict[str, Any]],
drv_info: dict[str, Any],
**kwargs: Any,
) -> None:
if "name" not in kwargs:
kwargs["name"] = "trigger"
self.jobs = jobs
self.drv_info = drv_info
self.config = None
self.builds_scheduler = builds_scheduler
self.skipped_builds_scheduler = skipped_builds_scheduler
Trigger.__init__(
self,
waitForFinish=True,
schedulerNames=[builds_scheduler, skipped_builds_scheduler],
haltOnFailure=True,
flunkOnFailure=True,
sourceStamps=[],
alwaysUseLatest=False,
updateSourceStamp=False,
**kwargs,
)
self._result_list = []
self.ended = False
self.waitForFinishDeferred = None
super().__init__(**kwargs)
def createTriggerProperties(self, props: Any) -> Any: # noqa: N802
return props
def interrupt(self, reason):
# We cancel the buildrequests, as the data api handles
# both cases:
# - build started: stop is sent,
# - build not created yet: related buildrequests are set to CANCELLED.
# Note that there is an identified race condition though (more details
# are available at buildbot.data.buildrequests).
for brid in self.brids:
self.master.data.control(
"cancel", {'reason': 'parent build was interrupted'}, ("buildrequests", brid)
)
if self.running and not self.ended:
self.ended = True
# if we are interrupted because of a connection lost, we interrupt synchronously
if self.build.conn is None and self.waitForFinishDeferred is not None:
self.waitForFinishDeferred.cancel()
def getSchedulersAndProperties(self) -> list[tuple[str, Properties]]: # noqa: N802
def getSchedulerByName(self, name):
schedulers = self.master.scheduler_manager.namedServices
if name not in schedulers:
raise ValueError(f"unknown triggered scheduler: {repr(name)}")
sch = schedulers[name]
# todo: check ITriggerableScheduler
return sch
def schedule_one(self, build_props, job):
source = f"nix-eval-lix"
attr = job.get("attr", "eval-error")
name = attr
name = f"hydraJobs.{name}"
error = job.get("error")
props = Properties()
props.setProperty("virtual_builder_name", name, source)
props.setProperty("status_name", f"nix-build .#hydraJobs.{attr}", source)
props.setProperty("virtual_builder_tags", "", source)
if error is not None:
props.setProperty("error", error, source)
return (self.skipped_builds_scheduler, props)
if job.get("isCached"):
return (self.skipped_builds_scheduler, props)
drv_path = job.get("drvPath")
system = job.get("system")
out_path = job.get("outputs", {}).get("out")
build_props.setProperty(f"{attr}-out_path", out_path, source)
build_props.setProperty(f"{attr}-drv_path", drv_path, source)
props.setProperty("attr", attr, source)
props.setProperty("system", system, source)
props.setProperty("drv_path", drv_path, source)
props.setProperty("out_path", out_path, source)
# we use this to identify builds when running a retry
props.setProperty("build_uuid", str(uuid.uuid4()), source)
return (self.builds_scheduler, props)
@defer.inlineCallbacks
def _add_results(self, brid):
@defer.inlineCallbacks
def _is_buildrequest_complete(brid):
buildrequest = yield self.master.db.buildrequests.getBuildRequest(brid)
return buildrequest['complete']
event = ('buildrequests', str(brid), 'complete')
yield self.master.mq.waitUntilEvent(event, lambda: _is_buildrequest_complete(brid))
builds = yield self.master.db.builds.getBuilds(buildrequestid=brid)
for build in builds:
self._result_list.append(build["results"])
self.updateSummary()
def prepareSourcestampListForTrigger(self):
ss_for_trigger = {}
objs_from_build = self.build.getAllSourceStamps()
for ss in objs_from_build:
ss_for_trigger[ss.codebase] = ss.asDict()
trigger_values = [ss_for_trigger[k] for k in sorted(ss_for_trigger.keys())]
return trigger_values
@defer.inlineCallbacks
def run(self):
self.running = True
build_props = self.build.getProperties()
source = f"nix-eval-lix"
triggered_schedulers = []
for job in self.jobs:
attr = job.get("attr", "eval-error")
name = attr
name = f"hydraJobs.{name}"
error = job.get("error")
props = Properties()
props.setProperty("virtual_builder_name", name, source)
props.setProperty("status_name", f"nix-build .#hydraJobs.{attr}", source)
props.setProperty("virtual_builder_tags", "", source)
all_deps = dict()
for drv, info in self.drv_info.items():
all_deps[drv] = set(info.get("inputDrvs").keys())
def closure_of(key, deps):
r = set()
r.add(key)
while True:
more = set(r)
more.update(*( deps[k] for k in r ))
if r == more:
break
r = more
r.remove(key)
return r
job_set = set(( drv for drv in ( job.get("drvPath") for job in self.jobs ) if drv ))
all_deps = { k: list(closure_of(k, all_deps).intersection(job_set)) for k in job_set }
builds_to_schedule = list(self.jobs)
build_schedule_order = []
sorter = graphlib.TopologicalSorter(all_deps)
for item in sorter.static_order():
i = 0
while i < len(builds_to_schedule):
if item == builds_to_schedule[i].get("drvPath"):
build_schedule_order.append(builds_to_schedule[i])
del builds_to_schedule[i]
else:
i += 1
if error is not None:
props.setProperty("error", error, source)
triggered_schedulers.append((self.skipped_builds_scheduler, props))
continue
done = []
scheduled = []
failed = []
all_results = SUCCESS
ss_for_trigger = self.prepareSourcestampListForTrigger()
while not self.ended and (len(build_schedule_order) > 0 or len(scheduled) > 0):
print('Scheduling..')
schedule_now = []
for build in list(build_schedule_order):
if all_deps.get(build.get("drvPath"), []) == []:
build_schedule_order.remove(build)
schedule_now.append(build)
if len(schedule_now) == 0:
print(' No builds to schedule found.')
for job in schedule_now:
print(f" - {job.get('attr')}")
(scheduler, props) = self.schedule_one(build_props, job)
scheduler = self.getSchedulerByName(scheduler)
if job.get("isCached"):
triggered_schedulers.append((self.skipped_builds_scheduler, props))
continue
idsDeferred, resultsDeferred = scheduler.trigger(
waited_for = True,
sourcestamps = ss_for_trigger,
set_props = props,
parent_buildid = self.build.buildid,
parent_relationship = "Triggered from",
)
drv_path = job.get("drvPath")
system = job.get("system")
out_path = job.get("outputs", {}).get("out")
brids = {}
try:
_, brids = yield idsDeferred
except Exception as e:
yield self.addLogWithException(e)
results = EXCEPTION
scheduled.append((job, brids, resultsDeferred))
build_props.setProperty(f"{attr}-out_path", out_path, source)
build_props.setProperty(f"{attr}-drv_path", drv_path, source)
for brid in brids.values():
url = getURLForBuildrequest(self.master, brid)
yield self.addURL(f"{scheduler.name} #{brid}", url)
self._add_results(brid)
print('Waiting..')
wait_for_next = defer.DeferredList([results for _, _, results in scheduled], fireOnOneCallback = True, fireOnOneErrback=True)
self.waitForFinishDeferred = wait_for_next
results, index = yield wait_for_next
job, brids, _ = scheduled[index]
done.append((job, brids, results))
del scheduled[index]
result = results[0]
print(f' Found finished build {job.get("attr")}, result {util.Results[result].upper()}')
if result != SUCCESS:
failed_checks = []
failed_paths = [job.get('drvPath')]
removed = []
while True:
old_paths = list(failed_paths)
for build in list(build_schedule_order):
deps = all_deps.get(build.get("drvPath"), [])
for path in old_paths:
if path in deps:
failed_checks.append(build)
failed_paths.append(build.get("drvPath"))
build_schedule_order.remove(build)
removed.append(build.get("attr"))
props.setProperty("attr", attr, source)
props.setProperty("system", system, source)
props.setProperty("drv_path", drv_path, source)
props.setProperty("out_path", out_path, source)
# we use this to identify builds when running a retry
props.setProperty("build_uuid", str(uuid.uuid4()), source)
triggered_schedulers.append((self.builds_scheduler, props))
return triggered_schedulers
break
if old_paths == failed_paths:
break
print(' Removed jobs: ' + ', '.join(removed))
all_results = worst_status(result, all_results)
print(f' New result: {util.Results[all_results].upper()}')
for dep in all_deps:
if job.get("drvPath") in all_deps[dep]:
all_deps[dep].remove(job.get("drvPath"))
print('Done!')
if self.ended:
return util.CANCELLED
return all_results
def getCurrentSummary(self) -> dict[str, str]: # noqa: N802
"""The original build trigger will the generic builder name `nix-build` in this case, which is not helpful"""
if not self.triggeredNames:
return {"step": "running"}
summary = []
if self._result_list:
for status in ALL_RESULTS:
@ -178,6 +322,24 @@ class NixEvalCommand(buildstep.ShellMixin, steps.BuildStep):
if not system or system in self.supported_systems: # report eval errors
filtered_jobs.append(job)
drv_show_log: Log = yield self.getLog("stdio")
drv_show_log.addStdout(f"getting derivation infos\n")
cmd = yield self.makeRemoteShellCommand(
stdioLogName=None,
collectStdout=True,
command=(
["nix", "derivation", "show", "--recursive"]
+ [ drv for drv in (job.get("drvPath") for job in filtered_jobs) if drv ]
),
)
yield self.runCommand(cmd)
drv_show_log.addStdout(f"done\n")
try:
drv_info = json.loads(cmd.stdout)
except json.JSONDecodeError as e:
msg = f"Failed to parse `nix derivation show` output for {cmd.command}"
raise BuildbotNixError(msg) from e
self.build.addStepsAfterCurrentStep(
[
BuildTrigger(
@ -185,6 +347,7 @@ class NixEvalCommand(buildstep.ShellMixin, steps.BuildStep):
skipped_builds_scheduler=f"lix-nix-skipped-build",
name="build flake",
jobs=filtered_jobs,
drv_info=drv_info,
),
],
)
@ -192,24 +355,6 @@ class NixEvalCommand(buildstep.ShellMixin, steps.BuildStep):
return result
# FIXME this leaks memory... but probably not enough that we care
class RetryCounter:
def __init__(self, retries: int) -> None:
self.builds: dict[uuid.UUID, int] = defaultdict(lambda: retries)
def retry_build(self, build_id: uuid.UUID) -> int:
retries = self.builds[build_id]
if retries > 1:
self.builds[build_id] = retries - 1
return retries
return 0
# For now we limit this to two. Often this allows us to make the error log
# shorter because we won't see the logs for all previous succeeded builds
RETRY_COUNTER = RetryCounter(retries=2)
class EvalErrorStep(steps.BuildStep):
"""Shows the error message of a failed evaluation."""
@ -236,12 +381,7 @@ class NixBuildCommand(buildstep.ShellMixin, steps.BuildStep):
cmd: remotecommand.RemoteCommand = yield self.makeRemoteShellCommand()
yield self.runCommand(cmd)
res = cmd.results()
if res == util.FAILURE:
retries = RETRY_COUNTER.retry_build(self.getProperty("build_uuid"))
if retries > 0:
return util.RETRY
return res
return cmd.results()
class UpdateBuildOutput(steps.BuildStep):
@ -775,6 +915,7 @@ class GerritNixConfigurator(ConfiguratorBase):
config["change_source"] = self.gerrit_change_source
config["workers"].append(worker.LocalWorker(SKIPPED_BUILDER_NAME))
"""
config["services"].append(
reporters.GerritStatusPush(self.gerrit_server, self.gerrit_user,
port=2022,
@ -790,6 +931,7 @@ class GerritNixConfigurator(ConfiguratorBase):
# summaryArg=self.url)
)
"""
systemd_secrets = secrets.SecretInAFile(
dirname=os.environ["CREDENTIALS_DIRECTORY"],