gerrit-linkbot/gerrit_linkbot/gerrit.py
2024-08-06 23:54:15 -07:00

180 lines
5.6 KiB
Python

import dataclasses
from importlib.resources.abc import Traversable
import json
from typing import Callable, Literal
import logging
import abc
from datetime import datetime
from .constants import GERRIT_BASE
from . import http
log = logging.getLogger(__name__)
@dataclasses.dataclass
class Change:
"""Change metadata acquired from the changes query endpoint."""
subject: str
project: str
id: str
number: int
updated: datetime
commit_msg: str
@classmethod
def parse(cls, obj: dict) -> 'Change':
commit = obj['current_revision']
rev_obj = obj['revisions'][commit]
commit_msg = rev_obj['commit']['message']
return cls(subject=obj['subject'],
id=obj['id'],
project=obj['project'],
number=obj['_number'],
commit_msg=commit_msg,
updated=datetime.fromisoformat(obj['updated']))
@dataclasses.dataclass
class Comment:
"""One comment from the change comments endpoint."""
message: str
updated: datetime
@classmethod
def parse(cls, obj) -> 'Comment':
return Comment(message=obj['message'],
updated=datetime.fromisoformat(obj['updated']))
@dataclasses.dataclass
class MessageIsh:
"""
Something kind of like a comment or a commit message. Sufficiently vague
that we can treat both the same.
"""
change_title: str
project: str
message: str
backlink: str
cl_number: int
kind: Literal['comment'] | Literal['commit message']
class API(metaclass=abc.ABCMeta):
"""A Gerrit API, as used by gerrit-linkbot."""
def __init__(self, base: str):
self.base = base
@abc.abstractmethod
def changes(self, per_page=25) -> list[Change]:
pass
@abc.abstractmethod
def change_comments(self, change_id: str) -> dict[str, list[Comment]]:
pass
def change_link(self, change_id: str) -> str:
"""Assumes that change_id is of a format project~id"""
[project, id] = change_id.split('~')
return self.base + f'/c/{project}/+/{id}'
class ConcreteAPI(API):
"""An actual non-fake Gerrit API."""
def __init__(self, base=GERRIT_BASE):
super().__init__(base)
def get(self, endpoint: str, **kwargs):
log.info('GET %s params=%s', endpoint, kwargs.get('params'))
resp = http.session.get(self.base + endpoint,
headers={'Accept': 'application/json'},
**kwargs)
resp.raise_for_status()
return json.loads(resp.content.removeprefix(b")]}'"))
def changes(self, per_page=25) -> list[Change]:
resp = self.get('/changes/',
params={
'n': per_page,
'o': ['CURRENT_REVISION', 'CURRENT_COMMIT']
})
return [Change.parse(c) for c in resp]
def change_comments(self, change_id: str) -> dict[str, list[Comment]]:
resp = self.get(f'/changes/{change_id}/comments')
return {
filename: [Comment.parse(obj) for obj in comments]
for (filename, comments) in resp.items()
}
class FakeAPI(API):
"""Fake Gerrit API that returns fake data."""
def __init__(self, fixtures: Traversable):
super().__init__(base=GERRIT_BASE)
self.fixtures = fixtures
def fixture(self, name: str):
return json.loads(self.fixtures.joinpath(name).read_text())
def changes(self, per_page=25) -> list[Change]:
return [Change.parse(c) for c in self.fixture('changes.json')]
def change_comments(self, change_id: str) -> dict[str, list[Comment]]:
return {
filename: [Comment.parse(obj) for obj in comments]
for filename, comments in self.fixture(
f'change-comments-{change_id}.json').items()
}
class GerritWatcher:
"""Watches Gerrit changes and calls a callback on commit messages and all comments."""
HandleComment = Callable[[MessageIsh], None]
def __init__(self, api: API, callback: HandleComment, per_page=25):
self.api = api
self.per_page = per_page
self.already_processed = datetime(1970, 1, 1)
self.callback = callback
def handle_change(self, change: Change):
self.callback(
MessageIsh(change_title=change.subject,
project=change.project,
message=change.commit_msg,
backlink=self.api.change_link(change.id),
cl_number=change.number,
kind='commit message'))
comments = self.api.change_comments(change.id)
for _, filecomments in comments.items():
for comment in filecomments:
self.callback(
MessageIsh(change_title=change.subject,
project=change.project,
message=comment.message,
backlink=self.api.change_link(change.id),
cl_number=change.number,
kind='comment'))
def poll(self):
new_already_processed = self.already_processed
# ordered by time descending
for change in self.api.changes(per_page=self.per_page):
new_already_processed = max(change.updated, new_already_processed)
# got past the ones we have already seen, we're done
if change.updated <= self.already_processed:
break
self.handle_change(change)
self.already_processed = new_already_processed