gerrit-linkbot/gerrit_linkbot/__init__.py
2024-08-06 23:54:15 -07:00

231 lines
7 KiB
Python

import requests
import re
import time
import textwrap
import json
import dataclasses
import logging
import importlib.resources
import importlib.resources.abc
from typing import Literal, Optional
from . import gerrit, forgejo
from .constants import FORGEJO_ACCOUNT, FORGEJO_SERVICE_ACCOUNT, FORGEJO_BASE
log = logging.getLogger(__name__)
log.setLevel(logging.INFO)
fmt = logging.Formatter('{asctime} {levelname} {name}: {message}',
datefmt='%b %d %H:%M:%S',
style='{')
if not any(isinstance(h, logging.StreamHandler) for h in log.handlers):
hand = logging.StreamHandler()
hand.setFormatter(fmt)
log.addHandler(hand)
testdata = importlib.resources.files('gerrit_linkbot.testdata')
@dataclasses.dataclass
class IssueMatch:
"""Result of applying a RegexRule."""
issue_num: int
project_name: Optional[str]
@dataclasses.dataclass
class RegexRule:
"""Rule applied on Gerrit CL comments and commit messages to extract issue references."""
regex: re.Pattern
capture_num: int
project_name_capture_num: Optional[int] = None
def apply(self, s: str) -> list[IssueMatch]:
return [
IssueMatch(
int(m.group(self.capture_num)),
m.group(self.project_name_capture_num)
if self.project_name_capture_num else None)
for m in self.regex.finditer(s)
]
RULES = [
# #123, fj#123
RegexRule(
re.compile(
r"(?:^|[\s\]\[(){},.;:!?])(fj|lix)?#(\d+)(?=$|[\s\]\[(){},.;:!?])"
), 2),
# URL to some issue
RegexRule(re.compile(
re.escape(FORGEJO_BASE) + "/" + re.escape(FORGEJO_ACCOUNT) +
r"/([a-zA-Z0-9-]+)/issues/([0-9]+)"),
2,
project_name_capture_num=1)
]
@dataclasses.dataclass
class CLMention:
"""One mention of a CL in a Forgejo comment, without mutable information about the CL itself."""
backlink: str
number: int
kind: Literal['comment'] | Literal['commit message']
@classmethod
def from_messageish(cls, message_ish: gerrit.MessageIsh) -> 'CLMention':
return cls(backlink=message_ish.backlink,
number=message_ish.cl_number,
kind=message_ish.kind)
@classmethod
def parse(cls, obj) -> 'CLMention':
return cls(backlink=obj['backlink'],
number=obj['number'],
kind=obj['kind'])
@dataclasses.dataclass
class CLMeta:
"""Mutable metadata on a Gerrit CL."""
change_title: str
@classmethod
def parse(cls, obj) -> 'CLMeta':
return cls(change_title=obj['change_title'])
@dataclasses.dataclass
class CommentMeta:
"""
The complete set of information that is put into a Forgejo comment.
"""
cls: list[CLMention]
"""CLs mentioned, with unchanging metadata"""
cl_meta: dict[int, CLMeta]
"""CL metadata that might change (e.g. title)"""
def serialize(self):
return {
'cls': [dataclasses.asdict(x) for x in self.cls],
'cl_meta': {
num: dataclasses.asdict(x)
for num, x in self.cl_meta.items()
}
}
@classmethod
def parse(cls, obj) -> 'CommentMeta':
return cls(cls=[CLMention.parse(x) for x in obj['cls']],
cl_meta={
int(num): CLMeta.parse(x)
for num, x in obj['cl_meta'].items()
})
def parse_comment(comment: str):
LEADER = '<!-- GERRIT_LINKBOT:'
for line in comment.splitlines():
if line.startswith(LEADER):
data = json.loads(line.removeprefix(LEADER))
return CommentMeta.parse(data)
def make_comment(meta: CommentMeta):
data_json = json.dumps(meta.serialize())
cls = '\n'.join('* {kind} in [cl/{cl}]({backlink}) ("{cl_desc}")'.format(
kind=cl.kind,
cl=cl.number,
backlink=cl.backlink,
cl_desc=meta.cl_meta[cl.number].change_title) for cl in meta.cls)
return textwrap.dedent("""\
<!-- GERRIT_LINKBOT: {data_json}
-->
This issue was mentioned on Gerrit on the following CLs:
{cls}
""").format(data_json=data_json, cls=cls)
class Service:
"""
The top level job in gerrit-linkbot: composes together the GerritWatcher
and taking actual actions with the Forgejo API.
"""
def __init__(self,
gerrit_api: gerrit.API,
forgejo_api: forgejo.API,
poll_interval=30,
gerrit_per_page=25):
self.gerrit_api = gerrit_api
self.forgejo_api = forgejo_api
self.watcher = gerrit.GerritWatcher(gerrit_api,
self.handle_comment,
per_page=gerrit_per_page)
self.poll_interval = poll_interval
def handle_hit(self, matched: IssueMatch, comment: gerrit.MessageIsh):
target = [
FORGEJO_ACCOUNT, matched.project_name or comment.project,
matched.issue_num
]
try:
comments = list(self.forgejo_api.all_comments(*target))
except requests.HTTPError as e:
if e.response.status_code in (404, 500):
# The issue doesn't exist, and it may be 500 because of a forgejo bug:
# https://codeberg.org/forgejo/forgejo/issues/4005
log.warn('Issue %s/%s#%d does not exist', *target)
return
raise
existing_comment = None
comment_meta = CommentMeta([], {})
for fj_comment in comments:
if fj_comment.author == FORGEJO_SERVICE_ACCOUNT:
data = parse_comment(fj_comment.text)
if data:
comment_meta = data
existing_comment = (fj_comment.id, fj_comment.text)
break
# construct the new comment on top of the existing one
comment_meta.cl_meta[comment.cl_number] = CLMeta(comment.change_title)
if not any(cl_mention.number == comment.cl_number
and cl_mention.kind == comment.kind
for cl_mention in comment_meta.cls):
comment_meta.cls.append(CLMention.from_messageish(comment))
text = make_comment(comment_meta)
if existing_comment:
# no change to the comment, no point posting it
if existing_comment[1] == text:
return
self.forgejo_api.edit_comment(
*target,
existing_comment[0], # type: ignore
text) # type: ignore
else:
self.forgejo_api.post_comment(*target, text) # type: ignore
def handle_comment(self, comment: gerrit.MessageIsh):
log.debug('Comment: %s', comment)
for rule in RULES:
for res in rule.apply(comment.message):
self.handle_hit(res, comment)
def step(self):
self.watcher.poll()
def run(self):
while True:
self.step()
time.sleep(self.poll_interval)