split mass-rebuild and mass-rebuild-filter

This commit is contained in:
Graham Christensen 2017-10-31 04:09:41 -04:00
parent 544a32776e
commit 2395c403d6
No known key found for this signature in database
GPG key ID: ACA1C1D120C83D5C
3 changed files with 170 additions and 71 deletions

View file

@ -13,8 +13,7 @@ $channel->exchange_declare('build-jobs', 'fanout', false, true, false);
list($queueName, , ) = $channel->queue_declare('build-inputs', list($queueName, , ) = $channel->queue_declare('build-inputs',
false, true, false, false); false, true, false, false);
$channel->queue_bind($queueName, 'nixos/nixpkgs'); $channel->queue_bind($queueName, 'github-events', 'issue_comment.nixos/nixpkgs');
$channel->queue_bind($queueName, 'grahamc/elm-stuff');
function runner($msg) { function runner($msg) {
$in = json_decode($msg->body); $in = json_decode($msg->body);

130
mass-rebuild-filter.php Normal file
View file

@ -0,0 +1,130 @@
<?php
require __DIR__ . '/config.php';
use PhpAmqpLib\Message\AMQPMessage;
# define('AMQP_DEBUG', true);
$connection = rabbitmq_conn();
$channel = $connection->channel();
$channel->basic_qos(null, 1, true);
$channel->queue_declare('mass-rebuild-check-jobs',
false, true, false, false);
list($queueName, , ) = $channel->queue_declare('mass-rebuild-check-inputs',
false, true, false, false);
$channel->queue_bind($queueName, 'github-events', 'pull_request.nixos/nixpkgs');
echo "hi\n";
function outrunner($msg) {
try {
runner($msg);
} catch (\GHE\ExecException $e) {
var_dump($msg);
var_dump($e->getMessage());
var_dump($e->getCode());
var_dump($e->getOutput());
} catch (\PhpAmqpLib\Exception\AMQPProtocolChannelException $e) {
echo "Channel exception:\n";
var_dump($e);
}
}
function runner($msg) {
$in = json_decode($msg->body);
try {
$etype = \GHE\EventClassifier::classifyEvent($in);
if ($etype != "pull_request") {
echo "Skipping event type: $etype\n";
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
return true;
}
} catch (\GHE\EventClassifierUnknownException $e) {
echo "Skipping unknown event type\n";
print_r($in);
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
return true;
}
if (!\GHE\ACL::isRepoEligible($in->repository->full_name)) {
echo "Repo not authorized (" . $in->repository->full_name . ")\n";
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
return true;
}
if ($in->pull_request->state != "open") {
echo "PR isn't open in the event\n";
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
return true;
}
$client = gh_client();
$status = $client->api('pull_request')->show(
$in->repository->owner->login,
$in->repository->name,
$in->number);
if ($status['mergeable'] === false) {
echo "github says the PR isn't able to be merged\n";
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
return true;
}
if ($status['state'] !== 'open') {
echo "github says the PR isn't open\n";
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
return true;
}
$ok_events = [
'opened',
'created',
'edited',
'synchronize',
'reopened',
];
if (!in_array($in->action, $ok_events)) {
echo "Uninteresting event " . $in->action . "\n";
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
return true;
}
$forward = [
'original_payload' => $in,
'repo' => [
'owner' => $in->repository->owner->login,
'name' => $in->repository->name,
'full_name' => $in->repository->full_name,
'clone_url' => $in->repository->clone_url,
],
'pr' => [
'number' => $in->number,
'target_branch' => $in->pull_request->base->ref,
'patch_url' => $in->pull_request->patch_url,
],
];
echo "forwarding to mass-rebuild-check-jobs :)\n";
$message = new AMQPMessage(json_encode($forward),
array(
'content_type' => 'application/json',
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
));
$msg->delivery_info['channel']->basic_publish($message, '', 'mass-rebuild-check-jobs');
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
return true;
}
$consumerTag = 'massrebuildcheckfilter' . getmypid();
$channel->basic_consume($queueName, $consumerTag, false, false, false, false, 'outrunner');
while(count($channel->callbacks)) {
$channel->wait();
}
echo "Bye\n";

View file

@ -10,81 +10,50 @@ $channel = $connection->channel();
$channel->basic_qos(null, 1, true); $channel->basic_qos(null, 1, true);
list($queueName, , ) = $channel->queue_declare('mass-rebuild-checks', list($queueName, , ) = $channel->queue_declare('mass-rebuild-check-jobs',
false, true, false, false); false, true, false, false);
$channel->queue_bind($queueName, 'nixos/nixpkgs');
echo "hi\n"; echo "hi\n";
function outrunner($msg) { function outrunner($msg) {
try { try {
$ret = runner($msg); runner($msg);
var_dump($ret);
if ($ret === true) {
echo "cool\n";
echo "acking\n";
$r = $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
var_dump($r);
echo "acked\n";
} else {
echo "Not acking?\n";
}
} catch (\GHE\ExecException $e) { } catch (\GHE\ExecException $e) {
var_dump($msg); var_dump($msg);
var_dump($e->getMessage()); var_dump($e->getMessage());
var_dump($e->getCode()); var_dump($e->getCode());
var_dump($e->getOutput()); var_dump($e->getOutput());
} catch (\PhpAmqpLib\Exception\AMQPProtocolChannelException $e) {
echo "Channel exception:\n";
var_dump($e);
} }
} }
function runner($msg) { function runner($msg) {
$in = json_decode($msg->body); $in = json_decode($msg->body);
try { $client = gh_client();
$etype = \GHE\EventClassifier::classifyEvent($in); $status = $client->api('pull_request')->show(
$in->repo->owner,
if ($etype != "pull_request") { $in->repo->name,
echo "Skipping event type: $etype\n"; $in->pr->number);
if ($status['mergeable'] === false) {
echo "github says the PR isn't able to be merged\n";
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
return true; return true;
} }
} catch (\GHE\EventClassifierUnknownException $e) { if ($status['state'] !== 'open') {
echo "Skipping unknown event type\n"; echo "github says the PR isn't open\n";
print_r($in); $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
return true; return true;
} }
if (!\GHE\ACL::isRepoEligible($in->repository->full_name)) { $against_name = "origin/" . $in->pr->target_branch;
echo "Repo not authorized (" . $in->repository->full_name . ")\n";
return true;
}
if ($in->pull_request->state != "open") {
echo "PR isn't open\n";
return true;
}
$ok_events = [
'created',
'edited',
'synchronize',
];
if (!in_array($in->action, $ok_events)) {
echo "Uninteresting event " . $in->action . "\n";
return true;
}
$r = $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
var_dump($r);
echo "acked\n";
$against_name = "origin/" . $in->pull_request->base->ref;
echo "Building against $against_name\n"; echo "Building against $against_name\n";
$co = new GHE\Checkout(WORKING_DIR, "mr-est"); $co = new GHE\Checkout(WORKING_DIR, "mr-est");
$pname = $co->checkOutRef($in->repository->full_name, $pname = $co->checkOutRef($in->repo->full_name,
$in->repository->clone_url, $in->repo->clone_url,
$in->number, $in->pr->number,
$against_name $against_name
); );
@ -92,13 +61,14 @@ function runner($msg) {
echo " $against_name is $against[0]\n"; echo " $against_name is $against[0]\n";
try { try {
$co->applyPatches($pname, $in->pull_request->patch_url); $co->applyPatches($pname, $in->pr->patch_url);
} catch (GHE\ExecException $e) { } catch (GHE\ExecException $e) {
echo "Received ExecException applying patches, likely due to conflicts:\n"; echo "Received ExecException applying patches, likely due to conflicts:\n";
var_dump($e->getCode()); var_dump($e->getCode());
var_dump($e->getMessage()); var_dump($e->getMessage());
var_dump($e->getArgs()); var_dump($e->getArgs());
var_dump($e->getOutput()); var_dump($e->getOutput());
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
return false; return false;
} }
@ -106,19 +76,19 @@ function runner($msg) {
echo " currently at ${current[0]}\n"; echo " currently at ${current[0]}\n";
reply_to_issue($in, $against[0], $current[0]); reply_to_issue($in->repo, $in->pr, $against[0], $current[0]);
$msg->delivery_info['channel']->basic_cancel($msg->delivery_info['consumer_tag']); $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
return false; return true;
} }
function reply_to_issue($issue, $prev, $current) { function reply_to_issue($repo, $pr, $prev, $current) {
$client = gh_client(); $client = gh_client();
echo "current labels:\n"; echo "current labels:\n";
$already_there = $client->api('issue')->labels()->all( $already_there = $client->api('issue')->labels()->all(
$issue->repository->owner->login, $repo->owner,
$issue->repository->name, $repo->name,
$issue->number); $pr->number);
$already_there = array_map(function($val) { return $val['name']; }, $already_there); $already_there = array_map(function($val) { return $val['name']; }, $already_there);
var_dump($already_there); var_dump($already_there);
@ -175,15 +145,15 @@ function reply_to_issue($issue, $prev, $current) {
} }
$client->api('issue')->labels()->add( $client->api('issue')->labels()->add(
$issue->repository->owner->login, $repo->owner,
$issue->repository->name, $repo->name,
$issue->number, $pr->number,
$label); $label);
} }
} }
$consumerTag = 'consumer' . getmypid(); $consumerTag = 'consumer' . getmypid();
$channel->basic_consume($queueName, $consumerTag, false, true, false, false, 'outrunner'); $channel->basic_consume($queueName, $consumerTag, false, false, false, false, 'outrunner');
while(count($channel->callbacks)) { while(count($channel->callbacks)) {
$channel->wait(); $channel->wait();
} }