1
0
Fork 0
mirror of synced 2024-06-26 18:20:43 +12:00
appwrite/app/workers/functions.php

375 lines
14 KiB
PHP
Raw Normal View History

2020-05-05 01:34:31 +12:00
<?php
2020-07-17 00:04:06 +12:00
2022-02-06 08:49:57 +13:00
use Appwrite\Event\Event;
2022-04-18 08:34:32 +12:00
use Appwrite\Event\Func;
2022-02-06 08:49:57 +13:00
use Appwrite\Messaging\Adapter\Realtime;
use Appwrite\Resque\Worker;
2022-02-06 08:49:57 +13:00
use Appwrite\Stats\Stats;
use Appwrite\Utopia\Response\Model\Execution;
2021-01-17 12:38:13 +13:00
use Cron\CronExpression;
2022-02-06 08:49:57 +13:00
use Executor\Executor;
2020-07-20 02:43:59 +12:00
use Utopia\App;
2020-05-10 10:12:00 +12:00
use Utopia\CLI\Console;
2020-05-05 01:34:31 +12:00
use Utopia\Config\Config;
2021-05-05 09:45:41 +12:00
use Utopia\Database\Database;
use Utopia\Database\Document;
use Utopia\Database\Validator\Authorization;
2020-05-09 18:26:18 +12:00
require_once __DIR__ . '/../init.php';
2020-05-10 04:39:50 +12:00
2021-04-15 01:07:26 +12:00
Console::title('Functions V1 Worker');
Console::success(APP_NAME . ' functions worker v1 has started');
2020-05-10 04:39:50 +12:00
class FunctionsV1 extends Worker
2020-05-05 01:34:31 +12:00
{
private ?Executor $executor = null;
public array $args = [];
public array $allowed = [];
2020-07-22 08:10:31 +12:00
public function getName(): string
{
return "functions";
}
public function init(): void
2020-05-05 01:34:31 +12:00
{
$this->executor = new Executor(App::getEnv('_APP_EXECUTOR_HOST'));
2020-05-05 01:34:31 +12:00
}
public function run(): void
2020-05-05 01:34:31 +12:00
{
2022-04-18 08:34:32 +12:00
$type = $this->args['type'] ?? '';
$events = $this->args['events'] ?? [];
$project = new Document($this->args['project'] ?? []);
2022-04-18 08:34:32 +12:00
$user = new Document($this->args['user'] ?? []);
$payload = json_encode($this->args['payload'] ?? []);
2022-06-20 21:22:53 +12:00
if ($project->getId() === 'console') {
return;
}
$database = $this->getProjectDB($project->getId());
2020-07-17 00:04:06 +12:00
2022-04-18 08:34:32 +12:00
/**
* Handle Event execution.
*/
if (!empty($events)) {
$limit = 30;
$sum = 30;
$offset = 0;
$functions = [];
/** @var Document[] $functions */
while ($sum >= $limit) {
2022-06-20 21:22:53 +12:00
$functions = $database->find('functions', [], $limit, $offset, ['name'], [Database::ORDER_ASC]);
2022-04-18 08:34:32 +12:00
$sum = \count($functions);
$offset = $offset + $limit;
Console::log('Fetched ' . $sum . ' functions...');
foreach ($functions as $function) {
if (!array_intersect($events, $function->getAttribute('events', []))) {
continue;
2020-08-05 17:18:45 +12:00
}
2022-04-18 08:34:32 +12:00
Console::success('Iterating function: ' . $function->getAttribute('name'));
$this->execute(
2022-04-19 04:21:45 +12:00
project: $project,
2022-04-18 08:34:32 +12:00
function: $function,
dbForProject: $database,
trigger: 'event',
2022-05-10 00:36:29 +12:00
// Pass first, most verbose event pattern
2022-04-18 08:34:32 +12:00
event: $events[0],
eventData: $payload,
2022-04-19 04:21:45 +12:00
user: $user
2022-04-18 08:34:32 +12:00
);
Console::success('Triggered function: ' . $events[0]);
}
2022-04-18 08:34:32 +12:00
}
2022-02-06 08:49:57 +13:00
2022-04-18 08:34:32 +12:00
return;
}
/**
* Handle Schedule and HTTP execution.
*/
$user = new Document($this->args['user'] ?? []);
$project = new Document($this->args['project'] ?? []);
$execution = new Document($this->args['execution'] ?? []);
2022-05-19 02:31:29 +12:00
$function = new Document($this->args['function'] ?? []);
2022-04-18 08:34:32 +12:00
switch ($type) {
case 'http':
2022-04-18 08:34:32 +12:00
$jwt = $this->args['jwt'] ?? '';
$data = $this->args['data'] ?? '';
2022-06-20 21:22:53 +12:00
$function = $database->getDocument('functions', $execution->getAttribute('functionId'));
$this->execute(
2022-04-19 04:21:45 +12:00
project: $project,
function: $function,
dbForProject: $database,
executionId: $execution->getId(),
2022-04-18 08:34:32 +12:00
trigger: 'http',
data: $data,
2022-04-19 04:21:45 +12:00
user: $user,
jwt: $jwt
);
2022-02-06 08:49:57 +13:00
break;
case 'schedule':
2022-04-18 08:34:32 +12:00
$scheduleOriginal = $execution->getAttribute('scheduleOriginal', '');
2020-10-02 21:25:57 +13:00
/*
2020-11-03 19:53:32 +13:00
* 1. Get Original Task
* 2. Check for updates
* If has updates skip task and don't reschedule
* If status not equal to play skip task
* 3. Check next run date, update task and add new job at the given date
* 4. Execute task (set optional timeout)
* 5. Update task response to log
* On success reset error count
* On failure add error count
* If error count bigger than allowed change status to pause
*/
2021-01-17 12:38:13 +13:00
// Reschedule
2022-06-20 21:22:53 +12:00
$function = $database->getDocument('functions', $function->getId());
2022-05-19 02:31:29 +12:00
2021-05-05 09:45:41 +12:00
if (empty($function->getId())) {
throw new Exception('Function not found (' . $function->getId() . ')');
2021-01-17 12:38:13 +13:00
}
if ($scheduleOriginal && $scheduleOriginal !== $function->getAttribute('schedule')) { // Schedule has changed from previous run, ignore this run.
2021-01-17 12:38:13 +13:00
return;
}
2021-02-22 10:37:22 +13:00
$cron = new CronExpression($function->getAttribute('schedule'));
2021-01-17 12:38:13 +13:00
$next = (int) $cron->getNextRunDate()->format('U');
$function
->setAttribute('scheduleNext', $next)
->setAttribute('schedulePrevious', \time());
2021-01-17 12:38:13 +13:00
2022-06-20 21:22:53 +12:00
$function = $database->updateDocument(
'functions',
$function->getId(),
2022-05-11 00:20:51 +12:00
$function->setAttribute('scheduleNext', (int) $next)
2022-06-20 21:22:53 +12:00
);
2021-01-17 12:38:13 +13:00
if ($function === false) {
throw new Exception('Function update failed.');
}
2022-04-18 08:34:32 +12:00
$reschedule = new Func();
$reschedule
->setFunction($function)
->setType('schedule')
->setUser($user)
->setProject($project);
2022-04-18 08:34:32 +12:00
// Async task reschedule
$reschedule->schedule($next);
2020-07-17 00:04:06 +12:00
$this->execute(
2022-04-19 04:21:45 +12:00
project: $project,
function: $function,
2022-02-06 08:49:57 +13:00
dbForProject: $database,
2022-04-18 08:34:32 +12:00
trigger: 'schedule'
);
2022-02-06 08:49:57 +13:00
break;
2020-07-17 00:04:06 +12:00
}
}
2022-02-06 08:49:57 +13:00
private function execute(
2022-04-19 04:21:45 +12:00
Document $project,
2022-02-06 08:49:57 +13:00
Document $function,
Database $dbForProject,
string $trigger,
2022-04-18 08:34:32 +12:00
string $executionId = null,
string $event = null,
string $eventData = null,
string $data = null,
2022-04-19 04:21:45 +12:00
?Document $user = null,
2022-04-18 08:34:32 +12:00
string $jwt = null
2022-02-06 08:49:57 +13:00
) {
2022-05-19 02:47:44 +12:00
$user ??= new Document();
2022-02-06 08:49:57 +13:00
$functionId = $function->getId();
$deploymentId = $function->getAttribute('deployment', '');
/** Check if deployment exists */
2022-06-20 21:22:53 +12:00
$deployment = $dbForProject->getDocument('deployments', $deploymentId);
2022-02-06 08:49:57 +13:00
if ($deployment->getAttribute('resourceId') !== $functionId) {
throw new Exception('Deployment not found. Create deployment before trying to execute a function', 404);
}
if ($deployment->isEmpty()) {
throw new Exception('Deployment not found. Create deployment before trying to execute a function', 404);
}
/** Check if build has exists */
2022-06-20 21:22:53 +12:00
$build = $dbForProject->getDocument('builds', $deployment->getAttribute('buildId', ''));
2022-02-06 08:49:57 +13:00
if ($build->isEmpty()) {
throw new Exception('Build not found', 404);
}
if ($build->getAttribute('status') !== 'ready') {
throw new Exception('Build not ready', 400);
}
/** Check if runtime is supported */
$runtimes = Config::getParam('runtimes', []);
if (!\array_key_exists($function->getAttribute('runtime'), $runtimes)) {
2022-02-06 08:49:57 +13:00
throw new Exception('Runtime "' . $function->getAttribute('runtime', '') . '" is not supported', 400);
}
2020-07-21 22:33:23 +12:00
$runtime = $runtimes[$function->getAttribute('runtime')];
2022-02-06 08:49:57 +13:00
/** Create execution or update execution status */
2022-06-20 21:22:53 +12:00
$execution = $dbForProject->getDocument('executions', $executionId ?? '');
if ($execution->isEmpty()) {
$executionId = $dbForProject->getId();
$execution = $dbForProject->createDocument('executions', new Document([
'$id' => $executionId,
'$read' => $user->isEmpty() ? [] : ['user:' . $user->getId()],
'$write' => [],
'functionId' => $functionId,
'deploymentId' => $deploymentId,
'trigger' => $trigger,
'status' => 'waiting',
'statusCode' => 0,
'response' => '',
'stderr' => '',
'time' => 0.0,
'search' => implode(' ', [$functionId, $executionId]),
]));
2022-02-06 08:49:57 +13:00
if ($execution->isEmpty()) {
2022-06-20 21:22:53 +12:00
throw new Exception('Failed to create or read execution');
2022-02-06 08:49:57 +13:00
}
2022-06-20 21:22:53 +12:00
}
$execution->setAttribute('status', 'processing');
$execution = $dbForProject->updateDocument('executions', $executionId, $execution);
2022-02-06 08:49:57 +13:00
/** Collect environment variables */
$vars = [
'APPWRITE_FUNCTION_ID' => $functionId,
'APPWRITE_FUNCTION_NAME' => $function->getAttribute('name', ''),
'APPWRITE_FUNCTION_DEPLOYMENT' => $deploymentId,
'APPWRITE_FUNCTION_RUNTIME_NAME' => $runtime['name'],
'APPWRITE_FUNCTION_RUNTIME_VERSION' => $runtime['version'],
'APPWRITE_FUNCTION_TRIGGER' => $trigger,
'APPWRITE_FUNCTION_EVENT' => $event,
'APPWRITE_FUNCTION_EVENT_DATA' => $eventData,
'APPWRITE_FUNCTION_DATA' => $data,
2022-04-19 04:21:45 +12:00
'APPWRITE_FUNCTION_PROJECT_ID' => $project->getId(),
'APPWRITE_FUNCTION_USER_ID' => $user->getId(),
2022-02-06 08:49:57 +13:00
'APPWRITE_FUNCTION_JWT' => $jwt,
];
$vars = \array_merge($function->getAttribute('vars', []), $vars);
/** Execute function */
try {
$executionResponse = $this->executor->createExecution(
2022-04-19 04:21:45 +12:00
projectId: $project->getId(),
deploymentId: $deploymentId,
path: $build->getAttribute('outputPath', ''),
vars: $vars,
entrypoint: $deployment->getAttribute('entrypoint', ''),
2022-05-19 02:47:44 +12:00
data: $vars['APPWRITE_FUNCTION_DATA'] ?? '',
runtime: $function->getAttribute('runtime', ''),
timeout: $function->getAttribute('timeout', 0),
baseImage: $runtime['image']
);
/** Update execution status */
$execution
->setAttribute('status', $executionResponse['status'])
->setAttribute('statusCode', $executionResponse['statusCode'])
->setAttribute('response', $executionResponse['response'])
->setAttribute('stderr', $executionResponse['stderr'])
->setAttribute('time', $executionResponse['time']);
} catch (\Throwable $th) {
2022-03-16 03:51:32 +13:00
$endtime = \microtime(true);
$time = $endtime - $execution->getCreatedAt();
$execution
->setAttribute('time', $time)
->setAttribute('status', 'failed')
->setAttribute('statusCode', $th->getCode())
->setAttribute('stderr', $th->getMessage());
Console::error($th->getMessage());
}
2022-06-20 21:22:53 +12:00
$execution = $dbForProject->updateDocument('executions', $executionId, $execution);
2022-02-06 08:49:57 +13:00
/** Trigger Webhook */
$executionModel = new Execution();
$executionUpdate = new Event(Event::WEBHOOK_QUEUE_NAME, Event::WEBHOOK_CLASS_NAME);
$executionUpdate
2022-04-19 04:21:45 +12:00
->setProject($project)
->setUser($user)
->setEvent('functions.[functionId].executions.[executionId].update')
->setParam('functionId', $function->getId())
->setParam('executionId', $execution->getId())
->setPayload($execution->getArrayCopy(array_keys($executionModel->getRules())))
->trigger();
/** Trigger Functions */
$executionUpdate
->setClass(Event::FUNCTIONS_CLASS_NAME)
->setQueue(Event::FUNCTIONS_QUEUE_NAME)
->trigger();
2022-02-06 08:49:57 +13:00
/** Trigger realtime event */
2022-04-19 04:21:45 +12:00
$allEvents = Event::generateEvents('functions.[functionId].executions.[executionId].update', [
'functionId' => $function->getId(),
'executionId' => $execution->getId()
]);
2022-05-10 00:36:29 +12:00
$target = Realtime::fromPayload(
// Pass first, most verbose event pattern
event: $allEvents[0],
payload: $execution
);
2022-03-01 01:24:35 +13:00
Realtime::send(
projectId: 'console',
payload: $execution->getArrayCopy(),
2022-04-19 04:21:45 +12:00
events: $allEvents,
2022-03-01 01:24:35 +13:00
channels: $target['channels'],
roles: $target['roles']
);
2022-02-06 08:49:57 +13:00
Realtime::send(
2022-04-19 04:21:45 +12:00
projectId: $project->getId(),
2022-02-06 08:49:57 +13:00
payload: $execution->getArrayCopy(),
2022-04-19 04:21:45 +12:00
events: $allEvents,
2022-02-06 08:49:57 +13:00
channels: $target['channels'],
roles: $target['roles']
);
/** Update usage stats */
global $register;
if (App::getEnv('_APP_USAGE_STATS', 'enabled') === 'enabled') {
$statsd = $register->get('statsd');
$usage = new Stats($statsd);
$usage
2022-04-19 04:21:45 +12:00
->setParam('projectId', $project->getId())
2022-02-06 08:49:57 +13:00
->setParam('functionId', $function->getId())
->setParam('functionExecution', 1)
->setParam('functionStatus', $execution->getAttribute('status', ''))
->setParam('functionExecutionTime', $execution->getAttribute('time') * 1000) // ms
->setParam('networkRequestSize', 0)
->setParam('networkResponseSize', 0)
->submit();
}
2020-11-03 19:53:32 +13:00
}
public function shutdown(): void
2020-05-05 01:34:31 +12:00
{
}
}