$container['name'], 'online' => (\substr($container['status'], 0, 2) === 'Up'), 'status' => $container['status'], 'labels' => $container['labels'], ]; \array_map(function($value) use (&$container) { $value = \explode('=', $value); if(isset($value[0]) && isset($value[1])) { $container[$value[0]] = $value[1]; } }, \explode(',', $container['labels'])); $list[$container['name']] = $container; } }, $stdout); Console::info(count($list)." functions listed in " . ($executionEnd - $executionStart) . " seconds with exit code {$exitCode}"); /** * 1. Get event args - DONE * 2. Unpackage code in the isolated container - DONE * 3. Execute in container with timeout * + messure execution time - DONE * + pass env vars - DONE * + pass one-time api key * 4. Update execution status - DONE * 5. Update execution stdout & stderr - DONE * 6. Trigger audit log - DONE * 7. Trigger usage log - DONE */ //TODO aviod scheduled execution if delay is bigger than X offest class FunctionsV1 extends Worker { public $args = []; public $allowed = []; public function init(): void { } public function run(): void { global $register; $db = $register->get('db'); $cache = $register->get('cache'); $projectId = $this->args['projectId'] ?? ''; $functionId = $this->args['functionId'] ?? ''; $webhooks = $this->args['webhooks'] ?? []; $executionId = $this->args['executionId'] ?? ''; $trigger = $this->args['trigger'] ?? ''; $event = $this->args['event'] ?? ''; $scheduleOriginal = $this->args['scheduleOriginal'] ?? ''; $eventData = (!empty($this->args['eventData'])) ? json_encode($this->args['eventData']) : ''; $data = $this->args['data'] ?? ''; $userId = $this->args['userId'] ?? ''; $jwt = $this->args['jwt'] ?? ''; $database = new Database(); $database->setAdapter(new RedisAdapter(new MySQLAdapter($db, $cache), $cache)); $database->setNamespace('app_'.$projectId); $database->setMocks(Config::getParam('collections', [])); switch ($trigger) { case 'event': $limit = 30; $sum = 30; $offset = 0; $functions = []; /** @var Document[] $functions */ while ($sum >= $limit) { Authorization::disable(); $functions = $database->getCollection([ 'limit' => $limit, 'offset' => $offset, 'orderField' => 'name', 'orderType' => 'ASC', 'orderCast' => 'string', 'filters' => [ '$collection='.Database::SYSTEM_COLLECTION_FUNCTIONS, ], ]); Authorization::reset(); $sum = \count($functions); $offset = $offset + $limit; Console::log('Fetched '.$sum.' functions...'); foreach($functions as $function) { $events = $function->getAttribute('events', []); $tag = $function->getAttribute('tag', []); Console::success('Itterating function: '.$function->getAttribute('name')); if(!\in_array($event, $events) || empty($tag)) { continue; } Console::success('Triggered function: '.$event); $this->execute('event', $projectId, '', $database, $function, $event, $eventData, $data, $webhooks, $userId, $jwt); } } break; case 'schedule': /* * 1. Get Original Task * 2. Check for updates * If has updates skip task and don't reschedule * If status not equal to play skip task * 3. Check next run date, update task and add new job at the given date * 4. Execute task (set optional timeout) * 5. Update task response to log * On success reset error count * On failure add error count * If error count bigger than allowed change status to pause */ // Reschedule Authorization::disable(); $function = $database->getDocument($functionId); Authorization::reset(); if (empty($function->getId()) || Database::SYSTEM_COLLECTION_FUNCTIONS != $function->getCollection()) { throw new Exception('Function not found ('.$functionId.')'); } if($scheduleOriginal && $scheduleOriginal !== $function->getAttribute('schedule')) { // Schedule has changed from previous run, ignore this run. return; } $cron = new CronExpression($function->getAttribute('schedule')); $next = (int) $cron->getNextRunDate()->format('U'); $function ->setAttribute('scheduleNext', $next) ->setAttribute('schedulePrevious', \time()) ; Authorization::disable(); $function = $database->updateDocument(array_merge($function->getArrayCopy(), [ 'scheduleNext' => $next, ])); Authorization::reset(); ResqueScheduler::enqueueAt($next, 'v1-functions', 'FunctionsV1', [ 'projectId' => $projectId, 'webhooks' => $webhooks, 'functionId' => $function->getId(), 'executionId' => null, 'trigger' => 'schedule', 'scheduleOriginal' => $function->getAttribute('schedule', ''), ]); // Async task rescheduale $this->execute($trigger, $projectId, $executionId, $database, $function, /*$event*/'', /*$eventData*/'', $data, $webhooks, $userId, $jwt); break; case 'http': Authorization::disable(); $function = $database->getDocument($functionId); Authorization::reset(); if (empty($function->getId()) || Database::SYSTEM_COLLECTION_FUNCTIONS != $function->getCollection()) { throw new Exception('Function not found ('.$functionId.')'); } $this->execute($trigger, $projectId, $executionId, $database, $function, /*$event*/'', /*$eventData*/'', $data, $webhooks, $userId, $jwt); break; default: # code... break; } } /** * Execute function tag * * @param string $trigger * @param string $projectId * @param string $executionId * @param Database $database * @param Database $function * @param string $event * @param string $eventData * @param string $data * @param array $webhooks * @param string $userId * @param string $jwt * * @return void */ public function execute(string $trigger, string $projectId, string $executionId, Database $database, Document $function, string $event = '', string $eventData = '', string $data = '', array $webhooks = [], string $userId = '', string $jwt = ''): void { $ch = \curl_init(); \curl_setopt($ch, CURLOPT_URL, "http://executor:8080/v1/execute"); \curl_setopt($ch, CURLOPT_POST, true); \curl_setopt($ch, CURLOPT_POSTFIELDS, json_encode([ 'trigger' => $trigger, 'projectId' => $projectId, 'executionId' => $executionId, 'functionId' => $function->getId(), 'event' => $event, 'eventData' => $eventData, 'data' => $data, 'webhooks' => $webhooks, 'userId' => $userId, 'jwt' => $jwt, ])); \curl_setopt($ch, CURLOPT_RETURNTRANSFER, true); \curl_setopt($ch, CURLOPT_TIMEOUT, App::getEnv('_APP_FUNCTIONS_TIMEOUT', 900) + 200); // + 200 for safety margin \curl_setopt($ch, CURLOPT_CONNECTTIMEOUT, 10); \curl_setopt($ch, CURLOPT_HTTPHEADER, [ 'Content-Type: application/json', ]); $response = \curl_exec($ch); $error = \curl_error($ch); if (!empty($error)) { Console::error('Curl error: '.$error); } \curl_close($ch); } /** * Cleanup any hanging containers above the allowed max containers. * * @return void */ public function cleanup(): void { global $list; Console::success(count($list).' running containers counted'); $max = (int) App::getEnv('_APP_FUNCTIONS_CONTAINERS'); if(\count($list) > $max) { Console::info('Starting containers cleanup'); \uasort($list, function ($item1, $item2) { return (int)($item1['appwrite-created'] ?? 0) <=> (int)($item2['appwrite-created'] ?? 0); }); while(\count($list) > $max) { $first = \array_shift($list); $stdout = ''; $stderr = ''; if(Console::execute("docker rm -f {$first['name']}", '', $stdout, $stderr, 30) !== 0) { Console::error('Failed to remove container: '.$stderr); } else { Console::info('Removed container: '.$first['name']); } } } } /** * Filter ENV vars * * @param string $string * * @return string */ public function filterEnvKey(string $string): string { if(empty($this->allowed)) { $this->allowed = array_fill_keys(\str_split('0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz_'), true); } $string = \str_split($string); $output = ''; foreach ($string as $char) { if(\array_key_exists($char, $this->allowed)) { $output .= $char; } } return $output; } public function shutdown(): void { } }