diff --git a/app/controllers/api/databases.php b/app/controllers/api/databases.php index ffe42920b..d29f41f6c 100644 --- a/app/controllers/api/databases.php +++ b/app/controllers/api/databases.php @@ -54,7 +54,7 @@ use MaxMind\Db\Reader; * @return Document Newly created attribute document * @throws Exception */ -function createAttribute(string $databaseId, string $collectionId, Document $attribute, Response $response, Database $dbForProject, EventDatabase $database, Event $queueForEvents): Document +function createAttribute(string $databaseId, string $collectionId, Document $attribute, Response $response, Database $dbForProject, EventDatabase $queueForDatabase, Event $queueForEvents): Document { $key = $attribute->getAttribute('key'); $type = $attribute->getAttribute('type', ''); @@ -125,7 +125,7 @@ function createAttribute(string $databaseId, string $collectionId, Document $att $dbForProject->deleteCachedDocument('database_' . $db->getInternalId(), $collectionId); $dbForProject->deleteCachedCollection('database_' . $db->getInternalId() . '_collection_' . $collection->getInternalId()); - $database + $queueForDatabase ->setType(DATABASE_TYPE_CREATE_ATTRIBUTE) ->setDatabase($db) ->setCollection($collection) diff --git a/app/worker.php b/app/worker.php index f523defd8..b76d59912 100644 --- a/app/worker.php +++ b/app/worker.php @@ -172,27 +172,6 @@ function getCache(): Cache return new Cache(new Sharding($adapters)); } -Server::setResource('getProjectDB', function (Registry $register, Database $dbForConsole) { - return function (Document $project) use ($register, $dbForConsole) { - /** @var Group $pools */ - $pools = $register->get('pools'); - - if ($project->isEmpty() || $project->getId() === 'console') { - return $dbForConsole; - } - - $dbAdapter = $pools - ->get($project->getAttribute('database')) - ->pop() - ->getResource(); - - $database = new Database($dbAdapter, getCache()); - $database->setNamespace('_' . $project->getInternalId()); - - return $database; - }; -}, ['register']); - /** * Get Functions Storage Device * @param string $projectId of the project @@ -234,7 +213,7 @@ try { $platform->init(Service::TYPE_WORKER, [ 'workersNum' => swoole_cpu_num() * intval(App::getEnv('_APP_WORKER_PER_CORE', 6)), 'connection' => $pools->get('queue')->pop()->getResource(), - 'workerName' => $workerName ?? null, + 'workerName' => strtolower($workerName) ?? null, ]); } catch (\Exception $e) { Console::error($e->getMessage() . ', File: ' . $e->getFile() . ', Line: ' . $e->getLine()); @@ -255,17 +234,13 @@ $worker ->inject('logger') ->action(function (Throwable $error, Logger|null $logger) { - if ($logger === null) { - return; - } - $version = App::getEnv('_APP_VERSION', 'UNKNOWN'); if ($error instanceof PDOException) { throw $error; } - if ($error->getCode() >= 500 || $error->getCode() === 0) { + if (($error->getCode() >= 500 || $error->getCode() === 0) && !empty($logger)) { $log = new Log(); $log->setNamespace("appwrite-worker"); @@ -294,5 +269,8 @@ $worker Console::error('[Error] Line: ' . $error->getLine()); }); -$worker->workerStart(); +$worker->workerStart() + ->action(function () use ($workerName) { + Console::info("Worker $workerName started"); + }); $worker->start(); diff --git a/bin/worker-certificates b/bin/worker-certificates index 679885fa4..901688c4c 100755 --- a/bin/worker-certificates +++ b/bin/worker-certificates @@ -1,10 +1,3 @@ #!/bin/sh -if [ -z "$_APP_REDIS_USER" ] && [ -z "$_APP_REDIS_PASS" ] -then - REDIS_BACKEND="${_APP_REDIS_HOST}:${_APP_REDIS_PORT}" -else - REDIS_BACKEND="redis://${_APP_REDIS_USER}:${_APP_REDIS_PASS}@${_APP_REDIS_HOST}:${_APP_REDIS_PORT}" -fi - -INTERVAL=1 QUEUE='v1-certificates' APP_INCLUDE='/usr/src/code/app/workers/certificates.php' php /usr/src/code/vendor/bin/resque -dopcache.preload=opcache.preload=/usr/src/code/app/preload.php \ No newline at end of file +php /usr/src/code/app/worker.php certificates $@ \ No newline at end of file diff --git a/bin/worker-databases b/bin/worker-databases index bbec58268..502075bc5 100644 --- a/bin/worker-databases +++ b/bin/worker-databases @@ -1,10 +1,3 @@ #!/bin/sh -if [ -z "$_APP_REDIS_USER" ] && [ -z "$_APP_REDIS_PASS" ] -then - REDIS_BACKEND="${_APP_REDIS_HOST}:${_APP_REDIS_PORT}" -else - REDIS_BACKEND="redis://${_APP_REDIS_USER}:${_APP_REDIS_PASS}@${_APP_REDIS_HOST}:${_APP_REDIS_PORT}" -fi - -INTERVAL=0.1 QUEUE='v1-database' APP_INCLUDE='/usr/src/code/app/workers/databases.php' php /usr/src/code/vendor/bin/resque -dopcache.preload=opcache.preload=/usr/src/code/app/preload.php \ No newline at end of file +php /usr/src/code/app/worker.php databases $@ \ No newline at end of file diff --git a/src/Appwrite/Event/Event.php b/src/Appwrite/Event/Event.php index e49ff6e8b..8f3bee26e 100644 --- a/src/Appwrite/Event/Event.php +++ b/src/Appwrite/Event/Event.php @@ -9,7 +9,7 @@ use Utopia\Queue\Connection; class Event { - public const DATABASE_QUEUE_NAME = 'v1-database'; + public const DATABASE_QUEUE_NAME = 'v1-databases'; public const DATABASE_CLASS_NAME = 'DatabaseV1'; public const DELETE_QUEUE_NAME = 'v1-deletes'; diff --git a/src/Appwrite/Platform/Services/Workers.php b/src/Appwrite/Platform/Services/Workers.php index 597bb8fae..8bd7522cc 100644 --- a/src/Appwrite/Platform/Services/Workers.php +++ b/src/Appwrite/Platform/Services/Workers.php @@ -7,6 +7,9 @@ use Appwrite\Platform\Workers\Audits; use Appwrite\Platform\Workers\Webhooks; use Appwrite\Platform\Workers\Mails; use Appwrite\Platform\Workers\Messaging; +use Appwrite\Platform\Workers\Certificates; +use Appwrite\Platform\Workers\Databases; +use Appwrite\Platform\Workers\Usage; class Workers extends Service { @@ -18,6 +21,9 @@ class Workers extends Service ->addAction(Webhooks::getName(), new Webhooks()) ->addAction(Mails::getName(), new Mails()) ->addAction(Messaging::getName(), new Messaging()) + ->addAction(Certificates::getName(), new Certificates()) + ->addAction(Databases::getName(), new Databases()) + //->addAction(Usage::getName(), new Usage()) ; } } diff --git a/src/Appwrite/Platform/Workers/Certificates.php b/src/Appwrite/Platform/Workers/Certificates.php new file mode 100644 index 000000000..6e1d1010c --- /dev/null +++ b/src/Appwrite/Platform/Workers/Certificates.php @@ -0,0 +1,440 @@ +desc('Certificates worker') + ->inject('message') + ->inject('dbForConsole') + ->inject('queueForMail') + ->callback(fn($message, $dbForConsole, $queueForMail) => $this->action($message, $dbForConsole, $queueForMail)); + } + + /** + * @throws Exception|Throwable + */ + public function action(Message $message, Database $dbForConsole, Mail $queueForMail): void + { + $payload = $message->getPayload() ?? []; + + if (empty($payload)) { + throw new Exception('Missing payload'); + } + + $document = new Document($payload['domain'] ?? []); + $domain = new Domain($document->getAttribute('domain', '')); + $skipRenewCheck = $payload['skipRenewCheck'] ?? false; + + $this->execute($domain, $dbForConsole, $queueForMail, $skipRenewCheck); + } + + /** + * @throws Exception|Throwable + */ + private function execute(Domain $domain, Database $dbForConsole, Mail $queueForMail, bool $skipRenewCheck = false): void + { + /** + * 1. Read arguments and validate domain + * 2. Get main domain + * 3. Validate CNAME DNS if parameter is not main domain (meaning it's custom domain) + * 4. Validate security email. Cannot be empty, required by LetsEncrypt + * 5. Validate renew date with certificate file, unless requested to skip by parameter + * 6. Issue a certificate using certbot CLI + * 7. Update 'log' attribute on certificate document with Certbot message + * 8. Create storage folder for certificate, if not ready already + * 9. Move certificates from Certbot location to our Storage + * 10. Create/Update our Storage with new Traefik config with new certificate paths + * 11. Read certificate file and update 'renewDate' on certificate document + * 12. Update 'issueDate' and 'attempts' on certificate + * + * If at any point unexpected error occurs, program stops without applying changes to document, and error is thrown into worker + * + * If code stops with expected error: + * 1. 'log' attribute on document is updated with error message + * 2. 'attempts' amount is increased + * 3. Console log is shown + * 4. Email is sent to security email + * + * Unless unexpected error occurs, at the end, we: + * 1. Update 'updated' attribute on document + * 2. Save document to database + * 3. Update all domains documents with current certificate ID + * + * Note: Renewals are checked and scheduled from maintenence worker + */ + + // Get current certificate + $certificate = $dbForConsole->findOne('certificates', [Query::equal('domain', [$domain->get()])]); + + // If we don't have certificate for domain yet, let's create new document. At the end we save it + if (!$certificate) { + $certificate = new Document(); + $certificate->setAttribute('domain', $domain->get()); + } + + try { + // Email for alerts is required by LetsEncrypt + $email = App::getEnv('_APP_SYSTEM_SECURITY_EMAIL_ADDRESS'); + if (empty($email)) { + throw new Exception('You must set a valid security email address (_APP_SYSTEM_SECURITY_EMAIL_ADDRESS) to issue an SSL certificate.'); + } + + // Validate domain and DNS records. Skip if job is forced + if (!$skipRenewCheck) { + $mainDomain = $this->getMainDomain($dbForConsole); + $isMainDomain = !isset($mainDomain) || $domain->get() === $mainDomain; + $this->validateDomain($domain, $isMainDomain); + } + + // If certificate exists already, double-check expiry date. Skip if job is forced + if (!$skipRenewCheck && !$this->isRenewRequired($domain->get())) { + throw new Exception('Renew isn\'t required.'); + } + + // Prepare folder name for certbot. Using this helps prevent miss-match in LetsEncrypt configuration when renewing certificate + $folder = ID::unique(); + + // Generate certificate files using Let's Encrypt + $letsEncryptData = $this->issueCertificate($folder, $domain->get(), $email); + + // Command succeeded, store all data into document + // We store stderr too, because it may include warnings + $certificate->setAttribute('log', \json_encode([ + 'stdout' => $letsEncryptData['stdout'], + 'stderr' => $letsEncryptData['stderr'], + ])); + + // Give certificates to Traefik + $this->applyCertificateFiles($folder, $domain->get(), $letsEncryptData); + + // Update certificate info stored in database + $certificate->setAttribute('renewDate', $this->getRenewDate($domain->get())); + $certificate->setAttribute('attempts', 0); + $certificate->setAttribute('issueDate', DateTime::now()); + } catch (Throwable $e) { + // Set exception as log in certificate document + $certificate->setAttribute('log', $e->getMessage()); + + // Increase attempts count + $attempts = $certificate->getAttribute('attempts', 0) + 1; + $certificate->setAttribute('attempts', $attempts); + + // Store cuttent time as renew date to ensure another attempt in next maintenance cycle + $certificate->setAttribute('renewDate', DateTime::now()); + + // Send email to security email + $this->notifyError($domain->get(), $e->getMessage(), $attempts, $queueForMail); + } finally { + // All actions result in new updatedAt date + $certificate->setAttribute('updated', DateTime::now()); + + // Save all changes we made to certificate document into database + $this->saveCertificateDocument($domain->get(), $certificate, $dbForConsole); + } + } + + /** + * Get main domain. Needed as we do different checks for main and non-main domains. + * + * @return null|string Returns main domain. If null, there is no main domain yet. + * @throws Exception + */ + private function getMainDomain(Database $dbForConsole): ?string + { + $envDomain = App::getEnv('_APP_DOMAIN', ''); + if (!empty($envDomain) && $envDomain !== 'localhost') { + return $envDomain; + } else { + $domainDocument = $dbForConsole->findOne('domains', [Query::orderAsc('_id')]); + if ($domainDocument) { + return $domainDocument->getAttribute('domain'); + } + } + + return null; + } + + /** + * Internal domain validation functionality to prevent unnecessary attempts failed from Let's Encrypt side. We check: + * - Domain needs to be public and valid (prevents NFT domains that are not supported by Let's Encrypt) + * - Domain must have proper DNS record + * + * @param Domain $domain Domain which we validate + * @param bool $isMainDomain In case of master domain, we look for different DNS configurations + * @return void + * @throws Exception + */ + private function validateDomain(Domain $domain, bool $isMainDomain): void + { + if (empty($domain->get())) { + throw new Exception('Missing certificate domain.'); + } + + if (!$domain->isKnown() || $domain->isTest()) { + throw new Exception('Unknown public suffix for domain.'); + } + + if (!$isMainDomain) { + // TODO: Would be awesome to also support A/AAAA records here. Maybe dry run? + // Validate if domain target is properly configured + $target = new Domain(App::getEnv('_APP_DOMAIN_TARGET', '')); + + if (!$target->isKnown() || $target->isTest()) { + throw new Exception('Unreachable CNAME target (' . $target->get() . '), please use a domain with a public suffix.'); + } + + // Verify domain with DNS records + $validator = new CNAME($target->get()); + if (!$validator->isValid($domain->get())) { + throw new Exception('Failed to verify domain DNS records.'); + } + } else { + // Main domain validation + // TODO: Would be awesome to check A/AAAA record here. Maybe dry run? + } + } + + /** + * Reads expiry date of certificate from file and decides if renewal is required or not. + * + * @param string $domain Domain for which we check certificate file + * @return bool True, if certificate needs to be renewed + * @throws Exception + */ + private function isRenewRequired(string $domain): bool + { + $certPath = APP_STORAGE_CERTIFICATES . '/' . $domain . '/cert.pem'; + if (\file_exists($certPath)) { + $validTo = null; + + $certData = openssl_x509_parse(file_get_contents($certPath)); + $validTo = $certData['validTo_time_t'] ?? 0; + + if (empty($validTo)) { + throw new Exception('Unable to read certificate file (cert.pem).'); + } + + // LetsEncrypt allows renewal 30 days before expiry + $expiryInAdvance = (60 * 60 * 24 * 30); + if ($validTo - $expiryInAdvance > \time()) { + return false; + } + } + + return true; + } + + /** + * LetsEncrypt communication to issue certificate (using certbot CLI) + * + * @param string $folder Folder into which certificates should be generated + * @param string $domain Domain to generate certificate for + * @return array Named array with keys 'stdout' and 'stderr', both string + * @throws Exception + */ + private function issueCertificate(string $folder, string $domain, string $email): array + { + $stdout = ''; + $stderr = ''; + + $staging = (App::isProduction()) ? '' : ' --dry-run'; + $exit = Console::execute("certbot certonly --webroot --noninteractive --agree-tos{$staging}" + . " --email " . $email + . " --cert-name " . $folder + . " -w " . APP_STORAGE_CERTIFICATES + . " -d {$domain}", '', $stdout, $stderr); + + // Unexpected error, usually 5XX, API limits, ... + if ($exit !== 0) { + throw new Exception('Failed to issue a certificate with message: ' . $stderr); + } + + return [ + 'stdout' => $stdout, + 'stderr' => $stderr + ]; + } + + /** + * Method to take files from Let's Encrypt, and put it into Traefik. + * + * @param string $domain Domain which certificate was generated for + * @param string $folder Folder in which certificates were generated + * @param array $letsEncryptData Let's Encrypt logs to use for additional info when throwing error + * @return void + * @throws Exception + */ + private function applyCertificateFiles(string $folder, string $domain, array $letsEncryptData): void + { + + // Prepare folder in storage for domain + $path = APP_STORAGE_CERTIFICATES . '/' . $domain; + if (!\is_readable($path)) { + if (!\mkdir($path, 0755, true)) { + throw new Exception('Failed to create path for certificate.'); + } + } + + // Move generated files + if (!@\rename('/etc/letsencrypt/live/' . $folder . '/cert.pem', APP_STORAGE_CERTIFICATES . '/' . $domain . '/cert.pem')) { + throw new Exception('Failed to rename certificate cert.pem. Let\'s Encrypt log: ' . $letsEncryptData['stderr'] . ' ; ' . $letsEncryptData['stdout']); + } + + if (!@\rename('/etc/letsencrypt/live/' . $folder . '/chain.pem', APP_STORAGE_CERTIFICATES . '/' . $domain . '/chain.pem')) { + throw new Exception('Failed to rename certificate chain.pem. Let\'s Encrypt log: ' . $letsEncryptData['stderr'] . ' ; ' . $letsEncryptData['stdout']); + } + + if (!@\rename('/etc/letsencrypt/live/' . $folder . '/fullchain.pem', APP_STORAGE_CERTIFICATES . '/' . $domain . '/fullchain.pem')) { + throw new Exception('Failed to rename certificate fullchain.pem. Let\'s Encrypt log: ' . $letsEncryptData['stderr'] . ' ; ' . $letsEncryptData['stdout']); + } + + if (!@\rename('/etc/letsencrypt/live/' . $folder . '/privkey.pem', APP_STORAGE_CERTIFICATES . '/' . $domain . '/privkey.pem')) { + throw new Exception('Failed to rename certificate privkey.pem. Let\'s Encrypt log: ' . $letsEncryptData['stderr'] . ' ; ' . $letsEncryptData['stdout']); + } + + $config = \implode(PHP_EOL, [ + "tls:", + " certificates:", + " - certFile: /storage/certificates/{$domain}/fullchain.pem", + " keyFile: /storage/certificates/{$domain}/privkey.pem" + ]); + + // Save configuration into Traefik using our new cert files + if (!\file_put_contents(APP_STORAGE_CONFIG . '/' . $domain . '.yml', $config)) { + throw new Exception('Failed to save Traefik configuration.'); + } + } + + /** + * Method to make sure information about error is delivered to admnistrator. + * + * @param string $domain Domain that caused the error + * @param string $errorMessage Verbose error message + * @param int $attempt How many times it failed already + * @param Mail $queueForMail + * @return void + * @throws Exception + */ + private function notifyError(string $domain, string $errorMessage, int $attempt, Mail $queueForMail): void + { + // Log error into console + Console::warning('Cannot renew domain (' . $domain . ') on attempt no. ' . $attempt . ' certificate: ' . $errorMessage); + + // Send mail to administrator mail + $locale = new Locale(App::getEnv('_APP_LOCALE', 'en')); + if (!$locale->getText('emails.sender') || !$locale->getText("emails.certificate.hello") || !$locale->getText("emails.certificate.subject") || !$locale->getText("emails.certificate.body") || !$locale->getText("emails.certificate.footer") || !$locale->getText("emails.certificate.thanks") || !$locale->getText("emails.certificate.signature")) { + $locale->setDefault('en'); + } + + $body = Template::fromFile(__DIR__ . '/../../config/locale/templates/email-base.tpl'); + + $subject = \sprintf($locale->getText("emails.certificate.subject"), $domain); + $body->setParam('{{domain}}', $domain); + $body->setParam('{{error}}', $errorMessage); + $body->setParam('{{attempt}}', $attempt); + $body + ->setParam('{{subject}}', $subject) + ->setParam('{{hello}}', $locale->getText("emails.certificate.hello")) + ->setParam('{{body}}', $locale->getText("emails.certificate.body")) + ->setParam('{{redirect}}', 'https://' . $domain) + ->setParam('{{footer}}', $locale->getText("emails.certificate.footer")) + ->setParam('{{thanks}}', $locale->getText("emails.certificate.thanks")) + ->setParam('{{signature}}', $locale->getText("emails.certificate.signature")) + ->setParam('{{project}}', 'Console') + ->setParam('{{direction}}', $locale->getText('settings.direction')) + ->setParam('{{bg-body}}', '#f7f7f7') + ->setParam('{{bg-content}}', '#ffffff') + ->setParam('{{text-content}}', '#000000'); + + $queueForMail + ->setRecipient(App::getEnv('_APP_SYSTEM_SECURITY_EMAIL_ADDRESS')) + ->setBody($body->render()) + ->setName('Appwrite Administrator') + ->trigger(); + } + + /** + * Update all existing domain documents so they have relation to correct certificate document. + * This solved issues: + * - when adding a domain for which there is already a certificate + * - when renew creates new document? It might? + * - overall makes it more reliable + * + * @param string $certificateId ID of a new or updated certificate document + * @param string $domain Domain that is affected by new certificate + * @param Database $dbForConsole Database instance for console + * @return void + * @throws Exception + */ + private function updateDomainDocuments(string $certificateId, string $domain, Database $dbForConsole): void + { + $domains = $dbForConsole->find('domains', [ + Query::equal('domain', [$domain]), + Query::limit(1000), + ]); + + foreach ($domains as $domainDocument) { + $domainDocument->setAttribute('updated', DateTime::now()); + $domainDocument->setAttribute('certificateId', $certificateId); + $dbForConsole->updateDocument('domains', $domainDocument->getId(), $domainDocument); + + if ($domainDocument->getAttribute('projectId')) { + $dbForConsole->deleteCachedDocument('projects', $domainDocument->getAttribute('projectId')); + } + } + } + + /** + * Save certificate data into database. + * + * @param string $domain Domain name that certificate is for + * @param Document $certificate Certificate document that we need to save + * @param Database $dbForConsole Database connection for console + * @return void + * @throws Exception|\Throwable + */ + private function saveCertificateDocument(string $domain, Document $certificate, Database $dbForConsole): void + { + // Check if update or insert required + $certificateDocument = $dbForConsole->findOne('certificates', [Query::equal('domain', [$domain])]); + if (!empty($certificateDocument) && !$certificateDocument->isEmpty()) { + // Merge new data with current data + $certificate = new Document(\array_merge($certificateDocument->getArrayCopy(), $certificate->getArrayCopy())); + $certificate = $dbForConsole->updateDocument('certificates', $certificate->getId(), $certificate); + } else { + $certificate = $dbForConsole->createDocument('certificates', $certificate); + } + + $certificateId = $certificate->getId(); + $this->updateDomainDocuments($certificateId, $domain, $dbForConsole); + } +} diff --git a/src/Appwrite/Platform/Workers/Databases.php b/src/Appwrite/Platform/Workers/Databases.php new file mode 100644 index 000000000..dfa22988f --- /dev/null +++ b/src/Appwrite/Platform/Workers/Databases.php @@ -0,0 +1,321 @@ +desc('Databases worker') + ->inject('message') + ->inject('dbForConsole') + ->inject('dbForProject') + ->callback(fn($message, $dbForConsole, $dbForProject) => $this->action($message, $dbForConsole, $dbForProject)); + } + + /** + * @throws Exception + */ + public function action(Message $message, Database $dbForConsole, Database $dbForProject): void + { + $payload = $message->getPayload() ?? []; + + if (empty($payload)) { + throw new Exception('Missing payload'); + } + + $type = $payload['type']; + $project = new Document($payload['project']); + $collection = new Document($payload['collection'] ?? []); + $document = new Document($payload['document'] ?? []); + $database = new Document($payload['database'] ?? []); + + if ($collection->isEmpty()) { + throw new Exception('Missing collection'); + } + + if ($document->isEmpty()) { + throw new Exception('Missing document'); + } + + match (strval($type)) { + DATABASE_TYPE_CREATE_ATTRIBUTE => $this->createAttribute($database, $collection, $document, $project, $dbForProject), + DATABASE_TYPE_DELETE_ATTRIBUTE => $this->deleteAttribute($database, $collection, $document, $project, $dbForConsole, $dbForProject), + DATABASE_TYPE_CREATE_INDEX => $this->createIndex($database, $collection, $document, $project, $dbForProject), + DATABASE_TYPE_DELETE_INDEX => $this->deleteIndex($database, $collection, $document, $project, $dbForProject), + default => Console::error('No database operation for type: ' . $type), + }; + } + + private function createAttribute(Document $database, Document $collection, Document $attribute, Document $project, $dbForProject): void + { + + $events = Event::generateEvents('databases.[databaseId].collections.[collectionId].attributes.[attributeId].update', [ + 'databaseId' => $database->getId(), + 'collectionId' => $collection->getId(), + 'attributeId' => $attribute->getId() + ]); + /** + * Fetch attribute from the database, since with Resque float values are loosing informations. + */ + $attribute = $dbForProject->getDocument('attributes', $attribute->getId()); + $collectionId = $collection->getId(); + $key = $attribute->getAttribute('key', ''); + $type = $attribute->getAttribute('type', ''); + $size = $attribute->getAttribute('size', 0); + $required = $attribute->getAttribute('required', false); + $default = $attribute->getAttribute('default', null); + $signed = $attribute->getAttribute('signed', true); + $array = $attribute->getAttribute('array', false); + $format = $attribute->getAttribute('format', ''); + $formatOptions = $attribute->getAttribute('formatOptions', []); + $filters = $attribute->getAttribute('filters', []); + + try { + if (!$dbForProject->createAttribute('database_' . $database->getInternalId() . '_collection_' . $collection->getInternalId(), $key, $type, $size, $required, $default, $signed, $array, $format, $formatOptions, $filters)) { + throw new Exception('Failed to create Attribute'); + } + $dbForProject->updateDocument('attributes', $attribute->getId(), $attribute->setAttribute('status', 'available')); + } catch (\Throwable $th) { + Console::error($th->getMessage()); + $dbForProject->updateDocument('attributes', $attribute->getId(), $attribute->setAttribute('status', 'failed')); + } finally { + $target = Realtime::fromPayload( + // Pass first, most verbose event pattern + event: $events[0], + payload: $attribute, + project: $project, + ); + + Realtime::send( + projectId: 'console', + payload: $attribute->getArrayCopy(), + events: $events, + channels: $target['channels'], + roles: $target['roles'], + options: [ + 'projectId' => $project->getId(), + 'databaseId' => $database->getId(), + 'collectionId' => $collection->getId() + ] + ); + } + + $dbForProject->deleteCachedDocument('database_' . $database->getInternalId(), $collectionId); + } + + /** + * @throws Authorization + */ + private function deleteAttribute(Document $database, Document $collection, Document $attribute, Document $project, Database $dbForConsole, Database $dbForProject) + { + $events = Event::generateEvents('databases.[databaseId].collections.[collectionId].attributes.[attributeId].delete', [ + 'databaseId' => $database->getId(), + 'collectionId' => $collection->getId(), + 'attributeId' => $attribute->getId() + ]); + + $collectionId = $collection->getId(); + $key = $attribute->getAttribute('key', ''); + $status = $attribute->getAttribute('status', ''); + + // possible states at this point: + // - available: should not land in queue; controller flips these to 'deleting' + // - processing: hasn't finished creating + // - deleting: was available, in deletion queue for first time + // - failed: attribute was never created + // - stuck: attribute was available but cannot be removed + try { + if ($status !== 'failed' && !$dbForProject->deleteAttribute('database_' . $database->getInternalId() . '_collection_' . $collection->getInternalId(), $key)) { + throw new Exception('Failed to delete Attribute'); + } + $dbForProject->deleteDocument('attributes', $attribute->getId()); + } catch (\Throwable $th) { + Console::error($th->getMessage()); + $dbForProject->updateDocument('attributes', $attribute->getId(), $attribute->setAttribute('status', 'stuck')); + } finally { + $target = Realtime::fromPayload( + // Pass first, most verbose event pattern + event: $events[0], + payload: $attribute, + project: $project + ); + + Realtime::send( + projectId: 'console', + payload: $attribute->getArrayCopy(), + events: $events, + channels: $target['channels'], + roles: $target['roles'], + options: [ + 'projectId' => $project->getId(), + 'databaseId' => $database->getId(), + 'collectionId' => $collection->getId() + ] + ); + } + + // The underlying database removes/rebuilds indexes when attribute is removed + // Update indexes table with changes + /** @var Document[] $indexes */ + $indexes = $collection->getAttribute('indexes', []); + + foreach ($indexes as $index) { + /** @var string[] $attributes */ + $attributes = $index->getAttribute('attributes'); + $lengths = $index->getAttribute('lengths'); + $orders = $index->getAttribute('orders'); + + $found = \array_search($key, $attributes); + + if ($found !== false) { + // If found, remove entry from attributes, lengths, and orders + // array_values wraps array_diff to reindex array keys + // when found attribute is removed from array + $attributes = \array_values(\array_diff($attributes, [$attributes[$found]])); + $lengths = \array_values(\array_diff($lengths, [$lengths[$found]])); + $orders = \array_values(\array_diff($orders, [$orders[$found]])); + + if (empty($attributes)) { + $dbForProject->deleteDocument('indexes', $index->getId()); + } else { + $index + ->setAttribute('attributes', $attributes, Document::SET_TYPE_ASSIGN) + ->setAttribute('lengths', $lengths, Document::SET_TYPE_ASSIGN) + ->setAttribute('orders', $orders, Document::SET_TYPE_ASSIGN); + + // Check if an index exists with the same attributes and orders + $exists = false; + foreach ($indexes as $existing) { + if ( + $existing->getAttribute('key') !== $index->getAttribute('key') // Ignore itself + && $existing->getAttribute('attributes') === $index->getAttribute('attributes') + && $existing->getAttribute('orders') === $index->getAttribute('orders') + ) { + $exists = true; + break; + } + } + + if ($exists) { // Delete the duplicate if created, else update in db + $this->deleteIndex($database, $collection, $index, $project, $dbForConsole); + } else { + $dbForProject->updateDocument('indexes', $index->getId(), $index); + } + } + } + } + + $dbForProject->deleteCachedDocument('database_' . $database->getInternalId(), $collectionId); + $dbForProject->deleteCachedCollection('database_' . $database->getInternalId() . '_collection_' . $collection->getInternalId()); + } + + private function createIndex(Document $database, Document $collection, Document $index, Document $project, Database $dbForProject) + { + $events = Event::generateEvents('databases.[databaseId].collections.[collectionId].indexes.[indexId].update', [ + 'databaseId' => $database->getId(), + 'collectionId' => $collection->getId(), + 'indexId' => $index->getId() + ]); + $collectionId = $collection->getId(); + $key = $index->getAttribute('key', ''); + $type = $index->getAttribute('type', ''); + $attributes = $index->getAttribute('attributes', []); + $lengths = $index->getAttribute('lengths', []); + $orders = $index->getAttribute('orders', []); + + try { + if (!$dbForProject->createIndex('database_' . $database->getInternalId() . '_collection_' . $collection->getInternalId(), $key, $type, $attributes, $lengths, $orders)) { + throw new Exception('Failed to create Index'); + } + $dbForProject->updateDocument('indexes', $index->getId(), $index->setAttribute('status', 'available')); + } catch (\Throwable $th) { + Console::error($th->getMessage()); + $dbForProject->updateDocument('indexes', $index->getId(), $index->setAttribute('status', 'failed')); + } finally { + $target = Realtime::fromPayload( + // Pass first, most verbose event pattern + event: $events[0], + payload: $index, + project: $project + ); + + Realtime::send( + projectId: 'console', + payload: $index->getArrayCopy(), + events: $events, + channels: $target['channels'], + roles: $target['roles'], + options: [ + 'projectId' => $project->getId(), + 'databaseId' => $database->getId(), + 'collectionId' => $collection->getId() + ] + ); + } + + $dbForProject->deleteCachedDocument('database_' . $database->getInternalId(), $collectionId); + } + + private function deleteIndex(Document $database, Document $collection, Document $index, Document $project, Database $dbForProject) + { + $events = Event::generateEvents('databases.[databaseId].collections.[collectionId].indexes.[indexId].delete', [ + 'databaseId' => $database->getId(), + 'collectionId' => $collection->getId(), + 'indexId' => $index->getId() + ]); + $key = $index->getAttribute('key'); + $status = $index->getAttribute('status', ''); + + try { + if ($status !== 'failed' && !$dbForProject->deleteIndex('database_' . $database->getInternalId() . '_collection_' . $collection->getInternalId(), $key)) { + throw new Exception('Failed to delete index'); + } + $dbForProject->deleteDocument('indexes', $index->getId()); + } catch (\Throwable $th) { + Console::error($th->getMessage()); + $dbForProject->updateDocument('indexes', $index->getId(), $index->setAttribute('status', 'stuck')); + } finally { + $target = Realtime::fromPayload( + // Pass first, most verbose event pattern + event: $events[0], + payload: $index, + project: $project + ); + + Realtime::send( + projectId: 'console', + payload: $index->getArrayCopy(), + events: $events, + channels: $target['channels'], + roles: $target['roles'], + options: [ + 'projectId' => $project->getId(), + 'databaseId' => $database->getId(), + 'collectionId' => $collection->getId() + ] + ); + } + + $dbForProject->deleteCachedDocument('database_' . $database->getInternalId(), $collection->getId()); + } +} diff --git a/src/Appwrite/Platform/Workers/Usage.php b/src/Appwrite/Platform/Workers/Usage.php new file mode 100644 index 000000000..7546cdf9b --- /dev/null +++ b/src/Appwrite/Platform/Workers/Usage.php @@ -0,0 +1,233 @@ + 'Y-m-d H:00', + '1d' => 'Y-m-d 00:00', + 'inf' => '0000-00-00 00:00' + ]; + + + const INFINITY_PERIOD = '_inf_'; + + + public static function getName(): string + { + return 'usage'; + } + + /** + * @throws Exception + */ + public function __construct() + { + $this + ->desc('Usage worker') + ->inject('message') + ->inject('pools') + ->inject('cache') + ->callback(function ($message, $pools, $cache) use (&$stats) { + $this->action($message, $pools, $cache); + }); + } + + /** + * @throws Exception + */ + public function action(Message $message, $pools, $cache): void + { + $payload = $message->getPayload() ?? []; + + if (empty($payload)) { + throw new Exception('Missing payload'); + } + + $payload = $message->getPayload() ?? []; + $project = new Document($payload['project'] ?? []); + $projectId = $project->getInternalId(); + foreach ($payload['reduce'] ?? [] as $document) { + if (empty($document)) { + continue; + } + + $this->reduce( + database: $project->getAttribute('database'), + projectInternalId: $project->getInternalId(), + document: new Document($document), + metrics: $payload['metrics'], + pools: $pools, + cache: $cache + ); + } + + $stats[$projectId]['database'] = $project->getAttribute('database'); + foreach ($payload['metrics'] ?? [] as $metric) { + if (!isset($stats[$projectId]['keys'][$metric['key']])) { + $stats[$projectId]['keys'][$metric['key']] = $metric['value']; + continue; + } + $stats[$projectId]['keys'][$metric['key']] += $metric['value']; + } + } + + +/** +* On Documents that tied by relations like functions>deployments>build || documents>collection>database || buckets>files. +* When we remove a parent document we need to deduct his children aggregation from the project scope. +*/ + private function reduce($database, $projectInternalId, Document $document, array &$metrics, $pools, $cache) + { + try { + $dbForProject = new Database( + $pools + ->get($database) + ->pop() + ->getResource(), + $cache + ); + + $dbForProject->setNamespace('_' . $projectInternalId); + + switch (true) { + case $document->getCollection() === 'users': // users + $sessions = count($document->getAttribute(METRIC_SESSIONS, 0)); + if (!empty($sessions)) { + $metrics[] = [ + 'key' => METRIC_SESSIONS, + 'value' => ($sessions * -1), + ]; + } + break; + case $document->getCollection() === 'databases': // databases + $collections = $dbForProject->getDocument('stats', md5(self::INFINITY_PERIOD . str_replace('{databaseInternalId}', $document->getInternalId(), METRIC_DATABASE_ID_COLLECTIONS))); + $documents = $dbForProject->getDocument('stats', md5(self::INFINITY_PERIOD . str_replace('{databaseInternalId}', $document->getInternalId(), METRIC_DATABASE_ID_DOCUMENTS))); + if (!empty($collections['value'])) { + $metrics[] = [ + 'key' => METRIC_COLLECTIONS, + 'value' => ($collections['value'] * -1), + ]; + } + + if (!empty($documents['value'])) { + $metrics[] = [ + 'key' => METRIC_DOCUMENTS, + 'value' => ($documents['value'] * -1), + ]; + } + break; + case str_starts_with($document->getCollection(), 'database_') && !str_contains($document->getCollection(), 'collection'): //collections + $parts = explode('_', $document->getCollection()); + $databaseInternalId = $parts[1] ?? 0; + $documents = $dbForProject->getDocument('stats', md5(self::INFINITY_PERIOD . str_replace(['{databaseInternalId}', '{collectionInternalId}'], [$databaseInternalId, $document->getInternalId()], METRIC_DATABASE_ID_COLLECTION_ID_DOCUMENTS))); + + if (!empty($documents['value'])) { + $metrics[] = [ + 'key' => METRIC_DOCUMENTS, + 'value' => ($documents['value'] * -1), + ]; + $metrics[] = [ + 'key' => str_replace('{databaseInternalId}', $databaseInternalId, METRIC_DATABASE_ID_DOCUMENTS), + 'value' => ($documents['value'] * -1), + ]; + } + break; + + case $document->getCollection() === 'buckets': + $files = $dbForProject->getDocument('stats', md5(self::INFINITY_PERIOD . str_replace('{bucketInternalId}', $document->getInternalId(), METRIC_BUCKET_ID_FILES))); + $storage = $dbForProject->getDocument('stats', md5(self::INFINITY_PERIOD . str_replace('{bucketInternalId}', $document->getInternalId(), METRIC_BUCKET_ID_FILES_STORAGE))); + + if (!empty($files['value'])) { + $metrics[] = [ + 'key' => METRIC_FILES, + 'value' => ($files['value'] * -1), + ]; + } + + if (!empty($storage['value'])) { + $metrics[] = [ + 'key' => METRIC_FILES_STORAGE, + 'value' => ($storage['value'] * -1), + ]; + } + break; + + case $document->getCollection() === 'functions': + $deployments = $dbForProject->getDocument('stats', md5(self::INFINITY_PERIOD . str_replace(['{resourceType}', '{resourceInternalId}'], ['functions', $document->getInternalId()], METRIC_FUNCTION_ID_DEPLOYMENTS))); + $deploymentsStorage = $dbForProject->getDocument('stats', md5(self::INFINITY_PERIOD . str_replace(['{resourceType}', '{resourceInternalId}'], ['functions', $document->getInternalId()], METRIC_FUNCTION_ID_DEPLOYMENTS_STORAGE))); + $builds = $dbForProject->getDocument('stats', md5(self::INFINITY_PERIOD . str_replace('{functionInternalId}', $document->getInternalId(), METRIC_FUNCTION_ID_BUILDS))); + $buildsStorage = $dbForProject->getDocument('stats', md5(self::INFINITY_PERIOD . str_replace('{functionInternalId}', $document->getInternalId(), METRIC_FUNCTION_ID_BUILDS_STORAGE))); + $buildsCompute = $dbForProject->getDocument('stats', md5(self::INFINITY_PERIOD . str_replace('{functionInternalId}', $document->getInternalId(), METRIC_FUNCTION_ID_BUILDS_COMPUTE))); + $executions = $dbForProject->getDocument('stats', md5(self::INFINITY_PERIOD . str_replace('{functionInternalId}', $document->getInternalId(), METRIC_FUNCTION_ID_EXECUTIONS))); + $executionsCompute = $dbForProject->getDocument('stats', md5(self::INFINITY_PERIOD . str_replace('{functionInternalId}', $document->getInternalId(), METRIC_FUNCTION_ID_EXECUTIONS_COMPUTE))); + + if (!empty($deployments['value'])) { + $metrics[] = [ + 'key' => METRIC_DEPLOYMENTS, + 'value' => ($deployments['value'] * -1), + ]; + } + + if (!empty($deploymentsStorage['value'])) { + $metrics[] = [ + 'key' => METRIC_DEPLOYMENTS_STORAGE, + 'value' => ($deploymentsStorage['value'] * -1), + ]; + } + + if (!empty($builds['value'])) { + $metrics[] = [ + 'key' => METRIC_BUILDS, + 'value' => ($builds['value'] * -1), + ]; + } + + if (!empty($buildsStorage['value'])) { + $metrics[] = [ + 'key' => METRIC_BUILDS_STORAGE, + 'value' => ($buildsStorage['value'] * -1), + ]; + } + + if (!empty($buildsCompute['value'])) { + $metrics[] = [ + 'key' => METRIC_BUILDS_COMPUTE, + 'value' => ($buildsCompute['value'] * -1), + ]; + } + + if (!empty($executions['value'])) { + $metrics[] = [ + 'key' => METRIC_EXECUTIONS, + 'value' => ($executions['value'] * -1), + ]; + } + + if (!empty($executionsCompute['value'])) { + $metrics[] = [ + 'key' => METRIC_EXECUTIONS_COMPUTE, + 'value' => ($executionsCompute['value'] * -1), + ]; + } + break; + default: + break; + } + } catch (\Exception $e) { + console::error("[reducer] " . " {DateTime::now()} " . " {$projectInternalId} " . " {$e->getMessage()}"); + } catch (\Throwable $e) { + } finally { + $pools->reclaim(); + } + } +} diff --git a/src/Appwrite/Platform/Workers/Webhooks.php b/src/Appwrite/Platform/Workers/Webhooks.php index c9622dc9f..fb40ffb8f 100644 --- a/src/Appwrite/Platform/Workers/Webhooks.php +++ b/src/Appwrite/Platform/Workers/Webhooks.php @@ -17,6 +17,9 @@ class Webhooks extends Action return 'webhooks'; } + /** + * @throws Exception + */ public function __construct() { $this @@ -34,26 +37,27 @@ class Webhooks extends Action if (empty($payload)) { throw new Exception('Missing payload'); + } - $events = $payload['events']; - $webhookPayload = json_encode($payload['payload']); - $project = new Document($payload['project']); - $user = new Document($payload['user'] ?? []); + $events = $payload['events']; + $webhookPayload = json_encode($payload['payload']); + $project = new Document($payload['project']); + $user = new Document($payload['user'] ?? []); - foreach ($project->getAttribute('webhooks', []) as $webhook) { - if (array_intersect($webhook->getAttribute('events', []), $events)) { + foreach ($project->getAttribute('webhooks', []) as $webhook) { + if (array_intersect($webhook->getAttribute('events', []), $events)) { $this->execute($events, $webhookPayload, $webhook, $user, $project); - } } + } - if (!empty($errors)) { - throw new Exception(\implode(" / \n\n", $errors)); - } + if (!empty($errors)) { + throw new Exception(\implode(" / \n\n", $errors)); + } $this->errors = []; - } } + private function execute(array $events, string $payload, Document $webhook, Document $user, Document $project): void { $url = \rawurldecode($webhook->getAttribute('url'));