1
0
Fork 0
mirror of synced 2024-07-01 12:40:34 +12:00
appwrite/app/workers/functions.php

362 lines
12 KiB
PHP
Raw Normal View History

2020-05-05 01:34:31 +12:00
<?php
2020-07-17 00:04:06 +12:00
2020-12-08 06:43:39 +13:00
use Appwrite\Event\Event;
2021-06-30 23:36:58 +12:00
use Appwrite\Messaging\Adapter\Realtime;
use Appwrite\Resque\Worker;
2021-08-08 18:31:20 +12:00
use Appwrite\Stats\Stats;
2021-06-28 19:56:03 +12:00
use Appwrite\Utopia\Response\Model\Execution;
2021-01-17 12:38:13 +13:00
use Cron\CronExpression;
2020-11-03 19:53:32 +13:00
use Swoole\Runtime;
2020-07-20 02:43:59 +12:00
use Utopia\App;
2020-05-10 10:12:00 +12:00
use Utopia\CLI\Console;
2020-05-05 01:34:31 +12:00
use Utopia\Config\Config;
2021-05-05 09:45:41 +12:00
use Utopia\Database\Database;
use Utopia\Database\Document;
use Utopia\Database\Validator\Authorization;
2021-07-23 00:49:52 +12:00
use Utopia\Orchestration\Orchestration;
use Utopia\Orchestration\Adapter\DockerAPI;
use Utopia\Orchestration\Container;
use Utopia\Orchestration\Exception\Orchestration as OrchestrationException;
2021-08-13 21:00:20 +12:00
use Utopia\Orchestration\Exception\Timeout as TimeoutException;
2020-05-09 18:26:18 +12:00
require_once __DIR__.'/../init.php';
2020-05-10 04:39:50 +12:00
2021-04-17 04:50:16 +12:00
Runtime::enableCoroutine(0);
2020-05-10 04:39:50 +12:00
2021-04-15 01:07:26 +12:00
Console::title('Functions V1 Worker');
Console::success(APP_NAME . ' functions worker v1 has started');
2020-05-10 04:39:50 +12:00
2021-04-21 23:02:54 +12:00
$runtimes = Config::getParam('runtimes');
2020-05-09 18:26:18 +12:00
2021-07-23 00:49:52 +12:00
$dockerUser = App::getEnv('DOCKERHUB_PULL_USERNAME', null);
$dockerPass = App::getEnv('DOCKERHUB_PULL_PASSWORD', null);
2021-08-13 21:00:20 +12:00
$dockerEmail = App::getEnv('DOCKERHUB_PULL_EMAIL', null);
$orchestration = new Orchestration(new DockerAPI($dockerUser, $dockerPass, $dockerEmail));
2021-07-23 00:49:52 +12:00
2020-07-20 18:43:25 +12:00
$warmupEnd = \microtime(true);
$warmupTime = $warmupEnd - $warmupStart;
Console::success('Finished warmup in ' . $warmupTime . ' seconds');
2020-11-03 19:53:32 +13:00
/**
* List function servers
*/
$stdout = '';
$stderr = '';
$executionStart = \microtime(true);
2021-07-23 21:05:59 +12:00
$response = $orchestration->list(['label' => 'appwrite-type=function']);
/** @var Container[] $list */
2020-11-03 19:53:32 +13:00
$list = [];
2021-08-13 21:00:20 +12:00
foreach ($response as $value) {
2021-07-23 20:48:51 +12:00
$list[$value->getName()] = $value;
2021-08-03 22:17:22 +12:00
}
2020-11-03 19:53:32 +13:00
2021-07-23 00:49:52 +12:00
$executionEnd = \microtime(true);
2020-11-03 19:53:32 +13:00
Console::info(count($list) . ' functions listed in ' . ($executionEnd - $executionStart) . ' seconds');
2020-11-03 19:53:32 +13:00
/**
* 1. Get event args - DONE
* 2. Unpackage code in the isolated container - DONE
* 3. Execute in container with timeout
* + messure execution time - DONE
* + pass env vars - DONE
* + pass one-time api key
2020-07-20 18:43:25 +12:00
* 4. Update execution status - DONE
* 5. Update execution stdout & stderr - DONE
2020-07-23 17:52:03 +12:00
* 6. Trigger audit log - DONE
* 7. Trigger usage log - DONE
*/
2021-07-28 02:16:12 +12:00
// TODO avoid scheduled execution if delay is bigger than X offest
class FunctionsV1 extends Worker
2020-05-05 01:34:31 +12:00
{
public array $args = [];
2020-05-05 01:34:31 +12:00
public array $allowed = [];
2020-07-22 08:10:31 +12:00
public function init(): void
2020-05-05 01:34:31 +12:00
{
}
public function run(): void
2020-05-05 01:34:31 +12:00
{
2020-11-03 19:53:32 +13:00
$projectId = $this->args['projectId'] ?? '';
$functionId = $this->args['functionId'] ?? '';
2021-05-17 23:32:37 +12:00
$webhooks = $this->args['webhooks'] ?? [];
2020-11-03 19:53:32 +13:00
$executionId = $this->args['executionId'] ?? '';
$trigger = $this->args['trigger'] ?? '';
$event = $this->args['event'] ?? '';
2021-01-17 12:38:13 +13:00
$scheduleOriginal = $this->args['scheduleOriginal'] ?? '';
$eventData = (!empty($this->args['eventData'])) ? json_encode($this->args['eventData']) : '';
$data = $this->args['data'] ?? '';
$userId = $this->args['userId'] ?? '';
$jwt = $this->args['jwt'] ?? '';
2020-07-16 08:29:55 +12:00
$database = $this->getInternalDB($projectId);
2020-07-17 00:04:06 +12:00
switch ($trigger) {
case 'event':
$limit = 30;
$sum = 30;
$offset = 0;
$functions = [];
/** @var Document[] $functions */
while ($sum >= $limit) {
Authorization::disable();
2021-05-05 09:45:41 +12:00
$functions = $database->find('functions', [], $limit, $offset, ['name'], [Database::ORDER_ASC]);
Authorization::reset();
$sum = \count($functions);
$offset = $offset + $limit;
Console::log('Fetched ' . $sum . ' functions...');
2020-08-05 17:18:45 +12:00
foreach ($functions as $function) {
2020-08-05 17:18:45 +12:00
$events = $function->getAttribute('events', []);
$tag = $function->getAttribute('tag', []);
Console::success('Itterating function: ' . $function->getAttribute('name'));
2020-08-05 17:18:45 +12:00
if (!\in_array($event, $events) || empty($tag)) {
2020-08-05 17:18:45 +12:00
continue;
}
Console::success('Triggered function: ' . $event);
$this->execute(
trigger: 'event',
projectId: $projectId,
executionId: '',
database: $database,
function: $function,
event: $event,
eventData: $eventData,
data: $data,
webhooks: $webhooks,
userId: $userId,
jwt: $jwt
);
2020-08-05 17:18:45 +12:00
}
}
break;
case 'schedule':
2020-10-02 21:25:57 +13:00
/*
2020-11-03 19:53:32 +13:00
* 1. Get Original Task
* 2. Check for updates
* If has updates skip task and don't reschedule
* If status not equal to play skip task
* 3. Check next run date, update task and add new job at the given date
* 4. Execute task (set optional timeout)
* 5. Update task response to log
* On success reset error count
* On failure add error count
* If error count bigger than allowed change status to pause
*/
2021-01-17 12:38:13 +13:00
// Reschedule
Authorization::disable();
2021-05-05 09:45:41 +12:00
$function = $database->getDocument('functions', $functionId);
2021-01-17 12:38:13 +13:00
Authorization::reset();
2021-05-05 09:45:41 +12:00
if (empty($function->getId())) {
2021-01-17 12:38:13 +13:00
throw new Exception('Function not found ('.$functionId.')');
}
if ($scheduleOriginal && $scheduleOriginal !== $function->getAttribute('schedule')) { // Schedule has changed from previous run, ignore this run.
2021-01-17 12:38:13 +13:00
return;
}
2021-02-22 10:37:22 +13:00
$cron = new CronExpression($function->getAttribute('schedule'));
2021-01-17 12:38:13 +13:00
$next = (int) $cron->getNextRunDate()->format('U');
$function
->setAttribute('scheduleNext', $next)
->setAttribute('schedulePrevious', \time());
2021-01-17 12:38:13 +13:00
2021-01-17 13:07:43 +13:00
Authorization::disable();
2021-05-05 09:45:41 +12:00
$function = $database->updateDocument('functions', $function->getId(), new Document(array_merge($function->getArrayCopy(), [
'scheduleNext' => (int)$next,
])));
2021-01-17 12:38:13 +13:00
if ($function === false) {
throw new Exception('Function update failed (' . $functionId . ')');
}
2021-01-17 13:07:43 +13:00
Authorization::reset();
2021-01-17 12:38:13 +13:00
ResqueScheduler::enqueueAt($next, 'v1-functions', 'FunctionsV1', [
'projectId' => $projectId,
2021-05-18 03:52:22 +12:00
'webhooks' => $webhooks,
2021-01-17 12:38:13 +13:00
'functionId' => $function->getId(),
'executionId' => null,
'trigger' => 'schedule',
'scheduleOriginal' => $function->getAttribute('schedule', ''),
]); // Async task reschedule
2021-01-17 12:38:13 +13:00
$this->execute(
trigger: $trigger,
projectId: $projectId,
executionId: $executionId,
database: $database,
function: $function,
data: $data,
webhooks: $webhooks,
userId: $userId,
jwt: $jwt
);
break;
case 'http':
Authorization::disable();
2021-05-05 09:45:41 +12:00
$function = $database->getDocument('functions', $functionId);
Authorization::reset();
2021-05-05 09:45:41 +12:00
if (empty($function->getId())) {
2020-12-11 03:35:39 +13:00
throw new Exception('Function not found ('.$functionId.')');
}
2020-07-17 00:04:06 +12:00
$this->execute(
trigger: $trigger,
projectId: $projectId,
executionId: $executionId,
database: $database,
function: $function,
data: $data,
webhooks: $webhooks,
userId: $userId,
jwt: $jwt
);
break;
2020-07-17 00:04:06 +12:00
}
}
2020-11-03 19:53:32 +13:00
/**
* Execute function tag
*
* @param string $trigger
* @param string $projectId
* @param string $executionId
* @param Database $database
* @param Document $function
2020-11-03 19:53:32 +13:00
* @param string $event
2021-03-25 02:36:36 +13:00
* @param string $eventData
* @param string $data
2021-05-18 03:52:22 +12:00
* @param array $webhooks
* @param string $userId
* @param string $jwt
2020-11-03 19:53:32 +13:00
*
* @return void
*/
2021-05-17 23:32:37 +12:00
public function execute(string $trigger, string $projectId, string $executionId, Database $database, Document $function, string $event = '', string $eventData = '', string $data = '', array $webhooks = [], string $userId = '', string $jwt = ''): void
{
2021-08-24 21:32:27 +12:00
$ch = \curl_init();
2021-08-27 22:55:22 +12:00
\curl_setopt($ch, CURLOPT_URL, "http://appwrite-executor:8080/v1/execute");
2021-08-24 21:32:27 +12:00
\curl_setopt($ch, CURLOPT_POST, true);
\curl_setopt($ch, CURLOPT_POSTFIELDS, json_encode([
'trigger' => $trigger,
'projectId' => $projectId,
'executionId' => $executionId,
2020-08-05 17:18:45 +12:00
'functionId' => $function->getId(),
2021-08-24 21:32:27 +12:00
'event' => $event,
2021-11-15 15:18:53 +13:00
'eventData' => $eventData,
2021-08-24 21:32:27 +12:00
'data' => $data,
'webhooks' => $webhooks,
'userId' => $userId,
'jwt' => $jwt,
2020-07-17 09:51:26 +12:00
]));
2021-08-24 21:32:27 +12:00
\curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
\curl_setopt($ch, CURLOPT_TIMEOUT, App::getEnv('_APP_FUNCTIONS_TIMEOUT', 900) + 200); // + 200 for safety margin
\curl_setopt($ch, CURLOPT_CONNECTTIMEOUT, 10);
\curl_setopt($ch, CURLOPT_HTTPHEADER, [
'Content-Type: application/json',
'x-appwrite-project: '.$projectId,
'x-appwrite-executor-key: '. App::getEnv('_APP_EXECUTOR_SECRET', '')
2021-08-24 21:32:27 +12:00
]);
2020-07-17 09:51:26 +12:00
2021-08-27 21:21:28 +12:00
\curl_exec($ch);
2020-07-17 09:51:26 +12:00
2021-08-24 21:32:27 +12:00
$error = \curl_error($ch);
if (!empty($error)) {
Console::error('Curl error: '.$error);
}
2020-07-21 22:33:23 +12:00
2021-08-24 21:32:27 +12:00
\curl_close($ch);
2020-11-03 19:53:32 +13:00
}
/**
* Cleanup any hanging containers above the allowed max containers.
*
* @return void
*/
public function cleanup(): void
{
/** @var Container[] $list */
2020-11-03 19:53:32 +13:00
global $list;
/** @var Orchestration $orchestration */
2021-07-23 00:49:52 +12:00
global $orchestration;
2020-11-03 19:53:32 +13:00
Console::success(count($list) . ' running containers counted');
2020-07-17 03:20:51 +12:00
2020-07-20 02:43:59 +12:00
$max = (int) App::getEnv('_APP_FUNCTIONS_CONTAINERS');
if (\count($list) > $max) {
Console::info('Starting containers cleanup');
\uasort($list, function (Container $item1, Container $item2) {
2021-07-23 21:05:59 +12:00
return (int)($item1->getLabels['appwrite-created'] ?? 0) <=> (int)($item2->getLabels['appwrite-created'] ?? 0);
});
while (\count($list) > $max) {
2020-12-30 23:44:33 +13:00
$first = \array_shift($list);
2021-07-23 00:49:52 +12:00
try {
2021-07-23 21:05:59 +12:00
$orchestration->remove($first->getName(), true);
Console::info('Removed container: ' . $first->getName());
2021-07-23 00:49:52 +12:00
} catch (Exception $e) {
Console::error('Failed to remove container: ' . $e);
2020-12-30 23:44:33 +13:00
}
}
}
2020-05-05 01:34:31 +12:00
}
2020-11-03 19:53:32 +13:00
/**
* Filter ENV vars
*
* @param string $string
*
* @return string
*/
2020-07-22 08:10:31 +12:00
public function filterEnvKey(string $string): string
{
if (empty($this->allowed)) {
$this->allowed = array_fill_keys(\str_split('0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz_'), true);
2020-07-22 08:10:31 +12:00
}
$string = \str_split($string);
2020-07-22 08:10:31 +12:00
$output = '';
foreach ($string as $char) {
if (\array_key_exists($char, $this->allowed)) {
2020-07-22 08:10:31 +12:00
$output .= $char;
}
}
return $output;
}
public function shutdown(): void
2020-05-05 01:34:31 +12:00
{
}
}