From 44a82de09bfa23b9eda9a7dc03448d6a8a9b5424 Mon Sep 17 00:00:00 2001 From: Matej Baco Date: Tue, 15 Nov 2022 19:13:17 +0100 Subject: [PATCH] Refactor func event triggering --- app/cli.php | 5 +++ app/controllers/api/functions.php | 26 +++++++-------- app/controllers/shared/api.php | 14 +++++--- app/init.php | 15 +++++---- app/worker.php | 8 ++++- app/workers/builds.php | 19 ++++++++--- app/workers/functions.php | 41 ++++++++++-------------- composer.json | 2 +- composer.lock | 21 ++++-------- src/Appwrite/Event/Func.php | 40 +++++++---------------- src/Appwrite/Platform/Tasks/Schedule.php | 24 +++++++------- 11 files changed, 106 insertions(+), 109 deletions(-) diff --git a/app/cli.php b/app/cli.php index 502ee77b75..544148c8d5 100644 --- a/app/cli.php +++ b/app/cli.php @@ -3,6 +3,7 @@ require_once __DIR__ . '/init.php'; require_once __DIR__ . '/controllers/general.php'; +use Appwrite\Event\Func; use Appwrite\Platform\Appwrite; use Utopia\CLI\CLI; use Utopia\Database\Validator\Authorization; @@ -109,6 +110,10 @@ CLI::setResource('influxdb', function (Registry $register) { return $database; }, ['register']); +CLI::setResource('functions', function (Group $pools) { + return new Func($pools->get('queue')->pop()->getResource()); +}, ['pools']); + CLI::setResource('logError', function (Registry $register) { return function (Throwable $error, string $namespace, string $action) use ($register) { $logger = $register->get('logger'); diff --git a/app/controllers/api/functions.php b/app/controllers/api/functions.php index d9f63a93be..02c3fce885 100644 --- a/app/controllers/api/functions.php +++ b/app/controllers/api/functions.php @@ -5,6 +5,7 @@ use Appwrite\Auth\Auth; use Appwrite\Event\Build; use Appwrite\Event\Delete; use Appwrite\Event\Event; +use Appwrite\Event\Func; use Appwrite\Event\Validator\Event as ValidatorEvent; use Appwrite\Extend\Exception; use Appwrite\Utopia\Database\Validator\CustomId; @@ -41,7 +42,6 @@ use Utopia\CLI\Console; use Utopia\Database\Validator\Roles; use Utopia\Validator\Boolean; use Utopia\Database\Exception\Duplicate as DuplicateException; -use Utopia\Queue\Client as QueueClient; include_once __DIR__ . '/../shared/api.php'; @@ -1061,7 +1061,8 @@ App::post('/v1/functions/:functionId/executions') ->inject('usage') ->inject('mode') ->inject('pools') - ->action(function (string $functionId, string $data, bool $async, Response $response, Document $project, Database $dbForProject, Document $user, Event $events, Stats $usage, string $mode, Group $pools) { + ->inject('functions') + ->action(function (string $functionId, string $data, bool $async, Response $response, Document $project, Database $dbForProject, Document $user, Event $events, Stats $usage, string $mode, Group $pools, Func $functions) { $function = Authorization::skip(fn () => $dbForProject->getDocument('functions', $functionId)); @@ -1149,18 +1150,15 @@ App::post('/v1/functions/:functionId/executions') ->setContext('function', $function); if ($async) { - $queueForFunctions = new QueueClient(Event::FUNCTIONS_QUEUE_NAME, $pools->get('queue')->pop()->getResource()); - $queueForFunctions->enqueue([ - 'type' => 'http', - 'value' => [ - 'type' => 'http', - 'execution' => $execution, - 'function' => $function, - 'data' => $data, - 'jwt' => $jwt, - 'project' => $project, - 'user' => $user - ]]); + $functions + ->setType('http') + ->setExecution($execution) + ->setFunction($function) + ->setData($data) + ->setJWT($jwt) + ->setProject($project) + ->setUser($user) + ->trigger(); return $response ->setStatusCode(Response::STATUS_CODE_ACCEPTED) diff --git a/app/controllers/shared/api.php b/app/controllers/shared/api.php index e07f405140..58dd2f16ef 100644 --- a/app/controllers/shared/api.php +++ b/app/controllers/shared/api.php @@ -5,6 +5,7 @@ use Appwrite\Event\Audit; use Appwrite\Event\Database as EventDatabase; use Appwrite\Event\Delete; use Appwrite\Event\Event; +use Appwrite\Event\Func; use Appwrite\Event\Mail; use Appwrite\Messaging\Adapter\Realtime; use Appwrite\Usage\Stats; @@ -251,7 +252,8 @@ App::shutdown() ->inject('database') ->inject('mode') ->inject('dbForProject') - ->action(function (App $utopia, Request $request, Response $response, Document $project, Event $events, Audit $audits, Stats $usage, Delete $deletes, EventDatabase $database, string $mode, Database $dbForProject) use ($parseLabel) { + ->inject('functions') + ->action(function (App $utopia, Request $request, Response $response, Document $project, Event $events, Audit $audits, Stats $usage, Delete $deletes, EventDatabase $database, string $mode, Database $dbForProject, Func $functions) use ($parseLabel) { $responsePayload = $response->getPayload(); @@ -262,9 +264,13 @@ App::shutdown() /** * Trigger functions. */ - $events - ->setClass(Event::FUNCTIONS_CLASS_NAME) - ->setQueue(Event::FUNCTIONS_QUEUE_NAME) + $functions + ->setData(\json_encode($events->getPayload())) + ->setProject($events->getProject()) + ->setUser($events->getUser()) + ->setEvent($events->getEvent()) + ->setParam('functionId', $events->getParam('functionId')) + ->setParam('executionId', $events->getParam('executionId')) ->trigger(); /** diff --git a/app/init.php b/app/init.php index b861297cef..19cea371df 100644 --- a/app/init.php +++ b/app/init.php @@ -71,6 +71,7 @@ use Utopia\Pools\Group; use Utopia\Pools\Pool; use Ahc\Jwt\JWT; use Ahc\Jwt\JWTException; +use Appwrite\Event\Func; use MaxMind\Db\Reader; use PHPMailer\PHPMailer\PHPMailer; use Swoole\Database\PDOProxy; @@ -843,6 +844,9 @@ App::setResource('mails', fn() => new Mail()); App::setResource('deletes', fn() => new Delete()); App::setResource('database', fn() => new EventDatabase()); App::setResource('messaging', fn() => new Phone()); +App::setResource('functions', function (Group $pools) { + return new Func($pools->get('queue')->pop()->getResource()); +}, ['pools']); App::setResource('usage', function ($register) { return new Stats($register->get('statsd')); }, ['register']); @@ -1023,13 +1027,12 @@ App::setResource('console', function () { }, []); App::setResource('queue', function () { - $fallbackForRedis = AppwriteURL::unparse([ - 'scheme' => 'redis', - 'host' => App::getEnv('_APP_REDIS_HOST', 'redis'), - 'port' => App::getEnv('_APP_REDIS_PORT', '6379'), - 'user' => App::getEnv('_APP_REDIS_USER', ''), - 'pass' => App::getEnv('_APP_REDIS_PASS', ''), + 'scheme' => 'redis', + 'host' => App::getEnv('_APP_REDIS_HOST', 'redis'), + 'port' => App::getEnv('_APP_REDIS_PORT', '6379'), + 'user' => App::getEnv('_APP_REDIS_USER', ''), + 'pass' => App::getEnv('_APP_REDIS_PASS', ''), ]); $connection = App::getEnv('_APP_CONNECTIONS_QUEUE', $fallbackForRedis); diff --git a/app/worker.php b/app/worker.php index 4e484e1545..dabb91f1d8 100644 --- a/app/worker.php +++ b/app/worker.php @@ -2,6 +2,7 @@ require_once __DIR__ . '/init.php'; +use Appwrite\Event\Func; use Swoole\Runtime; use Utopia\App; use Utopia\Cache\Adapter\Sharding; @@ -68,7 +69,12 @@ Server::setResource('cache', function (Registry $register) { return new Cache(new Sharding($adapters)); }, ['register']); -App::setResource('logger', function ($register) { +Server::setResource('functions', function (Registry $register) { + $pools = $register->get('pools'); + return new Func($pools->get('queue')->pop()->getResource()); +}, ['register']); + +Server::setResource('logger', function ($register) { return $register->get('logger'); }, ['register']); diff --git a/app/workers/builds.php b/app/workers/builds.php index babf2874d6..d52329b107 100644 --- a/app/workers/builds.php +++ b/app/workers/builds.php @@ -1,6 +1,7 @@ setAttribute('status', 'building'); $build = $dbForProject->updateDocument('builds', $buildId, $build); + $data = $deployment->getArrayCopy(array_keys($deploymentModel->getRules())); + /** Trigger Webhook */ $deploymentModel = new Deployment(); @@ -114,14 +117,22 @@ class BuildsV1 extends Worker ->setEvent('functions.[functionId].deployments.[deploymentId].update') ->setParam('functionId', $function->getId()) ->setParam('deploymentId', $deployment->getId()) - ->setPayload($deployment->getArrayCopy(array_keys($deploymentModel->getRules()))) + ->setPayload($data) ->trigger(); /** Trigger Functions */ - $deploymentUpdate - ->setClass(Event::FUNCTIONS_CLASS_NAME) - ->setQueue(Event::FUNCTIONS_QUEUE_NAME) + global $register; + $pools = $register->get('pools'); + $connection = $pools->get('queue')->pop(); + $functions = new Func($connection->getResource()); + $functions + ->setData(\json_encode($data)) + ->setProject($project) + ->setEvent('functions.[functionId].deployments.[deploymentId].update') + ->setParam('functionId', $function->getId()) + ->setParam('deploymentId', $deployment->getId()) ->trigger(); + $connection->reclaim(); /** Trigger Realtime */ $allEvents = Event::generateEvents('functions.[functionId].deployments.[deploymentId].update', [ diff --git a/app/workers/functions.php b/app/workers/functions.php index 2d7f4565c4..bad6e7d640 100644 --- a/app/workers/functions.php +++ b/app/workers/functions.php @@ -5,6 +5,7 @@ require_once __DIR__ . '/../worker.php'; use Utopia\Queue; use Utopia\Queue\Message; use Appwrite\Event\Event; +use Appwrite\Event\Func; use Appwrite\Messaging\Adapter\Realtime; use Appwrite\Usage\Stats; use Appwrite\Utopia\Response\Model\Execution; @@ -33,6 +34,7 @@ $execute = function ( Document $project, Document $function, Database $dbForProject, + Func $functions, string $trigger, string $executionId = null, string $event = null, @@ -206,9 +208,13 @@ $execute = function ( ->trigger(); /** Trigger Functions */ - $executionUpdate - ->setClass(Event::FUNCTIONS_CLASS_NAME) - ->setQueue(Event::FUNCTIONS_QUEUE_NAME) + $functions + ->setData($data) + ->setProject($project) + ->setUser($user) + ->setEvent('functions.[functionId].executions.[executionId].update') + ->setParam('functionId', $function->getId()) + ->setParam('executionId', $execution->getId()) ->trigger(); /** Trigger realtime event */ @@ -259,9 +265,10 @@ $server = new Queue\Server($adapter); $server->job() ->inject('message') ->inject('dbForProject') - ->action(function (Message $message, Database $dbForProject) use ($execute) { - $args = $message->getPayload()['value'] ?? []; - $type = $message->getPayload()['type'] ?? ''; + ->inject('functions') + ->action(function (Message $message, Database $dbForProject, Func $functions) use ($execute) { + $args = $message->getPayload() ?? []; + $type = $args['type'] ?? ''; $events = $args['events'] ?? []; $project = new Document($args['project'] ?? []); $user = new Document($args['user'] ?? []); @@ -317,34 +324,20 @@ $server->job() /** * Handle Schedule and HTTP execution. */ - $user = new Document($args['user'] ?? []); $project = new Document($args['project'] ?? []); - $execution = new Document($args['execution'] ?? []); $function = new Document($args['function'] ?? []); switch ($type) { case 'http': $jwt = $args['jwt'] ?? ''; $data = $args['data'] ?? ''; + $execution = new Document($args['execution'] ?? []); + $user = new Document($args['user'] ?? []); $function = $dbForProject->getDocument('functions', $execution->getAttribute('functionId')); - call_user_func($execute, $project, $function, $dbForProject, 'http', $execution->getId(), null, null, $data, $user, $jwt); + call_user_func($execute, $project, $function, $dbForProject, $functions, 'http', $execution->getId(), null, null, $data, $user, $jwt); break; - case 'schedule': - /* - * 1. Get Original Task - * 2. Check for updates - * If has updates skip task and don't reschedule - * If status not equal to play skip task - * 3. Check next run date, update task and add new job at the given date - * 4. Execute task (set optional timeout) - * 5. Update task response to log - * On success reset error count - * On failure add error count - * If error count bigger than allowed change status to pause - */ - - call_user_func($execute, $project, $function, $dbForProject, 'schedule', null, null, null, null, null, null); + call_user_func($execute, $project, $function, $dbForProject, $functions, 'schedule', null, null, null, null, null, null); break; } }); diff --git a/composer.json b/composer.json index d4c06d7614..08ede65dcf 100644 --- a/composer.json +++ b/composer.json @@ -53,7 +53,7 @@ "utopia-php/domains": "1.1.*", "utopia-php/framework": "0.25.*", "utopia-php/image": "0.5.*", - "utopia-php/queue": "dev-upgrade-libs as 0.4.1", + "utopia-php/queue": "0.4.*", "utopia-php/locale": "0.4.*", "utopia-php/logger": "0.3.*", "utopia-php/orchestration": "0.9.*", diff --git a/composer.lock b/composer.lock index 3912636eea..674e42a5ca 100644 --- a/composer.lock +++ b/composer.lock @@ -4,7 +4,7 @@ "Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies", "This file is @generated automatically" ], - "content-hash": "3e24f0f02ec826898d50e4119f9eb226", + "content-hash": "f2faa670abaa356f9b14e4f1c3542b33", "packages": [ { "name": "adhocore/jwt", @@ -2359,16 +2359,16 @@ }, { "name": "utopia-php/queue", - "version": "dev-upgrade-libs", + "version": "0.4.1", "source": { "type": "git", "url": "https://github.com/utopia-php/queue.git", - "reference": "310aaac74d2287d3d9450a532658247cdfe5e72c" + "reference": "0b69ede484a04c567cbb202f592d8e5e3cd2433e" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/utopia-php/queue/zipball/310aaac74d2287d3d9450a532658247cdfe5e72c", - "reference": "310aaac74d2287d3d9450a532658247cdfe5e72c", + "url": "https://api.github.com/repos/utopia-php/queue/zipball/0b69ede484a04c567cbb202f592d8e5e3cd2433e", + "reference": "0b69ede484a04c567cbb202f592d8e5e3cd2433e", "shasum": "" }, "require": { @@ -2414,9 +2414,9 @@ ], "support": { "issues": "https://github.com/utopia-php/queue/issues", - "source": "https://github.com/utopia-php/queue/tree/upgrade-libs" + "source": "https://github.com/utopia-php/queue/tree/0.4.1" }, - "time": "2022-11-15T16:35:56+00:00" + "time": "2022-11-15T16:56:37+00:00" }, { "name": "utopia-php/registry", @@ -5277,12 +5277,6 @@ } ], "aliases": [ - { - "package": "utopia-php/queue", - "version": "dev-upgrade-libs", - "alias": "0.4.1", - "alias_normalized": "0.4.1.0" - }, { "package": "utopia-php/registry", "version": "dev-feat-allow-params", @@ -5292,7 +5286,6 @@ ], "minimum-stability": "stable", "stability-flags": { - "utopia-php/queue": 20, "utopia-php/registry": 20 }, "prefer-stable": false, diff --git a/src/Appwrite/Event/Func.php b/src/Appwrite/Event/Func.php index b7531cf475..514eb5df74 100644 --- a/src/Appwrite/Event/Func.php +++ b/src/Appwrite/Event/Func.php @@ -6,6 +6,8 @@ use DateTime; use Resque; use ResqueScheduler; use Utopia\Database\Document; +use Utopia\Queue\Client; +use Utopia\Queue\Connection; class Func extends Event { @@ -15,7 +17,7 @@ class Func extends Event protected ?Document $function = null; protected ?Document $execution = null; - public function __construct() + public function __construct(protected Connection $connection) { parent::__construct(Event::FUNCTIONS_QUEUE_NAME, Event::FUNCTIONS_CLASS_NAME); } @@ -143,36 +145,16 @@ class Func extends Event */ public function trigger(): string|bool { - return Resque::enqueue($this->queue, $this->class, [ - 'project' => $this->project, - 'user' => $this->user, - 'function' => $this->function, - 'execution' => $this->execution, - 'type' => $this->type, - 'jwt' => $this->jwt, - 'payload' => $this->payload, - 'data' => $this->data - ]); - } + $queue = new Client(Event::FUNCTIONS_QUEUE_NAME, $this->connection); - /** - * Schedules the function event and schedules it in the functions worker queue. - * - * @param \DateTime|int $at - * @return void - * @throws \Resque_Exception - * @throws \ResqueScheduler_InvalidTimestampException - */ - public function schedule(DateTime|int $at): void - { - ResqueScheduler::enqueueAt($at, $this->queue, $this->class, [ - 'project' => $this->project, - 'user' => $this->user, - 'function' => $this->function, - 'execution' => $this->execution, + return $queue->enqueue([ 'type' => $this->type, - 'payload' => $this->payload, - 'data' => $this->data + 'execution' => $this->execution, + 'function' => $this->function, + 'data' => $this->data, + 'jwt' => $this->jwt, + 'project' => $this->project, + 'user' => $this->user ]); } } diff --git a/src/Appwrite/Platform/Tasks/Schedule.php b/src/Appwrite/Platform/Tasks/Schedule.php index 39e0686da5..88fd0a9a80 100644 --- a/src/Appwrite/Platform/Tasks/Schedule.php +++ b/src/Appwrite/Platform/Tasks/Schedule.php @@ -14,6 +14,7 @@ use Utopia\Database\Database; use Utopia\Pools\Group; use Utopia\Queue\Client as Worker; use Appwrite\Event\Event; +use Appwrite\Event\Func; use function Swoole\Coroutine\run; @@ -203,6 +204,9 @@ class Schedule extends Action \go(function () use ($delay, $schedules, $scheduleKeys, $pools) { \sleep($delay); // in seconds + $queue = $pools->get('queue')->pop(); + $connection = $queue->getResource(); + foreach ($scheduleKeys as $scheduleKey) { // Ensure schedule was not deleted if (!isset($schedules[$scheduleKey])) { @@ -211,20 +215,16 @@ class Schedule extends Action $schedule = $schedules[$scheduleKey]; - $queue = $pools->get('queue')->pop(); + $functions = new Func($connection); - $worker = new Worker(Event::FUNCTIONS_QUEUE_NAME, $queue->getResource()); - $worker - ->enqueue([ - 'type' => 'schedule', - 'value' => [ - 'project' => $schedule['project'], - 'function' => $schedule['function'], - ] - ]); - - $queue->reclaim(); + $functions + ->setType('schedule') + ->setFunction($schedule['function']) + ->setProject($schedule['project']) + ->trigger(); } + + $queue->reclaim(); }); }