1
0
Fork 0
mirror of synced 2024-06-18 18:54:55 +12:00

Start work on porting webhooks

This commit is contained in:
Bradley Schofield 2022-12-20 11:07:49 +00:00
parent 90e93f3255
commit a09e23d0b9
12 changed files with 158 additions and 89 deletions

View file

@ -3,6 +3,7 @@
require_once __DIR__ . '/init.php';
require_once __DIR__ . '/controllers/general.php';
use Appwrite\Event\Delete;
use Appwrite\Event\Func;
use Appwrite\Platform\Appwrite;
use Utopia\CLI\CLI;
@ -17,6 +18,7 @@ use Utopia\Database\Database;
use Utopia\Database\Document;
use Utopia\Logger\Log;
use Utopia\Pools\Group;
use Utopia\Queue\Connection;
use Utopia\Registry\Registry;
Authorization::disable();
@ -140,10 +142,18 @@ CLI::setResource('influxdb', function (Registry $register) {
return $database;
}, ['register']);
CLI::setResource('queueForFunctions', function (Group $pools) {
return new Func($pools->get('queue')->pop()->getResource());
CLI::setResource('queue', function (Group $pools) {
return $pools->get('queue')->pop()->getResource();
}, ['pools']);
CLI::setResource('deletes', function (Connection $queue) {
return new Delete($queue);
}, ['queue']);
CLI::setResource('queueForFunctions', function (Connection $queue) {
return new Func($queue);
}, ['queue']);
CLI::setResource('logError', function (Registry $register) {
return function (Throwable $error, string $namespace, string $action) use ($register) {
$logger = $register->get('logger');

View file

@ -848,15 +848,19 @@ App::setResource('register', fn() => $register);
App::setResource('locale', fn() => new Locale(App::getEnv('_APP_LOCALE', 'en')));
// Queues
App::setResource('events', fn() => new Event('', ''));
App::setResource('audits', fn() => new Audit());
App::setResource('mails', fn() => new Mail());
App::setResource('deletes', fn() => new Delete());
App::setResource('deletes', function (Connection $queue) {
return new Delete($queue);
}, ['queue']);
App::setResource('database', fn() => new EventDatabase());
App::setResource('messaging', fn() => new Phone());
App::setResource('queue', function (Group $pools) {
return $pools->get('queue')->pop()->getResource();
}, ['pools']);
App::setResource('events', function (Connection $queue) {
return new Event('', '', $queue);
}, ['queue']);
App::setResource('queueForFunctions', function (Connection $queue) {
return new Func($queue);
}, ['queue']);

View file

@ -2,6 +2,7 @@
require_once __DIR__ . '/init.php';
use Appwrite\Event\Event;
use Appwrite\Event\Func;
use Swoole\Runtime;
use Utopia\App;
@ -85,6 +86,18 @@ Server::setResource('queueForFunctions', function (Registry $register) {
);
}, ['register']);
Server::setResource('events', function (Registry $register) {
$pools = $register->get('pools');
return new Event(
'',
'',
$pools
->get('queue')
->pop()
->getResource()
);
}, ['register']);
Server::setResource('logger', function ($register) {
return $register->get('logger');
}, ['register']);

View file

@ -121,7 +121,10 @@ class BuildsV1 extends Worker
/** Trigger Webhook */
$deploymentModel = new Deployment();
$deploymentUpdate = new Event(Event::WEBHOOK_QUEUE_NAME, Event::WEBHOOK_CLASS_NAME);
$pools = $register->get('pools');
$connection = $pools->get('queue')->pop()->getResource();
$deploymentUpdate = new Event(Event::WEBHOOK_QUEUE_NAME, Event::WEBHOOK_CLASS_NAME, $connection);
$deploymentUpdate
->setProject($project)
->setEvent('functions.[functionId].deployments.[deploymentId].update')

View file

@ -20,6 +20,7 @@ use Utopia\Database\Permission;
use Utopia\Database\Query;
use Utopia\Database\Role;
use Utopia\Database\Validator\Authorization;
use Utopia\Queue\Connection;
use Utopia\Queue\Server;
Authorization::disable();
@ -39,6 +40,7 @@ Server::setResource('execute', function () {
string $event = null,
string $eventData = null,
string $executionId = null,
Connection $queueConnection,
) {
$user ??= new Document();
$functionId = $function->getId();
@ -161,9 +163,11 @@ Server::setResource('execute', function () {
$execution = $dbForProject->updateDocument('executions', $executionId, $execution);
$connection = $
/** Trigger Webhook */
$executionModel = new Execution();
$executionUpdate = new Event(Event::WEBHOOK_QUEUE_NAME, Event::WEBHOOK_CLASS_NAME);
$executionUpdate = new Event(Event::WEBHOOK_QUEUE_NAME, Event::WEBHOOK_CLASS_NAME, $queueConnection);
$executionUpdate
->setProject($project)
->setUser($user)
@ -225,8 +229,9 @@ $server->job()
->inject('dbForProject')
->inject('queueForFunctions')
->inject('statsd')
->inject('queue')
->inject('execute')
->action(function (Message $message, Database $dbForProject, Func $queueForFunctions, Client $statsd, callable $execute) {
->action(function (Message $message, Database $dbForProject, Func $queueForFunctions, Client $statsd, Connection $queue, callable $execute) {
$payload = $message->getPayload() ?? [];
if (empty($payload)) {
@ -280,7 +285,8 @@ $server->job()
user: $user,
data: null,
executionId: null,
jwt: null
jwt: null,
queueConnection: $queue
);
Console::success('Triggered function: ' . $events[0]);
}

View file

@ -1,48 +1,23 @@
<?php
require_once __DIR__ . '/../worker.php';
use Appwrite\Resque\Worker;
use Utopia\App;
use Utopia\CLI\Console;
use Utopia\Database\Document;
use Utopia\Database\Validator\Authorization;
use Utopia\Queue\Message;
use Utopia\Queue\Server;
require_once __DIR__ . '/../init.php';
Authorization::disable();
Authorization::setDefaultStatus(false);
Console::title('Webhooks V1 Worker');
Console::success(APP_NAME . ' webhooks worker v1 has started');
global $errors;
$errors = [];
class WebhooksV1 extends Worker
{
protected array $errors = [];
public function getName(): string
{
return "webhooks";
}
public function init(): void
{
}
public function run(): void
{
$events = $this->args['events'];
$payload = json_encode($this->args['payload']);
$project = new Document($this->args['project']);
$user = new Document($this->args['user'] ?? []);
foreach ($project->getAttribute('webhooks', []) as $webhook) {
if (array_intersect($webhook->getAttribute('events', []), $events)) {
$this->execute($events, $payload, $webhook, $user, $project);
}
}
if (!empty($this->errors)) {
throw new Exception(\implode(" / \n\n", $this->errors));
}
}
protected function execute(array $events, string $payload, Document $webhook, Document $user, Document $project): void
{
Server::setResource('execute', function () {
return function (array $events, string $payload, Document $webhook, Document $user, Document $project): void {
$url = \rawurldecode($webhook->getAttribute('url'));
$signatureKey = $webhook->getAttribute('signatureKey');
$signature = base64_encode(hash_hmac('sha1', $url . $payload, $signatureKey, true));
@ -85,14 +60,40 @@ class WebhooksV1 extends Worker
}
if (false === \curl_exec($ch)) {
$this->errors[] = \curl_error($ch) . ' in events ' . implode(', ', $events) . ' for webhook ' . $webhook->getAttribute('name');
$errors[] = \curl_error($ch) . ' in events ' . implode(', ', $events) . ' for webhook ' . $webhook->getAttribute('name');
}
\curl_close($ch);
}
};
});
public function shutdown(): void
{
$this->errors = [];
}
}
$server->job()
->inject('message')
->inject('execute')
->action(function (Message $message, callable $execute) {
$payload = $message->getPayload() ?? [];
if (empty($payload)) {
throw new Exception('Missing payload');
}
$events = $payload['events'];
$webhookPayload = json_encode($payload['payload']);
$project = new Document($payload['project']);
$user = new Document($payload['user'] ?? []);
foreach ($project->getAttribute('webhooks', []) as $webhook) {
if (array_intersect($webhook->getAttribute('events', []), $events)) {
$execute($events, $webhookPayload, $webhook, $user, $project);
}
}
if (!empty($errors)) {
throw new Exception(\implode(" / \n\n", $errors));
}
});
$server->workerStart();
$server->start();

View file

@ -580,6 +580,7 @@ services:
- _APP_MAINTENANCE_RETENTION_AUDIT
- _APP_MAINTENANCE_RETENTION_USAGE_HOURLY
- _APP_MAINTENANCE_RETENTION_SCHEDULES
- _APP_CONNECTIONS_QUEUE
appwrite-usage:
entrypoint: usage

View file

@ -4,6 +4,7 @@ namespace Appwrite\Event;
use Resque;
use Utopia\Database\Document;
use Utopia\Queue\Connection;
class Delete extends Event
{
@ -14,9 +15,9 @@ class Delete extends Event
protected ?string $hourlyUsageRetentionDatetime = null;
public function __construct()
public function __construct(protected Connection $connection)
{
parent::__construct(Event::DELETE_QUEUE_NAME, Event::DELETE_CLASS_NAME);
parent::__construct(Event::DELETE_QUEUE_NAME, Event::DELETE_CLASS_NAME, $connection);
}
/**

View file

@ -3,8 +3,9 @@
namespace Appwrite\Event;
use InvalidArgumentException;
use Resque;
use Utopia\Database\Document;
use Utopia\Queue\Client;
use Utopia\Queue\Connection;
class Event
{
@ -43,16 +44,18 @@ class Event
protected array $context = [];
protected ?Document $project = null;
protected ?Document $user = null;
protected Connection $connection;
/**
* @param string $queue
* @param string $class
* @return void
*/
public function __construct(string $queue, string $class)
public function __construct(string $queue, string $class, Connection $connection)
{
$this->queue = $queue;
$this->class = $class;
$this->connection = $connection;
}
/**
@ -260,7 +263,11 @@ class Event
*/
public function trigger(): string|bool
{
return Resque::enqueue($this->queue, $this->class, [
$client = new Client($this->queue, $this->connection);
$events = $this->getEvent() ? Event::generateEvents($this->getEvent(), $this->getParams()) : null;
return $client->enqueue([
'project' => $this->project,
'user' => $this->user,
'payload' => $this->payload,
@ -437,9 +444,9 @@ class Event
if ($subCurrent === $current || $subCurrent === $key) {
continue;
}
$filtered1 = \array_filter($paramKeys, fn(string $k) => $k === $subCurrent);
$filtered1 = \array_filter($paramKeys, fn (string $k) => $k === $subCurrent);
$events[] = \str_replace($paramKeys, $paramValues, \str_replace($filtered1, '*', $eventPattern));
$filtered2 = \array_filter($paramKeys, fn(string $k) => $k === $current);
$filtered2 = \array_filter($paramKeys, fn (string $k) => $k === $current);
$events[] = \str_replace($paramKeys, $paramValues, \str_replace($filtered2, '*', \str_replace($filtered1, '*', $eventPattern)));
$events[] = \str_replace($paramKeys, $paramValues, \str_replace($filtered2, '*', $eventPattern));
}
@ -447,7 +454,7 @@ class Event
if ($current === $key) {
continue;
}
$filtered = \array_filter($paramKeys, fn(string $k) => $k === $current);
$filtered = \array_filter($paramKeys, fn (string $k) => $k === $current);
$events[] = \str_replace($paramKeys, $paramValues, \str_replace($filtered, '*', $eventPattern));
}
}

View file

@ -16,7 +16,7 @@ class Func extends Event
public function __construct(protected Connection $connection)
{
parent::__construct(Event::FUNCTIONS_QUEUE_NAME, Event::FUNCTIONS_CLASS_NAME);
parent::__construct(Event::FUNCTIONS_QUEUE_NAME, Event::FUNCTIONS_CLASS_NAME, $connection);
}
/**

View file

@ -5,6 +5,7 @@ namespace Appwrite\Platform\Tasks;
use Appwrite\Auth\Auth;
use Appwrite\Event\Certificate;
use Appwrite\Event\Delete;
use Appwrite\Event\Func;
use Utopia\App;
use Utopia\CLI\Console;
use Utopia\Database\Database;
@ -12,6 +13,8 @@ use Utopia\Database\Document;
use Utopia\Database\DateTime;
use Utopia\Database\Query;
use Utopia\Platform\Action;
use Utopia\Queue\Connection;
use Utopia\Registry\Registry;
class Maintenance extends Action
{
@ -25,57 +28,58 @@ class Maintenance extends Action
$this
->desc('Schedules maintenance tasks and publishes them to resque')
->inject('dbForConsole')
->callback(fn (Database $dbForConsole) => $this->action($dbForConsole));
->inject('deletes')
->callback(fn (Database $dbForConsole, Delete $deletes) => $this->action($dbForConsole, $deletes));
}
public function action(Database $dbForConsole): void
public function action(Database $dbForConsole, Delete $queue): void
{
Console::title('Maintenance V1');
Console::success(APP_NAME . ' maintenance process v1 has started');
function notifyDeleteExecutionLogs(int $interval)
function notifyDeleteExecutionLogs(int $interval, Delete $deletes)
{
(new Delete())
($deletes)
->setType(DELETE_TYPE_EXECUTIONS)
->setDatetime(DateTime::addSeconds(new \DateTime(), -1 * $interval))
->trigger();
}
function notifyDeleteAbuseLogs(int $interval)
function notifyDeleteAbuseLogs(int $interval, Delete $deletes)
{
(new Delete())
($deletes)
->setType(DELETE_TYPE_ABUSE)
->setDatetime(DateTime::addSeconds(new \DateTime(), -1 * $interval))
->trigger();
}
function notifyDeleteAuditLogs(int $interval)
function notifyDeleteAuditLogs(int $interval, Delete $deletes)
{
(new Delete())
($deletes)
->setType(DELETE_TYPE_AUDIT)
->setDatetime(DateTime::addSeconds(new \DateTime(), -1 * $interval))
->trigger();
}
function notifyDeleteUsageStats(int $usageStatsRetentionHourly)
function notifyDeleteUsageStats(int $usageStatsRetentionHourly, Delete $deletes)
{
(new Delete())
($deletes)
->setType(DELETE_TYPE_USAGE)
->setUsageRetentionHourlyDateTime(DateTime::addSeconds(new \DateTime(), -1 * $usageStatsRetentionHourly))
->trigger();
}
function notifyDeleteConnections()
function notifyDeleteConnections(Delete $deletes)
{
(new Delete())
($deletes)
->setType(DELETE_TYPE_REALTIME)
->setDatetime(DateTime::addSeconds(new \DateTime(), -60))
->trigger();
}
function notifyDeleteExpiredSessions()
function notifyDeleteExpiredSessions(Delete $deletes)
{
(new Delete())
($deletes)
->setType(DELETE_TYPE_SESSIONS)
->trigger();
}
@ -107,19 +111,19 @@ class Maintenance extends Action
}
}
function notifyDeleteCache($interval)
function notifyDeleteCache($interval, Delete $deletes)
{
(new Delete())
($deletes)
->setType(DELETE_TYPE_CACHE_BY_TIMESTAMP)
->setDatetime(DateTime::addSeconds(new \DateTime(), -1 * $interval))
->trigger();
}
function notifyDeleteSchedules($interval)
function notifyDeleteSchedules($interval, Delete $deletes)
{
(new Delete())
($deletes)
->setType(DELETE_TYPE_SCHEDULES)
->setDatetime(DateTime::addSeconds(new \DateTime(), -1 * $interval))
->trigger();
@ -135,19 +139,19 @@ class Maintenance extends Action
$cacheRetention = (int) App::getEnv('_APP_MAINTENANCE_RETENTION_CACHE', '2592000'); // 30 days
$schedulesDeletionRetention = (int) App::getEnv('_APP_MAINTENANCE_RETENTION_SCHEDULES', '86400'); // 1 Day
Console::loop(function () use ($interval, $executionLogsRetention, $abuseLogsRetention, $auditLogRetention, $cacheRetention, $schedulesDeletionRetention, $usageStatsRetentionHourly, $dbForConsole) {
Console::loop(function () use ($interval, $executionLogsRetention, $abuseLogsRetention, $auditLogRetention, $cacheRetention, $schedulesDeletionRetention, $usageStatsRetentionHourly, $dbForConsole, $queue) {
$time = DateTime::now();
Console::info("[{$time}] Notifying workers with maintenance tasks every {$interval} seconds");
notifyDeleteExecutionLogs($executionLogsRetention);
notifyDeleteAbuseLogs($abuseLogsRetention);
notifyDeleteAuditLogs($auditLogRetention);
notifyDeleteUsageStats($usageStatsRetentionHourly);
notifyDeleteConnections();
notifyDeleteExpiredSessions();
renewCertificates($dbForConsole);
notifyDeleteCache($cacheRetention);
notifyDeleteSchedules($schedulesDeletionRetention);
notifyDeleteExecutionLogs($executionLogsRetention, $queue);
notifyDeleteAbuseLogs($abuseLogsRetention, $queue);
notifyDeleteAuditLogs($auditLogRetention, $queue);
notifyDeleteUsageStats($usageStatsRetentionHourly, $queue);
notifyDeleteConnections($queue);
notifyDeleteExpiredSessions($queue);
renewCertificates($dbForConsole, $queue);
notifyDeleteCache($cacheRetention, $queue);
notifyDeleteSchedules($schedulesDeletionRetention, $queue);
}, $interval);
}
}

View file

@ -3,9 +3,12 @@
namespace Tests\Unit\Event;
use Appwrite\Event\Event;
use Appwrite\URL\URL;
use InvalidArgumentException;
use PHPUnit\Framework\TestCase;
use Utopia\App;
use Utopia\DSN\DSN;
use Utopia\Queue;
class EventTest extends TestCase
{
@ -18,8 +21,24 @@ class EventTest extends TestCase
$redisPort = App::getEnv('_APP_REDIS_PORT', '');
\Resque::setBackend($redisHost . ':' . $redisPort);
$fallbackForRedis = URL::unparse([
'scheme' => 'redis',
'host' => App::getEnv('_APP_REDIS_HOST', 'redis'),
'port' => App::getEnv('_APP_REDIS_PORT', '6379'),
'user' => App::getEnv('_APP_REDIS_USER', ''),
'pass' => App::getEnv('_APP_REDIS_PASS', ''),
]);
$dsn = App::getEnv('_APP_CONNECTIONS_QUEUE', $fallbackForRedis);
$dsn = explode('=', $dsn);
$dsn = $dsn[1] ?? '';
$dsn = new DSN($dsn);
$connection = new Queue\Connection\Redis($dsn->getHost(), $dsn->getPort());
$this->queue = 'v1-tests' . uniqid();
$this->object = new Event($this->queue, 'TestsV1');
$this->object = new Event($this->queue, 'TestsV1', $connection);
}
public function testQueue(): void