databases worker
This commit is contained in:
parent
0d2987620c
commit
d820d93ac7
|
@ -54,7 +54,7 @@ use MaxMind\Db\Reader;
|
||||||
* @return Document Newly created attribute document
|
* @return Document Newly created attribute document
|
||||||
* @throws Exception
|
* @throws Exception
|
||||||
*/
|
*/
|
||||||
function createAttribute(string $databaseId, string $collectionId, Document $attribute, Response $response, Database $dbForProject, EventDatabase $database, Event $queueForEvents): Document
|
function createAttribute(string $databaseId, string $collectionId, Document $attribute, Response $response, Database $dbForProject, EventDatabase $queueForDatabase, Event $queueForEvents): Document
|
||||||
{
|
{
|
||||||
$key = $attribute->getAttribute('key');
|
$key = $attribute->getAttribute('key');
|
||||||
$type = $attribute->getAttribute('type', '');
|
$type = $attribute->getAttribute('type', '');
|
||||||
|
@ -125,7 +125,7 @@ function createAttribute(string $databaseId, string $collectionId, Document $att
|
||||||
$dbForProject->deleteCachedDocument('database_' . $db->getInternalId(), $collectionId);
|
$dbForProject->deleteCachedDocument('database_' . $db->getInternalId(), $collectionId);
|
||||||
$dbForProject->deleteCachedCollection('database_' . $db->getInternalId() . '_collection_' . $collection->getInternalId());
|
$dbForProject->deleteCachedCollection('database_' . $db->getInternalId() . '_collection_' . $collection->getInternalId());
|
||||||
|
|
||||||
$database
|
$queueForDatabase
|
||||||
->setType(DATABASE_TYPE_CREATE_ATTRIBUTE)
|
->setType(DATABASE_TYPE_CREATE_ATTRIBUTE)
|
||||||
->setDatabase($db)
|
->setDatabase($db)
|
||||||
->setCollection($collection)
|
->setCollection($collection)
|
||||||
|
|
|
@ -172,27 +172,6 @@ function getCache(): Cache
|
||||||
return new Cache(new Sharding($adapters));
|
return new Cache(new Sharding($adapters));
|
||||||
}
|
}
|
||||||
|
|
||||||
Server::setResource('getProjectDB', function (Registry $register, Database $dbForConsole) {
|
|
||||||
return function (Document $project) use ($register, $dbForConsole) {
|
|
||||||
/** @var Group $pools */
|
|
||||||
$pools = $register->get('pools');
|
|
||||||
|
|
||||||
if ($project->isEmpty() || $project->getId() === 'console') {
|
|
||||||
return $dbForConsole;
|
|
||||||
}
|
|
||||||
|
|
||||||
$dbAdapter = $pools
|
|
||||||
->get($project->getAttribute('database'))
|
|
||||||
->pop()
|
|
||||||
->getResource();
|
|
||||||
|
|
||||||
$database = new Database($dbAdapter, getCache());
|
|
||||||
$database->setNamespace('_' . $project->getInternalId());
|
|
||||||
|
|
||||||
return $database;
|
|
||||||
};
|
|
||||||
}, ['register']);
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get Functions Storage Device
|
* Get Functions Storage Device
|
||||||
* @param string $projectId of the project
|
* @param string $projectId of the project
|
||||||
|
@ -234,7 +213,7 @@ try {
|
||||||
$platform->init(Service::TYPE_WORKER, [
|
$platform->init(Service::TYPE_WORKER, [
|
||||||
'workersNum' => swoole_cpu_num() * intval(App::getEnv('_APP_WORKER_PER_CORE', 6)),
|
'workersNum' => swoole_cpu_num() * intval(App::getEnv('_APP_WORKER_PER_CORE', 6)),
|
||||||
'connection' => $pools->get('queue')->pop()->getResource(),
|
'connection' => $pools->get('queue')->pop()->getResource(),
|
||||||
'workerName' => $workerName ?? null,
|
'workerName' => strtolower($workerName) ?? null,
|
||||||
]);
|
]);
|
||||||
} catch (\Exception $e) {
|
} catch (\Exception $e) {
|
||||||
Console::error($e->getMessage() . ', File: ' . $e->getFile() . ', Line: ' . $e->getLine());
|
Console::error($e->getMessage() . ', File: ' . $e->getFile() . ', Line: ' . $e->getLine());
|
||||||
|
@ -255,17 +234,13 @@ $worker
|
||||||
->inject('logger')
|
->inject('logger')
|
||||||
->action(function (Throwable $error, Logger|null $logger) {
|
->action(function (Throwable $error, Logger|null $logger) {
|
||||||
|
|
||||||
if ($logger === null) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
$version = App::getEnv('_APP_VERSION', 'UNKNOWN');
|
$version = App::getEnv('_APP_VERSION', 'UNKNOWN');
|
||||||
|
|
||||||
if ($error instanceof PDOException) {
|
if ($error instanceof PDOException) {
|
||||||
throw $error;
|
throw $error;
|
||||||
}
|
}
|
||||||
|
|
||||||
if ($error->getCode() >= 500 || $error->getCode() === 0) {
|
if (($error->getCode() >= 500 || $error->getCode() === 0) && !empty($logger)) {
|
||||||
$log = new Log();
|
$log = new Log();
|
||||||
|
|
||||||
$log->setNamespace("appwrite-worker");
|
$log->setNamespace("appwrite-worker");
|
||||||
|
@ -294,5 +269,8 @@ $worker
|
||||||
Console::error('[Error] Line: ' . $error->getLine());
|
Console::error('[Error] Line: ' . $error->getLine());
|
||||||
});
|
});
|
||||||
|
|
||||||
$worker->workerStart();
|
$worker->workerStart()
|
||||||
|
->action(function () use ($workerName) {
|
||||||
|
Console::info("Worker $workerName started");
|
||||||
|
});
|
||||||
$worker->start();
|
$worker->start();
|
||||||
|
|
|
@ -1,10 +1,3 @@
|
||||||
#!/bin/sh
|
#!/bin/sh
|
||||||
|
|
||||||
if [ -z "$_APP_REDIS_USER" ] && [ -z "$_APP_REDIS_PASS" ]
|
php /usr/src/code/app/worker.php certificates $@
|
||||||
then
|
|
||||||
REDIS_BACKEND="${_APP_REDIS_HOST}:${_APP_REDIS_PORT}"
|
|
||||||
else
|
|
||||||
REDIS_BACKEND="redis://${_APP_REDIS_USER}:${_APP_REDIS_PASS}@${_APP_REDIS_HOST}:${_APP_REDIS_PORT}"
|
|
||||||
fi
|
|
||||||
|
|
||||||
INTERVAL=1 QUEUE='v1-certificates' APP_INCLUDE='/usr/src/code/app/workers/certificates.php' php /usr/src/code/vendor/bin/resque -dopcache.preload=opcache.preload=/usr/src/code/app/preload.php
|
|
|
@ -1,10 +1,3 @@
|
||||||
#!/bin/sh
|
#!/bin/sh
|
||||||
|
|
||||||
if [ -z "$_APP_REDIS_USER" ] && [ -z "$_APP_REDIS_PASS" ]
|
php /usr/src/code/app/worker.php databases $@
|
||||||
then
|
|
||||||
REDIS_BACKEND="${_APP_REDIS_HOST}:${_APP_REDIS_PORT}"
|
|
||||||
else
|
|
||||||
REDIS_BACKEND="redis://${_APP_REDIS_USER}:${_APP_REDIS_PASS}@${_APP_REDIS_HOST}:${_APP_REDIS_PORT}"
|
|
||||||
fi
|
|
||||||
|
|
||||||
INTERVAL=0.1 QUEUE='v1-database' APP_INCLUDE='/usr/src/code/app/workers/databases.php' php /usr/src/code/vendor/bin/resque -dopcache.preload=opcache.preload=/usr/src/code/app/preload.php
|
|
|
@ -9,7 +9,7 @@ use Utopia\Queue\Connection;
|
||||||
|
|
||||||
class Event
|
class Event
|
||||||
{
|
{
|
||||||
public const DATABASE_QUEUE_NAME = 'v1-database';
|
public const DATABASE_QUEUE_NAME = 'v1-databases';
|
||||||
public const DATABASE_CLASS_NAME = 'DatabaseV1';
|
public const DATABASE_CLASS_NAME = 'DatabaseV1';
|
||||||
|
|
||||||
public const DELETE_QUEUE_NAME = 'v1-deletes';
|
public const DELETE_QUEUE_NAME = 'v1-deletes';
|
||||||
|
|
|
@ -7,6 +7,9 @@ use Appwrite\Platform\Workers\Audits;
|
||||||
use Appwrite\Platform\Workers\Webhooks;
|
use Appwrite\Platform\Workers\Webhooks;
|
||||||
use Appwrite\Platform\Workers\Mails;
|
use Appwrite\Platform\Workers\Mails;
|
||||||
use Appwrite\Platform\Workers\Messaging;
|
use Appwrite\Platform\Workers\Messaging;
|
||||||
|
use Appwrite\Platform\Workers\Certificates;
|
||||||
|
use Appwrite\Platform\Workers\Databases;
|
||||||
|
use Appwrite\Platform\Workers\Usage;
|
||||||
|
|
||||||
class Workers extends Service
|
class Workers extends Service
|
||||||
{
|
{
|
||||||
|
@ -18,6 +21,9 @@ class Workers extends Service
|
||||||
->addAction(Webhooks::getName(), new Webhooks())
|
->addAction(Webhooks::getName(), new Webhooks())
|
||||||
->addAction(Mails::getName(), new Mails())
|
->addAction(Mails::getName(), new Mails())
|
||||||
->addAction(Messaging::getName(), new Messaging())
|
->addAction(Messaging::getName(), new Messaging())
|
||||||
|
->addAction(Certificates::getName(), new Certificates())
|
||||||
|
->addAction(Databases::getName(), new Databases())
|
||||||
|
//->addAction(Usage::getName(), new Usage())
|
||||||
;
|
;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
440
src/Appwrite/Platform/Workers/Certificates.php
Normal file
440
src/Appwrite/Platform/Workers/Certificates.php
Normal file
|
@ -0,0 +1,440 @@
|
||||||
|
<?php
|
||||||
|
|
||||||
|
namespace Appwrite\Platform\Workers;
|
||||||
|
|
||||||
|
use Appwrite\Event\Mail;
|
||||||
|
use Appwrite\Network\Validator\CNAME;
|
||||||
|
use Appwrite\Template\Template;
|
||||||
|
use Exception;
|
||||||
|
use Throwable;
|
||||||
|
use Utopia\App;
|
||||||
|
use Utopia\CLI\Console;
|
||||||
|
use Utopia\Database\Database;
|
||||||
|
use Utopia\Database\DateTime;
|
||||||
|
use Utopia\Database\Document;
|
||||||
|
use Utopia\Database\Helpers\ID;
|
||||||
|
use Utopia\Database\Query;
|
||||||
|
use Utopia\Domains\Domain;
|
||||||
|
use Utopia\Locale\Locale;
|
||||||
|
use Utopia\Platform\Action;
|
||||||
|
use Utopia\Queue\Message;
|
||||||
|
|
||||||
|
class Certificates extends Action
|
||||||
|
{
|
||||||
|
public static function getName(): string
|
||||||
|
{
|
||||||
|
return 'certificates';
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
public function __construct()
|
||||||
|
{
|
||||||
|
$this
|
||||||
|
->desc('Certificates worker')
|
||||||
|
->inject('message')
|
||||||
|
->inject('dbForConsole')
|
||||||
|
->inject('queueForMail')
|
||||||
|
->callback(fn($message, $dbForConsole, $queueForMail) => $this->action($message, $dbForConsole, $queueForMail));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @throws Exception|Throwable
|
||||||
|
*/
|
||||||
|
public function action(Message $message, Database $dbForConsole, Mail $queueForMail): void
|
||||||
|
{
|
||||||
|
$payload = $message->getPayload() ?? [];
|
||||||
|
|
||||||
|
if (empty($payload)) {
|
||||||
|
throw new Exception('Missing payload');
|
||||||
|
}
|
||||||
|
|
||||||
|
$document = new Document($payload['domain'] ?? []);
|
||||||
|
$domain = new Domain($document->getAttribute('domain', ''));
|
||||||
|
$skipRenewCheck = $payload['skipRenewCheck'] ?? false;
|
||||||
|
|
||||||
|
$this->execute($domain, $dbForConsole, $queueForMail, $skipRenewCheck);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @throws Exception|Throwable
|
||||||
|
*/
|
||||||
|
private function execute(Domain $domain, Database $dbForConsole, Mail $queueForMail, bool $skipRenewCheck = false): void
|
||||||
|
{
|
||||||
|
/**
|
||||||
|
* 1. Read arguments and validate domain
|
||||||
|
* 2. Get main domain
|
||||||
|
* 3. Validate CNAME DNS if parameter is not main domain (meaning it's custom domain)
|
||||||
|
* 4. Validate security email. Cannot be empty, required by LetsEncrypt
|
||||||
|
* 5. Validate renew date with certificate file, unless requested to skip by parameter
|
||||||
|
* 6. Issue a certificate using certbot CLI
|
||||||
|
* 7. Update 'log' attribute on certificate document with Certbot message
|
||||||
|
* 8. Create storage folder for certificate, if not ready already
|
||||||
|
* 9. Move certificates from Certbot location to our Storage
|
||||||
|
* 10. Create/Update our Storage with new Traefik config with new certificate paths
|
||||||
|
* 11. Read certificate file and update 'renewDate' on certificate document
|
||||||
|
* 12. Update 'issueDate' and 'attempts' on certificate
|
||||||
|
*
|
||||||
|
* If at any point unexpected error occurs, program stops without applying changes to document, and error is thrown into worker
|
||||||
|
*
|
||||||
|
* If code stops with expected error:
|
||||||
|
* 1. 'log' attribute on document is updated with error message
|
||||||
|
* 2. 'attempts' amount is increased
|
||||||
|
* 3. Console log is shown
|
||||||
|
* 4. Email is sent to security email
|
||||||
|
*
|
||||||
|
* Unless unexpected error occurs, at the end, we:
|
||||||
|
* 1. Update 'updated' attribute on document
|
||||||
|
* 2. Save document to database
|
||||||
|
* 3. Update all domains documents with current certificate ID
|
||||||
|
*
|
||||||
|
* Note: Renewals are checked and scheduled from maintenence worker
|
||||||
|
*/
|
||||||
|
|
||||||
|
// Get current certificate
|
||||||
|
$certificate = $dbForConsole->findOne('certificates', [Query::equal('domain', [$domain->get()])]);
|
||||||
|
|
||||||
|
// If we don't have certificate for domain yet, let's create new document. At the end we save it
|
||||||
|
if (!$certificate) {
|
||||||
|
$certificate = new Document();
|
||||||
|
$certificate->setAttribute('domain', $domain->get());
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
// Email for alerts is required by LetsEncrypt
|
||||||
|
$email = App::getEnv('_APP_SYSTEM_SECURITY_EMAIL_ADDRESS');
|
||||||
|
if (empty($email)) {
|
||||||
|
throw new Exception('You must set a valid security email address (_APP_SYSTEM_SECURITY_EMAIL_ADDRESS) to issue an SSL certificate.');
|
||||||
|
}
|
||||||
|
|
||||||
|
// Validate domain and DNS records. Skip if job is forced
|
||||||
|
if (!$skipRenewCheck) {
|
||||||
|
$mainDomain = $this->getMainDomain($dbForConsole);
|
||||||
|
$isMainDomain = !isset($mainDomain) || $domain->get() === $mainDomain;
|
||||||
|
$this->validateDomain($domain, $isMainDomain);
|
||||||
|
}
|
||||||
|
|
||||||
|
// If certificate exists already, double-check expiry date. Skip if job is forced
|
||||||
|
if (!$skipRenewCheck && !$this->isRenewRequired($domain->get())) {
|
||||||
|
throw new Exception('Renew isn\'t required.');
|
||||||
|
}
|
||||||
|
|
||||||
|
// Prepare folder name for certbot. Using this helps prevent miss-match in LetsEncrypt configuration when renewing certificate
|
||||||
|
$folder = ID::unique();
|
||||||
|
|
||||||
|
// Generate certificate files using Let's Encrypt
|
||||||
|
$letsEncryptData = $this->issueCertificate($folder, $domain->get(), $email);
|
||||||
|
|
||||||
|
// Command succeeded, store all data into document
|
||||||
|
// We store stderr too, because it may include warnings
|
||||||
|
$certificate->setAttribute('log', \json_encode([
|
||||||
|
'stdout' => $letsEncryptData['stdout'],
|
||||||
|
'stderr' => $letsEncryptData['stderr'],
|
||||||
|
]));
|
||||||
|
|
||||||
|
// Give certificates to Traefik
|
||||||
|
$this->applyCertificateFiles($folder, $domain->get(), $letsEncryptData);
|
||||||
|
|
||||||
|
// Update certificate info stored in database
|
||||||
|
$certificate->setAttribute('renewDate', $this->getRenewDate($domain->get()));
|
||||||
|
$certificate->setAttribute('attempts', 0);
|
||||||
|
$certificate->setAttribute('issueDate', DateTime::now());
|
||||||
|
} catch (Throwable $e) {
|
||||||
|
// Set exception as log in certificate document
|
||||||
|
$certificate->setAttribute('log', $e->getMessage());
|
||||||
|
|
||||||
|
// Increase attempts count
|
||||||
|
$attempts = $certificate->getAttribute('attempts', 0) + 1;
|
||||||
|
$certificate->setAttribute('attempts', $attempts);
|
||||||
|
|
||||||
|
// Store cuttent time as renew date to ensure another attempt in next maintenance cycle
|
||||||
|
$certificate->setAttribute('renewDate', DateTime::now());
|
||||||
|
|
||||||
|
// Send email to security email
|
||||||
|
$this->notifyError($domain->get(), $e->getMessage(), $attempts, $queueForMail);
|
||||||
|
} finally {
|
||||||
|
// All actions result in new updatedAt date
|
||||||
|
$certificate->setAttribute('updated', DateTime::now());
|
||||||
|
|
||||||
|
// Save all changes we made to certificate document into database
|
||||||
|
$this->saveCertificateDocument($domain->get(), $certificate, $dbForConsole);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get main domain. Needed as we do different checks for main and non-main domains.
|
||||||
|
*
|
||||||
|
* @return null|string Returns main domain. If null, there is no main domain yet.
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
private function getMainDomain(Database $dbForConsole): ?string
|
||||||
|
{
|
||||||
|
$envDomain = App::getEnv('_APP_DOMAIN', '');
|
||||||
|
if (!empty($envDomain) && $envDomain !== 'localhost') {
|
||||||
|
return $envDomain;
|
||||||
|
} else {
|
||||||
|
$domainDocument = $dbForConsole->findOne('domains', [Query::orderAsc('_id')]);
|
||||||
|
if ($domainDocument) {
|
||||||
|
return $domainDocument->getAttribute('domain');
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Internal domain validation functionality to prevent unnecessary attempts failed from Let's Encrypt side. We check:
|
||||||
|
* - Domain needs to be public and valid (prevents NFT domains that are not supported by Let's Encrypt)
|
||||||
|
* - Domain must have proper DNS record
|
||||||
|
*
|
||||||
|
* @param Domain $domain Domain which we validate
|
||||||
|
* @param bool $isMainDomain In case of master domain, we look for different DNS configurations
|
||||||
|
* @return void
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
private function validateDomain(Domain $domain, bool $isMainDomain): void
|
||||||
|
{
|
||||||
|
if (empty($domain->get())) {
|
||||||
|
throw new Exception('Missing certificate domain.');
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!$domain->isKnown() || $domain->isTest()) {
|
||||||
|
throw new Exception('Unknown public suffix for domain.');
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!$isMainDomain) {
|
||||||
|
// TODO: Would be awesome to also support A/AAAA records here. Maybe dry run?
|
||||||
|
// Validate if domain target is properly configured
|
||||||
|
$target = new Domain(App::getEnv('_APP_DOMAIN_TARGET', ''));
|
||||||
|
|
||||||
|
if (!$target->isKnown() || $target->isTest()) {
|
||||||
|
throw new Exception('Unreachable CNAME target (' . $target->get() . '), please use a domain with a public suffix.');
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify domain with DNS records
|
||||||
|
$validator = new CNAME($target->get());
|
||||||
|
if (!$validator->isValid($domain->get())) {
|
||||||
|
throw new Exception('Failed to verify domain DNS records.');
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Main domain validation
|
||||||
|
// TODO: Would be awesome to check A/AAAA record here. Maybe dry run?
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Reads expiry date of certificate from file and decides if renewal is required or not.
|
||||||
|
*
|
||||||
|
* @param string $domain Domain for which we check certificate file
|
||||||
|
* @return bool True, if certificate needs to be renewed
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
private function isRenewRequired(string $domain): bool
|
||||||
|
{
|
||||||
|
$certPath = APP_STORAGE_CERTIFICATES . '/' . $domain . '/cert.pem';
|
||||||
|
if (\file_exists($certPath)) {
|
||||||
|
$validTo = null;
|
||||||
|
|
||||||
|
$certData = openssl_x509_parse(file_get_contents($certPath));
|
||||||
|
$validTo = $certData['validTo_time_t'] ?? 0;
|
||||||
|
|
||||||
|
if (empty($validTo)) {
|
||||||
|
throw new Exception('Unable to read certificate file (cert.pem).');
|
||||||
|
}
|
||||||
|
|
||||||
|
// LetsEncrypt allows renewal 30 days before expiry
|
||||||
|
$expiryInAdvance = (60 * 60 * 24 * 30);
|
||||||
|
if ($validTo - $expiryInAdvance > \time()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* LetsEncrypt communication to issue certificate (using certbot CLI)
|
||||||
|
*
|
||||||
|
* @param string $folder Folder into which certificates should be generated
|
||||||
|
* @param string $domain Domain to generate certificate for
|
||||||
|
* @return array Named array with keys 'stdout' and 'stderr', both string
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
private function issueCertificate(string $folder, string $domain, string $email): array
|
||||||
|
{
|
||||||
|
$stdout = '';
|
||||||
|
$stderr = '';
|
||||||
|
|
||||||
|
$staging = (App::isProduction()) ? '' : ' --dry-run';
|
||||||
|
$exit = Console::execute("certbot certonly --webroot --noninteractive --agree-tos{$staging}"
|
||||||
|
. " --email " . $email
|
||||||
|
. " --cert-name " . $folder
|
||||||
|
. " -w " . APP_STORAGE_CERTIFICATES
|
||||||
|
. " -d {$domain}", '', $stdout, $stderr);
|
||||||
|
|
||||||
|
// Unexpected error, usually 5XX, API limits, ...
|
||||||
|
if ($exit !== 0) {
|
||||||
|
throw new Exception('Failed to issue a certificate with message: ' . $stderr);
|
||||||
|
}
|
||||||
|
|
||||||
|
return [
|
||||||
|
'stdout' => $stdout,
|
||||||
|
'stderr' => $stderr
|
||||||
|
];
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Method to take files from Let's Encrypt, and put it into Traefik.
|
||||||
|
*
|
||||||
|
* @param string $domain Domain which certificate was generated for
|
||||||
|
* @param string $folder Folder in which certificates were generated
|
||||||
|
* @param array $letsEncryptData Let's Encrypt logs to use for additional info when throwing error
|
||||||
|
* @return void
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
private function applyCertificateFiles(string $folder, string $domain, array $letsEncryptData): void
|
||||||
|
{
|
||||||
|
|
||||||
|
// Prepare folder in storage for domain
|
||||||
|
$path = APP_STORAGE_CERTIFICATES . '/' . $domain;
|
||||||
|
if (!\is_readable($path)) {
|
||||||
|
if (!\mkdir($path, 0755, true)) {
|
||||||
|
throw new Exception('Failed to create path for certificate.');
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Move generated files
|
||||||
|
if (!@\rename('/etc/letsencrypt/live/' . $folder . '/cert.pem', APP_STORAGE_CERTIFICATES . '/' . $domain . '/cert.pem')) {
|
||||||
|
throw new Exception('Failed to rename certificate cert.pem. Let\'s Encrypt log: ' . $letsEncryptData['stderr'] . ' ; ' . $letsEncryptData['stdout']);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!@\rename('/etc/letsencrypt/live/' . $folder . '/chain.pem', APP_STORAGE_CERTIFICATES . '/' . $domain . '/chain.pem')) {
|
||||||
|
throw new Exception('Failed to rename certificate chain.pem. Let\'s Encrypt log: ' . $letsEncryptData['stderr'] . ' ; ' . $letsEncryptData['stdout']);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!@\rename('/etc/letsencrypt/live/' . $folder . '/fullchain.pem', APP_STORAGE_CERTIFICATES . '/' . $domain . '/fullchain.pem')) {
|
||||||
|
throw new Exception('Failed to rename certificate fullchain.pem. Let\'s Encrypt log: ' . $letsEncryptData['stderr'] . ' ; ' . $letsEncryptData['stdout']);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!@\rename('/etc/letsencrypt/live/' . $folder . '/privkey.pem', APP_STORAGE_CERTIFICATES . '/' . $domain . '/privkey.pem')) {
|
||||||
|
throw new Exception('Failed to rename certificate privkey.pem. Let\'s Encrypt log: ' . $letsEncryptData['stderr'] . ' ; ' . $letsEncryptData['stdout']);
|
||||||
|
}
|
||||||
|
|
||||||
|
$config = \implode(PHP_EOL, [
|
||||||
|
"tls:",
|
||||||
|
" certificates:",
|
||||||
|
" - certFile: /storage/certificates/{$domain}/fullchain.pem",
|
||||||
|
" keyFile: /storage/certificates/{$domain}/privkey.pem"
|
||||||
|
]);
|
||||||
|
|
||||||
|
// Save configuration into Traefik using our new cert files
|
||||||
|
if (!\file_put_contents(APP_STORAGE_CONFIG . '/' . $domain . '.yml', $config)) {
|
||||||
|
throw new Exception('Failed to save Traefik configuration.');
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Method to make sure information about error is delivered to admnistrator.
|
||||||
|
*
|
||||||
|
* @param string $domain Domain that caused the error
|
||||||
|
* @param string $errorMessage Verbose error message
|
||||||
|
* @param int $attempt How many times it failed already
|
||||||
|
* @param Mail $queueForMail
|
||||||
|
* @return void
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
private function notifyError(string $domain, string $errorMessage, int $attempt, Mail $queueForMail): void
|
||||||
|
{
|
||||||
|
// Log error into console
|
||||||
|
Console::warning('Cannot renew domain (' . $domain . ') on attempt no. ' . $attempt . ' certificate: ' . $errorMessage);
|
||||||
|
|
||||||
|
// Send mail to administrator mail
|
||||||
|
$locale = new Locale(App::getEnv('_APP_LOCALE', 'en'));
|
||||||
|
if (!$locale->getText('emails.sender') || !$locale->getText("emails.certificate.hello") || !$locale->getText("emails.certificate.subject") || !$locale->getText("emails.certificate.body") || !$locale->getText("emails.certificate.footer") || !$locale->getText("emails.certificate.thanks") || !$locale->getText("emails.certificate.signature")) {
|
||||||
|
$locale->setDefault('en');
|
||||||
|
}
|
||||||
|
|
||||||
|
$body = Template::fromFile(__DIR__ . '/../../config/locale/templates/email-base.tpl');
|
||||||
|
|
||||||
|
$subject = \sprintf($locale->getText("emails.certificate.subject"), $domain);
|
||||||
|
$body->setParam('{{domain}}', $domain);
|
||||||
|
$body->setParam('{{error}}', $errorMessage);
|
||||||
|
$body->setParam('{{attempt}}', $attempt);
|
||||||
|
$body
|
||||||
|
->setParam('{{subject}}', $subject)
|
||||||
|
->setParam('{{hello}}', $locale->getText("emails.certificate.hello"))
|
||||||
|
->setParam('{{body}}', $locale->getText("emails.certificate.body"))
|
||||||
|
->setParam('{{redirect}}', 'https://' . $domain)
|
||||||
|
->setParam('{{footer}}', $locale->getText("emails.certificate.footer"))
|
||||||
|
->setParam('{{thanks}}', $locale->getText("emails.certificate.thanks"))
|
||||||
|
->setParam('{{signature}}', $locale->getText("emails.certificate.signature"))
|
||||||
|
->setParam('{{project}}', 'Console')
|
||||||
|
->setParam('{{direction}}', $locale->getText('settings.direction'))
|
||||||
|
->setParam('{{bg-body}}', '#f7f7f7')
|
||||||
|
->setParam('{{bg-content}}', '#ffffff')
|
||||||
|
->setParam('{{text-content}}', '#000000');
|
||||||
|
|
||||||
|
$queueForMail
|
||||||
|
->setRecipient(App::getEnv('_APP_SYSTEM_SECURITY_EMAIL_ADDRESS'))
|
||||||
|
->setBody($body->render())
|
||||||
|
->setName('Appwrite Administrator')
|
||||||
|
->trigger();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Update all existing domain documents so they have relation to correct certificate document.
|
||||||
|
* This solved issues:
|
||||||
|
* - when adding a domain for which there is already a certificate
|
||||||
|
* - when renew creates new document? It might?
|
||||||
|
* - overall makes it more reliable
|
||||||
|
*
|
||||||
|
* @param string $certificateId ID of a new or updated certificate document
|
||||||
|
* @param string $domain Domain that is affected by new certificate
|
||||||
|
* @param Database $dbForConsole Database instance for console
|
||||||
|
* @return void
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
private function updateDomainDocuments(string $certificateId, string $domain, Database $dbForConsole): void
|
||||||
|
{
|
||||||
|
$domains = $dbForConsole->find('domains', [
|
||||||
|
Query::equal('domain', [$domain]),
|
||||||
|
Query::limit(1000),
|
||||||
|
]);
|
||||||
|
|
||||||
|
foreach ($domains as $domainDocument) {
|
||||||
|
$domainDocument->setAttribute('updated', DateTime::now());
|
||||||
|
$domainDocument->setAttribute('certificateId', $certificateId);
|
||||||
|
$dbForConsole->updateDocument('domains', $domainDocument->getId(), $domainDocument);
|
||||||
|
|
||||||
|
if ($domainDocument->getAttribute('projectId')) {
|
||||||
|
$dbForConsole->deleteCachedDocument('projects', $domainDocument->getAttribute('projectId'));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Save certificate data into database.
|
||||||
|
*
|
||||||
|
* @param string $domain Domain name that certificate is for
|
||||||
|
* @param Document $certificate Certificate document that we need to save
|
||||||
|
* @param Database $dbForConsole Database connection for console
|
||||||
|
* @return void
|
||||||
|
* @throws Exception|\Throwable
|
||||||
|
*/
|
||||||
|
private function saveCertificateDocument(string $domain, Document $certificate, Database $dbForConsole): void
|
||||||
|
{
|
||||||
|
// Check if update or insert required
|
||||||
|
$certificateDocument = $dbForConsole->findOne('certificates', [Query::equal('domain', [$domain])]);
|
||||||
|
if (!empty($certificateDocument) && !$certificateDocument->isEmpty()) {
|
||||||
|
// Merge new data with current data
|
||||||
|
$certificate = new Document(\array_merge($certificateDocument->getArrayCopy(), $certificate->getArrayCopy()));
|
||||||
|
$certificate = $dbForConsole->updateDocument('certificates', $certificate->getId(), $certificate);
|
||||||
|
} else {
|
||||||
|
$certificate = $dbForConsole->createDocument('certificates', $certificate);
|
||||||
|
}
|
||||||
|
|
||||||
|
$certificateId = $certificate->getId();
|
||||||
|
$this->updateDomainDocuments($certificateId, $domain, $dbForConsole);
|
||||||
|
}
|
||||||
|
}
|
321
src/Appwrite/Platform/Workers/Databases.php
Normal file
321
src/Appwrite/Platform/Workers/Databases.php
Normal file
|
@ -0,0 +1,321 @@
|
||||||
|
<?php
|
||||||
|
|
||||||
|
namespace Appwrite\Platform\Workers;
|
||||||
|
|
||||||
|
use Appwrite\Event\Event;
|
||||||
|
use Appwrite\Messaging\Adapter\Realtime;
|
||||||
|
use Exception;
|
||||||
|
use Utopia\CLI\Console;
|
||||||
|
use Utopia\Database\Database;
|
||||||
|
use Utopia\Database\Document;
|
||||||
|
use Utopia\Database\Exception\Authorization;
|
||||||
|
use Utopia\Platform\Action;
|
||||||
|
use Utopia\Queue\Message;
|
||||||
|
|
||||||
|
class Databases extends Action
|
||||||
|
{
|
||||||
|
public static function getName(): string
|
||||||
|
{
|
||||||
|
return 'databases';
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
public function __construct()
|
||||||
|
{
|
||||||
|
$this
|
||||||
|
->desc('Databases worker')
|
||||||
|
->inject('message')
|
||||||
|
->inject('dbForConsole')
|
||||||
|
->inject('dbForProject')
|
||||||
|
->callback(fn($message, $dbForConsole, $dbForProject) => $this->action($message, $dbForConsole, $dbForProject));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
public function action(Message $message, Database $dbForConsole, Database $dbForProject): void
|
||||||
|
{
|
||||||
|
$payload = $message->getPayload() ?? [];
|
||||||
|
|
||||||
|
if (empty($payload)) {
|
||||||
|
throw new Exception('Missing payload');
|
||||||
|
}
|
||||||
|
|
||||||
|
$type = $payload['type'];
|
||||||
|
$project = new Document($payload['project']);
|
||||||
|
$collection = new Document($payload['collection'] ?? []);
|
||||||
|
$document = new Document($payload['document'] ?? []);
|
||||||
|
$database = new Document($payload['database'] ?? []);
|
||||||
|
|
||||||
|
if ($collection->isEmpty()) {
|
||||||
|
throw new Exception('Missing collection');
|
||||||
|
}
|
||||||
|
|
||||||
|
if ($document->isEmpty()) {
|
||||||
|
throw new Exception('Missing document');
|
||||||
|
}
|
||||||
|
|
||||||
|
match (strval($type)) {
|
||||||
|
DATABASE_TYPE_CREATE_ATTRIBUTE => $this->createAttribute($database, $collection, $document, $project, $dbForProject),
|
||||||
|
DATABASE_TYPE_DELETE_ATTRIBUTE => $this->deleteAttribute($database, $collection, $document, $project, $dbForConsole, $dbForProject),
|
||||||
|
DATABASE_TYPE_CREATE_INDEX => $this->createIndex($database, $collection, $document, $project, $dbForProject),
|
||||||
|
DATABASE_TYPE_DELETE_INDEX => $this->deleteIndex($database, $collection, $document, $project, $dbForProject),
|
||||||
|
default => Console::error('No database operation for type: ' . $type),
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
private function createAttribute(Document $database, Document $collection, Document $attribute, Document $project, $dbForProject): void
|
||||||
|
{
|
||||||
|
|
||||||
|
$events = Event::generateEvents('databases.[databaseId].collections.[collectionId].attributes.[attributeId].update', [
|
||||||
|
'databaseId' => $database->getId(),
|
||||||
|
'collectionId' => $collection->getId(),
|
||||||
|
'attributeId' => $attribute->getId()
|
||||||
|
]);
|
||||||
|
/**
|
||||||
|
* Fetch attribute from the database, since with Resque float values are loosing informations.
|
||||||
|
*/
|
||||||
|
$attribute = $dbForProject->getDocument('attributes', $attribute->getId());
|
||||||
|
$collectionId = $collection->getId();
|
||||||
|
$key = $attribute->getAttribute('key', '');
|
||||||
|
$type = $attribute->getAttribute('type', '');
|
||||||
|
$size = $attribute->getAttribute('size', 0);
|
||||||
|
$required = $attribute->getAttribute('required', false);
|
||||||
|
$default = $attribute->getAttribute('default', null);
|
||||||
|
$signed = $attribute->getAttribute('signed', true);
|
||||||
|
$array = $attribute->getAttribute('array', false);
|
||||||
|
$format = $attribute->getAttribute('format', '');
|
||||||
|
$formatOptions = $attribute->getAttribute('formatOptions', []);
|
||||||
|
$filters = $attribute->getAttribute('filters', []);
|
||||||
|
|
||||||
|
try {
|
||||||
|
if (!$dbForProject->createAttribute('database_' . $database->getInternalId() . '_collection_' . $collection->getInternalId(), $key, $type, $size, $required, $default, $signed, $array, $format, $formatOptions, $filters)) {
|
||||||
|
throw new Exception('Failed to create Attribute');
|
||||||
|
}
|
||||||
|
$dbForProject->updateDocument('attributes', $attribute->getId(), $attribute->setAttribute('status', 'available'));
|
||||||
|
} catch (\Throwable $th) {
|
||||||
|
Console::error($th->getMessage());
|
||||||
|
$dbForProject->updateDocument('attributes', $attribute->getId(), $attribute->setAttribute('status', 'failed'));
|
||||||
|
} finally {
|
||||||
|
$target = Realtime::fromPayload(
|
||||||
|
// Pass first, most verbose event pattern
|
||||||
|
event: $events[0],
|
||||||
|
payload: $attribute,
|
||||||
|
project: $project,
|
||||||
|
);
|
||||||
|
|
||||||
|
Realtime::send(
|
||||||
|
projectId: 'console',
|
||||||
|
payload: $attribute->getArrayCopy(),
|
||||||
|
events: $events,
|
||||||
|
channels: $target['channels'],
|
||||||
|
roles: $target['roles'],
|
||||||
|
options: [
|
||||||
|
'projectId' => $project->getId(),
|
||||||
|
'databaseId' => $database->getId(),
|
||||||
|
'collectionId' => $collection->getId()
|
||||||
|
]
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
$dbForProject->deleteCachedDocument('database_' . $database->getInternalId(), $collectionId);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @throws Authorization
|
||||||
|
*/
|
||||||
|
private function deleteAttribute(Document $database, Document $collection, Document $attribute, Document $project, Database $dbForConsole, Database $dbForProject)
|
||||||
|
{
|
||||||
|
$events = Event::generateEvents('databases.[databaseId].collections.[collectionId].attributes.[attributeId].delete', [
|
||||||
|
'databaseId' => $database->getId(),
|
||||||
|
'collectionId' => $collection->getId(),
|
||||||
|
'attributeId' => $attribute->getId()
|
||||||
|
]);
|
||||||
|
|
||||||
|
$collectionId = $collection->getId();
|
||||||
|
$key = $attribute->getAttribute('key', '');
|
||||||
|
$status = $attribute->getAttribute('status', '');
|
||||||
|
|
||||||
|
// possible states at this point:
|
||||||
|
// - available: should not land in queue; controller flips these to 'deleting'
|
||||||
|
// - processing: hasn't finished creating
|
||||||
|
// - deleting: was available, in deletion queue for first time
|
||||||
|
// - failed: attribute was never created
|
||||||
|
// - stuck: attribute was available but cannot be removed
|
||||||
|
try {
|
||||||
|
if ($status !== 'failed' && !$dbForProject->deleteAttribute('database_' . $database->getInternalId() . '_collection_' . $collection->getInternalId(), $key)) {
|
||||||
|
throw new Exception('Failed to delete Attribute');
|
||||||
|
}
|
||||||
|
$dbForProject->deleteDocument('attributes', $attribute->getId());
|
||||||
|
} catch (\Throwable $th) {
|
||||||
|
Console::error($th->getMessage());
|
||||||
|
$dbForProject->updateDocument('attributes', $attribute->getId(), $attribute->setAttribute('status', 'stuck'));
|
||||||
|
} finally {
|
||||||
|
$target = Realtime::fromPayload(
|
||||||
|
// Pass first, most verbose event pattern
|
||||||
|
event: $events[0],
|
||||||
|
payload: $attribute,
|
||||||
|
project: $project
|
||||||
|
);
|
||||||
|
|
||||||
|
Realtime::send(
|
||||||
|
projectId: 'console',
|
||||||
|
payload: $attribute->getArrayCopy(),
|
||||||
|
events: $events,
|
||||||
|
channels: $target['channels'],
|
||||||
|
roles: $target['roles'],
|
||||||
|
options: [
|
||||||
|
'projectId' => $project->getId(),
|
||||||
|
'databaseId' => $database->getId(),
|
||||||
|
'collectionId' => $collection->getId()
|
||||||
|
]
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
// The underlying database removes/rebuilds indexes when attribute is removed
|
||||||
|
// Update indexes table with changes
|
||||||
|
/** @var Document[] $indexes */
|
||||||
|
$indexes = $collection->getAttribute('indexes', []);
|
||||||
|
|
||||||
|
foreach ($indexes as $index) {
|
||||||
|
/** @var string[] $attributes */
|
||||||
|
$attributes = $index->getAttribute('attributes');
|
||||||
|
$lengths = $index->getAttribute('lengths');
|
||||||
|
$orders = $index->getAttribute('orders');
|
||||||
|
|
||||||
|
$found = \array_search($key, $attributes);
|
||||||
|
|
||||||
|
if ($found !== false) {
|
||||||
|
// If found, remove entry from attributes, lengths, and orders
|
||||||
|
// array_values wraps array_diff to reindex array keys
|
||||||
|
// when found attribute is removed from array
|
||||||
|
$attributes = \array_values(\array_diff($attributes, [$attributes[$found]]));
|
||||||
|
$lengths = \array_values(\array_diff($lengths, [$lengths[$found]]));
|
||||||
|
$orders = \array_values(\array_diff($orders, [$orders[$found]]));
|
||||||
|
|
||||||
|
if (empty($attributes)) {
|
||||||
|
$dbForProject->deleteDocument('indexes', $index->getId());
|
||||||
|
} else {
|
||||||
|
$index
|
||||||
|
->setAttribute('attributes', $attributes, Document::SET_TYPE_ASSIGN)
|
||||||
|
->setAttribute('lengths', $lengths, Document::SET_TYPE_ASSIGN)
|
||||||
|
->setAttribute('orders', $orders, Document::SET_TYPE_ASSIGN);
|
||||||
|
|
||||||
|
// Check if an index exists with the same attributes and orders
|
||||||
|
$exists = false;
|
||||||
|
foreach ($indexes as $existing) {
|
||||||
|
if (
|
||||||
|
$existing->getAttribute('key') !== $index->getAttribute('key') // Ignore itself
|
||||||
|
&& $existing->getAttribute('attributes') === $index->getAttribute('attributes')
|
||||||
|
&& $existing->getAttribute('orders') === $index->getAttribute('orders')
|
||||||
|
) {
|
||||||
|
$exists = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if ($exists) { // Delete the duplicate if created, else update in db
|
||||||
|
$this->deleteIndex($database, $collection, $index, $project, $dbForConsole);
|
||||||
|
} else {
|
||||||
|
$dbForProject->updateDocument('indexes', $index->getId(), $index);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
$dbForProject->deleteCachedDocument('database_' . $database->getInternalId(), $collectionId);
|
||||||
|
$dbForProject->deleteCachedCollection('database_' . $database->getInternalId() . '_collection_' . $collection->getInternalId());
|
||||||
|
}
|
||||||
|
|
||||||
|
private function createIndex(Document $database, Document $collection, Document $index, Document $project, Database $dbForProject)
|
||||||
|
{
|
||||||
|
$events = Event::generateEvents('databases.[databaseId].collections.[collectionId].indexes.[indexId].update', [
|
||||||
|
'databaseId' => $database->getId(),
|
||||||
|
'collectionId' => $collection->getId(),
|
||||||
|
'indexId' => $index->getId()
|
||||||
|
]);
|
||||||
|
$collectionId = $collection->getId();
|
||||||
|
$key = $index->getAttribute('key', '');
|
||||||
|
$type = $index->getAttribute('type', '');
|
||||||
|
$attributes = $index->getAttribute('attributes', []);
|
||||||
|
$lengths = $index->getAttribute('lengths', []);
|
||||||
|
$orders = $index->getAttribute('orders', []);
|
||||||
|
|
||||||
|
try {
|
||||||
|
if (!$dbForProject->createIndex('database_' . $database->getInternalId() . '_collection_' . $collection->getInternalId(), $key, $type, $attributes, $lengths, $orders)) {
|
||||||
|
throw new Exception('Failed to create Index');
|
||||||
|
}
|
||||||
|
$dbForProject->updateDocument('indexes', $index->getId(), $index->setAttribute('status', 'available'));
|
||||||
|
} catch (\Throwable $th) {
|
||||||
|
Console::error($th->getMessage());
|
||||||
|
$dbForProject->updateDocument('indexes', $index->getId(), $index->setAttribute('status', 'failed'));
|
||||||
|
} finally {
|
||||||
|
$target = Realtime::fromPayload(
|
||||||
|
// Pass first, most verbose event pattern
|
||||||
|
event: $events[0],
|
||||||
|
payload: $index,
|
||||||
|
project: $project
|
||||||
|
);
|
||||||
|
|
||||||
|
Realtime::send(
|
||||||
|
projectId: 'console',
|
||||||
|
payload: $index->getArrayCopy(),
|
||||||
|
events: $events,
|
||||||
|
channels: $target['channels'],
|
||||||
|
roles: $target['roles'],
|
||||||
|
options: [
|
||||||
|
'projectId' => $project->getId(),
|
||||||
|
'databaseId' => $database->getId(),
|
||||||
|
'collectionId' => $collection->getId()
|
||||||
|
]
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
$dbForProject->deleteCachedDocument('database_' . $database->getInternalId(), $collectionId);
|
||||||
|
}
|
||||||
|
|
||||||
|
private function deleteIndex(Document $database, Document $collection, Document $index, Document $project, Database $dbForProject)
|
||||||
|
{
|
||||||
|
$events = Event::generateEvents('databases.[databaseId].collections.[collectionId].indexes.[indexId].delete', [
|
||||||
|
'databaseId' => $database->getId(),
|
||||||
|
'collectionId' => $collection->getId(),
|
||||||
|
'indexId' => $index->getId()
|
||||||
|
]);
|
||||||
|
$key = $index->getAttribute('key');
|
||||||
|
$status = $index->getAttribute('status', '');
|
||||||
|
|
||||||
|
try {
|
||||||
|
if ($status !== 'failed' && !$dbForProject->deleteIndex('database_' . $database->getInternalId() . '_collection_' . $collection->getInternalId(), $key)) {
|
||||||
|
throw new Exception('Failed to delete index');
|
||||||
|
}
|
||||||
|
$dbForProject->deleteDocument('indexes', $index->getId());
|
||||||
|
} catch (\Throwable $th) {
|
||||||
|
Console::error($th->getMessage());
|
||||||
|
$dbForProject->updateDocument('indexes', $index->getId(), $index->setAttribute('status', 'stuck'));
|
||||||
|
} finally {
|
||||||
|
$target = Realtime::fromPayload(
|
||||||
|
// Pass first, most verbose event pattern
|
||||||
|
event: $events[0],
|
||||||
|
payload: $index,
|
||||||
|
project: $project
|
||||||
|
);
|
||||||
|
|
||||||
|
Realtime::send(
|
||||||
|
projectId: 'console',
|
||||||
|
payload: $index->getArrayCopy(),
|
||||||
|
events: $events,
|
||||||
|
channels: $target['channels'],
|
||||||
|
roles: $target['roles'],
|
||||||
|
options: [
|
||||||
|
'projectId' => $project->getId(),
|
||||||
|
'databaseId' => $database->getId(),
|
||||||
|
'collectionId' => $collection->getId()
|
||||||
|
]
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
$dbForProject->deleteCachedDocument('database_' . $database->getInternalId(), $collection->getId());
|
||||||
|
}
|
||||||
|
}
|
233
src/Appwrite/Platform/Workers/Usage.php
Normal file
233
src/Appwrite/Platform/Workers/Usage.php
Normal file
|
@ -0,0 +1,233 @@
|
||||||
|
<?php
|
||||||
|
|
||||||
|
namespace Appwrite\Platform\Workers;
|
||||||
|
|
||||||
|
use Exception;
|
||||||
|
use Utopia\CLI\Console;
|
||||||
|
use Utopia\Database\Database;
|
||||||
|
use Utopia\Database\Document;
|
||||||
|
use Utopia\Platform\Action;
|
||||||
|
use Utopia\Queue\Message;
|
||||||
|
|
||||||
|
class Usage extends Action
|
||||||
|
{
|
||||||
|
private $stats = [];
|
||||||
|
private array $periods = [
|
||||||
|
'1h' => 'Y-m-d H:00',
|
||||||
|
'1d' => 'Y-m-d 00:00',
|
||||||
|
'inf' => '0000-00-00 00:00'
|
||||||
|
];
|
||||||
|
|
||||||
|
|
||||||
|
const INFINITY_PERIOD = '_inf_';
|
||||||
|
|
||||||
|
|
||||||
|
public static function getName(): string
|
||||||
|
{
|
||||||
|
return 'usage';
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
public function __construct()
|
||||||
|
{
|
||||||
|
$this
|
||||||
|
->desc('Usage worker')
|
||||||
|
->inject('message')
|
||||||
|
->inject('pools')
|
||||||
|
->inject('cache')
|
||||||
|
->callback(function ($message, $pools, $cache) use (&$stats) {
|
||||||
|
$this->action($message, $pools, $cache);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
public function action(Message $message, $pools, $cache): void
|
||||||
|
{
|
||||||
|
$payload = $message->getPayload() ?? [];
|
||||||
|
|
||||||
|
if (empty($payload)) {
|
||||||
|
throw new Exception('Missing payload');
|
||||||
|
}
|
||||||
|
|
||||||
|
$payload = $message->getPayload() ?? [];
|
||||||
|
$project = new Document($payload['project'] ?? []);
|
||||||
|
$projectId = $project->getInternalId();
|
||||||
|
foreach ($payload['reduce'] ?? [] as $document) {
|
||||||
|
if (empty($document)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
$this->reduce(
|
||||||
|
database: $project->getAttribute('database'),
|
||||||
|
projectInternalId: $project->getInternalId(),
|
||||||
|
document: new Document($document),
|
||||||
|
metrics: $payload['metrics'],
|
||||||
|
pools: $pools,
|
||||||
|
cache: $cache
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
$stats[$projectId]['database'] = $project->getAttribute('database');
|
||||||
|
foreach ($payload['metrics'] ?? [] as $metric) {
|
||||||
|
if (!isset($stats[$projectId]['keys'][$metric['key']])) {
|
||||||
|
$stats[$projectId]['keys'][$metric['key']] = $metric['value'];
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
$stats[$projectId]['keys'][$metric['key']] += $metric['value'];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* On Documents that tied by relations like functions>deployments>build || documents>collection>database || buckets>files.
|
||||||
|
* When we remove a parent document we need to deduct his children aggregation from the project scope.
|
||||||
|
*/
|
||||||
|
private function reduce($database, $projectInternalId, Document $document, array &$metrics, $pools, $cache)
|
||||||
|
{
|
||||||
|
try {
|
||||||
|
$dbForProject = new Database(
|
||||||
|
$pools
|
||||||
|
->get($database)
|
||||||
|
->pop()
|
||||||
|
->getResource(),
|
||||||
|
$cache
|
||||||
|
);
|
||||||
|
|
||||||
|
$dbForProject->setNamespace('_' . $projectInternalId);
|
||||||
|
|
||||||
|
switch (true) {
|
||||||
|
case $document->getCollection() === 'users': // users
|
||||||
|
$sessions = count($document->getAttribute(METRIC_SESSIONS, 0));
|
||||||
|
if (!empty($sessions)) {
|
||||||
|
$metrics[] = [
|
||||||
|
'key' => METRIC_SESSIONS,
|
||||||
|
'value' => ($sessions * -1),
|
||||||
|
];
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
case $document->getCollection() === 'databases': // databases
|
||||||
|
$collections = $dbForProject->getDocument('stats', md5(self::INFINITY_PERIOD . str_replace('{databaseInternalId}', $document->getInternalId(), METRIC_DATABASE_ID_COLLECTIONS)));
|
||||||
|
$documents = $dbForProject->getDocument('stats', md5(self::INFINITY_PERIOD . str_replace('{databaseInternalId}', $document->getInternalId(), METRIC_DATABASE_ID_DOCUMENTS)));
|
||||||
|
if (!empty($collections['value'])) {
|
||||||
|
$metrics[] = [
|
||||||
|
'key' => METRIC_COLLECTIONS,
|
||||||
|
'value' => ($collections['value'] * -1),
|
||||||
|
];
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!empty($documents['value'])) {
|
||||||
|
$metrics[] = [
|
||||||
|
'key' => METRIC_DOCUMENTS,
|
||||||
|
'value' => ($documents['value'] * -1),
|
||||||
|
];
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
case str_starts_with($document->getCollection(), 'database_') && !str_contains($document->getCollection(), 'collection'): //collections
|
||||||
|
$parts = explode('_', $document->getCollection());
|
||||||
|
$databaseInternalId = $parts[1] ?? 0;
|
||||||
|
$documents = $dbForProject->getDocument('stats', md5(self::INFINITY_PERIOD . str_replace(['{databaseInternalId}', '{collectionInternalId}'], [$databaseInternalId, $document->getInternalId()], METRIC_DATABASE_ID_COLLECTION_ID_DOCUMENTS)));
|
||||||
|
|
||||||
|
if (!empty($documents['value'])) {
|
||||||
|
$metrics[] = [
|
||||||
|
'key' => METRIC_DOCUMENTS,
|
||||||
|
'value' => ($documents['value'] * -1),
|
||||||
|
];
|
||||||
|
$metrics[] = [
|
||||||
|
'key' => str_replace('{databaseInternalId}', $databaseInternalId, METRIC_DATABASE_ID_DOCUMENTS),
|
||||||
|
'value' => ($documents['value'] * -1),
|
||||||
|
];
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
|
||||||
|
case $document->getCollection() === 'buckets':
|
||||||
|
$files = $dbForProject->getDocument('stats', md5(self::INFINITY_PERIOD . str_replace('{bucketInternalId}', $document->getInternalId(), METRIC_BUCKET_ID_FILES)));
|
||||||
|
$storage = $dbForProject->getDocument('stats', md5(self::INFINITY_PERIOD . str_replace('{bucketInternalId}', $document->getInternalId(), METRIC_BUCKET_ID_FILES_STORAGE)));
|
||||||
|
|
||||||
|
if (!empty($files['value'])) {
|
||||||
|
$metrics[] = [
|
||||||
|
'key' => METRIC_FILES,
|
||||||
|
'value' => ($files['value'] * -1),
|
||||||
|
];
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!empty($storage['value'])) {
|
||||||
|
$metrics[] = [
|
||||||
|
'key' => METRIC_FILES_STORAGE,
|
||||||
|
'value' => ($storage['value'] * -1),
|
||||||
|
];
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
|
||||||
|
case $document->getCollection() === 'functions':
|
||||||
|
$deployments = $dbForProject->getDocument('stats', md5(self::INFINITY_PERIOD . str_replace(['{resourceType}', '{resourceInternalId}'], ['functions', $document->getInternalId()], METRIC_FUNCTION_ID_DEPLOYMENTS)));
|
||||||
|
$deploymentsStorage = $dbForProject->getDocument('stats', md5(self::INFINITY_PERIOD . str_replace(['{resourceType}', '{resourceInternalId}'], ['functions', $document->getInternalId()], METRIC_FUNCTION_ID_DEPLOYMENTS_STORAGE)));
|
||||||
|
$builds = $dbForProject->getDocument('stats', md5(self::INFINITY_PERIOD . str_replace('{functionInternalId}', $document->getInternalId(), METRIC_FUNCTION_ID_BUILDS)));
|
||||||
|
$buildsStorage = $dbForProject->getDocument('stats', md5(self::INFINITY_PERIOD . str_replace('{functionInternalId}', $document->getInternalId(), METRIC_FUNCTION_ID_BUILDS_STORAGE)));
|
||||||
|
$buildsCompute = $dbForProject->getDocument('stats', md5(self::INFINITY_PERIOD . str_replace('{functionInternalId}', $document->getInternalId(), METRIC_FUNCTION_ID_BUILDS_COMPUTE)));
|
||||||
|
$executions = $dbForProject->getDocument('stats', md5(self::INFINITY_PERIOD . str_replace('{functionInternalId}', $document->getInternalId(), METRIC_FUNCTION_ID_EXECUTIONS)));
|
||||||
|
$executionsCompute = $dbForProject->getDocument('stats', md5(self::INFINITY_PERIOD . str_replace('{functionInternalId}', $document->getInternalId(), METRIC_FUNCTION_ID_EXECUTIONS_COMPUTE)));
|
||||||
|
|
||||||
|
if (!empty($deployments['value'])) {
|
||||||
|
$metrics[] = [
|
||||||
|
'key' => METRIC_DEPLOYMENTS,
|
||||||
|
'value' => ($deployments['value'] * -1),
|
||||||
|
];
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!empty($deploymentsStorage['value'])) {
|
||||||
|
$metrics[] = [
|
||||||
|
'key' => METRIC_DEPLOYMENTS_STORAGE,
|
||||||
|
'value' => ($deploymentsStorage['value'] * -1),
|
||||||
|
];
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!empty($builds['value'])) {
|
||||||
|
$metrics[] = [
|
||||||
|
'key' => METRIC_BUILDS,
|
||||||
|
'value' => ($builds['value'] * -1),
|
||||||
|
];
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!empty($buildsStorage['value'])) {
|
||||||
|
$metrics[] = [
|
||||||
|
'key' => METRIC_BUILDS_STORAGE,
|
||||||
|
'value' => ($buildsStorage['value'] * -1),
|
||||||
|
];
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!empty($buildsCompute['value'])) {
|
||||||
|
$metrics[] = [
|
||||||
|
'key' => METRIC_BUILDS_COMPUTE,
|
||||||
|
'value' => ($buildsCompute['value'] * -1),
|
||||||
|
];
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!empty($executions['value'])) {
|
||||||
|
$metrics[] = [
|
||||||
|
'key' => METRIC_EXECUTIONS,
|
||||||
|
'value' => ($executions['value'] * -1),
|
||||||
|
];
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!empty($executionsCompute['value'])) {
|
||||||
|
$metrics[] = [
|
||||||
|
'key' => METRIC_EXECUTIONS_COMPUTE,
|
||||||
|
'value' => ($executionsCompute['value'] * -1),
|
||||||
|
];
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
} catch (\Exception $e) {
|
||||||
|
console::error("[reducer] " . " {DateTime::now()} " . " {$projectInternalId} " . " {$e->getMessage()}");
|
||||||
|
} catch (\Throwable $e) {
|
||||||
|
} finally {
|
||||||
|
$pools->reclaim();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -17,6 +17,9 @@ class Webhooks extends Action
|
||||||
return 'webhooks';
|
return 'webhooks';
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
public function __construct()
|
public function __construct()
|
||||||
{
|
{
|
||||||
$this
|
$this
|
||||||
|
@ -34,26 +37,27 @@ class Webhooks extends Action
|
||||||
|
|
||||||
if (empty($payload)) {
|
if (empty($payload)) {
|
||||||
throw new Exception('Missing payload');
|
throw new Exception('Missing payload');
|
||||||
|
}
|
||||||
|
|
||||||
$events = $payload['events'];
|
$events = $payload['events'];
|
||||||
$webhookPayload = json_encode($payload['payload']);
|
$webhookPayload = json_encode($payload['payload']);
|
||||||
$project = new Document($payload['project']);
|
$project = new Document($payload['project']);
|
||||||
$user = new Document($payload['user'] ?? []);
|
$user = new Document($payload['user'] ?? []);
|
||||||
|
|
||||||
foreach ($project->getAttribute('webhooks', []) as $webhook) {
|
foreach ($project->getAttribute('webhooks', []) as $webhook) {
|
||||||
if (array_intersect($webhook->getAttribute('events', []), $events)) {
|
if (array_intersect($webhook->getAttribute('events', []), $events)) {
|
||||||
$this->execute($events, $webhookPayload, $webhook, $user, $project);
|
$this->execute($events, $webhookPayload, $webhook, $user, $project);
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (!empty($errors)) {
|
if (!empty($errors)) {
|
||||||
throw new Exception(\implode(" / \n\n", $errors));
|
throw new Exception(\implode(" / \n\n", $errors));
|
||||||
}
|
}
|
||||||
|
|
||||||
$this->errors = [];
|
$this->errors = [];
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
private function execute(array $events, string $payload, Document $webhook, Document $user, Document $project): void
|
private function execute(array $events, string $payload, Document $webhook, Document $user, Document $project): void
|
||||||
{
|
{
|
||||||
$url = \rawurldecode($webhook->getAttribute('url'));
|
$url = \rawurldecode($webhook->getAttribute('url'));
|
||||||
|
|
Loading…
Reference in a new issue