231 lines
7 KiB
Python
231 lines
7 KiB
Python
import requests
|
|
import re
|
|
import time
|
|
import textwrap
|
|
import json
|
|
import dataclasses
|
|
import logging
|
|
import importlib.resources
|
|
import importlib.resources.abc
|
|
from typing import Literal, Optional
|
|
from . import gerrit, forgejo
|
|
from .constants import FORGEJO_ACCOUNT, FORGEJO_SERVICE_ACCOUNT, FORGEJO_BASE
|
|
|
|
log = logging.getLogger(__name__)
|
|
log.setLevel(logging.INFO)
|
|
|
|
fmt = logging.Formatter('{asctime} {levelname} {name}: {message}',
|
|
datefmt='%b %d %H:%M:%S',
|
|
style='{')
|
|
|
|
if not any(isinstance(h, logging.StreamHandler) for h in log.handlers):
|
|
hand = logging.StreamHandler()
|
|
hand.setFormatter(fmt)
|
|
log.addHandler(hand)
|
|
|
|
testdata = importlib.resources.files('gerrit_linkbot.testdata')
|
|
|
|
|
|
@dataclasses.dataclass
|
|
class IssueMatch:
|
|
"""Result of applying a RegexRule."""
|
|
issue_num: int
|
|
project_name: Optional[str]
|
|
|
|
|
|
@dataclasses.dataclass
|
|
class RegexRule:
|
|
"""Rule applied on Gerrit CL comments and commit messages to extract issue references."""
|
|
regex: re.Pattern
|
|
capture_num: int
|
|
project_name_capture_num: Optional[int] = None
|
|
|
|
def apply(self, s: str) -> list[IssueMatch]:
|
|
return [
|
|
IssueMatch(
|
|
int(m.group(self.capture_num)),
|
|
m.group(self.project_name_capture_num)
|
|
if self.project_name_capture_num else None)
|
|
for m in self.regex.finditer(s)
|
|
]
|
|
|
|
|
|
RULES = [
|
|
# #123, fj#123
|
|
RegexRule(
|
|
re.compile(
|
|
r"(?:^|[\s\]\[(){},.;:!?])(fj|lix)?#(\d+)(?=$|[\s\]\[(){},.;:!?])"
|
|
), 2),
|
|
# URL to some issue
|
|
RegexRule(re.compile(
|
|
re.escape(FORGEJO_BASE) + "/" + re.escape(FORGEJO_ACCOUNT) +
|
|
r"/([a-zA-Z0-9-]+)/issues/([0-9]+)"),
|
|
2,
|
|
project_name_capture_num=1)
|
|
]
|
|
|
|
|
|
@dataclasses.dataclass
|
|
class CLMention:
|
|
"""One mention of a CL in a Forgejo comment, without mutable information about the CL itself."""
|
|
|
|
backlink: str
|
|
number: int
|
|
kind: Literal['comment'] | Literal['commit message']
|
|
|
|
@classmethod
|
|
def from_messageish(cls, message_ish: gerrit.MessageIsh) -> 'CLMention':
|
|
return cls(backlink=message_ish.backlink,
|
|
number=message_ish.cl_number,
|
|
kind=message_ish.kind)
|
|
|
|
@classmethod
|
|
def parse(cls, obj) -> 'CLMention':
|
|
return cls(backlink=obj['backlink'],
|
|
number=obj['number'],
|
|
kind=obj['kind'])
|
|
|
|
|
|
@dataclasses.dataclass
|
|
class CLMeta:
|
|
"""Mutable metadata on a Gerrit CL."""
|
|
change_title: str
|
|
|
|
@classmethod
|
|
def parse(cls, obj) -> 'CLMeta':
|
|
return cls(change_title=obj['change_title'])
|
|
|
|
|
|
@dataclasses.dataclass
|
|
class CommentMeta:
|
|
"""
|
|
The complete set of information that is put into a Forgejo comment.
|
|
"""
|
|
|
|
cls: list[CLMention]
|
|
"""CLs mentioned, with unchanging metadata"""
|
|
cl_meta: dict[int, CLMeta]
|
|
"""CL metadata that might change (e.g. title)"""
|
|
|
|
def serialize(self):
|
|
return {
|
|
'cls': [dataclasses.asdict(x) for x in self.cls],
|
|
'cl_meta': {
|
|
num: dataclasses.asdict(x)
|
|
for num, x in self.cl_meta.items()
|
|
}
|
|
}
|
|
|
|
@classmethod
|
|
def parse(cls, obj) -> 'CommentMeta':
|
|
return cls(cls=[CLMention.parse(x) for x in obj['cls']],
|
|
cl_meta={
|
|
int(num): CLMeta.parse(x)
|
|
for num, x in obj['cl_meta'].items()
|
|
})
|
|
|
|
|
|
def parse_comment(comment: str):
|
|
LEADER = '<!-- GERRIT_LINKBOT:'
|
|
|
|
for line in comment.splitlines():
|
|
if line.startswith(LEADER):
|
|
data = json.loads(line.removeprefix(LEADER))
|
|
return CommentMeta.parse(data)
|
|
|
|
|
|
def make_comment(meta: CommentMeta):
|
|
data_json = json.dumps(meta.serialize())
|
|
cls = '\n'.join('* {kind} in [cl/{cl}]({backlink}) ("{cl_desc}")'.format(
|
|
kind=cl.kind,
|
|
cl=cl.number,
|
|
backlink=cl.backlink,
|
|
cl_desc=meta.cl_meta[cl.number].change_title) for cl in meta.cls)
|
|
|
|
return textwrap.dedent("""\
|
|
<!-- GERRIT_LINKBOT: {data_json}
|
|
-->
|
|
|
|
This issue was mentioned on Gerrit on the following CLs:
|
|
{cls}
|
|
""").format(data_json=data_json, cls=cls)
|
|
|
|
|
|
class Service:
|
|
"""
|
|
The top level job in gerrit-linkbot: composes together the GerritWatcher
|
|
and taking actual actions with the Forgejo API.
|
|
"""
|
|
|
|
def __init__(self,
|
|
gerrit_api: gerrit.API,
|
|
forgejo_api: forgejo.API,
|
|
poll_interval=30,
|
|
gerrit_per_page=25):
|
|
self.gerrit_api = gerrit_api
|
|
self.forgejo_api = forgejo_api
|
|
self.watcher = gerrit.GerritWatcher(gerrit_api,
|
|
self.handle_comment,
|
|
per_page=gerrit_per_page)
|
|
self.poll_interval = poll_interval
|
|
|
|
def handle_hit(self, matched: IssueMatch, comment: gerrit.MessageIsh):
|
|
target = [
|
|
FORGEJO_ACCOUNT, matched.project_name or comment.project,
|
|
matched.issue_num
|
|
]
|
|
try:
|
|
comments = list(self.forgejo_api.all_comments(*target))
|
|
except requests.HTTPError as e:
|
|
if e.response.status_code in (404, 500):
|
|
# The issue doesn't exist, and it may be 500 because of a forgejo bug:
|
|
# https://codeberg.org/forgejo/forgejo/issues/4005
|
|
log.warn('Issue %s/%s#%d does not exist', *target)
|
|
return
|
|
raise
|
|
|
|
existing_comment = None
|
|
comment_meta = CommentMeta([], {})
|
|
for fj_comment in comments:
|
|
if fj_comment.author == FORGEJO_SERVICE_ACCOUNT:
|
|
data = parse_comment(fj_comment.text)
|
|
if data:
|
|
comment_meta = data
|
|
existing_comment = (fj_comment.id, fj_comment.text)
|
|
break
|
|
|
|
# construct the new comment on top of the existing one
|
|
comment_meta.cl_meta[comment.cl_number] = CLMeta(comment.change_title)
|
|
if not any(cl_mention.number == comment.cl_number
|
|
and cl_mention.kind == comment.kind
|
|
for cl_mention in comment_meta.cls):
|
|
comment_meta.cls.append(CLMention.from_messageish(comment))
|
|
|
|
text = make_comment(comment_meta)
|
|
|
|
if existing_comment:
|
|
# no change to the comment, no point posting it
|
|
if existing_comment[1] == text:
|
|
return
|
|
|
|
self.forgejo_api.edit_comment(
|
|
*target,
|
|
existing_comment[0], # type: ignore
|
|
text) # type: ignore
|
|
else:
|
|
self.forgejo_api.post_comment(*target, text) # type: ignore
|
|
|
|
def handle_comment(self, comment: gerrit.MessageIsh):
|
|
log.debug('Comment: %s', comment)
|
|
for rule in RULES:
|
|
for res in rule.apply(comment.message):
|
|
self.handle_hit(res, comment)
|
|
|
|
def step(self):
|
|
self.watcher.poll()
|
|
|
|
def run(self):
|
|
while True:
|
|
self.step()
|
|
time.sleep(self.poll_interval)
|