[WIP] Hydra metrics

This commit is contained in:
Ilya K 2024-09-30 14:36:15 +03:00
parent 4749d204bf
commit 1405be8aca

View file

@ -0,0 +1,360 @@
#!/usr/bin/env nix-shell
#!nix-shell -i python3 -p "python3.withPackages(ps: [ps.aioprometheus ps.click ps.httpx ps.starlette ps.uvicorn])"
import asyncio
from contextlib import asynccontextmanager
import logging
from aioprometheus import Counter, Gauge
from aioprometheus.asgi.starlette import metrics
import click
import httpx
from starlette.applications import Starlette
from starlette.routing import Route
import uvicorn
up = Gauge("hydra_up", "Is Hydra running")
time = Gauge("hydra_time", "Hydra's current time")
uptime = Gauge("hydra_uptime", "Hydra's uptime")
builds_queued = Gauge("hydra_builds_queued", "Number of jobs in build queue")
steps_active = Gauge("hydra_steps_active", "Number of active steps in build queue")
steps_building = Gauge("hydra_steps_building", "Number of steps currently building")
steps_copying_to = Gauge(
"hydra_steps_copying_to", "Number of steps copying inputs to a worker"
)
steps_copying_from = Gauge(
"hydra_steps_copying_from", "Number of steps copying outputs from a worker"
)
steps_waiting = Gauge(
"hydra_steps_waiting", "Number of steps currently waiting for a worker slot"
)
steps_unsupported = Gauge(
"hydra_steps_unsupported", "Number of unsupported steps in build queue"
)
bytes_sent = Counter(
"hydra_build_inputs_sent_bytes_total",
"Total number of bytes copied to workers as build inputs",
)
bytes_received = Counter(
"hydra_build_outputs_received_bytes_total",
"Total number of bytes copied from workers as build outputs",
)
builds_read = Counter(
"hydra_builds_read_total",
"Total number of builds whose outputs have been copied from workers",
)
builds_read_seconds = Counter(
"hydra_builds_read_seconds_total",
"Total time spent copying build outputs, in seconds",
)
builds_done = Counter("hydra_builds_done_total", "Total number of builds completed")
steps_started = Counter("hydra_steps_started_total", "Total number of steps started")
steps_done = Counter("hydra_steps_done_total", "Total number of steps completed")
retries = Counter("hydra_retries_total", "Total number of retries")
max_retries = Gauge(
"hydra_max_retries", "Maximum observed number of retries for a single step"
)
queue_wakeups = Counter(
"hydra_queue_wakeup_total",
"Count of the times the queue runner has been notified of queue changes",
)
dispatcher_wakeups = Counter(
"hydra_dispatcher_wakeup_total",
"Count of the times the queue runner work dispatcher woke up due to new runnable builds and completed builds.",
)
dispatch_time = Counter(
"hydra_dispatch_execution_seconds_total",
"Total time the dispatcher has spent working, in seconds",
)
db_connections = Gauge("hydra_db_connections", "Number of connections to the database")
active_db_updates = Gauge("hydra_db_updates", "Number of in-progress database updates")
steps_queued = Gauge("hydra_steps_queued", "Number of steps in build queue")
steps_runnable = Gauge(
"hydra_steps_runnable", "Number of runnable steps in build queue"
)
step_time = Counter(
"hydra_step_time_total", "Total time spent executing steps, in seconds"
)
step_build_time = Counter(
"hydra_step_build_time_total", "Total time spent executing build steps, in seconds"
)
machine_enabled = Gauge("hydra_machine_enabled", "Whether machine is enabled")
machine_steps_done = Counter(
"hydra_machine_steps_done_total", "Total number of steps completed by this worker"
)
machine_current_jobs = Gauge(
"hydra_machine_current_jobs", "Number of jobs currently running on this worker"
)
machine_disabled_until = Gauge(
"hydra_machine_disabled_until",
"Timestamp of when this worker will next become active",
)
machine_last_failure = Gauge(
"hydra_machine_last_failure", "Timestamp of when a build last failed on this worker"
)
machine_consecutive_failures = Gauge(
"hydra_machine_consecutive_failures",
"Number of consecutive failed builds on this worker",
)
machine_idle_since = Gauge(
"hydra_machine_idle_since", "Timestamp of when this worker last had jobs running"
)
machine_step_time = Counter(
"hydra_machine_step_time_total",
"Total time this worker spent executing steps, in seconds",
)
machine_step_build_time = Counter(
"hydra_machine_step_build_time_total",
"Total time this worker spent executing build steps, in seconds",
)
jobset_time = Counter(
"hydra_jobset_seconds_total",
"Total time this jobset has been building for, in seconds",
)
jobset_shares_used = Gauge(
"hydra_jobset_shares_used", "Number of shares currently consumed by this jobset"
)
machine_type_runnable = Gauge(
"hydra_machine_type_runnable",
"Number of steps currently runnable on this machine type",
)
machine_type_running = Gauge(
"hydra_machine_type_running",
"Number of steps currently running on this machine type",
)
machine_type_wait_time = Counter(
"hydra_machine_type_wait_time_total",
"Total time spent waiting for a build slot of this machine type",
)
machine_type_last_active = Gauge(
"hydra_machine_type_last_active",
"Timestamp of when a machine of this type was last active",
)
store_nar_info_read = Counter(
"hydra_store_nar_info_read_total",
"Total number of narinfo files read from the remote store",
)
store_nar_info_read_averted = Counter(
"hydra_store_nar_info_read_averted_total",
"Total number of narinfo file reads averted (already loaded)",
)
store_nar_info_missing = Counter(
"hydra_store_nar_info_missing_total",
"Total number of narinfo files found to be missing",
)
store_nar_info_write = Counter(
"hydra_store_nar_info_write_total",
"Total number of narinfo files written to the remote store",
)
store_nar_info_cache_size = Gauge(
"hydra_store_nar_info_cache_size",
"Size of the in-memory store path information cache",
)
store_nar_read = Counter(
"hydra_store_nar_read_total", "Total number of NAR files read from the remote store"
)
store_nar_read_bytes = Counter(
"hydra_store_nar_read_bytes_total",
"Total number of NAR file bytes read from the remote store (uncompressed)",
)
store_nar_read_compressed_bytes = Counter(
"hydra_store_nar_read_compressed_bytes_total",
"Total number of NAR file bytes read from the remote store (compressed)",
)
store_nar_write = Counter(
"hydra_store_nar_write_total",
"Total number of NAR files written to the remote store",
)
store_nar_write_averted = Counter(
"hydra_store_nar_write_averted_total",
"Total number of NAR file writes averted (already exists on remote)",
)
store_nar_write_bytes = Counter(
"hydra_store_nar_write_bytes_total",
"Total number of NAR file bytes written to the remote store (uncompressed)",
)
store_nar_write_compressed_bytes = Counter(
"hydra_store_nar_write_compressed_bytes_total",
"Total number of NAR file bytes written to the remote store (compressed)",
)
store_nar_write_compression_seconds = Counter(
"hydra_store_nar_write_compression_seconds_total",
"Total time spent compressing NAR files for writing to the remote store",
)
store_s3_put = Counter(
"hydra_store_s3_put_total", "Total number of PUT requests to S3 store"
)
store_s3_put_bytes = Counter(
"hydra_store_s3_put_bytes_total", "Total number of bytes written to S3 store"
)
store_s3_put_seconds = Counter(
"hydra_store_s3_put_seconds_total",
"Total time spent writing to S3 store, in seconds",
)
store_s3_get = Counter(
"hydra_store_s3_get_total", "Total number of GET requests to S3 store"
)
store_s3_get_bytes = Counter(
"hydra_store_s3_get_bytes_total", "Total number of bytes read from S3 store"
)
store_s3_get_seconds = Counter(
"hydra_store_s3_get_seconds_total",
"Total time spent reading from S3 store, in seconds",
)
store_s3_head = Counter(
"hydra_store_s3_head_total", "Total number of HEAD requests to S3 store"
)
def update_metrics(status):
up.set({}, int(status["status"] == "up"))
time.set({}, status["time"])
uptime.set({}, status["uptime"])
builds_queued.set({}, status["nrQueuedBuilds"])
steps_active.set({}, status["nrActiveSteps"])
steps_building.set({}, status["nrStepsBuilding"])
steps_copying_to.set({}, status["nrStepsCopyingTo"])
steps_copying_from.set({}, status["nrStepsCopyingFrom"])
steps_waiting.set({}, status["nrStepsWaiting"])
steps_unsupported.set({}, status["nrUnsupportedSteps"])
bytes_sent.set({}, status["bytesSent"])
bytes_received.set({}, status["bytesReceived"])
builds_read.set({}, status["nrBuildsRead"])
builds_read_seconds.set({}, status["buildReadTimeMs"] / 1000)
builds_done.set({}, status["nrBuildsDone"])
steps_started.set({}, status["nrStepsStarted"])
steps_done.set({}, status["nrStepsDone"])
retries.set({}, status["nrRetries"])
max_retries.set({}, status["maxNrRetries"])
queue_wakeups.set({}, status["nrQueueWakeups"])
dispatcher_wakeups.set({}, status["nrDispatcherWakeups"])
dispatch_time.set({}, status["dispatchTimeMs"] / 1000)
db_connections.set({}, status["nrDbConnections"])
active_db_updates.set({}, status["nrActiveDbUpdates"])
steps_queued.set({}, status["nrUnfinishedSteps"])
steps_runnable.set({}, status["nrRunnableSteps"])
if st := status.get("totalStepTime"):
step_time.set({}, st)
if sbt := status.get("totalStepBuildTime"):
step_build_time.set({}, sbt)
for machine_name, machine_status in status["machines"].items():
labels = {"host": machine_name}
machine_enabled.set(labels, int(machine_status["enabled"]))
machine_steps_done.set(labels, machine_status["nrStepsDone"])
machine_current_jobs.set(labels, machine_status["currentJobs"])
machine_disabled_until.set(labels, machine_status["disabledUntil"])
machine_last_failure.set(labels, machine_status["lastFailure"])
machine_consecutive_failures.set(labels, machine_status["consecutiveFailures"])
if isn := machine_status.get("idleSince"):
machine_idle_since.set(labels, isn)
if st := machine_status.get("totalStepTime"):
machine_step_time.set(labels, st)
if sbt := machine_status.get("totalStepBuildTime"):
machine_step_build_time.set(labels, sbt)
for jobset_name, jobset_status in status["jobsets"].items():
labels = {"name": jobset_name}
jobset_time.set(labels, jobset_status["seconds"])
jobset_shares_used.set(labels, jobset_status["shareUsed"])
for type_name, type_status in status["machineTypes"].items():
labels = {"machineType": type_name}
machine_type_runnable.set(labels, type_status["runnable"])
machine_type_running.set(labels, type_status["running"])
if wt := type_status.get("waitTime"):
machine_type_wait_time.set(labels, wt)
if la := type_status.get("lastActive"):
machine_type_last_active.set(labels, la)
store = status["store"]
store_nar_info_read.set({}, store["narInfoRead"])
store_nar_info_read_averted.set({}, store["narInfoReadAverted"])
store_nar_info_missing.set({}, store["narInfoMissing"])
store_nar_info_write.set({}, store["narInfoWrite"])
store_nar_info_cache_size.set({}, store["narInfoCacheSize"])
store_nar_read.set({}, store["narRead"])
store_nar_read_bytes.set({}, store["narReadBytes"])
store_nar_read_compressed_bytes.set({}, store["narReadCompressedBytes"])
store_nar_write.set({}, store["narWrite"])
store_nar_write_averted.set({}, store["narWriteAverted"])
store_nar_write_bytes.set({}, store["narWriteBytes"])
store_nar_write_compressed_bytes.set({}, store["narWriteCompressedBytes"])
store_nar_write_compression_seconds.set(
{}, store["narWriteCompressionTimeMs"] / 1000
)
if s3 := status.get("s3"):
store_s3_put.set({}, s3["put"])
store_s3_put_bytes.set({}, s3["putBytes"])
store_s3_put_seconds.set({}, s3["putTimeMs"] / 1000)
store_s3_get.set({}, s3["get"])
store_s3_get_bytes.set({}, s3["getBytes"])
store_s3_get_seconds.set({}, s3["getTimeMs"] / 1000)
store_s3_head.set({}, s3["head"])
async def update_metrics_loop(hydra_url, scrape_interval):
async with httpx.AsyncClient(base_url=hydra_url) as client:
while True:
try:
response = await client.get(
"/queue-runner-status",
headers={"Content-Type": "application/json"},
)
update_metrics(response.json())
await asyncio.sleep(scrape_interval)
except Exception as ex:
logging.exception("Failed to update metrics", exc_info=ex)
@click.command()
@click.option("--hydra-url", default="https://hydra.forkos.org/")
@click.option("--port", default=9200)
@click.option("--scrape-interval", default=15)
def main(hydra_url, port, scrape_interval):
@asynccontextmanager
async def lifespan(_):
loop = asyncio.get_event_loop()
loop.create_task(update_metrics_loop(hydra_url, scrape_interval))
yield
app = Starlette(routes=[Route("/metrics", metrics)], lifespan=lifespan)
uvicorn.run(app, port=port, log_level="info")
if __name__ == "__main__":
main()