1
0
Fork 0
mirror of synced 2024-07-08 16:06:02 +12:00
appwrite/app/workers/functions.php

398 lines
15 KiB
PHP
Raw Normal View History

2020-05-05 01:34:31 +12:00
<?php
2020-07-17 00:04:06 +12:00
2022-11-10 06:01:43 +13:00
require_once __DIR__ . '/../worker.php';
use Utopia\Queue;
use Utopia\Queue\Message;
2022-02-06 08:49:57 +13:00
use Appwrite\Event\Event;
use Appwrite\Messaging\Adapter\Realtime;
2022-08-09 18:28:38 +12:00
use Appwrite\Usage\Stats;
2022-02-06 08:49:57 +13:00
use Appwrite\Utopia\Response\Model\Execution;
use Executor\Executor;
2020-07-20 02:43:59 +12:00
use Utopia\App;
2020-05-10 10:12:00 +12:00
use Utopia\CLI\Console;
2020-05-05 01:34:31 +12:00
use Utopia\Config\Config;
2021-05-05 09:45:41 +12:00
use Utopia\Database\Database;
use Utopia\Database\Document;
2022-08-15 02:22:38 +12:00
use Utopia\Database\ID;
2022-08-15 23:24:31 +12:00
use Utopia\Database\Permission;
2022-08-11 01:43:05 +12:00
use Utopia\Database\Query;
2022-08-15 23:24:31 +12:00
use Utopia\Database\Role;
2022-11-10 06:01:43 +13:00
use Utopia\Database\Validator\Authorization;
use Utopia\Logger\Log;
Authorization::disable();
Authorization::setDefaultStatus(false);
2022-11-16 05:03:42 +13:00
global $connection;
2022-11-10 06:01:43 +13:00
global $workerNumber;
2022-11-13 03:35:42 +13:00
$executor = new Executor(App::getEnv('_APP_EXECUTOR_HOST'));
2022-11-10 06:01:43 +13:00
$execute = function (
Document $project,
Document $function,
Database $dbForProject,
string $trigger,
string $executionId = null,
string $event = null,
string $eventData = null,
string $data = null,
?Document $user = null,
string $jwt = null
2022-11-16 05:03:42 +13:00
) use ($executor, $register) {
2022-11-10 06:01:43 +13:00
$user ??= new Document();
$functionId = $function->getId();
$deploymentId = $function->getAttribute('deployment', '');
/** Check if deployment exists */
$deployment = $dbForProject->getDocument('deployments', $deploymentId);
if ($deployment->getAttribute('resourceId') !== $functionId) {
2022-11-16 05:03:42 +13:00
throw new Exception('Deployment not found. Create deployment before trying to execute a function');
2022-11-10 06:01:43 +13:00
}
if ($deployment->isEmpty()) {
2022-11-16 05:03:42 +13:00
throw new Exception('Deployment not found. Create deployment before trying to execute a function');
2022-11-10 06:01:43 +13:00
}
/** Check if build has exists */
$build = $dbForProject->getDocument('builds', $deployment->getAttribute('buildId', ''));
if ($build->isEmpty()) {
2022-11-16 05:03:42 +13:00
throw new Exception('Build not found');
2022-11-10 06:01:43 +13:00
}
if ($build->getAttribute('status') !== 'ready') {
2022-11-16 05:03:42 +13:00
throw new Exception('Build not ready');
2022-11-10 06:01:43 +13:00
}
/** Check if runtime is supported */
$runtimes = Config::getParam('runtimes', []);
if (!\array_key_exists($function->getAttribute('runtime'), $runtimes)) {
2022-11-16 05:03:42 +13:00
throw new Exception('Runtime "' . $function->getAttribute('runtime', '') . '" is not supported');
2022-11-10 06:01:43 +13:00
}
$runtime = $runtimes[$function->getAttribute('runtime')];
/** Create execution or update execution status */
$execution = $dbForProject->getDocument('executions', $executionId ?? '');
if ($execution->isEmpty()) {
$executionId = ID::unique();
$execution = $dbForProject->createDocument('executions', new Document([
'$id' => $executionId,
'$permissions' => $user->isEmpty() ? [] : [Permission::read(Role::user($user->getId()))],
'functionId' => $functionId,
'deploymentId' => $deploymentId,
'trigger' => $trigger,
'status' => 'waiting',
'statusCode' => 0,
'response' => '',
'stderr' => '',
'duration' => 0.0,
'search' => implode(' ', [$functionId, $executionId]),
]));
if ($execution->isEmpty()) {
throw new Exception('Failed to create or read execution');
}
}
$execution->setAttribute('status', 'processing');
$execution = $dbForProject->updateDocument('executions', $executionId, $execution);
if ($build->getAttribute('status') !== 'ready') {
2022-11-16 05:03:42 +13:00
throw new Exception('Build not ready');
2022-11-10 06:01:43 +13:00
}
2020-05-09 18:26:18 +12:00
2022-11-10 06:01:43 +13:00
/** Check if runtime is supported */
$runtimes = Config::getParam('runtimes', []);
2020-05-10 04:39:50 +12:00
2022-11-10 06:01:43 +13:00
if (!\array_key_exists($function->getAttribute('runtime'), $runtimes)) {
2022-11-16 05:03:42 +13:00
throw new Exception('Runtime "' . $function->getAttribute('runtime', '') . '" is not supported');
2022-11-10 06:01:43 +13:00
}
2020-05-10 04:39:50 +12:00
2022-11-10 06:01:43 +13:00
$runtime = $runtimes[$function->getAttribute('runtime')];
/** Create execution or update execution status */
$execution = $dbForProject->getDocument('executions', $executionId ?? '');
if ($execution->isEmpty()) {
$executionId = ID::unique();
$execution = $dbForProject->createDocument('executions', new Document([
'$id' => $executionId,
'$permissions' => $user->isEmpty() ? [] : [Permission::read(Role::user($user->getId()))],
'functionId' => $functionId,
'deploymentId' => $deploymentId,
'trigger' => $trigger,
'status' => 'waiting',
'statusCode' => 0,
'response' => '',
'stderr' => '',
'duration' => 0.0,
'search' => implode(' ', [$functionId, $executionId]),
]));
2020-07-22 08:10:31 +12:00
2022-11-10 06:01:43 +13:00
if ($execution->isEmpty()) {
throw new Exception('Failed to create or read execution');
}
}
2022-11-10 06:01:43 +13:00
$execution->setAttribute('status', 'processing');
$execution = $dbForProject->updateDocument('executions', $executionId, $execution);
$vars = array_reduce($function['vars'] ?? [], function (array $carry, Document $var) {
$carry[$var->getAttribute('key')] = $var->getAttribute('value');
return $carry;
}, []);
/** Collect environment variables */
$vars = \array_merge($vars, [
'APPWRITE_FUNCTION_ID' => $functionId,
'APPWRITE_FUNCTION_NAME' => $function->getAttribute('name'),
'APPWRITE_FUNCTION_DEPLOYMENT' => $deploymentId,
'APPWRITE_FUNCTION_TRIGGER' => $trigger,
'APPWRITE_FUNCTION_PROJECT_ID' => $project->getId(),
'APPWRITE_FUNCTION_RUNTIME_NAME' => $runtime['name'] ?? '',
'APPWRITE_FUNCTION_RUNTIME_VERSION' => $runtime['version'] ?? '',
'APPWRITE_FUNCTION_EVENT' => $event ?? '',
'APPWRITE_FUNCTION_EVENT_DATA' => $eventData ?? '',
'APPWRITE_FUNCTION_DATA' => $data ?? '',
'APPWRITE_FUNCTION_USER_ID' => $user->getId() ?? '',
'APPWRITE_FUNCTION_JWT' => $jwt ?? '',
]);
/** Execute function */
try {
$executionResponse = $executor->createExecution(
projectId: $project->getId(),
2022-11-16 05:55:30 +13:00
deploymentId: $deployment->getId(),
payload: $vars['APPWRITE_FUNCTION_DATA'] ?? '',
variables: $vars,
timeout: $function->getAttribute('timeout', 0),
image: $runtime['image'],
source: $build->getAttribute('outputPath', ''),
2022-11-10 06:01:43 +13:00
entrypoint: $deployment->getAttribute('entrypoint', ''),
);
2022-11-10 06:01:43 +13:00
/** Update execution status */
$execution
->setAttribute('status', $executionResponse['status'])
->setAttribute('statusCode', $executionResponse['statusCode'])
->setAttribute('response', $executionResponse['response'])
->setAttribute('stdout', $executionResponse['stdout'])
->setAttribute('stderr', $executionResponse['stderr'])
->setAttribute('duration', $executionResponse['duration']);
} catch (\Throwable $th) {
$interval = (new \DateTime())->diff(new \DateTime($execution->getCreatedAt()));
$execution
->setAttribute('duration', (float)$interval->format('%s.%f'))
->setAttribute('status', 'failed')
->setAttribute('statusCode', $th->getCode())
->setAttribute('stderr', $th->getMessage());
Console::error($th->getMessage());
2020-05-05 01:34:31 +12:00
}
2022-11-10 06:01:43 +13:00
$execution = $dbForProject->updateDocument('executions', $executionId, $execution);
/** Trigger Webhook */
$executionModel = new Execution();
$executionUpdate = new Event(Event::WEBHOOK_QUEUE_NAME, Event::WEBHOOK_CLASS_NAME);
$executionUpdate
->setProject($project)
->setUser($user)
->setEvent('functions.[functionId].executions.[executionId].update')
->setParam('functionId', $function->getId())
->setParam('executionId', $execution->getId())
->setPayload($execution->getArrayCopy(array_keys($executionModel->getRules())))
->trigger();
/** Trigger Functions */
$executionUpdate
->setClass(Event::FUNCTIONS_CLASS_NAME)
->setQueue(Event::FUNCTIONS_QUEUE_NAME)
->trigger();
/** Trigger realtime event */
$allEvents = Event::generateEvents('functions.[functionId].executions.[executionId].update', [
'functionId' => $function->getId(),
'executionId' => $execution->getId()
]);
$target = Realtime::fromPayload(
// Pass first, most verbose event pattern
event: $allEvents[0],
payload: $execution
);
Realtime::send(
projectId: 'console',
payload: $execution->getArrayCopy(),
events: $allEvents,
channels: $target['channels'],
roles: $target['roles']
);
Realtime::send(
projectId: $project->getId(),
payload: $execution->getArrayCopy(),
events: $allEvents,
channels: $target['channels'],
roles: $target['roles']
);
/** Update usage stats */
global $register;
if (App::getEnv('_APP_USAGE_STATS', 'enabled') === 'enabled') {
$statsd = $register->get('statsd');
$usage = new Stats($statsd);
$usage
->setParam('projectId', $project->getId())
->setParam('functionId', $function->getId())
->setParam('executions.{scope}.compute', 1)
->setParam('executionStatus', $execution->getAttribute('status', ''))
->setParam('executionTime', $execution->getAttribute('duration'))
->setParam('networkRequestSize', 0)
->setParam('networkResponseSize', 0)
->submit();
}
};
2022-11-16 05:03:42 +13:00
$adapter = new Queue\Adapter\Swoole($connection, $workerNumber, Event::FUNCTIONS_QUEUE_NAME);
2022-11-10 06:01:43 +13:00
$server = new Queue\Server($adapter);
$server->job()
->inject('message')
->inject('dbForProject')
->action(function (Message $message, Database $dbForProject) use ($execute) {
$args = $message->getPayload()['value'] ?? [];
$type = $message->getPayload()['type'] ?? '';
$events = $args['events'] ?? [];
$project = new Document($args['project'] ?? []);
$user = new Document($args['user'] ?? []);
// Where $payload comes from
$payload = json_encode($args['payload'] ?? []);
2022-06-20 21:22:53 +12:00
if ($project->getId() === 'console') {
return;
}
2022-04-18 08:34:32 +12:00
/**
* Handle Event execution.
*/
if (!empty($events)) {
$limit = 30;
2022-11-10 06:01:43 +13:00
$sum = $limit;
$total = 0;
$latestDocument = null;
while ($sum === $limit) {
$paginationQueries = [Query::limit($limit)];
if ($latestDocument !== null) {
$paginationQueries[] = Query::cursorAfter($latestDocument);
}
$results = $dbForProject->find('functions', \array_merge($paginationQueries, [
Query::orderAsc('name')
]));
$sum = count($results);
$total = $total + $sum;
2022-04-18 08:34:32 +12:00
Console::log('Fetched ' . $sum . ' functions...');
2022-11-10 06:01:43 +13:00
foreach ($results as $function) {
2022-04-18 08:34:32 +12:00
if (!array_intersect($events, $function->getAttribute('events', []))) {
continue;
2020-08-05 17:18:45 +12:00
}
2022-04-18 08:34:32 +12:00
Console::success('Iterating function: ' . $function->getAttribute('name'));
2022-11-10 06:01:43 +13:00
// As event, pass first, most verbose event pattern
call_user_func($execute, $project, $function, $dbForProject, 'event', null, $events[0], $payload, null, $user, null);
2022-04-18 08:34:32 +12:00
Console::success('Triggered function: ' . $events[0]);
}
2022-11-10 06:01:43 +13:00
$latestDocument = !empty(array_key_last($results)) ? $results[array_key_last($results)] : null;
2022-04-18 08:34:32 +12:00
}
2022-02-06 08:49:57 +13:00
2022-04-18 08:34:32 +12:00
return;
}
/**
* Handle Schedule and HTTP execution.
*/
2022-11-10 06:01:43 +13:00
$user = new Document($args['user'] ?? []);
$project = new Document($args['project'] ?? []);
$execution = new Document($args['execution'] ?? []);
$function = new Document($args['function'] ?? []);
2022-04-18 08:34:32 +12:00
switch ($type) {
case 'http':
2022-11-10 06:01:43 +13:00
$jwt = $args['jwt'] ?? '';
$data = $args['data'] ?? '';
$function = $dbForProject->getDocument('functions', $execution->getAttribute('functionId'));
call_user_func($execute, $project, $function, $dbForProject, 'http', $execution->getId(), null, null, $data, $user, $jwt);
break;
case 'schedule':
2020-10-02 21:25:57 +13:00
/*
2020-11-03 19:53:32 +13:00
* 1. Get Original Task
* 2. Check for updates
* If has updates skip task and don't reschedule
* If status not equal to play skip task
* 3. Check next run date, update task and add new job at the given date
* 4. Execute task (set optional timeout)
* 5. Update task response to log
* On success reset error count
* On failure add error count
* If error count bigger than allowed change status to pause
*/
2021-01-17 12:38:13 +13:00
2022-11-10 06:01:43 +13:00
call_user_func($execute, $project, $function, $dbForProject, 'schedule', null, null, null, null, null, null);
break;
2020-07-17 00:04:06 +12:00
}
2022-11-10 06:01:43 +13:00
});
2022-11-10 06:01:43 +13:00
$server
->error()
->inject('error')
->inject('logger')
->inject('register')
->action(function ($error, $logger, $register) {
2022-02-06 08:49:57 +13:00
2022-11-10 06:01:43 +13:00
$version = App::getEnv('_APP_VERSION', 'UNKNOWN');
2022-02-06 08:49:57 +13:00
2022-11-10 06:01:43 +13:00
if ($error instanceof PDOException) {
throw $error;
2022-02-06 08:49:57 +13:00
}
2022-11-10 06:01:43 +13:00
if ($error->getCode() >= 500 || $error->getCode() === 0) {
$log = new Log();
$log->setNamespace("appwrite-worker");
$log->setServer(\gethostname());
$log->setVersion($version);
$log->setType(Log::TYPE_ERROR);
$log->setMessage($error->getMessage());
$log->setAction('appwrite-worker-functions');
$log->addTag('verboseType', get_class($error));
$log->addTag('code', $error->getCode());
$log->addExtra('file', $error->getFile());
$log->addExtra('line', $error->getLine());
$log->addExtra('trace', $error->getTraceAsString());
$log->addExtra('detailedTrace', $error->getTrace());
$log->addExtra('roles', \Utopia\Database\Validator\Authorization::$roles);
$isProduction = App::getEnv('_APP_ENV', 'development') === 'production';
$log->setEnvironment($isProduction ? Log::ENVIRONMENT_PRODUCTION : Log::ENVIRONMENT_STAGING);
$logger->addLog($log);
2022-02-06 08:49:57 +13:00
}
2022-11-10 06:01:43 +13:00
Console::error('[Error] Type: ' . get_class($error));
Console::error('[Error] Message: ' . $error->getMessage());
Console::error('[Error] File: ' . $error->getFile());
Console::error('[Error] Line: ' . $error->getLine());
2022-02-06 08:49:57 +13:00
2022-11-10 06:01:43 +13:00
$register->get('pools')->reclaim();
});
2020-07-21 22:33:23 +12:00
2022-11-10 06:01:43 +13:00
$server->workerStart();
$server->start();