1
0
Fork 0
mirror of synced 2024-09-30 09:18:14 +13:00

fix: function events and linter

This commit is contained in:
Christy Jacob 2022-11-16 11:00:57 +05:30
parent 5abe9ad73c
commit 8b0a78bb36
6 changed files with 78 additions and 60 deletions

View file

@ -1157,7 +1157,8 @@ App::post('/v1/functions/:functionId/executions')
->setData($data) ->setData($data)
->setJWT($jwt) ->setJWT($jwt)
->setProject($project) ->setProject($project)
->setUser($user); ->setUser($user)
->trigger();
return $response return $response
->setStatusCode(Response::STATUS_CODE_ACCEPTED) ->setStatusCode(Response::STATUS_CODE_ACCEPTED)

View file

@ -129,9 +129,9 @@ App::init()
} }
} }
/* /*
* Background Jobs * Background Jobs
*/ */
$events $events
->setEvent($route->getLabel('event', '')) ->setEvent($route->getLabel('event', ''))
->setProject($project) ->setProject($project)
@ -265,12 +265,9 @@ App::shutdown()
* Trigger functions. * Trigger functions.
*/ */
$functions $functions
->setData(\json_encode($events->getPayload())) ->from($events)
->setProject($events->getProject()) ->setQueue(Event::FUNCTIONS_QUEUE_NAME)
->setUser($events->getUser()) ->setClass(Event::FUNCTIONS_CLASS_NAME)
->setEvent($events->getEvent())
->setParam('functionId', $events->getParam('functionId'))
->setParam('executionId', $events->getParam('executionId'))
->trigger(); ->trigger();
/** /**

View file

@ -76,7 +76,7 @@ Server::setResource('functions', function (Registry $register) {
->get('queue') ->get('queue')
->pop() ->pop()
->getResource() ->getResource()
); );
}, ['register']); }, ['register']);
Server::setResource('logger', function ($register) { Server::setResource('logger', function ($register) {
@ -93,4 +93,4 @@ $connection = $pools->get('queue')->pop()->getResource();
$workerNumber = swoole_cpu_num() * intval(App::getEnv('_APP_WORKER_PER_CORE', 6)); $workerNumber = swoole_cpu_num() * intval(App::getEnv('_APP_WORKER_PER_CORE', 6));
$workerNumber = 1; $workerNumber = 1;
Runtime::enableCoroutine(SWOOLE_HOOK_ALL); Runtime::enableCoroutine(SWOOLE_HOOK_ALL);

View file

@ -52,37 +52,37 @@ Server::setResource('execute', function () {
$functionId = $function->getId(); $functionId = $function->getId();
$deploymentId = $function->getAttribute('deployment', ''); $deploymentId = $function->getAttribute('deployment', '');
var_dump("Deployment ID : ", $deploymentId); var_dump("Deployment ID : ", $deploymentId);
/** Check if deployment exists */ /** Check if deployment exists */
$deployment = $dbForProject->getDocument('deployments', $deploymentId); $deployment = $dbForProject->getDocument('deployments', $deploymentId);
if ($deployment->getAttribute('resourceId') !== $functionId) { if ($deployment->getAttribute('resourceId') !== $functionId) {
throw new Exception('Deployment not found. Create deployment before trying to execute a function'); throw new Exception('Deployment not found. Create deployment before trying to execute a function');
} }
if ($deployment->isEmpty()) { if ($deployment->isEmpty()) {
throw new Exception('Deployment not found. Create deployment before trying to execute a function'); throw new Exception('Deployment not found. Create deployment before trying to execute a function');
} }
/** Check if build has exists */ /** Check if build has exists */
$build = $dbForProject->getDocument('builds', $deployment->getAttribute('buildId', '')); $build = $dbForProject->getDocument('builds', $deployment->getAttribute('buildId', ''));
if ($build->isEmpty()) { if ($build->isEmpty()) {
throw new Exception('Build not found'); throw new Exception('Build not found');
} }
if ($build->getAttribute('status') !== 'ready') { if ($build->getAttribute('status') !== 'ready') {
throw new Exception('Build not ready'); throw new Exception('Build not ready');
} }
/** Check if runtime is supported */ /** Check if runtime is supported */
$runtimes = Config::getParam('runtimes', []); $runtimes = Config::getParam('runtimes', []);
if (!\array_key_exists($function->getAttribute('runtime'), $runtimes)) { if (!\array_key_exists($function->getAttribute('runtime'), $runtimes)) {
throw new Exception('Runtime "' . $function->getAttribute('runtime', '') . '" is not supported'); throw new Exception('Runtime "' . $function->getAttribute('runtime', '') . '" is not supported');
} }
$runtime = $runtimes[$function->getAttribute('runtime')]; $runtime = $runtimes[$function->getAttribute('runtime')];
/** Create execution or update execution status */ /** Create execution or update execution status */
$execution = $dbForProject->getDocument('executions', $executionId ?? ''); $execution = $dbForProject->getDocument('executions', $executionId ?? '');
if ($execution->isEmpty()) { if ($execution->isEmpty()) {
@ -100,27 +100,27 @@ Server::setResource('execute', function () {
'duration' => 0.0, 'duration' => 0.0,
'search' => implode(' ', [$functionId, $executionId]), 'search' => implode(' ', [$functionId, $executionId]),
])); ]));
if ($execution->isEmpty()) { if ($execution->isEmpty()) {
throw new Exception('Failed to create or read execution'); throw new Exception('Failed to create or read execution');
} }
} }
$execution->setAttribute('status', 'processing'); $execution->setAttribute('status', 'processing');
$execution = $dbForProject->updateDocument('executions', $executionId, $execution); $execution = $dbForProject->updateDocument('executions', $executionId, $execution);
if ($build->getAttribute('status') !== 'ready') { if ($build->getAttribute('status') !== 'ready') {
throw new Exception('Build not ready'); throw new Exception('Build not ready');
} }
/** Check if runtime is supported */ /** Check if runtime is supported */
$runtimes = Config::getParam('runtimes', []); $runtimes = Config::getParam('runtimes', []);
if (!\array_key_exists($function->getAttribute('runtime'), $runtimes)) { if (!\array_key_exists($function->getAttribute('runtime'), $runtimes)) {
throw new Exception('Runtime "' . $function->getAttribute('runtime', '') . '" is not supported'); throw new Exception('Runtime "' . $function->getAttribute('runtime', '') . '" is not supported');
} }
$runtime = $runtimes[$function->getAttribute('runtime')]; $runtime = $runtimes[$function->getAttribute('runtime')];
/** Create execution or update execution status */ /** Create execution or update execution status */
$execution = $dbForProject->getDocument('executions', $executionId ?? ''); $execution = $dbForProject->getDocument('executions', $executionId ?? '');
if ($execution->isEmpty()) { if ($execution->isEmpty()) {
@ -138,19 +138,19 @@ Server::setResource('execute', function () {
'duration' => 0.0, 'duration' => 0.0,
'search' => implode(' ', [$functionId, $executionId]), 'search' => implode(' ', [$functionId, $executionId]),
])); ]));
if ($execution->isEmpty()) { if ($execution->isEmpty()) {
throw new Exception('Failed to create or read execution'); throw new Exception('Failed to create or read execution');
} }
} }
$execution->setAttribute('status', 'processing'); $execution->setAttribute('status', 'processing');
$execution = $dbForProject->updateDocument('executions', $executionId, $execution); $execution = $dbForProject->updateDocument('executions', $executionId, $execution);
$vars = array_reduce($function['vars'] ?? [], function (array $carry, Document $var) { $vars = array_reduce($function['vars'] ?? [], function (array $carry, Document $var) {
$carry[$var->getAttribute('key')] = $var->getAttribute('value'); $carry[$var->getAttribute('key')] = $var->getAttribute('value');
return $carry; return $carry;
}, []); }, []);
/** Collect environment variables */ /** Collect environment variables */
$vars = \array_merge($vars, [ $vars = \array_merge($vars, [
'APPWRITE_FUNCTION_ID' => $functionId, 'APPWRITE_FUNCTION_ID' => $functionId,
@ -166,7 +166,7 @@ Server::setResource('execute', function () {
'APPWRITE_FUNCTION_USER_ID' => $user->getId() ?? '', 'APPWRITE_FUNCTION_USER_ID' => $user->getId() ?? '',
'APPWRITE_FUNCTION_JWT' => $jwt ?? '', 'APPWRITE_FUNCTION_JWT' => $jwt ?? '',
]); ]);
/** Execute function */ /** Execute function */
$executor = new Executor(App::getEnv('_APP_EXECUTOR_HOST')); $executor = new Executor(App::getEnv('_APP_EXECUTOR_HOST'));
try { try {
@ -180,7 +180,7 @@ Server::setResource('execute', function () {
source: $build->getAttribute('outputPath', ''), source: $build->getAttribute('outputPath', ''),
entrypoint: $deployment->getAttribute('entrypoint', ''), entrypoint: $deployment->getAttribute('entrypoint', ''),
); );
/** Update execution status */ /** Update execution status */
$execution $execution
->setAttribute('status', $executionResponse['status']) ->setAttribute('status', $executionResponse['status'])
@ -198,9 +198,9 @@ Server::setResource('execute', function () {
->setAttribute('stderr', $th->getMessage()); ->setAttribute('stderr', $th->getMessage());
Console::error($th->getMessage()); Console::error($th->getMessage());
} }
$execution = $dbForProject->updateDocument('executions', $executionId, $execution); $execution = $dbForProject->updateDocument('executions', $executionId, $execution);
/** Trigger Webhook */ /** Trigger Webhook */
$executionModel = new Execution(); $executionModel = new Execution();
$executionUpdate = new Event(Event::WEBHOOK_QUEUE_NAME, Event::WEBHOOK_CLASS_NAME); $executionUpdate = new Event(Event::WEBHOOK_QUEUE_NAME, Event::WEBHOOK_CLASS_NAME);
@ -212,7 +212,7 @@ Server::setResource('execute', function () {
->setParam('executionId', $execution->getId()) ->setParam('executionId', $execution->getId())
->setPayload($execution->getArrayCopy(array_keys($executionModel->getRules()))) ->setPayload($execution->getArrayCopy(array_keys($executionModel->getRules())))
->trigger(); ->trigger();
/** Trigger Functions */ /** Trigger Functions */
$functions $functions
->setData($data ?? '') ->setData($data ?? '')
@ -222,7 +222,7 @@ Server::setResource('execute', function () {
->setParam('functionId', $function->getId()) ->setParam('functionId', $function->getId())
->setParam('executionId', $execution->getId()) ->setParam('executionId', $execution->getId())
->trigger(); ->trigger();
/** Trigger realtime event */ /** Trigger realtime event */
$allEvents = Event::generateEvents('functions.[functionId].executions.[executionId].update', [ $allEvents = Event::generateEvents('functions.[functionId].executions.[executionId].update', [
'functionId' => $function->getId(), 'functionId' => $function->getId(),
@ -247,7 +247,7 @@ Server::setResource('execute', function () {
channels: $target['channels'], channels: $target['channels'],
roles: $target['roles'] roles: $target['roles']
); );
/** Update usage stats */ /** Update usage stats */
if (App::getEnv('_APP_USAGE_STATS', 'enabled') === 'enabled') { if (App::getEnv('_APP_USAGE_STATS', 'enabled') === 'enabled') {
$usage = new Stats($statsd); $usage = new Stats($statsd);
@ -282,6 +282,7 @@ $server->job()
$type = $payload['type'] ?? ''; $type = $payload['type'] ?? '';
$events = $payload['events'] ?? []; $events = $payload['events'] ?? [];
$data = $payload['data'] ?? ''; $data = $payload['data'] ?? '';
$eventData = $payload['payload'] ?? '';
$project = new Document($payload['project'] ?? []); $project = new Document($payload['project'] ?? []);
$function = new Document($payload['function'] ?? []); $function = new Document($payload['function'] ?? []);
$user = new Document($payload['user'] ?? []); $user = new Document($payload['user'] ?? []);
@ -320,14 +321,14 @@ $server->job()
$execute( $execute(
statsd: $statsd, statsd: $statsd,
dbForProject: $dbForProject, dbForProject: $dbForProject,
project: $project, project: $project,
function: $function, function: $function,
trigger: 'event', trigger: 'event',
event: $events[0], event: $events[0],
eventData: $payload, eventData: $eventData,
user: $user, user: $user,
data: null, data: null,
executionId: null, executionId: null,
jwt: null jwt: null
); );
Console::success('Triggered function: ' . $events[0]); Console::success('Triggered function: ' . $events[0]);
@ -346,15 +347,15 @@ $server->job()
$execution = new Document($payload['execution'] ?? []); $execution = new Document($payload['execution'] ?? []);
$user = new Document($payload['user'] ?? []); $user = new Document($payload['user'] ?? []);
$execute( $execute(
project: $project, project: $project,
function: $function, function: $function,
dbForProject: $dbForProject, dbForProject: $dbForProject,
functions: $functions, functions: $functions,
trigger: 'http', trigger: 'http',
executionId: $execution->getId(), executionId: $execution->getId(),
event: null, event: null,
eventData: null, eventData: null,
data: $data, data: $data,
user: $user, user: $user,
jwt: $jwt, jwt: $jwt,
statsd: $statsd, statsd: $statsd,
@ -362,15 +363,15 @@ $server->job()
break; break;
case 'schedule': case 'schedule':
$execute( $execute(
project: $project, project: $project,
function: $function, function: $function,
dbForProject: $dbForProject, dbForProject: $dbForProject,
functions: $functions, functions: $functions,
trigger: 'http', trigger: 'http',
executionId: null, executionId: null,
event: null, event: null,
eventData: null, eventData: null,
data: null, data: null,
user: null, user: null,
jwt: null, jwt: null,
statsd: $statsd, statsd: $statsd,

View file

@ -145,16 +145,36 @@ class Func extends Event
*/ */
public function trigger(): string|bool public function trigger(): string|bool
{ {
$queue = new Client(Event::FUNCTIONS_QUEUE_NAME, $this->connection); $client = new Client($this->queue, $this->connection);
return $queue->enqueue([ return $client->enqueue([
'type' => $this->type,
'execution' => $this->execution,
'function' => $this->function,
'data' => $this->data,
'jwt' => $this->jwt,
'project' => $this->project, 'project' => $this->project,
'user' => $this->user 'user' => $this->user,
'function' => $this->function,
'execution' => $this->execution,
'type' => $this->type,
'jwt' => $this->jwt,
'payload' => '',
'events' => Event::generateEvents($this->getEvent(), $this->getParams()),
'data' => $this->data,
]); ]);
} }
/**
* Generate a function event from a base event
*
* @param Event $event
*
* @return self
*
*/
public function from(Event $event): self
{
$this->project = $event->getProject();
$this->user = $event->getUser();
$this->payload = $event->getPayload();
$this->event = $event->getEvent();
$this->params = $event->getParams();
return $this;
}
} }

View file

@ -6,7 +6,6 @@ use Appwrite\Utopia\Response;
use Appwrite\Utopia\Response\Model; use Appwrite\Utopia\Response\Model;
use Utopia\Database\Role; use Utopia\Database\Role;
class Execution extends Model class Execution extends Model
{ {
public function __construct() public function __construct()