From a09e23d0b99c822d881f194702f8501a96be451a Mon Sep 17 00:00:00 2001 From: Bradley Schofield Date: Tue, 20 Dec 2022 11:07:49 +0000 Subject: [PATCH] Start work on porting webhooks --- app/cli.php | 14 +++- app/init.php | 8 +- app/worker.php | 13 +++ app/workers/builds.php | 5 +- app/workers/functions.php | 12 ++- app/workers/webhooks.php | 87 +++++++++++---------- docker-compose.yml | 1 + src/Appwrite/Event/Delete.php | 5 +- src/Appwrite/Event/Event.php | 19 +++-- src/Appwrite/Event/Func.php | 2 +- src/Appwrite/Platform/Tasks/Maintenance.php | 60 +++++++------- tests/unit/Event/EventTest.php | 21 ++++- 12 files changed, 158 insertions(+), 89 deletions(-) diff --git a/app/cli.php b/app/cli.php index bfe7bfcef..74c3a6061 100644 --- a/app/cli.php +++ b/app/cli.php @@ -3,6 +3,7 @@ require_once __DIR__ . '/init.php'; require_once __DIR__ . '/controllers/general.php'; +use Appwrite\Event\Delete; use Appwrite\Event\Func; use Appwrite\Platform\Appwrite; use Utopia\CLI\CLI; @@ -17,6 +18,7 @@ use Utopia\Database\Database; use Utopia\Database\Document; use Utopia\Logger\Log; use Utopia\Pools\Group; +use Utopia\Queue\Connection; use Utopia\Registry\Registry; Authorization::disable(); @@ -140,10 +142,18 @@ CLI::setResource('influxdb', function (Registry $register) { return $database; }, ['register']); -CLI::setResource('queueForFunctions', function (Group $pools) { - return new Func($pools->get('queue')->pop()->getResource()); +CLI::setResource('queue', function (Group $pools) { + return $pools->get('queue')->pop()->getResource(); }, ['pools']); +CLI::setResource('deletes', function (Connection $queue) { + return new Delete($queue); +}, ['queue']); + +CLI::setResource('queueForFunctions', function (Connection $queue) { + return new Func($queue); +}, ['queue']); + CLI::setResource('logError', function (Registry $register) { return function (Throwable $error, string $namespace, string $action) use ($register) { $logger = $register->get('logger'); diff --git a/app/init.php b/app/init.php index 2f71a60ce..aaca91835 100644 --- a/app/init.php +++ b/app/init.php @@ -848,15 +848,19 @@ App::setResource('register', fn() => $register); App::setResource('locale', fn() => new Locale(App::getEnv('_APP_LOCALE', 'en'))); // Queues -App::setResource('events', fn() => new Event('', '')); App::setResource('audits', fn() => new Audit()); App::setResource('mails', fn() => new Mail()); -App::setResource('deletes', fn() => new Delete()); +App::setResource('deletes', function (Connection $queue) { + return new Delete($queue); +}, ['queue']); App::setResource('database', fn() => new EventDatabase()); App::setResource('messaging', fn() => new Phone()); App::setResource('queue', function (Group $pools) { return $pools->get('queue')->pop()->getResource(); }, ['pools']); +App::setResource('events', function (Connection $queue) { + return new Event('', '', $queue); +}, ['queue']); App::setResource('queueForFunctions', function (Connection $queue) { return new Func($queue); }, ['queue']); diff --git a/app/worker.php b/app/worker.php index 8151381d4..752f5df31 100644 --- a/app/worker.php +++ b/app/worker.php @@ -2,6 +2,7 @@ require_once __DIR__ . '/init.php'; +use Appwrite\Event\Event; use Appwrite\Event\Func; use Swoole\Runtime; use Utopia\App; @@ -85,6 +86,18 @@ Server::setResource('queueForFunctions', function (Registry $register) { ); }, ['register']); +Server::setResource('events', function (Registry $register) { + $pools = $register->get('pools'); + return new Event( + '', + '', + $pools + ->get('queue') + ->pop() + ->getResource() + ); +}, ['register']); + Server::setResource('logger', function ($register) { return $register->get('logger'); }, ['register']); diff --git a/app/workers/builds.php b/app/workers/builds.php index d26f07ab7..586663f18 100644 --- a/app/workers/builds.php +++ b/app/workers/builds.php @@ -121,7 +121,10 @@ class BuildsV1 extends Worker /** Trigger Webhook */ $deploymentModel = new Deployment(); - $deploymentUpdate = new Event(Event::WEBHOOK_QUEUE_NAME, Event::WEBHOOK_CLASS_NAME); + $pools = $register->get('pools'); + $connection = $pools->get('queue')->pop()->getResource(); + + $deploymentUpdate = new Event(Event::WEBHOOK_QUEUE_NAME, Event::WEBHOOK_CLASS_NAME, $connection); $deploymentUpdate ->setProject($project) ->setEvent('functions.[functionId].deployments.[deploymentId].update') diff --git a/app/workers/functions.php b/app/workers/functions.php index 31e64a2bb..adb434848 100644 --- a/app/workers/functions.php +++ b/app/workers/functions.php @@ -20,6 +20,7 @@ use Utopia\Database\Permission; use Utopia\Database\Query; use Utopia\Database\Role; use Utopia\Database\Validator\Authorization; +use Utopia\Queue\Connection; use Utopia\Queue\Server; Authorization::disable(); @@ -39,6 +40,7 @@ Server::setResource('execute', function () { string $event = null, string $eventData = null, string $executionId = null, + Connection $queueConnection, ) { $user ??= new Document(); $functionId = $function->getId(); @@ -161,9 +163,11 @@ Server::setResource('execute', function () { $execution = $dbForProject->updateDocument('executions', $executionId, $execution); + $connection = $ + /** Trigger Webhook */ $executionModel = new Execution(); - $executionUpdate = new Event(Event::WEBHOOK_QUEUE_NAME, Event::WEBHOOK_CLASS_NAME); + $executionUpdate = new Event(Event::WEBHOOK_QUEUE_NAME, Event::WEBHOOK_CLASS_NAME, $queueConnection); $executionUpdate ->setProject($project) ->setUser($user) @@ -225,8 +229,9 @@ $server->job() ->inject('dbForProject') ->inject('queueForFunctions') ->inject('statsd') + ->inject('queue') ->inject('execute') - ->action(function (Message $message, Database $dbForProject, Func $queueForFunctions, Client $statsd, callable $execute) { + ->action(function (Message $message, Database $dbForProject, Func $queueForFunctions, Client $statsd, Connection $queue, callable $execute) { $payload = $message->getPayload() ?? []; if (empty($payload)) { @@ -280,7 +285,8 @@ $server->job() user: $user, data: null, executionId: null, - jwt: null + jwt: null, + queueConnection: $queue ); Console::success('Triggered function: ' . $events[0]); } diff --git a/app/workers/webhooks.php b/app/workers/webhooks.php index 4048581c0..75eeec19d 100644 --- a/app/workers/webhooks.php +++ b/app/workers/webhooks.php @@ -1,48 +1,23 @@ args['events']; - $payload = json_encode($this->args['payload']); - $project = new Document($this->args['project']); - $user = new Document($this->args['user'] ?? []); - - foreach ($project->getAttribute('webhooks', []) as $webhook) { - if (array_intersect($webhook->getAttribute('events', []), $events)) { - $this->execute($events, $payload, $webhook, $user, $project); - } - } - - if (!empty($this->errors)) { - throw new Exception(\implode(" / \n\n", $this->errors)); - } - } - - protected function execute(array $events, string $payload, Document $webhook, Document $user, Document $project): void - { +Server::setResource('execute', function () { + return function (array $events, string $payload, Document $webhook, Document $user, Document $project): void { $url = \rawurldecode($webhook->getAttribute('url')); $signatureKey = $webhook->getAttribute('signatureKey'); $signature = base64_encode(hash_hmac('sha1', $url . $payload, $signatureKey, true)); @@ -85,14 +60,40 @@ class WebhooksV1 extends Worker } if (false === \curl_exec($ch)) { - $this->errors[] = \curl_error($ch) . ' in events ' . implode(', ', $events) . ' for webhook ' . $webhook->getAttribute('name'); + $errors[] = \curl_error($ch) . ' in events ' . implode(', ', $events) . ' for webhook ' . $webhook->getAttribute('name'); } \curl_close($ch); - } + }; +}); - public function shutdown(): void - { - $this->errors = []; - } -} + +$server->job() + ->inject('message') + ->inject('execute') + ->action(function (Message $message, callable $execute) { + $payload = $message->getPayload() ?? []; + + if (empty($payload)) { + throw new Exception('Missing payload'); + } + + $events = $payload['events']; + $webhookPayload = json_encode($payload['payload']); + $project = new Document($payload['project']); + $user = new Document($payload['user'] ?? []); + + foreach ($project->getAttribute('webhooks', []) as $webhook) { + if (array_intersect($webhook->getAttribute('events', []), $events)) { + $execute($events, $webhookPayload, $webhook, $user, $project); + } + } + + if (!empty($errors)) { + throw new Exception(\implode(" / \n\n", $errors)); + } + }); + + +$server->workerStart(); +$server->start(); diff --git a/docker-compose.yml b/docker-compose.yml index 7620039b6..35c6975c6 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -580,6 +580,7 @@ services: - _APP_MAINTENANCE_RETENTION_AUDIT - _APP_MAINTENANCE_RETENTION_USAGE_HOURLY - _APP_MAINTENANCE_RETENTION_SCHEDULES + - _APP_CONNECTIONS_QUEUE appwrite-usage: entrypoint: usage diff --git a/src/Appwrite/Event/Delete.php b/src/Appwrite/Event/Delete.php index d1519121a..ad6917206 100644 --- a/src/Appwrite/Event/Delete.php +++ b/src/Appwrite/Event/Delete.php @@ -4,6 +4,7 @@ namespace Appwrite\Event; use Resque; use Utopia\Database\Document; +use Utopia\Queue\Connection; class Delete extends Event { @@ -14,9 +15,9 @@ class Delete extends Event protected ?string $hourlyUsageRetentionDatetime = null; - public function __construct() + public function __construct(protected Connection $connection) { - parent::__construct(Event::DELETE_QUEUE_NAME, Event::DELETE_CLASS_NAME); + parent::__construct(Event::DELETE_QUEUE_NAME, Event::DELETE_CLASS_NAME, $connection); } /** diff --git a/src/Appwrite/Event/Event.php b/src/Appwrite/Event/Event.php index f88d2e94a..bb269dcda 100644 --- a/src/Appwrite/Event/Event.php +++ b/src/Appwrite/Event/Event.php @@ -3,8 +3,9 @@ namespace Appwrite\Event; use InvalidArgumentException; -use Resque; use Utopia\Database\Document; +use Utopia\Queue\Client; +use Utopia\Queue\Connection; class Event { @@ -43,16 +44,18 @@ class Event protected array $context = []; protected ?Document $project = null; protected ?Document $user = null; + protected Connection $connection; /** * @param string $queue * @param string $class * @return void */ - public function __construct(string $queue, string $class) + public function __construct(string $queue, string $class, Connection $connection) { $this->queue = $queue; $this->class = $class; + $this->connection = $connection; } /** @@ -260,7 +263,11 @@ class Event */ public function trigger(): string|bool { - return Resque::enqueue($this->queue, $this->class, [ + $client = new Client($this->queue, $this->connection); + + $events = $this->getEvent() ? Event::generateEvents($this->getEvent(), $this->getParams()) : null; + + return $client->enqueue([ 'project' => $this->project, 'user' => $this->user, 'payload' => $this->payload, @@ -437,9 +444,9 @@ class Event if ($subCurrent === $current || $subCurrent === $key) { continue; } - $filtered1 = \array_filter($paramKeys, fn(string $k) => $k === $subCurrent); + $filtered1 = \array_filter($paramKeys, fn (string $k) => $k === $subCurrent); $events[] = \str_replace($paramKeys, $paramValues, \str_replace($filtered1, '*', $eventPattern)); - $filtered2 = \array_filter($paramKeys, fn(string $k) => $k === $current); + $filtered2 = \array_filter($paramKeys, fn (string $k) => $k === $current); $events[] = \str_replace($paramKeys, $paramValues, \str_replace($filtered2, '*', \str_replace($filtered1, '*', $eventPattern))); $events[] = \str_replace($paramKeys, $paramValues, \str_replace($filtered2, '*', $eventPattern)); } @@ -447,7 +454,7 @@ class Event if ($current === $key) { continue; } - $filtered = \array_filter($paramKeys, fn(string $k) => $k === $current); + $filtered = \array_filter($paramKeys, fn (string $k) => $k === $current); $events[] = \str_replace($paramKeys, $paramValues, \str_replace($filtered, '*', $eventPattern)); } } diff --git a/src/Appwrite/Event/Func.php b/src/Appwrite/Event/Func.php index 5f8b4c80c..a632e3bfb 100644 --- a/src/Appwrite/Event/Func.php +++ b/src/Appwrite/Event/Func.php @@ -16,7 +16,7 @@ class Func extends Event public function __construct(protected Connection $connection) { - parent::__construct(Event::FUNCTIONS_QUEUE_NAME, Event::FUNCTIONS_CLASS_NAME); + parent::__construct(Event::FUNCTIONS_QUEUE_NAME, Event::FUNCTIONS_CLASS_NAME, $connection); } /** diff --git a/src/Appwrite/Platform/Tasks/Maintenance.php b/src/Appwrite/Platform/Tasks/Maintenance.php index 0739923e3..2b58aec51 100644 --- a/src/Appwrite/Platform/Tasks/Maintenance.php +++ b/src/Appwrite/Platform/Tasks/Maintenance.php @@ -5,6 +5,7 @@ namespace Appwrite\Platform\Tasks; use Appwrite\Auth\Auth; use Appwrite\Event\Certificate; use Appwrite\Event\Delete; +use Appwrite\Event\Func; use Utopia\App; use Utopia\CLI\Console; use Utopia\Database\Database; @@ -12,6 +13,8 @@ use Utopia\Database\Document; use Utopia\Database\DateTime; use Utopia\Database\Query; use Utopia\Platform\Action; +use Utopia\Queue\Connection; +use Utopia\Registry\Registry; class Maintenance extends Action { @@ -25,57 +28,58 @@ class Maintenance extends Action $this ->desc('Schedules maintenance tasks and publishes them to resque') ->inject('dbForConsole') - ->callback(fn (Database $dbForConsole) => $this->action($dbForConsole)); + ->inject('deletes') + ->callback(fn (Database $dbForConsole, Delete $deletes) => $this->action($dbForConsole, $deletes)); } - public function action(Database $dbForConsole): void + public function action(Database $dbForConsole, Delete $queue): void { Console::title('Maintenance V1'); Console::success(APP_NAME . ' maintenance process v1 has started'); - function notifyDeleteExecutionLogs(int $interval) + function notifyDeleteExecutionLogs(int $interval, Delete $deletes) { - (new Delete()) + ($deletes) ->setType(DELETE_TYPE_EXECUTIONS) ->setDatetime(DateTime::addSeconds(new \DateTime(), -1 * $interval)) ->trigger(); } - function notifyDeleteAbuseLogs(int $interval) + function notifyDeleteAbuseLogs(int $interval, Delete $deletes) { - (new Delete()) + ($deletes) ->setType(DELETE_TYPE_ABUSE) ->setDatetime(DateTime::addSeconds(new \DateTime(), -1 * $interval)) ->trigger(); } - function notifyDeleteAuditLogs(int $interval) + function notifyDeleteAuditLogs(int $interval, Delete $deletes) { - (new Delete()) + ($deletes) ->setType(DELETE_TYPE_AUDIT) ->setDatetime(DateTime::addSeconds(new \DateTime(), -1 * $interval)) ->trigger(); } - function notifyDeleteUsageStats(int $usageStatsRetentionHourly) + function notifyDeleteUsageStats(int $usageStatsRetentionHourly, Delete $deletes) { - (new Delete()) + ($deletes) ->setType(DELETE_TYPE_USAGE) ->setUsageRetentionHourlyDateTime(DateTime::addSeconds(new \DateTime(), -1 * $usageStatsRetentionHourly)) ->trigger(); } - function notifyDeleteConnections() + function notifyDeleteConnections(Delete $deletes) { - (new Delete()) + ($deletes) ->setType(DELETE_TYPE_REALTIME) ->setDatetime(DateTime::addSeconds(new \DateTime(), -60)) ->trigger(); } - function notifyDeleteExpiredSessions() + function notifyDeleteExpiredSessions(Delete $deletes) { - (new Delete()) + ($deletes) ->setType(DELETE_TYPE_SESSIONS) ->trigger(); } @@ -107,19 +111,19 @@ class Maintenance extends Action } } - function notifyDeleteCache($interval) + function notifyDeleteCache($interval, Delete $deletes) { - (new Delete()) + ($deletes) ->setType(DELETE_TYPE_CACHE_BY_TIMESTAMP) ->setDatetime(DateTime::addSeconds(new \DateTime(), -1 * $interval)) ->trigger(); } - function notifyDeleteSchedules($interval) + function notifyDeleteSchedules($interval, Delete $deletes) { - (new Delete()) + ($deletes) ->setType(DELETE_TYPE_SCHEDULES) ->setDatetime(DateTime::addSeconds(new \DateTime(), -1 * $interval)) ->trigger(); @@ -135,19 +139,19 @@ class Maintenance extends Action $cacheRetention = (int) App::getEnv('_APP_MAINTENANCE_RETENTION_CACHE', '2592000'); // 30 days $schedulesDeletionRetention = (int) App::getEnv('_APP_MAINTENANCE_RETENTION_SCHEDULES', '86400'); // 1 Day - Console::loop(function () use ($interval, $executionLogsRetention, $abuseLogsRetention, $auditLogRetention, $cacheRetention, $schedulesDeletionRetention, $usageStatsRetentionHourly, $dbForConsole) { + Console::loop(function () use ($interval, $executionLogsRetention, $abuseLogsRetention, $auditLogRetention, $cacheRetention, $schedulesDeletionRetention, $usageStatsRetentionHourly, $dbForConsole, $queue) { $time = DateTime::now(); Console::info("[{$time}] Notifying workers with maintenance tasks every {$interval} seconds"); - notifyDeleteExecutionLogs($executionLogsRetention); - notifyDeleteAbuseLogs($abuseLogsRetention); - notifyDeleteAuditLogs($auditLogRetention); - notifyDeleteUsageStats($usageStatsRetentionHourly); - notifyDeleteConnections(); - notifyDeleteExpiredSessions(); - renewCertificates($dbForConsole); - notifyDeleteCache($cacheRetention); - notifyDeleteSchedules($schedulesDeletionRetention); + notifyDeleteExecutionLogs($executionLogsRetention, $queue); + notifyDeleteAbuseLogs($abuseLogsRetention, $queue); + notifyDeleteAuditLogs($auditLogRetention, $queue); + notifyDeleteUsageStats($usageStatsRetentionHourly, $queue); + notifyDeleteConnections($queue); + notifyDeleteExpiredSessions($queue); + renewCertificates($dbForConsole, $queue); + notifyDeleteCache($cacheRetention, $queue); + notifyDeleteSchedules($schedulesDeletionRetention, $queue); }, $interval); } } diff --git a/tests/unit/Event/EventTest.php b/tests/unit/Event/EventTest.php index dee905638..6a001fac1 100644 --- a/tests/unit/Event/EventTest.php +++ b/tests/unit/Event/EventTest.php @@ -3,9 +3,12 @@ namespace Tests\Unit\Event; use Appwrite\Event\Event; +use Appwrite\URL\URL; use InvalidArgumentException; use PHPUnit\Framework\TestCase; use Utopia\App; +use Utopia\DSN\DSN; +use Utopia\Queue; class EventTest extends TestCase { @@ -18,8 +21,24 @@ class EventTest extends TestCase $redisPort = App::getEnv('_APP_REDIS_PORT', ''); \Resque::setBackend($redisHost . ':' . $redisPort); + $fallbackForRedis = URL::unparse([ + 'scheme' => 'redis', + 'host' => App::getEnv('_APP_REDIS_HOST', 'redis'), + 'port' => App::getEnv('_APP_REDIS_PORT', '6379'), + 'user' => App::getEnv('_APP_REDIS_USER', ''), + 'pass' => App::getEnv('_APP_REDIS_PASS', ''), + ]); + + $dsn = App::getEnv('_APP_CONNECTIONS_QUEUE', $fallbackForRedis); + $dsn = explode('=', $dsn); + $dsn = $dsn[1] ?? ''; + + $dsn = new DSN($dsn); + + $connection = new Queue\Connection\Redis($dsn->getHost(), $dsn->getPort()); + $this->queue = 'v1-tests' . uniqid(); - $this->object = new Event($this->queue, 'TestsV1'); + $this->object = new Event($this->queue, 'TestsV1', $connection); } public function testQueue(): void