2020-05-05 01:34:31 +12:00
|
|
|
<?php
|
2020-07-17 00:04:06 +12:00
|
|
|
|
2022-02-06 08:49:57 +13:00
|
|
|
use Appwrite\Event\Event;
|
|
|
|
use Appwrite\Messaging\Adapter\Realtime;
|
2021-06-12 02:20:18 +12:00
|
|
|
use Appwrite\Resque\Worker;
|
2022-02-06 08:49:57 +13:00
|
|
|
use Appwrite\Stats\Stats;
|
|
|
|
use Appwrite\Utopia\Response\Model\Execution;
|
2021-01-17 12:38:13 +13:00
|
|
|
use Cron\CronExpression;
|
2022-02-06 08:49:57 +13:00
|
|
|
use Executor\Executor;
|
2020-11-03 19:53:32 +13:00
|
|
|
use Swoole\Runtime;
|
2020-07-20 02:43:59 +12:00
|
|
|
use Utopia\App;
|
2020-05-10 10:12:00 +12:00
|
|
|
use Utopia\CLI\Console;
|
2020-05-05 01:34:31 +12:00
|
|
|
use Utopia\Config\Config;
|
2021-05-05 09:45:41 +12:00
|
|
|
use Utopia\Database\Database;
|
|
|
|
use Utopia\Database\Document;
|
|
|
|
use Utopia\Database\Validator\Authorization;
|
2020-05-09 18:26:18 +12:00
|
|
|
|
2021-08-13 20:39:46 +12:00
|
|
|
require_once __DIR__.'/../init.php';
|
2020-05-10 04:39:50 +12:00
|
|
|
|
2021-04-15 01:07:26 +12:00
|
|
|
Console::title('Functions V1 Worker');
|
2021-09-01 21:09:04 +12:00
|
|
|
Console::success(APP_NAME . ' functions worker v1 has started');
|
2020-05-10 04:39:50 +12:00
|
|
|
|
2021-06-12 02:20:18 +12:00
|
|
|
class FunctionsV1 extends Worker
|
2020-05-05 01:34:31 +12:00
|
|
|
{
|
2022-02-06 08:49:57 +13:00
|
|
|
/**
|
|
|
|
* @var Executor
|
|
|
|
*/
|
|
|
|
private $executor = null;
|
|
|
|
|
2021-09-01 21:09:04 +12:00
|
|
|
public array $args = [];
|
2020-05-05 01:34:31 +12:00
|
|
|
|
2021-09-01 21:09:04 +12:00
|
|
|
public array $allowed = [];
|
2020-07-22 08:10:31 +12:00
|
|
|
|
2021-11-24 22:38:32 +13:00
|
|
|
public function getName(): string {
|
2021-11-24 03:24:25 +13:00
|
|
|
return "functions";
|
|
|
|
}
|
|
|
|
|
2021-06-12 02:20:18 +12:00
|
|
|
public function init(): void
|
2020-05-05 01:34:31 +12:00
|
|
|
{
|
2022-02-06 08:49:57 +13:00
|
|
|
$this->executor = new Executor();
|
2020-05-05 01:34:31 +12:00
|
|
|
}
|
|
|
|
|
2021-06-12 02:20:18 +12:00
|
|
|
public function run(): void
|
2020-05-05 01:34:31 +12:00
|
|
|
{
|
2020-11-03 19:53:32 +13:00
|
|
|
$projectId = $this->args['projectId'] ?? '';
|
|
|
|
$functionId = $this->args['functionId'] ?? '';
|
2021-05-17 23:32:37 +12:00
|
|
|
$webhooks = $this->args['webhooks'] ?? [];
|
2020-11-03 19:53:32 +13:00
|
|
|
$executionId = $this->args['executionId'] ?? '';
|
|
|
|
$trigger = $this->args['trigger'] ?? '';
|
|
|
|
$event = $this->args['event'] ?? '';
|
2021-01-17 12:38:13 +13:00
|
|
|
$scheduleOriginal = $this->args['scheduleOriginal'] ?? '';
|
2021-03-31 00:42:58 +13:00
|
|
|
$eventData = (!empty($this->args['eventData'])) ? json_encode($this->args['eventData']) : '';
|
2021-03-10 08:58:03 +13:00
|
|
|
$data = $this->args['data'] ?? '';
|
2021-03-11 05:58:46 +13:00
|
|
|
$userId = $this->args['userId'] ?? '';
|
|
|
|
$jwt = $this->args['jwt'] ?? '';
|
2020-07-16 08:29:55 +12:00
|
|
|
|
2021-12-28 01:45:23 +13:00
|
|
|
$database = $this->getProjectDB($projectId);
|
2020-07-17 00:04:06 +12:00
|
|
|
|
2020-08-04 17:23:38 +12:00
|
|
|
switch ($trigger) {
|
|
|
|
case 'event':
|
2020-08-05 08:09:01 +12:00
|
|
|
$limit = 30;
|
|
|
|
$sum = 30;
|
|
|
|
$offset = 0;
|
2021-09-01 21:09:04 +12:00
|
|
|
$functions = [];
|
|
|
|
/** @var Document[] $functions */
|
2020-08-05 08:09:01 +12:00
|
|
|
|
|
|
|
while ($sum >= $limit) {
|
2022-02-06 08:49:57 +13:00
|
|
|
$functions = Authorization::skip(fn() => $database->find('functions', [], $limit, $offset, ['name'], [Database::ORDER_ASC]));
|
2020-08-05 08:09:01 +12:00
|
|
|
$sum = \count($functions);
|
|
|
|
$offset = $offset + $limit;
|
|
|
|
|
2021-09-01 21:09:04 +12:00
|
|
|
Console::log('Fetched ' . $sum . ' functions...');
|
2020-08-05 17:18:45 +12:00
|
|
|
|
2021-09-01 21:09:04 +12:00
|
|
|
foreach ($functions as $function) {
|
2020-08-05 17:18:45 +12:00
|
|
|
$events = $function->getAttribute('events', []);
|
|
|
|
|
2022-02-06 08:49:57 +13:00
|
|
|
if (!\in_array($event, $events)) {
|
2020-08-05 17:18:45 +12:00
|
|
|
continue;
|
|
|
|
}
|
|
|
|
|
2022-02-06 08:49:57 +13:00
|
|
|
Console::success('Iterating function: ' . $function->getAttribute('name'));
|
2021-09-01 21:09:04 +12:00
|
|
|
|
|
|
|
$this->execute(
|
|
|
|
projectId: $projectId,
|
|
|
|
function: $function,
|
2022-02-06 08:49:57 +13:00
|
|
|
dbForProject: $database,
|
|
|
|
executionId: $executionId,
|
|
|
|
webhooks: $webhooks,
|
|
|
|
trigger: $trigger,
|
2021-09-01 21:09:04 +12:00
|
|
|
event: $event,
|
|
|
|
eventData: $eventData,
|
|
|
|
data: $data,
|
|
|
|
userId: $userId,
|
|
|
|
jwt: $jwt
|
|
|
|
);
|
2022-02-06 08:49:57 +13:00
|
|
|
|
|
|
|
Console::success('Triggered function: ' . $event);
|
2020-08-05 17:18:45 +12:00
|
|
|
}
|
2020-08-05 08:09:01 +12:00
|
|
|
}
|
2022-02-06 08:49:57 +13:00
|
|
|
|
2020-08-04 17:23:38 +12:00
|
|
|
break;
|
|
|
|
|
|
|
|
case 'schedule':
|
2020-10-02 21:25:57 +13:00
|
|
|
/*
|
2020-11-03 19:53:32 +13:00
|
|
|
* 1. Get Original Task
|
|
|
|
* 2. Check for updates
|
|
|
|
* If has updates skip task and don't reschedule
|
|
|
|
* If status not equal to play skip task
|
|
|
|
* 3. Check next run date, update task and add new job at the given date
|
|
|
|
* 4. Execute task (set optional timeout)
|
|
|
|
* 5. Update task response to log
|
|
|
|
* On success reset error count
|
|
|
|
* On failure add error count
|
|
|
|
* If error count bigger than allowed change status to pause
|
|
|
|
*/
|
2021-01-17 12:38:13 +13:00
|
|
|
|
|
|
|
// Reschedule
|
2022-02-06 08:49:57 +13:00
|
|
|
$function = Authorization::skip(fn() => $database->getDocument('functions', $functionId));
|
2021-01-17 12:38:13 +13:00
|
|
|
|
2021-05-05 09:45:41 +12:00
|
|
|
if (empty($function->getId())) {
|
2021-01-17 12:38:13 +13:00
|
|
|
throw new Exception('Function not found ('.$functionId.')');
|
|
|
|
}
|
|
|
|
|
2021-09-01 21:09:04 +12:00
|
|
|
if ($scheduleOriginal && $scheduleOriginal !== $function->getAttribute('schedule')) { // Schedule has changed from previous run, ignore this run.
|
2021-01-17 12:38:13 +13:00
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
2021-02-22 10:37:22 +13:00
|
|
|
$cron = new CronExpression($function->getAttribute('schedule'));
|
2021-01-17 12:38:13 +13:00
|
|
|
$next = (int) $cron->getNextRunDate()->format('U');
|
|
|
|
|
|
|
|
$function
|
|
|
|
->setAttribute('scheduleNext', $next)
|
2021-09-01 21:09:04 +12:00
|
|
|
->setAttribute('schedulePrevious', \time());
|
2021-01-17 12:38:13 +13:00
|
|
|
|
2022-02-06 08:49:57 +13:00
|
|
|
$function = Authorization::skip(function() use ($database, $function, $next, $functionId) {
|
|
|
|
$function = $database->updateDocument('functions', $function->getId(), new Document(array_merge($function->getArrayCopy(), [
|
|
|
|
'scheduleNext' => (int)$next,
|
|
|
|
])));
|
|
|
|
|
|
|
|
if ($function === false) {
|
|
|
|
throw new Exception('Function update failed (' . $functionId . ')');
|
|
|
|
}
|
|
|
|
return $function;
|
|
|
|
});
|
2021-01-17 13:07:43 +13:00
|
|
|
|
2022-02-06 08:49:57 +13:00
|
|
|
ResqueScheduler::enqueueAt($next, Event::FUNCTIONS_QUEUE_NAME, Event::FUNCTIONS_CLASS_NAME, [
|
2021-01-17 12:38:13 +13:00
|
|
|
'projectId' => $projectId,
|
2021-05-18 03:52:22 +12:00
|
|
|
'webhooks' => $webhooks,
|
2021-01-17 12:38:13 +13:00
|
|
|
'functionId' => $function->getId(),
|
2021-12-07 23:42:33 +13:00
|
|
|
'userId' => $userId,
|
2021-01-17 12:38:13 +13:00
|
|
|
'executionId' => null,
|
|
|
|
'trigger' => 'schedule',
|
|
|
|
'scheduleOriginal' => $function->getAttribute('schedule', ''),
|
2021-09-06 12:37:20 +12:00
|
|
|
]); // Async task reschedule
|
2021-01-17 12:38:13 +13:00
|
|
|
|
2021-09-01 21:09:04 +12:00
|
|
|
$this->execute(
|
|
|
|
projectId: $projectId,
|
|
|
|
function: $function,
|
2022-02-06 08:49:57 +13:00
|
|
|
dbForProject: $database,
|
|
|
|
executionId: $executionId,
|
2021-09-01 21:09:04 +12:00
|
|
|
webhooks: $webhooks,
|
2022-02-06 08:49:57 +13:00
|
|
|
trigger: $trigger,
|
|
|
|
event: $event,
|
|
|
|
eventData: $eventData,
|
|
|
|
data: $data,
|
2021-09-01 21:09:04 +12:00
|
|
|
userId: $userId,
|
|
|
|
jwt: $jwt
|
|
|
|
);
|
2020-08-04 17:23:38 +12:00
|
|
|
break;
|
|
|
|
|
|
|
|
case 'http':
|
2022-02-06 08:49:57 +13:00
|
|
|
$function = Authorization::skip(fn() => $database->getDocument('functions', $functionId));
|
2020-08-04 17:23:38 +12:00
|
|
|
|
2021-05-05 09:45:41 +12:00
|
|
|
if (empty($function->getId())) {
|
2020-12-11 03:35:39 +13:00
|
|
|
throw new Exception('Function not found ('.$functionId.')');
|
2020-08-04 17:23:38 +12:00
|
|
|
}
|
2020-07-17 00:04:06 +12:00
|
|
|
|
2021-09-01 21:09:04 +12:00
|
|
|
$this->execute(
|
|
|
|
projectId: $projectId,
|
|
|
|
function: $function,
|
2022-02-06 08:49:57 +13:00
|
|
|
dbForProject: $database,
|
|
|
|
executionId: $executionId,
|
2021-09-01 21:09:04 +12:00
|
|
|
webhooks: $webhooks,
|
2022-02-06 08:49:57 +13:00
|
|
|
trigger: $trigger,
|
|
|
|
event: $event,
|
|
|
|
eventData: $eventData,
|
|
|
|
data: $data,
|
2021-09-01 21:09:04 +12:00
|
|
|
userId: $userId,
|
|
|
|
jwt: $jwt
|
|
|
|
);
|
2022-02-06 08:49:57 +13:00
|
|
|
|
2020-08-04 17:23:38 +12:00
|
|
|
break;
|
2020-07-17 00:04:06 +12:00
|
|
|
}
|
2020-08-04 17:23:38 +12:00
|
|
|
}
|
|
|
|
|
2022-02-06 08:49:57 +13:00
|
|
|
private function execute(
|
|
|
|
string $projectId,
|
|
|
|
Document $function,
|
|
|
|
Database $dbForProject,
|
|
|
|
string $executionId,
|
|
|
|
array $webhooks,
|
|
|
|
string $trigger,
|
|
|
|
string $event,
|
|
|
|
string $eventData,
|
|
|
|
string $data,
|
|
|
|
string $userId,
|
|
|
|
string $jwt
|
|
|
|
) {
|
|
|
|
|
|
|
|
$functionId = $function->getId();
|
|
|
|
$deploymentId = $function->getAttribute('deployment', '');
|
|
|
|
|
|
|
|
/** Check if deployment exists */
|
|
|
|
$deployment = Authorization::skip(fn() => $dbForProject->getDocument('deployments', $deploymentId));
|
|
|
|
|
|
|
|
if ($deployment->getAttribute('resourceId') !== $functionId) {
|
|
|
|
throw new Exception('Deployment not found. Create deployment before trying to execute a function', 404);
|
|
|
|
}
|
|
|
|
|
|
|
|
if ($deployment->isEmpty()) {
|
|
|
|
throw new Exception('Deployment not found. Create deployment before trying to execute a function', 404);
|
|
|
|
}
|
|
|
|
|
|
|
|
/** Check if build has exists */
|
|
|
|
$build = Authorization::skip(fn() => $dbForProject->getDocument('builds', $deployment->getAttribute('buildId', '')));
|
|
|
|
if ($build->isEmpty()) {
|
|
|
|
throw new Exception('Build not found', 404);
|
|
|
|
}
|
|
|
|
|
|
|
|
if ($build->getAttribute('status') !== 'ready') {
|
|
|
|
throw new Exception('Build not ready', 400);
|
|
|
|
}
|
|
|
|
|
|
|
|
/** Check if runtime is supported */
|
|
|
|
$runtimes = Config::getParam('runtimes', []);
|
|
|
|
$runtime = (isset($runtimes[$function->getAttribute('runtime', '')])) ? $runtimes[$function->getAttribute('runtime', '')] : null;
|
|
|
|
|
|
|
|
if (\is_null($runtime)) {
|
|
|
|
throw new Exception('Runtime "' . $function->getAttribute('runtime', '') . '" is not supported', 400);
|
2021-01-13 18:57:15 +13:00
|
|
|
}
|
2020-07-21 22:33:23 +12:00
|
|
|
|
2022-02-06 08:49:57 +13:00
|
|
|
/** Create execution or update execution status */
|
|
|
|
$execution = Authorization::skip(function() use ($dbForProject, &$executionId, $functionId, $deploymentId, $trigger, $userId) {
|
|
|
|
$execution = $dbForProject->getDocument('executions', $executionId);
|
|
|
|
if ($execution->isEmpty()) {
|
|
|
|
$executionId = $dbForProject->getId();
|
|
|
|
$execution = $dbForProject->createDocument('executions', new Document([
|
|
|
|
'$id' => $executionId,
|
|
|
|
'$read' => $userId ? ['user:' . $userId] : [],
|
|
|
|
'$write' => [],
|
|
|
|
'dateCreated' => time(),
|
|
|
|
'functionId' => $functionId,
|
|
|
|
'deploymentId' => $deploymentId,
|
|
|
|
'trigger' => $trigger,
|
|
|
|
'status' => 'waiting',
|
|
|
|
'statusCode' => 0,
|
|
|
|
'stdout' => '',
|
|
|
|
'stderr' => '',
|
|
|
|
'time' => 0.0,
|
|
|
|
'search' => implode(' ', [$functionId, $executionId]),
|
|
|
|
]));
|
|
|
|
|
|
|
|
if ($execution->isEmpty()) {
|
|
|
|
throw new Exception('Failed to create or read execution');
|
|
|
|
}
|
|
|
|
}
|
|
|
|
$execution->setAttribute('status', 'processing');
|
|
|
|
$execution = $dbForProject->updateDocument('executions', $executionId, $execution);
|
|
|
|
return $execution;
|
|
|
|
});
|
|
|
|
|
|
|
|
/** Collect environment variables */
|
|
|
|
$vars = [
|
|
|
|
'APPWRITE_FUNCTION_ID' => $functionId,
|
|
|
|
'APPWRITE_FUNCTION_NAME' => $function->getAttribute('name', ''),
|
|
|
|
'APPWRITE_FUNCTION_DEPLOYMENT' => $deploymentId,
|
|
|
|
'APPWRITE_FUNCTION_RUNTIME_NAME' => $runtime['name'],
|
|
|
|
'APPWRITE_FUNCTION_RUNTIME_VERSION' => $runtime['version'],
|
|
|
|
'APPWRITE_FUNCTION_TRIGGER' => $trigger,
|
|
|
|
'APPWRITE_FUNCTION_EVENT' => $event,
|
|
|
|
'APPWRITE_FUNCTION_EVENT_DATA' => $eventData,
|
|
|
|
'APPWRITE_FUNCTION_DATA' => $data,
|
|
|
|
'APPWRITE_FUNCTION_PROJECT_ID' => $projectId,
|
|
|
|
'APPWRITE_FUNCTION_USER_ID' => $userId,
|
|
|
|
'APPWRITE_FUNCTION_JWT' => $jwt,
|
|
|
|
];
|
|
|
|
$vars = \array_merge($function->getAttribute('vars', []), $vars);
|
|
|
|
|
|
|
|
/** Execute function */
|
2022-02-16 06:39:03 +13:00
|
|
|
try {
|
|
|
|
$executionResponse = $this->executor->createExecution(
|
|
|
|
projectId: $projectId,
|
|
|
|
deploymentId: $deploymentId,
|
|
|
|
path: $build->getAttribute('outputPath', ''),
|
|
|
|
vars: $vars,
|
|
|
|
entrypoint: $deployment->getAttribute('entrypoint', ''),
|
|
|
|
data: $vars['APPWRITE_FUNCTION_DATA'],
|
|
|
|
runtime: $function->getAttribute('runtime', ''),
|
|
|
|
timeout: $function->getAttribute('timeout', 0),
|
|
|
|
baseImage: $runtime['image']
|
|
|
|
);
|
|
|
|
|
|
|
|
/** Update execution status */
|
|
|
|
$execution->setAttribute('status', $executionResponse['status']);
|
|
|
|
$execution->setAttribute('statusCode', $executionResponse['statusCode']);
|
|
|
|
$execution->setAttribute('stdout', $executionResponse['stdout']);
|
|
|
|
$execution->setAttribute('stderr', $executionResponse['stderr']);
|
|
|
|
$execution->setAttribute('time', $executionResponse['time']);
|
|
|
|
} catch (\Throwable $th) {
|
|
|
|
$execution->setAttribute('status', 'failed');
|
|
|
|
$execution->setAttribute('statusCode', $th->getCode());
|
|
|
|
$execution->setAttribute('stderr', $th->getMessage());
|
|
|
|
Console::error($th->getMessage());
|
|
|
|
}
|
|
|
|
|
2022-02-06 08:49:57 +13:00
|
|
|
$execution = Authorization::skip(fn() => $dbForProject->updateDocument('executions', $executionId, $execution));
|
|
|
|
|
|
|
|
/** Trigger Webhook */
|
|
|
|
$executionModel = new Execution();
|
|
|
|
$executionUpdate = new Event(Event::WEBHOOK_QUEUE_NAME, Event::WEBHOOK_CLASS_NAME);
|
|
|
|
$executionUpdate
|
|
|
|
->setParam('projectId', $projectId)
|
|
|
|
->setParam('userId', $userId)
|
|
|
|
->setParam('webhooks', $webhooks)
|
2022-02-10 09:20:28 +13:00
|
|
|
->setParam('event', 'functions.executions.update')
|
2022-02-06 08:49:57 +13:00
|
|
|
->setParam('eventData', $execution->getArrayCopy(array_keys($executionModel->getRules())));
|
|
|
|
$executionUpdate->trigger();
|
|
|
|
|
|
|
|
/** Trigger realtime event */
|
2022-02-10 09:20:28 +13:00
|
|
|
$target = Realtime::fromPayload('functions.executions.update', $execution);
|
2022-03-01 01:24:35 +13:00
|
|
|
Realtime::send(
|
|
|
|
projectId: 'console',
|
|
|
|
payload: $execution->getArrayCopy(),
|
|
|
|
event: 'functions.executions.update',
|
|
|
|
channels: $target['channels'],
|
|
|
|
roles: $target['roles']
|
|
|
|
);
|
2022-02-06 08:49:57 +13:00
|
|
|
Realtime::send(
|
|
|
|
projectId: $projectId,
|
|
|
|
payload: $execution->getArrayCopy(),
|
2022-02-10 09:20:28 +13:00
|
|
|
event: 'functions.executions.update',
|
2022-02-06 08:49:57 +13:00
|
|
|
channels: $target['channels'],
|
|
|
|
roles: $target['roles']
|
|
|
|
);
|
|
|
|
|
|
|
|
/** Update usage stats */
|
|
|
|
global $register;
|
|
|
|
if (App::getEnv('_APP_USAGE_STATS', 'enabled') === 'enabled') {
|
|
|
|
$statsd = $register->get('statsd');
|
|
|
|
$usage = new Stats($statsd);
|
|
|
|
$usage
|
|
|
|
->setParam('projectId', $projectId)
|
|
|
|
->setParam('functionId', $function->getId())
|
|
|
|
->setParam('functionExecution', 1)
|
|
|
|
->setParam('functionStatus', $execution->getAttribute('status', ''))
|
|
|
|
->setParam('functionExecutionTime', $execution->getAttribute('time') * 1000) // ms
|
|
|
|
->setParam('networkRequestSize', 0)
|
|
|
|
->setParam('networkResponseSize', 0)
|
|
|
|
->submit();
|
|
|
|
$usage->submit();
|
|
|
|
}
|
2020-11-03 19:53:32 +13:00
|
|
|
}
|
|
|
|
|
2021-06-12 02:20:18 +12:00
|
|
|
public function shutdown(): void
|
2020-05-05 01:34:31 +12:00
|
|
|
{
|
|
|
|
}
|
2021-09-01 21:09:04 +12:00
|
|
|
}
|