From 8b0a78bb36672ec986df87031e4face4a34ce3ba Mon Sep 17 00:00:00 2001 From: Christy Jacob Date: Wed, 16 Nov 2022 11:00:57 +0530 Subject: [PATCH] fix: function events and linter --- app/controllers/api/functions.php | 3 +- app/controllers/shared/api.php | 15 ++-- app/worker.php | 4 +- app/workers/functions.php | 79 ++++++++++--------- src/Appwrite/Event/Func.php | 36 +++++++-- .../Utopia/Response/Model/Execution.php | 1 - 6 files changed, 78 insertions(+), 60 deletions(-) diff --git a/app/controllers/api/functions.php b/app/controllers/api/functions.php index 04da1234a1..b695d19429 100644 --- a/app/controllers/api/functions.php +++ b/app/controllers/api/functions.php @@ -1157,7 +1157,8 @@ App::post('/v1/functions/:functionId/executions') ->setData($data) ->setJWT($jwt) ->setProject($project) - ->setUser($user); + ->setUser($user) + ->trigger(); return $response ->setStatusCode(Response::STATUS_CODE_ACCEPTED) diff --git a/app/controllers/shared/api.php b/app/controllers/shared/api.php index 58dd2f16ef..a12016606f 100644 --- a/app/controllers/shared/api.php +++ b/app/controllers/shared/api.php @@ -129,9 +129,9 @@ App::init() } } - /* - * Background Jobs - */ + /* + * Background Jobs + */ $events ->setEvent($route->getLabel('event', '')) ->setProject($project) @@ -265,12 +265,9 @@ App::shutdown() * Trigger functions. */ $functions - ->setData(\json_encode($events->getPayload())) - ->setProject($events->getProject()) - ->setUser($events->getUser()) - ->setEvent($events->getEvent()) - ->setParam('functionId', $events->getParam('functionId')) - ->setParam('executionId', $events->getParam('executionId')) + ->from($events) + ->setQueue(Event::FUNCTIONS_QUEUE_NAME) + ->setClass(Event::FUNCTIONS_CLASS_NAME) ->trigger(); /** diff --git a/app/worker.php b/app/worker.php index 110a9d81da..60e93b2518 100644 --- a/app/worker.php +++ b/app/worker.php @@ -76,7 +76,7 @@ Server::setResource('functions', function (Registry $register) { ->get('queue') ->pop() ->getResource() - ); + ); }, ['register']); Server::setResource('logger', function ($register) { @@ -93,4 +93,4 @@ $connection = $pools->get('queue')->pop()->getResource(); $workerNumber = swoole_cpu_num() * intval(App::getEnv('_APP_WORKER_PER_CORE', 6)); $workerNumber = 1; -Runtime::enableCoroutine(SWOOLE_HOOK_ALL); \ No newline at end of file +Runtime::enableCoroutine(SWOOLE_HOOK_ALL); diff --git a/app/workers/functions.php b/app/workers/functions.php index a34f0aec99..9d1dc0372d 100644 --- a/app/workers/functions.php +++ b/app/workers/functions.php @@ -52,37 +52,37 @@ Server::setResource('execute', function () { $functionId = $function->getId(); $deploymentId = $function->getAttribute('deployment', ''); var_dump("Deployment ID : ", $deploymentId); - + /** Check if deployment exists */ $deployment = $dbForProject->getDocument('deployments', $deploymentId); - + if ($deployment->getAttribute('resourceId') !== $functionId) { throw new Exception('Deployment not found. Create deployment before trying to execute a function'); } - + if ($deployment->isEmpty()) { throw new Exception('Deployment not found. Create deployment before trying to execute a function'); } - + /** Check if build has exists */ $build = $dbForProject->getDocument('builds', $deployment->getAttribute('buildId', '')); if ($build->isEmpty()) { throw new Exception('Build not found'); } - + if ($build->getAttribute('status') !== 'ready') { throw new Exception('Build not ready'); } - + /** Check if runtime is supported */ $runtimes = Config::getParam('runtimes', []); - + if (!\array_key_exists($function->getAttribute('runtime'), $runtimes)) { throw new Exception('Runtime "' . $function->getAttribute('runtime', '') . '" is not supported'); } - + $runtime = $runtimes[$function->getAttribute('runtime')]; - + /** Create execution or update execution status */ $execution = $dbForProject->getDocument('executions', $executionId ?? ''); if ($execution->isEmpty()) { @@ -100,27 +100,27 @@ Server::setResource('execute', function () { 'duration' => 0.0, 'search' => implode(' ', [$functionId, $executionId]), ])); - + if ($execution->isEmpty()) { throw new Exception('Failed to create or read execution'); } } $execution->setAttribute('status', 'processing'); $execution = $dbForProject->updateDocument('executions', $executionId, $execution); - + if ($build->getAttribute('status') !== 'ready') { throw new Exception('Build not ready'); } - + /** Check if runtime is supported */ $runtimes = Config::getParam('runtimes', []); - + if (!\array_key_exists($function->getAttribute('runtime'), $runtimes)) { throw new Exception('Runtime "' . $function->getAttribute('runtime', '') . '" is not supported'); } - + $runtime = $runtimes[$function->getAttribute('runtime')]; - + /** Create execution or update execution status */ $execution = $dbForProject->getDocument('executions', $executionId ?? ''); if ($execution->isEmpty()) { @@ -138,19 +138,19 @@ Server::setResource('execute', function () { 'duration' => 0.0, 'search' => implode(' ', [$functionId, $executionId]), ])); - + if ($execution->isEmpty()) { throw new Exception('Failed to create or read execution'); } } $execution->setAttribute('status', 'processing'); $execution = $dbForProject->updateDocument('executions', $executionId, $execution); - + $vars = array_reduce($function['vars'] ?? [], function (array $carry, Document $var) { $carry[$var->getAttribute('key')] = $var->getAttribute('value'); return $carry; }, []); - + /** Collect environment variables */ $vars = \array_merge($vars, [ 'APPWRITE_FUNCTION_ID' => $functionId, @@ -166,7 +166,7 @@ Server::setResource('execute', function () { 'APPWRITE_FUNCTION_USER_ID' => $user->getId() ?? '', 'APPWRITE_FUNCTION_JWT' => $jwt ?? '', ]); - + /** Execute function */ $executor = new Executor(App::getEnv('_APP_EXECUTOR_HOST')); try { @@ -180,7 +180,7 @@ Server::setResource('execute', function () { source: $build->getAttribute('outputPath', ''), entrypoint: $deployment->getAttribute('entrypoint', ''), ); - + /** Update execution status */ $execution ->setAttribute('status', $executionResponse['status']) @@ -198,9 +198,9 @@ Server::setResource('execute', function () { ->setAttribute('stderr', $th->getMessage()); Console::error($th->getMessage()); } - + $execution = $dbForProject->updateDocument('executions', $executionId, $execution); - + /** Trigger Webhook */ $executionModel = new Execution(); $executionUpdate = new Event(Event::WEBHOOK_QUEUE_NAME, Event::WEBHOOK_CLASS_NAME); @@ -212,7 +212,7 @@ Server::setResource('execute', function () { ->setParam('executionId', $execution->getId()) ->setPayload($execution->getArrayCopy(array_keys($executionModel->getRules()))) ->trigger(); - + /** Trigger Functions */ $functions ->setData($data ?? '') @@ -222,7 +222,7 @@ Server::setResource('execute', function () { ->setParam('functionId', $function->getId()) ->setParam('executionId', $execution->getId()) ->trigger(); - + /** Trigger realtime event */ $allEvents = Event::generateEvents('functions.[functionId].executions.[executionId].update', [ 'functionId' => $function->getId(), @@ -247,7 +247,7 @@ Server::setResource('execute', function () { channels: $target['channels'], roles: $target['roles'] ); - + /** Update usage stats */ if (App::getEnv('_APP_USAGE_STATS', 'enabled') === 'enabled') { $usage = new Stats($statsd); @@ -282,6 +282,7 @@ $server->job() $type = $payload['type'] ?? ''; $events = $payload['events'] ?? []; $data = $payload['data'] ?? ''; + $eventData = $payload['payload'] ?? ''; $project = new Document($payload['project'] ?? []); $function = new Document($payload['function'] ?? []); $user = new Document($payload['user'] ?? []); @@ -320,14 +321,14 @@ $server->job() $execute( statsd: $statsd, dbForProject: $dbForProject, - project: $project, + project: $project, function: $function, - trigger: 'event', - event: $events[0], - eventData: $payload, + trigger: 'event', + event: $events[0], + eventData: $eventData, user: $user, - data: null, - executionId: null, + data: null, + executionId: null, jwt: null ); Console::success('Triggered function: ' . $events[0]); @@ -346,15 +347,15 @@ $server->job() $execution = new Document($payload['execution'] ?? []); $user = new Document($payload['user'] ?? []); $execute( - project: $project, + project: $project, function: $function, dbForProject: $dbForProject, functions: $functions, trigger: 'http', - executionId: $execution->getId(), - event: null, + executionId: $execution->getId(), + event: null, eventData: null, - data: $data, + data: $data, user: $user, jwt: $jwt, statsd: $statsd, @@ -362,15 +363,15 @@ $server->job() break; case 'schedule': $execute( - project: $project, + project: $project, function: $function, dbForProject: $dbForProject, functions: $functions, trigger: 'http', - executionId: null, - event: null, + executionId: null, + event: null, eventData: null, - data: null, + data: null, user: null, jwt: null, statsd: $statsd, diff --git a/src/Appwrite/Event/Func.php b/src/Appwrite/Event/Func.php index 514eb5df74..9fdcc4f8f3 100644 --- a/src/Appwrite/Event/Func.php +++ b/src/Appwrite/Event/Func.php @@ -145,16 +145,36 @@ class Func extends Event */ public function trigger(): string|bool { - $queue = new Client(Event::FUNCTIONS_QUEUE_NAME, $this->connection); + $client = new Client($this->queue, $this->connection); - return $queue->enqueue([ - 'type' => $this->type, - 'execution' => $this->execution, - 'function' => $this->function, - 'data' => $this->data, - 'jwt' => $this->jwt, + return $client->enqueue([ 'project' => $this->project, - 'user' => $this->user + 'user' => $this->user, + 'function' => $this->function, + 'execution' => $this->execution, + 'type' => $this->type, + 'jwt' => $this->jwt, + 'payload' => '', + 'events' => Event::generateEvents($this->getEvent(), $this->getParams()), + 'data' => $this->data, ]); } + + /** + * Generate a function event from a base event + * + * @param Event $event + * + * @return self + * + */ + public function from(Event $event): self + { + $this->project = $event->getProject(); + $this->user = $event->getUser(); + $this->payload = $event->getPayload(); + $this->event = $event->getEvent(); + $this->params = $event->getParams(); + return $this; + } } diff --git a/src/Appwrite/Utopia/Response/Model/Execution.php b/src/Appwrite/Utopia/Response/Model/Execution.php index 987a140dfb..13011a24b7 100644 --- a/src/Appwrite/Utopia/Response/Model/Execution.php +++ b/src/Appwrite/Utopia/Response/Model/Execution.php @@ -6,7 +6,6 @@ use Appwrite\Utopia\Response; use Appwrite\Utopia\Response\Model; use Utopia\Database\Role; - class Execution extends Model { public function __construct()