1
0
Fork 0
mirror of synced 2024-07-07 23:46:11 +12:00
appwrite/app/workers/functions.php

427 lines
16 KiB
PHP
Raw Normal View History

2020-05-05 01:34:31 +12:00
<?php
2020-07-17 00:04:06 +12:00
2022-11-10 06:01:43 +13:00
require_once __DIR__ . '/../worker.php';
2023-08-21 00:29:43 +12:00
use Domnikl\Statsd\Client;
2022-11-10 06:01:43 +13:00
use Utopia\Queue\Message;
2022-02-06 08:49:57 +13:00
use Appwrite\Event\Event;
2022-11-16 07:13:17 +13:00
use Appwrite\Event\Func;
2022-02-06 08:49:57 +13:00
use Appwrite\Messaging\Adapter\Realtime;
2023-08-21 00:29:43 +12:00
use Appwrite\Usage\Stats;
2022-02-06 08:49:57 +13:00
use Appwrite\Utopia\Response\Model\Execution;
use Executor\Executor;
2020-07-20 02:43:59 +12:00
use Utopia\App;
2020-05-10 10:12:00 +12:00
use Utopia\CLI\Console;
2020-05-05 01:34:31 +12:00
use Utopia\Config\Config;
2021-05-05 09:45:41 +12:00
use Utopia\Database\Database;
use Utopia\Database\Document;
2022-12-15 04:42:25 +13:00
use Utopia\Database\Helpers\ID;
2022-12-15 05:04:06 +13:00
use Utopia\Database\Helpers\Permission;
2022-08-11 01:43:05 +12:00
use Utopia\Database\Query;
2022-11-10 06:01:43 +13:00
use Utopia\Database\Validator\Authorization;
2023-05-25 06:14:58 +12:00
use Utopia\Logger\Log;
2022-11-16 17:17:46 +13:00
use Utopia\Queue\Server;
2022-12-15 05:04:06 +13:00
use Utopia\Database\Helpers\Role;
2022-11-10 06:01:43 +13:00
Authorization::disable();
Authorization::setDefaultStatus(false);
Server::setResource('execute', function () {
return function (
2023-05-25 06:14:58 +12:00
Log $log,
2022-11-17 02:50:12 +13:00
Func $queueForFunctions,
Database $dbForProject,
2023-08-21 00:29:43 +12:00
Client $statsd,
Document $project,
Document $function,
string $trigger,
string $data = null,
2023-02-15 00:01:38 +13:00
string $path,
string $method,
array $headers,
?Document $user = null,
string $jwt = null,
2022-11-17 02:50:12 +13:00
string $event = null,
string $eventData = null,
string $executionId = null,
) {
$user ??= new Document();
$functionId = $function->getId();
$deploymentId = $function->getAttribute('deployment', '');
2022-11-16 18:30:57 +13:00
2023-05-27 23:56:26 +12:00
$log->addTag('functionId', $functionId);
$log->addTag('projectId', $project->getId());
2023-05-25 06:14:58 +12:00
/** Check if deployment exists */
$deployment = $dbForProject->getDocument('deployments', $deploymentId);
2022-11-16 18:30:57 +13:00
if ($deployment->getAttribute('resourceId') !== $functionId) {
throw new Exception('Deployment not found. Create deployment before trying to execute a function');
}
2022-11-16 18:30:57 +13:00
if ($deployment->isEmpty()) {
throw new Exception('Deployment not found. Create deployment before trying to execute a function');
}
2022-11-16 18:30:57 +13:00
/** Check if build has exists */
$build = $dbForProject->getDocument('builds', $deployment->getAttribute('buildId', ''));
if ($build->isEmpty()) {
throw new Exception('Build not found');
}
2022-11-16 18:30:57 +13:00
if ($build->getAttribute('status') !== 'ready') {
throw new Exception('Build not ready');
}
2022-11-16 18:30:57 +13:00
/** Check if runtime is supported */
2023-09-05 05:53:25 +12:00
$version = $function->getAttribute('version', 'v2');
$runtimes = Config::getParam($version === 'v2' ? 'runtimes-v2' : 'runtimes', []);
2022-11-16 18:30:57 +13:00
if (!\array_key_exists($function->getAttribute('runtime'), $runtimes)) {
throw new Exception('Runtime "' . $function->getAttribute('runtime', '') . '" is not supported');
}
2022-11-16 18:30:57 +13:00
$runtime = $runtimes[$function->getAttribute('runtime')];
2022-11-16 18:30:57 +13:00
2023-07-30 04:20:20 +12:00
$headers['x-appwrite-trigger'] = $trigger;
$headers['x-appwrite-event'] = $event ?? '';
$headers['x-appwrite-user-id'] = $user->getId() ?? '';
$headers['x-appwrite-user-jwt'] = $jwt ?? '';
2023-07-31 01:30:30 +12:00
/** Create execution or update execution status */
$execution = $dbForProject->getDocument('executions', $executionId ?? '');
2022-11-10 06:01:43 +13:00
if ($execution->isEmpty()) {
$headersFiltered = [];
foreach ($headers as $key => $value) {
2023-08-12 01:34:57 +12:00
if (\in_array(\strtolower($key), FUNCTION_ALLOWLIST_HEADERS_REQUEST)) {
2023-08-08 21:28:25 +12:00
$headersFiltered[] = [ 'name' => $key, 'value' => $value ];
}
}
2023-03-15 08:31:23 +13:00
$executionId = ID::unique();
2023-02-24 22:24:20 +13:00
$execution = new Document([
'$id' => $executionId,
'$permissions' => $user->isEmpty() ? [] : [Permission::read(Role::user($user->getId()))],
2023-07-30 04:20:20 +12:00
'functionInternalId' => $function->getInternalId(),
'functionId' => $function->getId(),
'deploymentInternalId' => $deployment->getInternalId(),
'deploymentId' => $deployment->getId(),
'trigger' => $trigger,
2023-02-24 22:24:20 +13:00
'status' => 'processing',
2023-07-30 04:20:20 +12:00
'responseStatusCode' => 0,
'responseHeaders' => [],
'requestPath' => $path,
'requestMethod' => $method,
'requestHeaders' => $headersFiltered,
2023-02-15 00:01:38 +13:00
'errors' => '',
'logs' => '',
'duration' => 0.0,
'search' => implode(' ', [$functionId, $executionId]),
2023-02-24 22:24:20 +13:00
]);
2023-03-15 08:31:23 +13:00
if ($function->getAttribute('logging')) {
2023-02-24 22:24:20 +13:00
$execution = $dbForProject->createDocument('executions', $execution);
}
2022-11-16 18:30:57 +13:00
2022-11-17 02:11:00 +13:00
// TODO: @Meldiron Trigger executions.create event here
2022-11-16 18:30:57 +13:00
if ($execution->isEmpty()) {
throw new Exception('Failed to create or read execution');
}
2022-11-10 06:01:43 +13:00
}
2022-11-17 02:11:00 +13:00
2023-03-15 08:31:23 +13:00
if ($execution->getAttribute('status') !== 'processing') {
2023-02-24 22:24:20 +13:00
$execution->setAttribute('status', 'processing');
2023-03-15 08:31:23 +13:00
if ($function->getAttribute('logging')) {
2023-02-24 22:24:20 +13:00
$execution = $dbForProject->updateDocument('executions', $executionId, $execution);
}
}
2022-11-16 18:30:57 +13:00
2023-03-29 02:21:42 +13:00
$durationStart = \microtime(true);
2023-09-11 22:22:16 +12:00
$body = $eventData ?? '';
if (empty($body)) {
$body = $data ?? '';
}
2023-03-12 05:06:02 +13:00
$vars = [];
2023-09-11 22:22:16 +12:00
// V2 vars
if ($version === 'v2') {
$vars = \array_merge($vars, [
'APPWRITE_FUNCTION_TRIGGER' => $headers['x-appwrite-trigger'] ?? '',
'APPWRITE_FUNCTION_DATA' => $body ?? '',
'APPWRITE_FUNCTION_EVENT_DATA' => $body ?? '',
'APPWRITE_FUNCTION_EVENT' => $headers['x-appwrite-event'] ?? '',
'APPWRITE_FUNCTION_USER_ID' => $headers['x-appwrite-user-id'] ?? '',
'APPWRITE_FUNCTION_JWT' => $headers['x-appwrite-user-jwt'] ?? ''
]);
}
2023-07-28 19:56:07 +12:00
// Shared vars
2023-09-05 20:21:36 +12:00
foreach ($function->getAttribute('varsProject', []) as $var) {
$vars[$var->getAttribute('key')] = $var->getAttribute('value', '');
}
2023-03-12 05:06:02 +13:00
// Function vars
2023-09-05 20:21:36 +12:00
foreach ($function->getAttribute('vars', []) as $var) {
$vars[$var->getAttribute('key')] = $var->getAttribute('value', '');
}
2022-11-16 18:30:57 +13:00
2023-03-12 05:06:02 +13:00
// Appwrite vars
$vars = \array_merge($vars, [
'APPWRITE_FUNCTION_ID' => $functionId,
'APPWRITE_FUNCTION_NAME' => $function->getAttribute('name'),
'APPWRITE_FUNCTION_DEPLOYMENT' => $deploymentId,
'APPWRITE_FUNCTION_PROJECT_ID' => $project->getId(),
'APPWRITE_FUNCTION_RUNTIME_NAME' => $runtime['name'] ?? '',
'APPWRITE_FUNCTION_RUNTIME_VERSION' => $runtime['version'] ?? '',
]);
2022-11-16 18:30:57 +13:00
/** Execute function */
try {
2023-09-05 05:53:25 +12:00
$version = $function->getAttribute('version', 'v2');
$command = $runtime['startCommand'];
$executor = new Executor(App::getEnv('_APP_EXECUTOR_HOST'));
2023-09-05 05:53:25 +12:00
$command = $version === 'v2' ? '' : 'cp /tmp/code.tar.gz /mnt/code/code.tar.gz && nohup helpers/start.sh "' . $command . '"';
$executionResponse = $executor->createExecution(
projectId: $project->getId(),
2022-11-17 02:50:12 +13:00
deploymentId: $deploymentId,
body: \strlen($body) > 0 ? $body : null,
variables: $vars,
timeout: $function->getAttribute('timeout', 0),
image: $runtime['image'],
2023-03-15 19:08:43 +13:00
source: $build->getAttribute('path', ''),
entrypoint: $deployment->getAttribute('entrypoint', ''),
2023-09-05 05:53:25 +12:00
version: $version,
2023-02-15 00:01:38 +13:00
path: $path,
method: $method,
headers: $headers,
2023-09-05 05:53:25 +12:00
runtimeEntrypoint: $command
);
2022-11-16 18:30:57 +13:00
2023-03-07 00:16:34 +13:00
$status = $executionResponse['statusCode'] >= 400 ? 'failed' : 'completed';
2023-02-03 08:21:00 +13:00
$headersFiltered = [];
foreach ($executionResponse['headers'] as $key => $value) {
2023-08-12 01:34:57 +12:00
if (\in_array(\strtolower($key), FUNCTION_ALLOWLIST_HEADERS_RESPONSE)) {
2023-08-07 01:11:30 +12:00
$headersFiltered[] = [ 'name' => $key, 'value' => $value ];
}
}
2023-07-30 04:20:20 +12:00
/** Update execution status */
$execution
2023-02-03 08:21:00 +13:00
->setAttribute('status', $status)
2023-07-30 04:20:20 +12:00
->setAttribute('responseStatusCode', $executionResponse['statusCode'])
->setAttribute('responseHeaders', $headersFiltered)
2023-02-15 00:01:38 +13:00
->setAttribute('logs', $executionResponse['logs'])
->setAttribute('errors', $executionResponse['errors'])
->setAttribute('duration', $executionResponse['duration']);
} catch (\Throwable $th) {
2023-03-29 02:21:42 +13:00
$durationEnd = \microtime(true);
$execution
2023-03-29 02:21:42 +13:00
->setAttribute('duration', $durationEnd - $durationStart)
->setAttribute('status', 'failed')
2023-07-30 04:20:20 +12:00
->setAttribute('responseStatusCode', 500)
2023-02-15 21:36:20 +13:00
->setAttribute('errors', $th->getMessage() . '\nError Code: ' . $th->getCode());
2022-11-23 04:36:15 +13:00
2023-05-25 04:29:23 +12:00
$error = $th->getMessage();
2023-05-25 07:07:41 +12:00
$errorCode = $th->getCode();
}
2022-11-16 18:30:57 +13:00
2023-03-15 08:31:23 +13:00
if ($function->getAttribute('logging')) {
2023-02-24 22:24:20 +13:00
$execution = $dbForProject->updateDocument('executions', $executionId, $execution);
}
2022-11-16 18:30:57 +13:00
/** Trigger Webhook */
$executionModel = new Execution();
$executionUpdate = new Event(Event::WEBHOOK_QUEUE_NAME, Event::WEBHOOK_CLASS_NAME);
$executionUpdate
->setProject($project)
->setUser($user)
->setEvent('functions.[functionId].executions.[executionId].update')
->setParam('functionId', $function->getId())
->setParam('executionId', $execution->getId())
->setPayload($execution->getArrayCopy(array_keys($executionModel->getRules())))
->trigger();
2022-11-16 18:30:57 +13:00
/** Trigger Functions */
2022-11-16 19:08:01 +13:00
$queueForFunctions
2022-11-17 02:50:12 +13:00
->from($executionUpdate)
->trigger();
2022-11-16 18:30:57 +13:00
/** Trigger realtime event */
$allEvents = Event::generateEvents('functions.[functionId].executions.[executionId].update', [
'functionId' => $function->getId(),
'executionId' => $execution->getId()
]);
$target = Realtime::fromPayload(
// Pass first, most verbose event pattern
event: $allEvents[0],
payload: $execution
);
Realtime::send(
projectId: 'console',
payload: $execution->getArrayCopy(),
events: $allEvents,
channels: $target['channels'],
roles: $target['roles']
);
Realtime::send(
2022-11-10 06:01:43 +13:00
projectId: $project->getId(),
payload: $execution->getArrayCopy(),
events: $allEvents,
channels: $target['channels'],
roles: $target['roles']
2022-11-10 06:01:43 +13:00
);
2022-11-16 18:30:57 +13:00
2023-08-21 00:29:43 +12:00
/** Update usage stats */
if (App::getEnv('_APP_USAGE_STATS', 'enabled') === 'enabled') {
$usage = new Stats($statsd);
$usage
->setParam('projectId', $project->getId())
->setParam('projectInternalId', $project->getInternalId())
->setParam('functionId', $function->getId()) // TODO: We should use functionInternalId in usage stats
->setParam('executions.{scope}.compute', 1)
->setParam('executionStatus', $execution->getAttribute('status', ''))
->setParam('executionTime', $execution->getAttribute('duration'))
->setParam('networkRequestSize', 0)
->setParam('networkResponseSize', 0)
->submit();
}
if (!empty($error)) {
throw new Exception($error, $errorCode);
}
};
});
2022-11-10 06:01:43 +13:00
$server->job()
->inject('message')
->inject('dbForProject')
2022-11-16 19:08:01 +13:00
->inject('queueForFunctions')
2023-08-21 00:29:43 +12:00
->inject('statsd')
->inject('execute')
2023-07-12 19:23:04 +12:00
->inject('log')
2023-08-21 00:29:43 +12:00
->action(function (Message $message, Database $dbForProject, Func $queueForFunctions, Client $statsd, callable $execute, Log $log) {
2022-11-16 17:17:46 +13:00
$payload = $message->getPayload() ?? [];
if (empty($payload)) {
throw new Exception('Missing payload');
}
$type = $payload['type'] ?? '';
$events = $payload['events'] ?? [];
2023-02-20 23:45:50 +13:00
$data = $payload['body'] ?? '';
2022-11-16 18:30:57 +13:00
$eventData = $payload['payload'] ?? '';
2022-11-16 17:19:35 +13:00
$project = new Document($payload['project'] ?? []);
$function = new Document($payload['function'] ?? []);
2022-11-16 17:17:46 +13:00
$user = new Document($payload['user'] ?? []);
2022-06-20 21:22:53 +12:00
if ($project->getId() === 'console') {
return;
}
2022-11-17 01:44:14 +13:00
if (!empty($events)) {
$limit = 30;
$sum = 30;
$offset = 0;
$functions = [];
/** @var Document[] $functions */
while ($sum >= $limit) {
$functions = $dbForProject->find('functions', [
Query::limit($limit),
Query::offset($offset)
2022-11-17 01:44:14 +13:00
]);
$sum = \count($functions);
$offset = $offset + $limit;
Console::log('Fetched ' . $sum . ' functions...');
foreach ($functions as $function) {
if (!array_intersect($events, $function->getAttribute('events', []))) {
continue;
}
Console::success('Iterating function: ' . $function->getAttribute('name'));
$execute(
log: $log,
2023-08-21 00:29:43 +12:00
statsd: $statsd,
2022-11-17 01:44:14 +13:00
dbForProject: $dbForProject,
project: $project,
function: $function,
queueForFunctions: $queueForFunctions,
2022-11-17 02:11:00 +13:00
trigger: 'event',
2022-11-17 01:44:14 +13:00
event: $events[0],
eventData: \is_string($eventData) ? $eventData : \json_encode($eventData),
2022-11-17 01:44:14 +13:00
user: $user,
data: null,
executionId: null,
2023-02-15 00:01:38 +13:00
jwt: null,
2023-03-05 21:12:01 +13:00
path: '/',
method: 'POST',
headers: [
'user-agent' => 'Appwrite/' . APP_VERSION_STABLE
],
2022-11-17 01:44:14 +13:00
);
Console::success('Triggered function: ' . $events[0]);
}
}
2022-11-17 02:11:00 +13:00
return;
2022-11-17 01:44:14 +13:00
}
2022-04-18 08:34:32 +12:00
/**
* Handle Schedule and HTTP execution.
*/
switch ($type) {
case 'http':
2022-11-16 17:17:46 +13:00
$jwt = $payload['jwt'] ?? '';
$execution = new Document($payload['execution'] ?? []);
$user = new Document($payload['user'] ?? []);
$execute(
2023-05-25 06:14:58 +12:00
log: $log,
2022-11-16 18:30:57 +13:00
project: $project,
function: $function,
dbForProject: $dbForProject,
2022-11-16 19:08:01 +13:00
queueForFunctions: $queueForFunctions,
2022-11-17 02:11:00 +13:00
trigger: 'http',
2022-11-16 18:30:57 +13:00
executionId: $execution->getId(),
event: null,
eventData: null,
2022-11-16 18:30:57 +13:00
data: $data,
user: $user,
jwt: $jwt,
2023-09-06 20:10:03 +12:00
path: $payload['path'] ?? '',
method: $payload['method'] ?? 'POST',
headers: $payload['headers'] ?? [],
2023-08-21 00:29:43 +12:00
statsd: $statsd,
);
break;
case 'schedule':
$execute(
2023-05-25 06:14:58 +12:00
log: $log,
2022-11-16 18:30:57 +13:00
project: $project,
function: $function,
dbForProject: $dbForProject,
2022-11-16 19:08:01 +13:00
queueForFunctions: $queueForFunctions,
2022-11-17 02:11:00 +13:00
trigger: 'schedule',
2022-11-16 18:30:57 +13:00
executionId: null,
event: null,
eventData: null,
2022-11-16 18:30:57 +13:00
data: null,
user: null,
jwt: null,
2023-09-06 20:10:03 +12:00
path: $payload['path'] ?? '/',
method: $payload['method'] ?? 'POST',
headers: $payload['headers'] ?? [],
2023-08-21 00:29:43 +12:00
statsd: $statsd,
);
break;
2020-07-17 00:04:06 +12:00
}
2022-11-10 06:01:43 +13:00
});
2022-11-10 06:01:43 +13:00
$server->workerStart();
$server->start();