@ -4,6 +4,7 @@ import os
import sys
import graphlib
import base64
import random
from collections . abc import Generator
from dataclasses import dataclass , field
from pathlib import Path
@ -24,6 +25,7 @@ from buildbot.reporters.generators.build import BuildStatusGenerator
from buildbot . reporters . message import MessageFormatterFunction
from buildbot . process . buildstep import EXCEPTION
from buildbot . process . buildstep import SUCCESS
from buildbot . process . buildstep import BuildStepFailed
from buildbot . process . results import worst_status
import requests
@ -39,6 +41,14 @@ log = Logger()
FLAKE_TARGET_ATTRIBUTE_FOR_JOBS = " buildbotJobs "
@dataclass
class EvaluatorSettings :
supported_systems : list [ str ]
worker_count : int
max_memory_size : int
gc_roots_dir : str
lock : util . MasterLock
@dataclass
class NixBuilder :
protocol : str
@ -49,14 +59,24 @@ class NixBuilder:
publicHostKey : str | None = None
sshUser : str | None = None
sshKey : str | None = None
systems : list [ str ] = field ( default_factory = lambda : [ " - " ] )
supportedFeatures : list [ str ] = field ( default_factory = lambda : [ " - " ] )
mandatoryFeatures : list [ str ] = field ( default_factory = lambda : [ " - " ] )
systems : list [ str ] = field ( default_factory = lambda : [ ] )
supportedFeatures : list [ str ] = field ( default_factory = lambda : [ ] )
mandatoryFeatures : list [ str ] = field ( default_factory = lambda : [ ] )
def to_nix_line ( self ) :
encoded_public_key = base64 . b64encode ( self . publicHostKey . encode ( ' ascii ' ) ) . decode ( ' ascii ' ) if self . publicHostKey is not None else " - "
fullConnection = f " { self . protocol } :// { self . sshUser } @ { self . hostName } " if self . sshUser is not None else self . hostName
return f " { fullConnection } { " , " . join ( self . systems ) } { self . sshKey or " - " } { self . maxJobs } { self . speedFactor } { " , " . join ( self . supportedFeatures ) } { " , " . join ( self . mandatoryFeatures ) } { encoded_public_key } "
def to_nix_store ( self ) :
fullConnection = f " { self . sshUser } @ { self . hostName } " if self . sshUser is not None else self . hostName
fullConnection = f " { self . protocol } :// { fullConnection } "
params = [ ]
if self . sshKey is not None :
params . append ( f " ssh-key= { self . sshKey } " )
if self . publicHostKey is not None :
encoded_public_key = base64 . b64encode ( self . publicHostKey . encode ( ' ascii ' ) ) . decode ( ' ascii ' )
params . append ( f " base64-ssh-public-host-key= { encoded_public_key } " )
if params != [ ] :
fullConnection + = " ? "
fullConnection + = " & " . join ( params )
return fullConnection
@dataclass
@ -130,7 +150,7 @@ class GerritConfig:
"""
Returns the prefix to build a repourl using that gerrit configuration .
"""
return ' ssh:// { self.username}@ { self.domain}: { self.port}/ '
return f ' ssh:// { self . username }@ { self . domain }: { self . port }/ '
class BuildTrigger ( steps . BuildStep ) :
def __init__ (
@ -148,7 +168,7 @@ class BuildTrigger(steps.BuildStep):
self . ended = False
self . waitForFinishDeferred = None
self . brids = [ ]
self . description = f " building { len ( jobs ) } hydra jobs"
self . description = f " building { len ( jobs ) } jobs"
super ( ) . __init__ ( * * kwargs )
def interrupt ( self , reason ) :
@ -177,15 +197,14 @@ class BuildTrigger(steps.BuildStep):
return sch
def schedule_one ( self , build_props : Properties , job ) :
project_name = build_props . getProperty ( ' event.project ' )
source = f " { project_name } -eval -lix "
project_name = build_props . getProperty ( " event.refUpdate.project " ) or build_props . getProperty ( " event.change.project " )
source = f " { project_name } -eval "
attr = job . get ( " attr " , " eval-error " )
name = attr
name = f " { FLAKE_TARGET_ATTRIBUTE_FOR_JOBS } . { name } "
name = f " buildbotJobs. { attr } "
error = job . get ( " error " )
props = Properties ( )
props . setProperty ( " virtual_builder_name " , name , source )
props . setProperty ( " status_name " , f " nix-build .#{ FLAKE_TARGET_ATTRIBUTE_FOR_JOBS } .{ attr } " , source )
props . setProperty ( " status_name " , f " building buildbotJobs .{ attr } " , source )
props . setProperty ( " virtual_builder_tags " , " " , source )
if error is not None :
@ -372,7 +391,8 @@ class NixEvalCommand(buildstep.ShellMixin, steps.BuildStep):
# run nix-eval-jobs --flake .#$FLAKE_TARGET_ATTRIBUTE_FOR_JOBS to generate the dict of stages
cmd : remotecommand . RemoteCommand = yield self . makeRemoteShellCommand ( )
build_props = self . build . getProperties ( )
project_name = build_props . get ( ' event.project ' )
project_name = build_props . getProperty ( " event.refUpdate.project " ) or build_props . getProperty ( " event.change.project " )
assert project_name is not None , " `event.refUpdate.project` or `event.change.project` is not available on the build properties, unexpected build type! "
yield self . runCommand ( cmd )
@ -396,26 +416,11 @@ class NixEvalCommand(buildstep.ShellMixin, steps.BuildStep):
if not system or system in self . supported_systems : # report eval errors
filtered_jobs . append ( job )
# Filter out failed evaluations
succeeded_jobs = [ job for job in filtered_jobs if job . get ( ' error ' ) is None ]
drv_show_log : Log = yield self . getLog ( " stdio " )
drv_show_log . addStdout ( f " getting derivation infos \n " )
cmd = yield self . makeRemoteShellCommand (
stdioLogName = None ,
collectStdout = True ,
command = (
[ " nix " , " derivation " , " show " , " --recursive " ]
+ [ drv for drv in ( job . get ( " drvPath " ) for job in filtered_jobs ) if drv ]
) ,
)
yield self . runCommand ( cmd )
drv_show_log . addStdout ( f " done \n " )
try :
drv_info = json . loads ( cmd . stdout )
except json . JSONDecodeError as e :
msg = f " Failed to parse `nix derivation show` output for { cmd . command } "
raise BuildbotNixError ( msg ) from e
all_deps = dict ( )
for drv , info in drv_info . items ( ) :
all_deps [ drv ] = set ( info . get ( " inputDrvs " ) . keys ( ) )
def closure_of ( key , deps ) :
r , size = set ( [ key ] ) , 0
@ -424,14 +429,34 @@ class NixEvalCommand(buildstep.ShellMixin, steps.BuildStep):
r . update ( * [ deps [ k ] for k in r ] )
return r . difference ( [ key ] )
job_set = set ( ( drv for drv in ( job . get ( " drvPath " ) for job in filtered_jobs ) if drv ) )
all_deps = { k : list ( closure_of ( k , all_deps ) . intersection ( job_set ) ) for k in job_set }
if succeeded_jobs :
drv_show_log . addStdout ( f " getting derivation infos for valid derivations \n " )
cmd = yield self . makeRemoteShellCommand (
stdioLogName = None ,
collectStdout = True ,
command = (
[ " nix " , " derivation " , " show " , " --recursive " ]
+ [ drv for drv in ( job . get ( " drvPath " ) for job in succeeded_jobs ) if drv ]
) ,
)
yield self . runCommand ( cmd )
drv_show_log . addStdout ( f " done \n " )
try :
drv_info = json . loads ( cmd . stdout )
except json . JSONDecodeError as e :
msg = f " Failed to parse `nix derivation show` output for { cmd . command } "
raise BuildbotNixError ( msg ) from e
for drv , info in drv_info . items ( ) :
all_deps [ drv ] = set ( info . get ( " inputDrvs " ) . keys ( ) )
job_set = set ( ( drv for drv in ( job . get ( " drvPath " ) for job in filtered_jobs ) if drv ) )
all_deps = { k : list ( closure_of ( k , all_deps ) . intersection ( job_set ) ) for k in job_set }
self . build . addStepsAfterCurrentStep (
[
BuildTrigger (
builds_scheduler_group = f " { project_name } -nix-build " ,
name = " build flake " ,
name = " build derivations " ,
jobs = filtered_jobs ,
all_deps = all_deps ,
) ,
@ -440,6 +465,88 @@ class NixEvalCommand(buildstep.ShellMixin, steps.BuildStep):
return result
def make_job_evaluator ( name : str , settings : EvaluatorSettings , flake : bool ) - > NixEvalCommand :
actual_command = [ ]
if flake :
actual_command + = [ " --flake " , f " .# { FLAKE_TARGET_ATTRIBUTE_FOR_JOBS } " ]
else :
actual_command + = [ " --expr " , " import ./.ci/buildbot.nix " ]
return NixEvalCommand (
env = { } ,
name = name ,
supported_systems = settings . supported_systems ,
command = [
" nix-eval-jobs " ,
" --workers " ,
str ( settings . worker_count ) ,
" --max-memory-size " ,
str ( settings . max_memory_size ) ,
" --gc-roots-dir " ,
settings . gc_roots_dir ,
" --force-recurse " ,
" --check-cache-status " ,
] + actual_command ,
haltOnFailure = True ,
locks = [ settings . lock . access ( " exclusive " ) ]
)
class NixConfigure ( buildstep . CommandMixin , steps . BuildStep ) :
name = " determining jobs "
"""
Determine what ` NixEvalCommand ` step should be added after
based on the existence of :
- flake . nix
- . ci / buildbot . nix
"""
def __init__ ( self , eval_settings : EvaluatorSettings , * * kwargs : Any ) - > None :
self . evaluator_settings = eval_settings
super ( ) . __init__ ( * * kwargs )
self . observer = logobserver . BufferLogObserver ( )
self . addLogObserver ( " stdio " , self . observer )
@defer.inlineCallbacks
def run ( self ) - > Generator [ Any , object , Any ] :
try :
configure_log : Log = yield self . getLog ( " stdio " )
except Exception :
configure_log : Log = yield self . addLog ( " stdio " )
# Takes precedence.
configure_log . addStdout ( " checking if there ' s a .ci/buildbot.nix... \n " )
ci_buildbot_defn_exists = yield self . pathExists ( ' build/.ci/buildbot.nix ' )
if ci_buildbot_defn_exists :
configure_log . addStdout ( " .ci/buildbot.nix found, configured for non-flake CI \n " )
self . build . addStepsAfterCurrentStep (
[
make_job_evaluator (
" evaluate `.ci/buildbot.nix` jobs " ,
self . evaluator_settings ,
False
)
]
)
return SUCCESS
flake_exists = yield self . pathExists ( ' build/flake.nix ' )
if flake_exists :
configure_log . addStdout ( f " flake.nix found " )
self . build . addStepsAfterCurrentStep ( [
make_job_evaluator (
" evaluate `flake.nix` jobs " ,
self . evaluator_settings ,
True
)
]
)
return SUCCESS
configure_log . addStdout ( " neither flake.nix found neither .ci/buildbot.nix, no CI to run! " )
return SUCCESS
class NixBuildCommand ( buildstep . ShellMixin , steps . BuildStep ) :
""" Builds a nix derivation. """
@ -481,10 +588,19 @@ def nix_eval_config(
worker_count : int ,
max_memory_size : int ,
) - > util . BuilderConfig :
""" Uses nix-eval-jobs to evaluate $FLAKE_TARGET_ATTRIBUTE_FOR_JOBS (`.#hydraJobs` by default) from flake.nix in parallel.
"""
Uses nix - eval - jobs to evaluate the entrypoint of this project .
For each evaluated attribute a new build pipeline is started .
"""
factory = util . BuildFactory ( )
gerrit_private_key = None
with open ( project . private_sshkey_path , ' r ' ) as f :
gerrit_private_key = f . read ( )
if gerrit_private_key is None :
raise RuntimeError ( ' No gerrit private key to fetch the repositories ' )
# check out the source
factory . addStep (
steps . Gerrit (
@ -492,9 +608,10 @@ def nix_eval_config(
mode = " full " ,
retry = [ 60 , 60 ] ,
timeout = 3600 ,
sshPrivateKey = project. private_sshkey_path
sshPrivateKey = gerrit_private_key
) ,
)
# use one gcroots directory per worker. this should be scoped to the largest unique resource
# in charge of builds (ie, buildnumber is too narrow) to not litter the system with permanent
# gcroots in case of worker restarts.
@ -503,27 +620,22 @@ def nix_eval_config(
" /nix/var/nix/gcroots/per-user/buildbot-worker/ % (prop:project)s/drvs/ % (prop:workername)s/ " ,
)
eval_settings = EvaluatorSettings (
supported_systems = supported_systems ,
worker_count = worker_count ,
max_memory_size = max_memory_size ,
gc_roots_dir = drv_gcroots_dir ,
lock = eval_lock
)
# NixConfigure will choose
# how to add a NixEvalCommand job
# based on whether there's a flake.nix or
# a .ci/buildbot.nix.
factory . addStep (
NixEvalCommand (
env = { } ,
name = " evaluate flake " ,
supported_systems = supported_systems ,
command = [
" nix-eval-jobs " ,
" --workers " ,
str ( worker_count ) ,
" --max-memory-size " ,
str ( max_memory_size ) ,
" --gc-roots-dir " ,
drv_gcroots_dir ,
" --force-recurse " ,
" --check-cache-status " ,
" --flake " ,
f " .# { FLAKE_TARGET_ATTRIBUTE_FOR_JOBS } "
] ,
haltOnFailure = True ,
locks = [ eval_lock . access ( " exclusive " ) ] ,
) ,
NixConfigure (
eval_settings
)
)
factory . addStep (
@ -551,12 +663,17 @@ def nix_build_config(
project : GerritProject ,
worker_arch : str ,
worker_names : list [ str ] ,
build ers_spec: str ,
build _stores: list [ str ] ,
signing_keyfile : str | None = None ,
binary_cache_config : S3BinaryCacheConfig | None = None
) - > util . BuilderConfig :
""" Builds one nix flake attribute. """
factory = util . BuildFactory ( )
# pick a store to run the build on
# TODO proper scheduling instead of picking the first builder
build_store = build_stores [ 0 ]
factory . addStep (
NixBuildCommand (
env = { } ,
@ -578,6 +695,10 @@ def nix_build_config(
" 7200 " ,
" --builders " ,
builders_spec ,
" --store " ,
build_store ,
" --eval-store " ,
" ssh-ng://localhost " ,
" --out-link " ,
util . Interpolate ( " result- % (prop:attr)s " ) ,
util . Interpolate ( " % (prop:drv_path)s^* " ) ,
@ -597,6 +718,8 @@ def nix_build_config(
" nix " ,
" store " ,
" sign " ,
" --store " ,
build_store ,
" --key-file " ,
signing_keyfile ,
util . Interpolate (
@ -613,6 +736,8 @@ def nix_build_config(
command = [
" nix " ,
" copy " ,
" --store " ,
build_store ,
" --to " ,
f " s3:// { binary_cache_config . bucket } ?profile= { binary_cache_config . profile } ®ion= { binary_cache_config . region } &endpoint= { binary_cache_config . endpoint } " ,
util . Property (
@ -674,7 +799,7 @@ def config_for_project(
nix_eval_worker_count : int ,
nix_eval_max_memory_size : int ,
eval_lock : util . MasterLock ,
builders_spec : str,
nix_ builders: li st[ NixBuilde r] ,
signing_keyfile : str | None = None ,
binary_cache_config : S3BinaryCacheConfig | None = None
) - > Project :
@ -712,12 +837,6 @@ def config_for_project(
) ,
] ,
)
gerrit_private_key = None
with open ( project . private_sshkey_path , ' r ' ) as f :
gerrit_private_key = f . read ( )
if gerrit_private_key is None :
raise RuntimeError ( ' No gerrit private key to fetch the repositories ' )
config [ " builders " ] . extend (
[
@ -737,7 +856,7 @@ def config_for_project(
project ,
arch ,
[ f " { w } - { arch } " for w in worker_names ] ,
builders_spec ,
[ b . to_nix_store ( ) for b in nix_builders if arch in b . systems or arch == " other " ] ,
signing_keyfile = signing_keyfile ,
binary_cache_config = binary_cache_config
)
@ -768,10 +887,7 @@ def gerritReviewFmt(url, data):
builderName = build [ ' builder ' ] [ ' name ' ]
if len ( build [ ' results ' ] ) != 1 :
raise ValueError ( ' this review request contains more than one build results, unexpected format request ' )
result = build [ ' results ' ] [ 0 ]
result = build [ ' results ' ]
if result == util . RETRY :
return dict ( )
@ -824,13 +940,15 @@ class GerritNixConfigurator(ConfiguratorBase):
prometheus_config : dict [ str , int | str ] | None = None ,
binary_cache_config : dict [ str , str ] | None = None ,
auth_method : AuthBase | None = None ,
manhole : Any = None ,
) - > None :
super ( ) . __init__ ( )
self . manhole = manhole
self . allowed_origins = allowed_origins
self . gerrit_server = gerrit_server
self . gerrit_user = gerrit_user
self . gerrit_port = gerrit_port
self . gerrit_sshkey_path = gerrit_sshkey_path
self . gerrit_sshkey_path = str ( gerrit_sshkey_path )
self . gerrit_config = GerritConfig ( domain = self . gerrit_server ,
username = self . gerrit_user ,
port = self . gerrit_port )
@ -860,6 +978,9 @@ class GerritNixConfigurator(ConfiguratorBase):
worker_config = json . loads ( read_secret_file ( self . nix_workers_secret_name ) )
worker_names = [ ]
if self . manhole is not None :
config [ " manhole " ] = self . manhole
config . setdefault ( " projects " , [ ] )
config . setdefault ( " secretsProviders " , [ ] )
config . setdefault ( " www " , {
@ -876,7 +997,6 @@ class GerritNixConfigurator(ConfiguratorBase):
eval_lock = util . MasterLock ( " nix-eval " )
builders_spec = " ; " . join ( builder . to_nix_line ( ) for builder in self . nix_builders )
for project in self . projects :
config_for_project (
config ,
@ -887,7 +1007,7 @@ class GerritNixConfigurator(ConfiguratorBase):
self . nix_eval_worker_count or multiprocessing . cpu_count ( ) ,
self . nix_eval_max_memory_size ,
eval_lock ,
builders_spec ,
self . nix_ builders,
signing_keyfile = self . signing_keyfile ,
binary_cache_config = self . binary_cache_config
)