diff --git a/src/Appwrite/Platform/Workers/Builds.php b/src/Appwrite/Platform/Workers/Builds.php index 6dbcac637e..84a431f939 100644 --- a/src/Appwrite/Platform/Workers/Builds.php +++ b/src/Appwrite/Platform/Workers/Builds.php @@ -21,9 +21,9 @@ use Utopia\Database\Document; use Utopia\Database\Validator\Authorization; use Utopia\Database\Exception\Structure; use Utopia\Database\Helpers\ID; -use Utopia\DSN\DSN; use Utopia\Platform\Action; use Utopia\Queue\Message; +use Utopia\Storage\Device; use Utopia\Storage\Device\Local; use Utopia\Storage\Storage; use Utopia\VCS\Adapter\Git\GitHub; @@ -44,19 +44,19 @@ class Builds extends Action ->desc('Builds worker') ->inject('message') ->inject('dbForConsole') - ->inject('dbForProject') ->inject('queueForEvents') ->inject('queueForFunctions') ->inject('queueForUsage') ->inject('cache') ->inject('getProjectDB') - ->callback(fn($message, Database $dbForConsole, Database $dbForProject, Event $queueForEvents, Func $queueForFunctions, Usage $queueForUsage, Cache $cache, callable $getProjectDB) => $this->action($message, $dbForConsole, $dbForProject, $queueForEvents, $queueForFunctions, $queueForUsage, $cache, $getProjectDB)); + ->inject('deviceFunctions') + ->callback(fn($message, Database $dbForConsole, Event $queueForEvents, Func $queueForFunctions, Usage $queueForUsage, Cache $cache, callable $getProjectDB, callable $deviceFunctions) => $this->action($message, $dbForConsole, $queueForEvents, $queueForFunctions, $queueForUsage, $cache, $getProjectDB, $deviceFunctions)); } /** * @throws Exception|\Throwable */ - public function action(Message $message, Database $dbForConsole, Database $dbForProject, Event $queueForEvents, Func $queueForFunctions, Usage $queueForUsage, Cache $cache, callable $getProjectDB): void + public function action(Message $message, Database $dbForConsole, Event $queueForEvents, Func $queueForFunctions, Usage $queueForUsage, Cache $cache, callable $getProjectDB, callable $deviceFunctions): void { $payload = $message->getPayload() ?? []; @@ -75,7 +75,7 @@ class Builds extends Action case BUILD_TYPE_RETRY: Console::info('Creating build for deployment: ' . $deployment->getId()); $github = new GitHub($cache); - $this->buildDeployment($queueForEvents, $queueForUsage, $getProjectDB, $dbForConsole, $github, $project, $resource, $deployment, $template); + $this->buildDeployment($deviceFunctions, $queueForFunctions, $queueForEvents, $queueForUsage, $dbForConsole, $getProjectDB, $github, $project, $resource, $deployment, $template); break; default: @@ -88,7 +88,7 @@ class Builds extends Action * @throws \Throwable * @throws Structure */ - protected function buildDeployment(Func $queueForFunctions, Event $queueForEvents, Usage $queueForUsage, Database $dbForConsole, callable $getProjectDB, GitHub $github, Document $project, Document $function, Document $deployment, Document $template) + protected function buildDeployment(callable $deviceFunctions, Func $queueForFunctions, Event $queueForEvents, Usage $queueForUsage, Database $dbForConsole, callable $getProjectDB, GitHub $github, Document $project, Document $function, Document $deployment, Document $template) { $executor = new Executor(App::getEnv('_APP_EXECUTOR_HOST')); @@ -116,7 +116,6 @@ class Builds extends Action throw new Exception('Runtime "' . $function->getAttribute('runtime', '') . '" is not supported'); } - // Realtime preparation $allEvents = Event::generateEvents('functions.[functionId].deployments.[deploymentId].update', [ 'functionId' => $function->getId(), @@ -125,9 +124,7 @@ class Builds extends Action $startTime = DateTime::now(); $durationStart = \microtime(true); - $buildId = $deployment->getAttribute('buildId', ''); - $isNewBuild = empty($buildId); if ($isNewBuild) { @@ -272,7 +269,7 @@ class Builds extends Action Console::execute('tar --exclude code.tar.gz -czf ' . $tmpPathFile . ' -C /tmp/builds/' . \escapeshellcmd($buildId) . '/code' . (empty($rootDirectory) ? '' : '/' . $rootDirectory) . ' .', '', $stdout, $stderr); - $deviceFunctions = $this->getFunctionsDevice($project->getId()); + $deviceFunctions = $deviceFunctions($project->getId()); $localDevice = new Local(); $buffer = $localDevice->read($tmpPathFile); @@ -303,6 +300,7 @@ class Builds extends Action } /** Trigger Webhook */ + $deploymentModel = new Deployment(); $deploymentUpdate = $queueForEvents ->setQueue(Event::WEBHOOK_QUEUE_NAME) @@ -311,11 +309,8 @@ class Builds extends Action ->setEvent('functions.[functionId].deployments.[deploymentId].update') ->setParam('functionId', $function->getId()) ->setParam('deploymentId', $deployment->getId()) - ->setPayload($deployment->getArrayCopy( - array_keys( - (new Deployment())->getRules() - ) - )); + ->setPayload($deployment->getArrayCopy(array_keys($deploymentModel->getRules()))) + ; $deploymentUpdate->trigger(); @@ -593,217 +588,3 @@ class Builds extends Action } } } - -// // Realtime preparation -// $allEvents = Event::generateEvents('functions.[functionId].deployments.[deploymentId].update', [ -// 'functionId' => $function->getId(), -// 'deploymentId' => $deployment->getId() -// ]); -// -// $startTime = DateTime::now(); -// $durationStart = \microtime(true); -// -// -// -// $executor = new Executor(App::getEnv('_APP_EXECUTOR_HOST')); -// $function = $dbForProject->getDocument('functions', $function->getId()); -// -// if ($function->isEmpty()) { -// throw new Exception('Function not found', 404); -// } -// -// $deployment = $dbForProject->getDocument('deployments', $deployment->getId()); -// if ($deployment->isEmpty()) { -// throw new Exception('Deployment not found', 404); -// } -// -// $runtimes = Config::getParam('runtimes', []); -// $key = $function->getAttribute('runtime'); -// $runtime = $runtimes[$key] ?? null; -// if (\is_null($runtime)) { -// throw new Exception('Runtime "' . $function->getAttribute('runtime', '') . '" is not supported'); -// } -// -// $connection = App::getEnv('_APP_CONNECTIONS_STORAGE', ''); -// /** @TODO : move this to the registry or someplace else */ -// $device = Storage::DEVICE_LOCAL; -// try { -// $dsn = new DSN($connection); -// $device = $dsn->getScheme(); -// } catch (\Exception $e) { -// Console::error($e->getMessage() . 'Invalid DSN. Defaulting to Local device.'); -// } -// -// $buildId = $deployment->getAttribute('buildId', ''); -// $startTime = DateTime::now(); -// -// if (empty($buildId)) { -// $buildId = ID::unique(); -// $build = $dbForProject->createDocument('builds', new Document([ -// '$id' => $buildId, -// '$permissions' => [], -// 'startTime' => $startTime, -// 'deploymentInternalId' => $deployment->getInternalId(), -// 'deploymentId' => $deployment->getId(), -// 'status' => 'processing', -// 'outputPath' => '', -// 'runtime' => $function->getAttribute('runtime'), -// 'source' => $deployment->getAttribute('path'), -// 'sourceType' => $device, -// 'stdout' => '', -// 'stderr' => '', -// 'duration' => 0 -// ])); -// -// $deployment->setAttribute('buildId', $buildId); -// $deployment->setAttribute('buildInternalId', $build->getInternalId()); -// $deployment = $dbForProject->updateDocument('deployments', $deployment->getId(), $deployment); -// } else { -// $build = $dbForProject->getDocument('builds', $buildId); -// } -// -// /** Request the executor to build the code... */ -// $build->setAttribute('status', 'building'); -// $build = $dbForProject->updateDocument('builds', $buildId, $build); -// -// /** Trigger Webhook */ -// $deploymentUpdate = $queueForEvents -// ->setQueue(Event::WEBHOOK_QUEUE_NAME) -// ->setClass(Event::WEBHOOK_CLASS_NAME) -// ->setProject($project) -// ->setEvent('functions.[functionId].deployments.[deploymentId].update') -// ->setParam('functionId', $function->getId()) -// ->setParam('deploymentId', $deployment->getId()) -// ->setPayload($deployment->getArrayCopy( -// array_keys( -// (new Deployment())->getRules() -// ) -// )); -// -// $deploymentUpdate->trigger(); -// -// /** Trigger Functions */ -// $queueForFunctions -// ->from($deploymentUpdate) -// ->trigger(); -// -// -// /** Trigger Realtime */ -// $allEvents = Event::generateEvents('functions.[functionId].deployments.[deploymentId].update', [ -// 'functionId' => $function->getId(), -// 'deploymentId' => $deployment->getId() -// ]); -// $target = Realtime::fromPayload( -// // Pass first, most verbose event pattern -// event: $allEvents[0], -// payload: $build, -// project: $project -// ); -// -// Realtime::send( -// projectId: 'console', -// payload: $build->getArrayCopy(), -// events: $allEvents, -// channels: $target['channels'], -// roles: $target['roles'] -// ); -// -// $source = $deployment->getAttribute('path'); -// -// $vars = array_reduce($function->getAttribute('vars', []), function (array $carry, Document $var) { -// $carry[$var->getAttribute('key')] = $var->getAttribute('value'); -// return $carry; -// }, []); -// -// try { -// $response = $executor->createRuntime( -// deploymentId: $deployment->getId(), -// projectId: $project->getId(), -// source: $source, -// image: $runtime['image'], -// remove: true, -// entrypoint: $deployment->getAttribute('entrypoint'), -// workdir: '/usr/code', -// destination: APP_STORAGE_BUILDS . "/app-{$project->getId()}", -// variables: $vars, -// commands: [ -// 'sh', '-c', -// 'tar -zxf /tmp/code.tar.gz -C /usr/code && \ -// cd /usr/local/src/ && ./build.sh' -// ] -// ); -// -// $endTime = new \DateTime(); -// $endTime->setTimestamp($response['endTimeUnix']); -// -// /** Update the build document */ -// $build->setAttribute('endTime', DateTime::format($endTime)); -// $build->setAttribute('duration', \intval($response['duration'])); -// $build->setAttribute('status', $response['status']); -// $build->setAttribute('outputPath', $response['outputPath']); -// $build->setAttribute('stderr', $response['stderr']); -// $build->setAttribute('stdout', $response['stdout']); -// -// /* Also update the deployment buildTime */ -// $deployment->setAttribute('buildTime', $response['duration']); -// -// Console::success("Build id: $buildId created"); -// -// /** Set auto deploy */ -// if ($deployment->getAttribute('activate') === true) { -// $function->setAttribute('deploymentInternalId', $deployment->getInternalId()); -// $function->setAttribute('deployment', $deployment->getId()); -// $function = $dbForProject->updateDocument('functions', $function->getId(), $function); -// } -// -// /** Update function schedule */ -// $schedule = $dbForConsole->getDocument('schedules', $function->getAttribute('scheduleId')); -// $schedule->setAttribute('resourceUpdatedAt', DateTime::now()); -// -// $schedule -// ->setAttribute('schedule', $function->getAttribute('schedule')) -// ->setAttribute('active', !empty($function->getAttribute('schedule')) && !empty($function->getAttribute('deployment'))); -// -// -// \Utopia\Database\Validator\Authorization::skip(fn() => $dbForConsole->updateDocument('schedules', $schedule->getId(), $schedule)); -// } catch (\Throwable $th) { -// $endTime = DateTime::now(); -// $interval = (new \DateTime($endTime))->diff(new \DateTime($startTime)); -// $build->setAttribute('endTime', $endTime); -// $build->setAttribute('duration', $interval->format('%s') + 0); -// $build->setAttribute('status', 'failed'); -// $build->setAttribute('stderr', $th->getMessage()); -// Console::error($th->getMessage()); -// } finally { -// $build = $dbForProject->updateDocument('builds', $buildId, $build); -// -// /** -// * Send realtime Event -// */ -// $target = Realtime::fromPayload( -// // Pass first, most verbose event pattern -// event: $allEvents[0], -// payload: $build, -// project: $project -// ); -// Realtime::send( -// projectId: 'console', -// payload: $build->getArrayCopy(), -// events: $allEvents, -// channels: $target['channels'], -// roles: $target['roles'] -// ); -// -// /** Trigger usage queue */ -// $queueForUsage -// ->setProject($project) -// ->addMetric(METRIC_BUILDS, 1) // per project -// ->addMetric(METRIC_BUILDS_STORAGE, $build->getAttribute('size', 0)) -// ->addMetric(METRIC_BUILDS_COMPUTE, (int)$build->getAttribute('duration', 0) * 1000) -// ->addMetric(str_replace('{functionInternalId}', $function->getInternalId(), METRIC_FUNCTION_ID_BUILDS), 1) // per function -// ->addMetric(str_replace('{functionInternalId}', $function->getInternalId(), METRIC_FUNCTION_ID_BUILDS_STORAGE), $build->getAttribute('size', 0)) -// ->addMetric(str_replace('{functionInternalId}', $function->getInternalId(), METRIC_FUNCTION_ID_BUILDS_COMPUTE), (int)$build->getAttribute('duration', 0) * 1000) -// ->trigger(); -// } -// } -//} diff --git a/src/Appwrite/Platform/Workers/Certificates.php b/src/Appwrite/Platform/Workers/Certificates.php index df60343078..4c9eb0bbaf 100644 --- a/src/Appwrite/Platform/Workers/Certificates.php +++ b/src/Appwrite/Platform/Workers/Certificates.php @@ -2,9 +2,13 @@ namespace Appwrite\Platform\Workers; +use Appwrite\Event\Event; +use Appwrite\Event\Func; use Appwrite\Event\Mail; +use Appwrite\Messaging\Adapter\Realtime; use Appwrite\Network\Validator\CNAME; use Appwrite\Template\Template; +use Appwrite\Utopia\Response\Model\Rule; use Exception; use Throwable; use Utopia\App; @@ -36,13 +40,15 @@ class Certificates extends Action ->inject('message') ->inject('dbForConsole') ->inject('queueForMails') - ->callback(fn($message, $dbForConsole, $queueForMails) => $this->action($message, $dbForConsole, $queueForMails)); + ->inject('queueForEvents') + ->inject('queueForFunctions') + ->callback(fn(Message $message, Database $dbForConsole, Mail $queueForMails, Event $queueForEvents, Func $queueForFunctions) => $this->action($message, $dbForConsole, $queueForMails, $queueForEvents, $queueForFunctions)); } /** * @throws Exception|Throwable */ - public function action(Message $message, Database $dbForConsole, Mail $queueForMails): void + public function action(Message $message, Database $dbForConsole, Mail $queueForMails, Event $queueForEvents, Func $queueForFunctions): void { $payload = $message->getPayload() ?? []; @@ -54,13 +60,13 @@ class Certificates extends Action $domain = new Domain($document->getAttribute('domain', '')); $skipRenewCheck = $payload['skipRenewCheck'] ?? false; - $this->execute($domain, $dbForConsole, $queueForMails, $skipRenewCheck); + $this->execute($domain, $dbForConsole, $queueForMails, $queueForEvents, $queueForFunctions, $skipRenewCheck); } /** * @throws Exception|Throwable */ - private function execute(Domain $domain, Database $dbForConsole, Mail $queueForMails, bool $skipRenewCheck = false): void + private function execute(Domain $domain, Database $dbForConsole, Mail $queueForMails, Event $queueForEvents, Func $queueForFunctions, bool $skipRenewCheck = false): void { /** * 1. Read arguments and validate domain @@ -101,6 +107,8 @@ class Certificates extends Action $certificate->setAttribute('domain', $domain->get()); } + $success = false; + try { // Email for alerts is required by LetsEncrypt $email = App::getEnv('_APP_SYSTEM_SECURITY_EMAIL_ADDRESS'); @@ -127,11 +135,9 @@ class Certificates extends Action $letsEncryptData = $this->issueCertificate($folder, $domain->get(), $email); // Command succeeded, store all data into document - // We store stderr too, because it may include warnings - $certificate->setAttribute('log', \json_encode([ - 'stdout' => $letsEncryptData['stdout'], - 'stderr' => $letsEncryptData['stderr'], - ])); + $logs = 'Certificate successfully generated.'; + $certificate->setAttribute('logs', \mb_strcut($logs, 0, 1000000));// Limit to 1MB + // Give certificates to Traefik $this->applyCertificateFiles($folder, $domain->get(), $letsEncryptData); @@ -140,9 +146,12 @@ class Certificates extends Action $certificate->setAttribute('renewDate', $this->getRenewDate($domain->get())); $certificate->setAttribute('attempts', 0); $certificate->setAttribute('issueDate', DateTime::now()); + $success = true; } catch (Throwable $e) { + $logs = $e->getMessage(); + // Set exception as log in certificate document - $certificate->setAttribute('log', $e->getMessage()); + $certificate->setAttribute('logs', \mb_strcut($logs, 0, 1000000));// Limit to 1MB // Increase attempts count $attempts = $certificate->getAttribute('attempts', 0) + 1; @@ -158,26 +167,45 @@ class Certificates extends Action $certificate->setAttribute('updated', DateTime::now()); // Save all changes we made to certificate document into database - $this->saveCertificateDocument($domain->get(), $certificate, $dbForConsole); + $this->saveCertificateDocument($domain->get(), $certificate, $success, $dbForConsole, $queueForEvents, $queueForFunctions); } } + /** + * Save certificate data into database. + * + * @param string $domain Domain name that certificate is for + * @param Document $certificate Certificate document that we need to save + * @param Database $dbForConsole Database connection for console + * @return void + * @throws Exception|Throwable + */ + private function saveCertificateDocument(string $domain, Document $certificate, bool $success, Database $dbForConsole, Event $queueForEvents, Func $queueForFunctions): void + { + // Check if update or insert required + $certificateDocument = $dbForConsole->findOne('certificates', [Query::equal('domain', [$domain])]); + if (!empty($certificateDocument) && !$certificateDocument->isEmpty()) { + // Merge new data with current data + $certificate = new Document(\array_merge($certificateDocument->getArrayCopy(), $certificate->getArrayCopy())); + $certificate = $dbForConsole->updateDocument('certificates', $certificate->getId(), $certificate); + } else { + $certificate = $dbForConsole->createDocument('certificates', $certificate); + } + + $certificateId = $certificate->getId(); + $this->updateDomainDocuments($certificateId, $domain, $success, $dbForConsole, $queueForEvents, $queueForFunctions); + } + /** * Get main domain. Needed as we do different checks for main and non-main domains. * * @return null|string Returns main domain. If null, there is no main domain yet. - * @throws Exception */ private function getMainDomain(Database $dbForConsole): ?string { $envDomain = App::getEnv('_APP_DOMAIN', ''); if (!empty($envDomain) && $envDomain !== 'localhost') { return $envDomain; - } else { - $domainDocument = $dbForConsole->findOne('domains', [Query::orderAsc('_id')]); - if ($domainDocument) { - return $domainDocument->getAttribute('domain'); - } } return null; @@ -190,6 +218,7 @@ class Certificates extends Action * * @param Domain $domain Domain which we validate * @param bool $isMainDomain In case of master domain, we look for different DNS configurations + * * @return void * @throws Exception */ @@ -267,7 +296,7 @@ class Certificates extends Action $stderr = ''; $staging = (App::isProduction()) ? '' : ' --dry-run'; - $exit = Console::execute("certbot certonly --webroot --noninteractive --agree-tos{$staging}" + $exit = Console::execute("certbot certonly -v --webroot --noninteractive --agree-tos{$staging}" . " --email " . $email . " --cert-name " . $folder . " -w " . APP_STORAGE_CERTIFICATES @@ -284,6 +313,22 @@ class Certificates extends Action ]; } + /** + * Read new renew date from certificate file generated by Let's Encrypt + * + * @param string $domain Domain which certificate was generated for + * @return string + * @throws \Utopia\Database\Exception + */ + private function getRenewDate(string $domain): string + { + $certPath = APP_STORAGE_CERTIFICATES . '/' . $domain . '/cert.pem'; + $certData = openssl_x509_parse(file_get_contents($certPath)); + $validTo = $certData['validTo_time_t'] ?? null; + $dt = (new \DateTime())->setTimestamp($validTo); + return DateTime::addSeconds($dt, -60 * 60 * 24 * 30); // -30 days + } + /** * Method to take files from Let's Encrypt, and put it into Traefik. * @@ -349,19 +394,20 @@ class Certificates extends Action // Log error into console Console::warning('Cannot renew domain (' . $domain . ') on attempt no. ' . $attempt . ' certificate: ' . $errorMessage); - // Send mail to administrator mail + // Send mail to administratore mail + $locale = new Locale(App::getEnv('_APP_LOCALE', 'en')); if (!$locale->getText('emails.sender') || !$locale->getText("emails.certificate.hello") || !$locale->getText("emails.certificate.subject") || !$locale->getText("emails.certificate.body") || !$locale->getText("emails.certificate.footer") || !$locale->getText("emails.certificate.thanks") || !$locale->getText("emails.certificate.signature")) { $locale->setDefault('en'); } - $body = Template::fromFile(__DIR__ . '/../../config/locale/templates/email-base.tpl'); + $body = Template::fromFile(__DIR__ . '/../config/locale/templates/email-base.tpl'); $subject = \sprintf($locale->getText("emails.certificate.subject"), $domain); - $body->setParam('{{domain}}', $domain); - $body->setParam('{{error}}', $errorMessage); - $body->setParam('{{attempt}}', $attempt); $body + ->setParam('{{domain}}', $domain) + ->setParam('{{error}}', $errorMessage) + ->setParam('{{attempt}}', $attempt) ->setParam('{{subject}}', $subject) ->setParam('{{hello}}', $locale->getText("emails.certificate.hello")) ->setParam('{{body}}', $locale->getText("emails.certificate.body")) @@ -391,50 +437,69 @@ class Certificates extends Action * * @param string $certificateId ID of a new or updated certificate document * @param string $domain Domain that is affected by new certificate - * @param Database $dbForConsole Database instance for console + * @param bool $success Was certificate generation successful? + * * @return void - * @throws Exception */ - private function updateDomainDocuments(string $certificateId, string $domain, Database $dbForConsole): void + private function updateDomainDocuments(string $certificateId, string $domain, bool $success, Database $dbForConsole, Event $queueForEvents, Func $queueForFunctions): void { - $domains = $dbForConsole->find('domains', [ + + $rule = $dbForConsole->findOne('rules', [ Query::equal('domain', [$domain]), - Query::limit(1000), ]); - foreach ($domains as $domainDocument) { - $domainDocument->setAttribute('updated', DateTime::now()); - $domainDocument->setAttribute('certificateId', $certificateId); - $dbForConsole->updateDocument('domains', $domainDocument->getId(), $domainDocument); + if ($rule !== false && !$rule->isEmpty()) { + $rule->setAttribute('certificateId', $certificateId); + $rule->setAttribute('status', $success ? 'verified' : 'unverified'); + $dbForConsole->updateDocument('rules', $rule->getId(), $rule); - if ($domainDocument->getAttribute('projectId')) { - $dbForConsole->deleteCachedDocument('projects', $domainDocument->getAttribute('projectId')); + $projectId = $rule->getAttribute('projectId'); + + // Skip events for console project (triggered by auto-ssl generation for 1 click setups) + if ($projectId === 'console') { + return; } - } - } - /** - * Save certificate data into database. - * - * @param string $domain Domain name that certificate is for - * @param Document $certificate Certificate document that we need to save - * @param Database $dbForConsole Database connection for console - * @return void - * @throws Exception|\Throwable - */ - private function saveCertificateDocument(string $domain, Document $certificate, Database $dbForConsole): void - { - // Check if update or insert required - $certificateDocument = $dbForConsole->findOne('certificates', [Query::equal('domain', [$domain])]); - if (!empty($certificateDocument) && !$certificateDocument->isEmpty()) { - // Merge new data with current data - $certificate = new Document(\array_merge($certificateDocument->getArrayCopy(), $certificate->getArrayCopy())); - $certificate = $dbForConsole->updateDocument('certificates', $certificate->getId(), $certificate); - } else { - $certificate = $dbForConsole->createDocument('certificates', $certificate); - } + $project = $dbForConsole->getDocument('projects', $projectId); - $certificateId = $certificate->getId(); - $this->updateDomainDocuments($certificateId, $domain, $dbForConsole); + /** Trigger Webhook */ + $ruleModel = new Rule(); + $queueForEvents + ->setProject($project) + ->setEvent('rules.[ruleId].update') + ->setParam('ruleId', $rule->getId()) + ->setPayload($rule->getArrayCopy(array_keys($ruleModel->getRules()))) + ->trigger(); + + + /** Trigger Functions */ + $queueForFunctions + ->trigger(); + + /** Trigger realtime event */ + $allEvents = Event::generateEvents('rules.[ruleId].update', [ + 'ruleId' => $rule->getId(), + ]); + $target = Realtime::fromPayload( + // Pass first, most verbose event pattern + event: $allEvents[0], + payload: $rule, + project: $project + ); + Realtime::send( + projectId: 'console', + payload: $rule->getArrayCopy(), + events: $allEvents, + channels: $target['channels'], + roles: $target['roles'] + ); + Realtime::send( + projectId: $project->getId(), + payload: $rule->getArrayCopy(), + events: $allEvents, + channels: $target['channels'], + roles: $target['roles'] + ); + } } } diff --git a/src/Appwrite/Platform/Workers/Functions.php b/src/Appwrite/Platform/Workers/Functions.php index d3c4399323..695d1c7486 100644 --- a/src/Appwrite/Platform/Workers/Functions.php +++ b/src/Appwrite/Platform/Workers/Functions.php @@ -9,6 +9,7 @@ use Appwrite\Messaging\Adapter\Realtime; use Appwrite\Utopia\Response\Model\Execution; use Exception; use Executor\Executor; +use Throwable; use Utopia\App; use Utopia\CLI\Console; use Utopia\Config\Config; @@ -20,6 +21,7 @@ use Utopia\Database\Helpers\ID; use Utopia\Database\Helpers\Permission; use Utopia\Database\Helpers\Role; use Utopia\Database\Query; +use Utopia\Logger\Log; use Utopia\Platform\Action; use Utopia\Queue\Message; @@ -42,13 +44,14 @@ class Functions extends Action ->inject('queueForFunctions') ->inject('queueForEvents') ->inject('queueForUsage') - ->callback(fn($message, $dbForProject, $queueForFunctions, $queueForEvents, $queueForUsage) => $this->action($message, $dbForProject, $queueForFunctions, $queueForEvents, $queueForUsage)); + ->inject('log') + ->callback(fn(Message $message, Database $dbForProject, Func $queueForFunctions, Event $queueForEvents, Usage $queueForUsage, Log $log) => $this->action($message, $dbForProject, $queueForFunctions, $queueForEvents, $queueForUsage, $log)); } /** - * @throws Exception|\Throwable + * @throws Exception|Throwable */ - public function action(Message $message, Database $dbForProject, Func $queueForFunctions, Event $queueForEvents, Usage $queueForUsage): void + public function action(Message $message, Database $dbForProject, Func $queueForFunctions, Event $queueForEvents, Usage $queueForUsage, Log $log): void { $payload = $message->getPayload() ?? []; @@ -69,6 +72,9 @@ class Functions extends Action $project = new Document($payload['project'] ?? []); $function = new Document($payload['function'] ?? []); $user = new Document($payload['user'] ?? []); + $method = $payload['method'] ?? 'POST'; + $headers = $payload['headers'] ?? []; + $path = $payload['path'] ?? '/'; if ($project->getId() === 'console') { return; @@ -99,16 +105,25 @@ class Functions extends Action Console::success('Iterating function: ' . $function->getAttribute('name')); $this->execute( + log: $log, dbForProject: $dbForProject, queueForFunctions: $queueForFunctions, - queueForEvents: $queueForEvents, queueForUsage: $queueForUsage, + queueForEvents: $queueForEvents, project: $project, function: $function, trigger: 'event', + path: '/', + method: 'POST', + headers: [ + 'user-agent' => 'Appwrite/' . APP_VERSION_STABLE + ], + data: null, user: $user, + jwt: null, event: $events[0], eventData: \is_string($eventData) ? $eventData : \json_encode($eventData), + executionId: null, ); Console::success('Triggered function: ' . $events[0]); } @@ -125,28 +140,44 @@ class Functions extends Action $execution = new Document($payload['execution'] ?? []); $user = new Document($payload['user'] ?? []); $this->execute( + log: $log, dbForProject: $dbForProject, queueForFunctions: $queueForFunctions, - queueForEvents: $queueForEvents, queueForUsage: $queueForUsage, + queueForEvents: $queueForEvents, project: $project, function: $function, trigger: 'http', + path: $path, + method: $method, + headers: $headers, data: $data, user: $user, jwt: $jwt, - executionId: $execution->getId(), + event: null, + eventData: null, + executionId: $execution->getId() ); break; case 'schedule': $this->execute( + log: $log, dbForProject: $dbForProject, queueForFunctions: $queueForFunctions, - queueForEvents: $queueForEvents, queueForUsage: $queueForUsage, + queueForEvents: $queueForEvents, project: $project, function: $function, trigger: 'schedule', + path: $path, + method: $method, + headers: $headers, + data: null, + user: null, + jwt: null, + event: null, + eventData: null, + executionId: null, ); break; } @@ -154,17 +185,21 @@ class Functions extends Action /** * @throws Authorization - * @throws \Throwable + * @throws Throwable * @throws Structure */ private function execute( + Log $log, Database $dbForProject, Func $queueForFunctions, - Event $queueForEvents, Usage $queueForUsage, + Event $queueForEvents, Document $project, Document $function, string $trigger, + string $path, + string $method, + array $headers, string $data = null, ?Document $user = null, string $jwt = null, @@ -177,6 +212,9 @@ class Functions extends Action $functionInternalId = $function->getInternalId(); $deploymentId = $function->getAttribute('deployment', ''); + $log->addTag('functionId', $functionId); + $log->addTag('projectId', $project->getId()); + /** Check if deployment exists */ $deployment = $dbForProject->getDocument('deployments', $deploymentId); $deploymentInternalId = $deployment->getInternalId(); @@ -200,157 +238,221 @@ class Functions extends Action } /** Check if runtime is supported */ - $runtimes = Config::getParam('runtimes', []); + $version = $function->getAttribute('version', 'v2'); + $runtimes = Config::getParam($version === 'v2' ? 'runtimes-v2' : 'runtimes', []); if (!\array_key_exists($function->getAttribute('runtime'), $runtimes)) { throw new Exception('Runtime "' . $function->getAttribute('runtime', '') . '" is not supported'); } - $runtime = $runtimes[$function->getAttribute('runtime')]; + $runtime = $runtimes[$function->getAttribute('runtime')]; - /** Create execution or update execution status */ - $execution = $dbForProject->getDocument('executions', $executionId ?? ''); + $headers['x-appwrite-trigger'] = $trigger; + $headers['x-appwrite-event'] = $event ?? ''; + $headers['x-appwrite-user-id'] = $user->getId() ?? ''; + $headers['x-appwrite-user-jwt'] = $jwt ?? ''; + + /** Create execution or update execution status */ + /** Create execution or update execution status */ + $execution = $dbForProject->getDocument('executions', $executionId ?? ''); if ($execution->isEmpty()) { + $headersFiltered = []; + foreach ($headers as $key => $value) { + if (\in_array(\strtolower($key), FUNCTION_ALLOWLIST_HEADERS_REQUEST)) { + $headersFiltered[] = [ 'name' => $key, 'value' => $value ]; + } + } + $executionId = ID::unique(); - $execution = $dbForProject->createDocument('executions', new Document([ + $execution = new Document([ '$id' => $executionId, '$permissions' => $user->isEmpty() ? [] : [Permission::read(Role::user($user->getId()))], - 'functionId' => $functionId, - 'functionInternalId' => $functionInternalId, - 'deploymentInternalId' => $deploymentInternalId, - 'deploymentId' => $deploymentId, + 'functionInternalId' => $function->getInternalId(), + 'functionId' => $function->getId(), + 'deploymentInternalId' => $deployment->getInternalId(), + 'deploymentId' => $deployment->getId(), 'trigger' => $trigger, - 'status' => 'waiting', - 'statusCode' => 0, - 'response' => '', - 'stderr' => '', + 'status' => 'processing', + 'responseStatusCode' => 0, + 'responseHeaders' => [], + 'requestPath' => $path, + 'requestMethod' => $method, + 'requestHeaders' => $headersFiltered, + 'errors' => '', + 'logs' => '', 'duration' => 0.0, - 'search' => implode(' ', [$function->getId(), $executionId]), - ])); + 'search' => implode(' ', [$functionId, $executionId]), + ]); + + if ($function->getAttribute('logging')) { + $execution = $dbForProject->createDocument('executions', $execution); + } // TODO: @Meldiron Trigger executions.create event here if ($execution->isEmpty()) { throw new Exception('Failed to create or read execution'); } - - /*** Usage */ - $queueForUsage - ->addMetric(METRIC_EXECUTIONS, 1) // per project - ->addMetric(str_replace('{functionInternalId}', $function->getInternalId(), METRIC_FUNCTION_ID_EXECUTIONS), 1); // per function } + if ($execution->getAttribute('status') !== 'processing') { $execution->setAttribute('status', 'processing'); - $execution = $dbForProject->updateDocument('executions', $executionId, $execution); - $vars = array_reduce($function->getAttribute('vars', []), function (array $carry, Document $var) { - $carry[$var->getAttribute('key')] = $var->getAttribute('value'); - return $carry; - }, []); + if ($function->getAttribute('logging')) { + $execution = $dbForProject->updateDocument('executions', $executionId, $execution); + } + } - /** Collect environment variables */ + $durationStart = \microtime(true); + + $body = $eventData ?? ''; + if (empty($body)) { + $body = $data ?? ''; + } + + $vars = []; + + // V2 vars + if ($version === 'v2') { $vars = \array_merge($vars, [ - 'APPWRITE_FUNCTION_ID' => $functionId, - 'APPWRITE_FUNCTION_NAME' => $function->getAttribute('name'), - 'APPWRITE_FUNCTION_DEPLOYMENT' => $deploymentId, - 'APPWRITE_FUNCTION_TRIGGER' => $trigger, - 'APPWRITE_FUNCTION_PROJECT_ID' => $project->getId(), - 'APPWRITE_FUNCTION_RUNTIME_NAME' => $runtime['name'] ?? '', - 'APPWRITE_FUNCTION_RUNTIME_VERSION' => $runtime['version'] ?? '', - 'APPWRITE_FUNCTION_EVENT' => $event ?? '', - 'APPWRITE_FUNCTION_EVENT_DATA' => $eventData ?? '', - 'APPWRITE_FUNCTION_DATA' => $data ?? '', - 'APPWRITE_FUNCTION_USER_ID' => $user->getId() ?? '', - 'APPWRITE_FUNCTION_JWT' => $jwt ?? '', + 'APPWRITE_FUNCTION_TRIGGER' => $headers['x-appwrite-trigger'] ?? '', + 'APPWRITE_FUNCTION_DATA' => $body ?? '', + 'APPWRITE_FUNCTION_EVENT_DATA' => $body ?? '', + 'APPWRITE_FUNCTION_EVENT' => $headers['x-appwrite-event'] ?? '', + 'APPWRITE_FUNCTION_USER_ID' => $headers['x-appwrite-user-id'] ?? '', + 'APPWRITE_FUNCTION_JWT' => $headers['x-appwrite-user-jwt'] ?? '' ]); + } - /** Execute function */ + // Shared vars + foreach ($function->getAttribute('varsProject', []) as $var) { + $vars[$var->getAttribute('key')] = $var->getAttribute('value', ''); + } + + // Function vars + foreach ($function->getAttribute('vars', []) as $var) { + $vars[$var->getAttribute('key')] = $var->getAttribute('value', ''); + } + + // Appwrite vars + $vars = \array_merge($vars, [ + 'APPWRITE_FUNCTION_ID' => $functionId, + 'APPWRITE_FUNCTION_NAME' => $function->getAttribute('name'), + 'APPWRITE_FUNCTION_DEPLOYMENT' => $deploymentId, + 'APPWRITE_FUNCTION_PROJECT_ID' => $project->getId(), + 'APPWRITE_FUNCTION_RUNTIME_NAME' => $runtime['name'] ?? '', + 'APPWRITE_FUNCTION_RUNTIME_VERSION' => $runtime['version'] ?? '', + ]); + + /** Execute function */ try { - $client = new Executor(App::getEnv('_APP_EXECUTOR_HOST')); - $executionResponse = $client->createExecution( + $version = $function->getAttribute('version', 'v2'); + $command = $runtime['startCommand']; + $executor = new Executor(App::getEnv('_APP_EXECUTOR_HOST')); + $command = $version === 'v2' ? '' : 'cp /tmp/code.tar.gz /mnt/code/code.tar.gz && nohup helpers/start.sh "' . $command . '"'; + $executionResponse = $executor->createExecution( projectId: $project->getId(), deploymentId: $deploymentId, - payload: $vars['APPWRITE_FUNCTION_DATA'] ?? '', + body: \strlen($body) > 0 ? $body : null, variables: $vars, timeout: $function->getAttribute('timeout', 0), image: $runtime['image'], - source: $build->getAttribute('outputPath', ''), + source: $build->getAttribute('path', ''), entrypoint: $deployment->getAttribute('entrypoint', ''), + version: $version, + path: $path, + method: $method, + headers: $headers, + runtimeEntrypoint: $command ); + $status = $executionResponse['statusCode'] >= 400 ? 'failed' : 'completed'; + + $headersFiltered = []; + foreach ($executionResponse['headers'] as $key => $value) { + if (\in_array(\strtolower($key), FUNCTION_ALLOWLIST_HEADERS_RESPONSE)) { + $headersFiltered[] = [ 'name' => $key, 'value' => $value ]; + } + } + /** Update execution status */ $execution - ->setAttribute('status', $executionResponse['status']) - ->setAttribute('statusCode', $executionResponse['statusCode']) - ->setAttribute('response', $executionResponse['response']) - ->setAttribute('stdout', $executionResponse['stdout']) - ->setAttribute('stderr', $executionResponse['stderr']) + ->setAttribute('status', $status) + ->setAttribute('responseStatusCode', $executionResponse['statusCode']) + ->setAttribute('responseHeaders', $headersFiltered) + ->setAttribute('logs', $executionResponse['logs']) + ->setAttribute('errors', $executionResponse['errors']) ->setAttribute('duration', $executionResponse['duration']); } catch (\Throwable $th) { - $interval = (new \DateTime())->diff(new \DateTime($execution->getCreatedAt())); + $durationEnd = \microtime(true); $execution - ->setAttribute('duration', (float)$interval->format('%s.%f')) + ->setAttribute('duration', $durationEnd - $durationStart) ->setAttribute('status', 'failed') - ->setAttribute('statusCode', $th->getCode()) - ->setAttribute('stderr', $th->getMessage()); + ->setAttribute('responseStatusCode', 500) + ->setAttribute('errors', $th->getMessage() . '\nError Code: ' . $th->getCode()); - Console::error($th->getTraceAsString()); - Console::error($th->getFile()); - Console::error($th->getLine()); - Console::error($th->getMessage()); + $error = $th->getMessage(); + $errorCode = $th->getCode(); } + if ($function->getAttribute('logging')) { $execution = $dbForProject->updateDocument('executions', $executionId, $execution); + } + /** Trigger Webhook */ + $executionModel = new Execution(); + $queueForEvents + ->setQueue(Event::WEBHOOK_QUEUE_NAME) + ->setClass(Event::WEBHOOK_CLASS_NAME) + ->setProject($project) + ->setUser($user) + ->setEvent('functions.[functionId].executions.[executionId].update') + ->setParam('functionId', $function->getId()) + ->setParam('executionId', $execution->getId()) + ->setPayload($execution->getArrayCopy(array_keys($executionModel->getRules()))) + ->trigger(); - /** Trigger Webhook */ - $executionModel = new Execution(); - $queueForEvents - ->setQueue(Event::WEBHOOK_QUEUE_NAME) - ->setClass(Event::WEBHOOK_CLASS_NAME) - ->setProject($project) - ->setUser($user) - ->setEvent('functions.[functionId].executions.[executionId].update') - ->setParam('functionId', $function->getId()) - ->setParam('executionId', $execution->getId()) - ->setPayload($execution->getArrayCopy(array_keys($executionModel->getRules()))) - ->trigger(); + /** Trigger Functions */ + $queueForFunctions + ->from($queueForEvents) + ->trigger(); - /** Trigger Functions */ - $queueForFunctions - ->from($queueForEvents) - ->trigger(); + /** Trigger realtime event */ + $allEvents = Event::generateEvents('functions.[functionId].executions.[executionId].update', [ + 'functionId' => $function->getId(), + 'executionId' => $execution->getId() + ]); + $target = Realtime::fromPayload( + // Pass first, most verbose event pattern + event: $allEvents[0], + payload: $execution + ); + Realtime::send( + projectId: 'console', + payload: $execution->getArrayCopy(), + events: $allEvents, + channels: $target['channels'], + roles: $target['roles'] + ); + Realtime::send( + projectId: $project->getId(), + payload: $execution->getArrayCopy(), + events: $allEvents, + channels: $target['channels'], + roles: $target['roles'] + ); - /** Trigger realtime event */ - $allEvents = Event::generateEvents('functions.[functionId].executions.[executionId].update', [ - 'functionId' => $function->getId(), - 'executionId' => $execution->getId() - ]); - $target = Realtime::fromPayload( - // Pass first, most verbose event pattern - event: $allEvents[0], - payload: $execution - ); - Realtime::send( - projectId: 'console', - payload: $execution->getArrayCopy(), - events: $allEvents, - channels: $target['channels'], - roles: $target['roles'] - ); - Realtime::send( - projectId: $project->getId(), - payload: $execution->getArrayCopy(), - events: $allEvents, - channels: $target['channels'], - roles: $target['roles'] - ); + if (!empty($error)) { + throw new Exception($error, $errorCode); + } - /** Trigger usage queue */ - $queueForUsage - ->setProject($project) - ->addMetric(METRIC_EXECUTIONS_COMPUTE, (int)($execution->getAttribute('duration') * 1000))// per project - ->addMetric(str_replace('{functionInternalId}', $function->getInternalId(), METRIC_FUNCTION_ID_EXECUTIONS_COMPUTE), (int)($execution->getAttribute('duration') * 1000)) - ->trigger() - ; + /** Trigger usage queue */ + $queueForUsage + ->setProject($project) + ->addMetric(METRIC_EXECUTIONS, 1) + ->addMetric(str_replace('{functionInternalId}', $function->getInternalId(), METRIC_FUNCTION_ID_EXECUTIONS), 1) + ->addMetric(METRIC_EXECUTIONS_COMPUTE, (int)($execution->getAttribute('duration') * 1000))// per project + ->addMetric(str_replace('{functionInternalId}', $function->getInternalId(), METRIC_FUNCTION_ID_EXECUTIONS_COMPUTE), (int)($execution->getAttribute('duration') * 1000)) + ->trigger() + ; } }