diff --git a/app/workers/audits.php b/app/workers/audits.php index 152934a6f..b352e2ce9 100644 --- a/app/workers/audits.php +++ b/app/workers/audits.php @@ -12,8 +12,6 @@ Console::success(APP_NAME.' audits worker v1 has started'); class AuditsV1 extends Worker { - public $args = []; - public function init(): void { } @@ -30,7 +28,7 @@ class AuditsV1 extends Worker $ip = $this->args['ip']; $data = $this->args['data']; $db = $register->get('db', true); - + $adapter = new AuditAdapter($db); $adapter->setNamespace('app_'.$projectId); diff --git a/app/workers/certificates.php b/app/workers/certificates.php index bc746775c..73675abd2 100644 --- a/app/workers/certificates.php +++ b/app/workers/certificates.php @@ -18,8 +18,6 @@ Console::success(APP_NAME.' certificates worker v1 has started'); class CertificatesV1 extends Worker { - public $args = []; - public function init(): void { } diff --git a/app/workers/deletes.php b/app/workers/deletes.php index a0a5708b6..8dd9f1bf7 100644 --- a/app/workers/deletes.php +++ b/app/workers/deletes.php @@ -21,9 +21,7 @@ Console::success(APP_NAME.' deletes worker v1 has started'."\n"); class DeletesV1 extends Worker { - public $args = []; - - protected $consoleDB = null; + protected Database $consoleDB = null; public function init(): void { @@ -31,9 +29,9 @@ class DeletesV1 extends Worker public function run(): void { - $projectId = isset($this->args['projectId']) ? $this->args['projectId'] : ''; + $projectId = isset($this->args['projectId']) ? $this->args['projectId'] : ''; $type = $this->args['type']; - + switch (strval($type)) { case DELETE_TYPE_DOCUMENT: $document = $this->args['document']; diff --git a/app/workers/functions.php b/app/workers/functions.php index 55387422e..fbe658bd7 100644 --- a/app/workers/functions.php +++ b/app/workers/functions.php @@ -19,12 +19,12 @@ use Utopia\Orchestration\Container; use Utopia\Orchestration\Exception\Orchestration as OrchestrationException; use Utopia\Orchestration\Exception\Timeout as TimeoutException; -require_once __DIR__.'/../workers.php'; +require_once __DIR__ . '/../workers.php'; Runtime::enableCoroutine(0); Console::title('Functions V1 Worker'); -Console::success(APP_NAME.' functions worker v1 has started'); +Console::success(APP_NAME . ' functions worker v1 has started'); $runtimes = Config::getParam('runtimes'); @@ -38,11 +38,11 @@ $orchestration = new Orchestration(new DockerAPI($dockerUser, $dockerPass, $dock */ $warmupStart = \microtime(true); -Co\run(function() use ($runtimes, $orchestration) { // Warmup: make sure images are ready to run fast 🚀 - foreach($runtimes as $runtime) { - go(function() use ($runtime, $orchestration) { - Console::info('Warming up '.$runtime['name'].' '.$runtime['version'].' environment...'); - +Co\run(function () use ($runtimes, $orchestration) { // Warmup: make sure images are ready to run fast 🚀 + foreach ($runtimes as $runtime) { + go(function () use ($runtime, $orchestration) { + Console::info('Warming up ' . $runtime['name'] . ' ' . $runtime['version'] . ' environment...'); + $response = $orchestration->pull($runtime['image']); if ($response) { @@ -57,7 +57,7 @@ Co\run(function() use ($runtimes, $orchestration) { // Warmup: make sure images $warmupEnd = \microtime(true); $warmupTime = $warmupEnd - $warmupStart; -Console::success('Finished warmup in '.$warmupTime.' seconds'); +Console::success('Finished warmup in ' . $warmupTime . ' seconds'); /** * List function servers @@ -68,7 +68,7 @@ $stderr = ''; $executionStart = \microtime(true); $response = $orchestration->list(['label' => 'appwrite-type=function']); - +/** @var Container[] $list */ $list = []; foreach ($response as $value) { @@ -77,7 +77,7 @@ foreach ($response as $value) { $executionEnd = \microtime(true); -Console::info(count($list).' functions listed in ' . ($executionEnd - $executionStart) . ' seconds'); +Console::info(count($list) . ' functions listed in ' . ($executionEnd - $executionStart) . ' seconds'); /** * 1. Get event args - DONE @@ -96,9 +96,9 @@ Console::info(count($list).' functions listed in ' . ($executionEnd - $execution class FunctionsV1 extends Worker { - public $args = []; + public array $args = []; - public $allowed = []; + public array $allowed = []; public function init(): void { @@ -125,7 +125,7 @@ class FunctionsV1 extends Worker $database = new Database(); $database->setAdapter(new RedisAdapter(new MySQLAdapter($db, $cache), $cache)); - $database->setNamespace('app_'.$projectId); + $database->setNamespace('app_' . $projectId); $database->setMocks(Config::getParam('collections', [])); switch ($trigger) { @@ -133,7 +133,8 @@ class FunctionsV1 extends Worker $limit = 30; $sum = 30; $offset = 0; - $functions = []; /** @var Document[] $functions */ + $functions = []; + /** @var Document[] $functions */ while ($sum >= $limit) { @@ -146,7 +147,7 @@ class FunctionsV1 extends Worker 'orderType' => 'ASC', 'orderCast' => 'string', 'filters' => [ - '$collection='.Database::SYSTEM_COLLECTION_FUNCTIONS, + '$collection=' . Database::SYSTEM_COLLECTION_FUNCTIONS, ], ]); @@ -155,21 +156,33 @@ class FunctionsV1 extends Worker $sum = \count($functions); $offset = $offset + $limit; - Console::log('Fetched '.$sum.' functions...'); + Console::log('Fetched ' . $sum . ' functions...'); - foreach($functions as $function) { + foreach ($functions as $function) { $events = $function->getAttribute('events', []); $tag = $function->getAttribute('tag', []); - Console::success('Itterating function: '.$function->getAttribute('name')); + Console::success('Itterating function: ' . $function->getAttribute('name')); - if(!\in_array($event, $events) || empty($tag)) { + if (!\in_array($event, $events) || empty($tag)) { continue; } - Console::success('Triggered function: '.$event); + Console::success('Triggered function: ' . $event); - $this->execute('event', $projectId, '', $database, $function, $event, $eventData, $data, $webhooks, $userId, $jwt); + $this->execute( + trigger: 'event', + projectId: $projectId, + executionId: '', + database: $database, + function: $function, + event: $event, + eventData: $eventData, + data: $data, + webhooks: $webhooks, + userId: $userId, + jwt: $jwt + ); } } break; @@ -194,10 +207,10 @@ class FunctionsV1 extends Worker Authorization::reset(); if (empty($function->getId()) || Database::SYSTEM_COLLECTION_FUNCTIONS != $function->getCollection()) { - throw new Exception('Function not found ('.$functionId.')'); + throw new Exception('Function not found (' . $functionId . ')'); } - if($scheduleOriginal && $scheduleOriginal !== $function->getAttribute('schedule')) { // Schedule has changed from previous run, ignore this run. + if ($scheduleOriginal && $scheduleOriginal !== $function->getAttribute('schedule')) { // Schedule has changed from previous run, ignore this run. return; } @@ -206,8 +219,7 @@ class FunctionsV1 extends Worker $function ->setAttribute('scheduleNext', $next) - ->setAttribute('schedulePrevious', \time()) - ; + ->setAttribute('schedulePrevious', \time()); Authorization::disable(); @@ -215,6 +227,10 @@ class FunctionsV1 extends Worker 'scheduleNext' => $next, ])); + if ($function === false) { + throw new Exception('Function update failed (' . $functionId . ')'); + } + Authorization::reset(); ResqueScheduler::enqueueAt($next, 'v1-functions', 'FunctionsV1', [ @@ -226,7 +242,17 @@ class FunctionsV1 extends Worker 'scheduleOriginal' => $function->getAttribute('schedule', ''), ]); // Async task rescheduale - $this->execute($trigger, $projectId, $executionId, $database, $function, /*$event*/'', /*$eventData*/'', $data, $webhooks, $userId, $jwt); + $this->execute( + trigger: $trigger, + projectId: $projectId, + executionId: $executionId, + database: $database, + function: $function, + data: $data, + webhooks: $webhooks, + userId: $userId, + jwt: $jwt + ); break; case 'http': @@ -235,14 +261,20 @@ class FunctionsV1 extends Worker Authorization::reset(); if (empty($function->getId()) || Database::SYSTEM_COLLECTION_FUNCTIONS != $function->getCollection()) { - throw new Exception('Function not found ('.$functionId.')'); + throw new Exception('Function not found (' . $functionId . ')'); } - $this->execute($trigger, $projectId, $executionId, $database, $function, /*$event*/'', /*$eventData*/'', $data, $webhooks, $userId, $jwt); - break; - - default: - # code... + $this->execute( + trigger: $trigger, + projectId: $projectId, + executionId: $executionId, + database: $database, + function: $function, + data: $data, + webhooks: $webhooks, + userId: $userId, + jwt: $jwt + ); break; } } @@ -254,7 +286,7 @@ class FunctionsV1 extends Worker * @param string $projectId * @param string $executionId * @param Database $database - * @param Database $function + * @param Document $function * @param string $event * @param string $eventData * @param string $data @@ -275,7 +307,7 @@ class FunctionsV1 extends Worker $tag = $database->getDocument($function->getAttribute('tag', '')); Authorization::reset(); - if($tag->getAttribute('functionId') !== $function->getId()) { + if ($tag->getAttribute('functionId') !== $function->getId()) { throw new Exception('Tag not found', 404); } @@ -297,18 +329,18 @@ class FunctionsV1 extends Worker 'time' => 0, ]); - if(false === $execution || ($execution instanceof Document && $execution->isEmpty())) { + if ($execution->isEmpty()) { throw new Exception('Failed to create or read execution'); } - + Authorization::reset(); $runtime = (isset($runtimes[$function->getAttribute('runtime', '')])) ? $runtimes[$function->getAttribute('runtime', '')] : null; - if(\is_null($runtime)) { - throw new Exception('Runtime "'.$function->getAttribute('runtime', '').'" is not supported'); + if (\is_null($runtime)) { + throw new Exception('Runtime "' . $function->getAttribute('runtime', '') . '" is not supported'); } $vars = \array_merge($function->getAttribute('vars', []), [ @@ -325,37 +357,37 @@ class FunctionsV1 extends Worker 'APPWRITE_FUNCTION_JWT' => $jwt, 'APPWRITE_FUNCTION_PROJECT_ID' => $projectId, ]); - + $tagId = $tag->getId() ?? ''; $tagPath = $tag->getAttribute('path', ''); - $tagPathTarget = '/tmp/project-'.$projectId.'/'.$tag->getId().'/code.tar.gz'; + $tagPathTarget = '/tmp/project-' . $projectId . '/' . $tagId . '/code.tar.gz'; $tagPathTargetDir = \pathinfo($tagPathTarget, PATHINFO_DIRNAME); - $container = 'appwrite-function-'.$tag->getId(); + $container = 'appwrite-function-' . $tagId; $command = \escapeshellcmd($tag->getAttribute('command', '')); - if(!\is_readable($tagPath)) { - throw new Exception('Code is not readable: '.$tag->getAttribute('path', '')); + if (!\is_readable($tagPath)) { + throw new Exception('Code is not readable: ' . $tag->getAttribute('path', '')); } if (!\file_exists($tagPathTargetDir)) { if (!\mkdir($tagPathTargetDir, 0755, true)) { - throw new Exception('Can\'t create directory '.$tagPathTargetDir); - } - } - - if (!\file_exists($tagPathTarget)) { - if(!\copy($tagPath, $tagPathTarget)) { - throw new Exception('Can\'t create temporary code file '.$tagPathTarget); + throw new Exception('Can\'t create directory ' . $tagPathTargetDir); } } - if(isset($list[$container]) && !(\substr($list[$container]->getStatus(), 0, 2) === 'Up')) { // Remove conatiner if not online + if (!\file_exists($tagPathTarget)) { + if (!\copy($tagPath, $tagPathTarget)) { + throw new Exception('Can\'t create temporary code file ' . $tagPathTarget); + } + } + + if (isset($list[$container]) && !(\substr($list[$container]->getStatus(), 0, 2) === 'Up')) { // Remove conatiner if not online $stdout = ''; $stderr = ''; try { $orchestration->remove($container); } catch (Exception $e) { - Console::warning('Failed to remove container: '.$e->getMessage()); + Console::warning('Failed to remove container: ' . $e->getMessage()); } unset($list[$container]); @@ -370,10 +402,10 @@ class FunctionsV1 extends Worker * Make sure no access to NFS server / storage volumes * Access Appwrite REST from internal network for improved performance */ - if(!isset($list[$container])) { // Create contianer if not ready + if (!isset($list[$container])) { // Create contianer if not ready $stdout = ''; $stderr = ''; - + $executionStart = \microtime(true); $executionTime = \time(); @@ -381,16 +413,17 @@ class FunctionsV1 extends Worker $orchestration->setMemory(App::getEnv('_APP_FUNCTIONS_MEMORY', '256')); $orchestration->setSwap(App::getEnv('_APP_FUNCTIONS_MEMORY_SWAP', '256')); - foreach($vars as &$value) { + foreach ($vars as &$value) { $value = strval($value); } $id = $orchestration->run( image: $runtime['image'], name: $container, - command: ['tail', - '-f', - '/dev/null' + command: [ + 'tail', + '-f', + '/dev/null' ], entrypoint: '', workdir: '/usr/local/src', @@ -400,38 +433,43 @@ class FunctionsV1 extends Worker labels: [ 'appwrite-type' => 'function', 'appwrite-created' => strval($executionTime) - ]); + ] + ); $untarStdout = ''; $untarStderr = ''; $untarSuccess = $orchestration->execute( - name: $container, + name: $container, command: [ 'sh', '-c', 'mv /tmp/code.tar.gz /usr/local/src/code.tar.gz && tar -zxf /usr/local/src/code.tar.gz --strip 1 && rm /usr/local/src/code.tar.gz' ], - stdout: $untarStdout, + stdout: $untarStdout, stderr: $untarStderr, vars: $vars, - timeout: 60); + timeout: 60 + ); if (!$untarSuccess) { - throw new Exception('Failed to extract tar: '.$untarStderr); + throw new Exception('Failed to extract tar: ' . $untarStderr); } $executionEnd = \microtime(true); - $list[$container] = new Container($container, $id, 'Up', + $list[$container] = new Container( + $container, + $id, + 'Up', [ 'appwrite-type' => 'function', 'appwrite-created' => strval($executionTime), - ]); + ] + ); Console::info('Function created in ' . ($executionEnd - $executionStart) . ' seconds'); - } - else { + } else { Console::info('Container is ready to run'); } @@ -444,12 +482,13 @@ class FunctionsV1 extends Worker try { $exitCode = (int)!$orchestration->execute( - name: $container, - command: $orchestration->parseCommandString($command), - stdout: $stdout, - stderr: $stderr, - vars: $vars, - timeout: $function->getAttribute('timeout', (int) App::getEnv('_APP_FUNCTIONS_TIMEOUT', 900))); + name: $container, + command: $orchestration->parseCommandString($command), + stdout: $stdout, + stderr: $stderr, + vars: $vars, + timeout: $function->getAttribute('timeout', (int) App::getEnv('_APP_FUNCTIONS_TIMEOUT', 900)) + ); } catch (TimeoutException $e) { $exitCode = 124; } catch (OrchestrationException $e) { @@ -473,10 +512,10 @@ class FunctionsV1 extends Worker 'stderr' => \mb_substr($stderr, -4000), // log last 4000 chars output 'time' => $executionTime ])); - + Authorization::reset(); - if (false === $function) { + if ($execution === false) { throw new Exception('Failed saving execution to DB', 500); } @@ -501,10 +540,9 @@ class FunctionsV1 extends Worker ->setParam('functionStatus', $functionStatus) ->setParam('functionExecutionTime', $executionTime * 1000) // ms ->setParam('networkRequestSize', 0) - ->setParam('networkResponseSize', 0) - ; - - if(App::getEnv('_APP_USAGE_STATS', 'enabled') == 'enabled') { + ->setParam('networkResponseSize', 0); + + if (App::getEnv('_APP_USAGE_STATS', 'enabled') == 'enabled') { $usage->trigger(); } @@ -518,28 +556,30 @@ class FunctionsV1 extends Worker */ public function cleanup(): void { + /** @var Container[] $list */ global $list; + /** @var Orchestration $orchestration */ global $orchestration; - Console::success(count($list).' running containers counted'); + Console::success(count($list) . ' running containers counted'); $max = (int) App::getEnv('_APP_FUNCTIONS_CONTAINERS'); - if(\count($list) > $max) { + if (\count($list) > $max) { Console::info('Starting containers cleanup'); - \uasort($list, function ($item1, $item2) { + \uasort($list, function (Container $item1, Container $item2) { return (int)($item1->getLabels['appwrite-created'] ?? 0) <=> (int)($item2->getLabels['appwrite-created'] ?? 0); }); - while(\count($list) > $max) { + while (\count($list) > $max) { $first = \array_shift($list); try { $orchestration->remove($first->getName(), true); - Console::info('Removed container: '.$first->getName()); + Console::info('Removed container: ' . $first->getName()); } catch (Exception $e) { - Console::error('Failed to remove container: '.$e); + Console::error('Failed to remove container: ' . $e); } } } @@ -554,7 +594,7 @@ class FunctionsV1 extends Worker */ public function filterEnvKey(string $string): string { - if(empty($this->allowed)) { + if (empty($this->allowed)) { $this->allowed = array_fill_keys(\str_split('0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz_'), true); } @@ -562,7 +602,7 @@ class FunctionsV1 extends Worker $output = ''; foreach ($string as $char) { - if(\array_key_exists($char, $this->allowed)) { + if (\array_key_exists($char, $this->allowed)) { $output .= $char; } } @@ -573,4 +613,4 @@ class FunctionsV1 extends Worker public function shutdown(): void { } -} \ No newline at end of file +} diff --git a/app/workers/mails.php b/app/workers/mails.php index 25abe54aa..d72ca3572 100644 --- a/app/workers/mails.php +++ b/app/workers/mails.php @@ -13,11 +13,6 @@ Console::success(APP_NAME . ' mails worker v1 has started' . "\n"); class MailsV1 extends Worker { - /** - * @var array - */ - public $args = []; - public function init(): void { } diff --git a/app/workers/tasks.php b/app/workers/tasks.php index 0b1b33e23..8e47d7bec 100644 --- a/app/workers/tasks.php +++ b/app/workers/tasks.php @@ -17,11 +17,6 @@ Console::success(APP_NAME.' tasks worker v1 has started'); class TasksV1 extends Worker { - /** - * @var array - */ - public $args = []; - public function init(): void { } diff --git a/app/workers/usage.php b/app/workers/usage.php index b5a3f885a..8e8f5337c 100644 --- a/app/workers/usage.php +++ b/app/workers/usage.php @@ -11,11 +11,6 @@ Console::success(APP_NAME.' usage worker v1 has started'); class UsageV1 extends Worker { - /** - * @var array - */ - public $args = []; - public function init(): void { } @@ -33,7 +28,7 @@ class UsageV1 extends Worker $networkRequestSize = $this->args['networkRequestSize'] ?? 0; $networkResponseSize = $this->args['networkResponseSize'] ?? 0; - + $httpMethod = $this->args['httpMethod'] ?? ''; $httpRequest = $this->args['httpRequest'] ?? 0; diff --git a/app/workers/webhooks.php b/app/workers/webhooks.php index 00307bdda..287ab61a2 100644 --- a/app/workers/webhooks.php +++ b/app/workers/webhooks.php @@ -11,8 +11,6 @@ Console::success(APP_NAME.' webhooks worker v1 has started'); class WebhooksV1 extends Worker { - public $args = []; - public function init(): void { } @@ -37,7 +35,7 @@ class WebhooksV1 extends Worker $name = $webhook['name'] ?? ''; $signature = $webhook['signature'] ?? 'not-yet-implemented'; $url = $webhook['url'] ?? ''; - $security = (bool) $webhook['security'] ?? true; + $security = (bool) ($webhook['security'] ?? true); $httpUser = $webhook['httpUser'] ?? null; $httpPass = $webhook['httpPass'] ?? null; diff --git a/src/Appwrite/Resque/Worker.php b/src/Appwrite/Resque/Worker.php index db8dc91ce..f6f99cbaf 100644 --- a/src/Appwrite/Resque/Worker.php +++ b/src/Appwrite/Resque/Worker.php @@ -4,7 +4,7 @@ namespace Appwrite\Resque; abstract class Worker { - public $args = []; + public array $args = []; abstract public function init(): void; @@ -17,7 +17,7 @@ abstract class Worker $this->init(); } - public function perform() + public function perform(): void { $this->run(); }