From 62c619434e25f94ab8e23e00c3ab754a45abe2ec Mon Sep 17 00:00:00 2001 From: shimon Date: Mon, 5 Jun 2023 19:13:00 +0300 Subject: [PATCH] functions/builds/deletes worker --- app/cli.php | 9 +- app/init.php | 4 +- app/worker.php | 57 +- bin/worker-builds | 9 +- bin/worker-deletes | 9 +- bin/worker-functions | 2 +- docker-compose.yml | 4 + src/Appwrite/Platform/Services/Workers.php | 6 + src/Appwrite/Platform/Tasks/Maintenance.php | 205 +++-- src/Appwrite/Platform/Workers/Builds.php | 290 +++++++ src/Appwrite/Platform/Workers/Deletes.php | 851 ++++++++++++++++++++ src/Appwrite/Platform/Workers/Functions.php | 350 ++++++++ 12 files changed, 1646 insertions(+), 150 deletions(-) create mode 100644 src/Appwrite/Platform/Workers/Builds.php create mode 100644 src/Appwrite/Platform/Workers/Deletes.php create mode 100644 src/Appwrite/Platform/Workers/Functions.php diff --git a/app/cli.php b/app/cli.php index 5da82fa7d..7beab2871 100644 --- a/app/cli.php +++ b/app/cli.php @@ -26,10 +26,13 @@ use Utopia\Database\Document; use Utopia\Logger\Log; use Utopia\Pools\Group; use Utopia\Queue\Connection; +use Utopia\Queue\Server; use Utopia\Registry\Registry; Authorization::disable(); +global $register; + CLI::setResource('register', fn () => $register); CLI::setResource('cache', function ($pools) { @@ -95,7 +98,7 @@ CLI::setResource('dbForConsole', function ($pools, $cache) { CLI::setResource('getProjectDB', function (Group $pools, Database $dbForConsole, $cache) { $databases = []; // TODO: @Meldiron This should probably be responsibility of utopia-php/pools - $getProjectDB = function (Document $project) use ($pools, $dbForConsole, $cache, &$databases) { + return function (Document $project) use ($pools, $dbForConsole, $cache, &$databases) { if ($project->isEmpty() || $project->getId() === 'console') { return $dbForConsole; } @@ -121,8 +124,6 @@ CLI::setResource('getProjectDB', function (Group $pools, Database $dbForConsole, return $database; }; - - return $getProjectDB; }, ['pools', 'dbForConsole', 'cache']); CLI::setResource('queue', function (Group $pools) { @@ -144,7 +145,7 @@ CLI::setResource('queueForDeletes', function (Connection $queue) { return new Delete($queue); }, ['queue']); CLI::setResource('queueForEvents', function (Connection $queue) { - return new Event('', '', $queue); + return new Event($queue); }, ['queue']); CLI::setResource('queueForAudits', function (Connection $queue) { return new Audit($queue); diff --git a/app/init.php b/app/init.php index d473f5937..7ded34755 100644 --- a/app/init.php +++ b/app/init.php @@ -614,13 +614,13 @@ $register->set('pools', function () { foreach ($connections as $key => $connection) { $type = $connection['type'] ?? ''; $dsns = $connection['dsns'] ?? ''; - $multipe = $connection['multiple'] ?? false; + $multiple = $connection['multiple'] ?? false; $schemes = $connection['schemes'] ?? []; $config = []; $dsns = explode(',', $connection['dsns'] ?? ''); foreach ($dsns as &$dsn) { $dsn = explode('=', $dsn); - $name = ($multipe) ? $key . '_' . $dsn[0] : $key; + $name = ($multiple) ? $key . '_' . $dsn[0] : $key; $dsn = $dsn[1] ?? ''; $config[] = $name; if (empty($dsn)) { diff --git a/app/worker.php b/app/worker.php index b76d59912..08c38ebd8 100644 --- a/app/worker.php +++ b/app/worker.php @@ -12,7 +12,6 @@ use Appwrite\Event\Func; use Appwrite\Event\Mail; use Appwrite\Event\Phone; use Appwrite\Event\Usage; -use Appwrite\Extend\Exception; use Appwrite\Platform\Appwrite; use Swoole\Runtime; use Utopia\App; @@ -23,9 +22,7 @@ use Utopia\Config\Config; use Utopia\Database\Database; use Utopia\Database\Document; use Utopia\Database\Validator\Authorization; -use Utopia\DSN\DSN; use Utopia\Platform\Service; -use Utopia\Queue\Adapter\Swoole; use Utopia\Queue\Message; use Utopia\Queue\Server; use Utopia\Registry\Registry; @@ -34,13 +31,6 @@ use Utopia\Logger\Logger; use Utopia\Pools\Group; use Utopia\Queue\Connection; use Utopia\Storage\Device; -use Utopia\Storage\Device\Backblaze; -use Utopia\Storage\Device\DOSpaces; -use Utopia\Storage\Device\Linode; -use Utopia\Storage\Device\Local; -use Utopia\Storage\Device\S3; -use Utopia\Storage\Device\Wasabi; -use Utopia\Storage\Storage; Authorization::disable(); Runtime::enableCoroutine(SWOOLE_HOOK_ALL); @@ -81,6 +71,37 @@ Server::setResource('dbForProject', function (Cache $cache, Registry $register, return $adapter; }, ['cache', 'register', 'message', 'dbForConsole']); +Server::setResource('getProjectDB', function (Group $pools, Database $dbForConsole, $cache) { + $databases = []; // TODO: @Meldiron This should probably be responsibility of utopia-php/pools + + return function (Document $project) use ($pools, $dbForConsole, $cache, &$databases) { + if ($project->isEmpty() || $project->getId() === 'console') { + return $dbForConsole; + } + + $databaseName = $project->getAttribute('database'); + + if (isset($databases[$databaseName])) { + $database = $databases[$databaseName]; + $database->setNamespace('_' . $project->getInternalId()); + return $database; + } + + $dbAdapter = $pools + ->get($databaseName) + ->pop() + ->getResource(); + + $database = new Database($dbAdapter, $cache); + + $databases[$databaseName] = $database; + + $database->setNamespace('_' . $project->getInternalId()); + + return $database; + }; +}, ['pools', 'dbForConsole', 'cache']); + Server::setResource('cache', function (Registry $register) { $pools = $register->get('pools'); $list = Config::getParam('pools-cache', []); @@ -95,20 +116,12 @@ Server::setResource('cache', function (Registry $register) { return new Cache(new Sharding($adapters)); }, ['register']); - -Server::setResource('queueForDatabase', function (Registry $register) { - $pools = $register->get('pools'); - return new EventDatabase( - $pools - ->get('queue') - ->pop() - ->getResource() - ); -}, ['register']); - Server::setResource('queue', function (Group $pools) { return $pools->get('queue')->pop()->getResource(); }, ['pools']); +Server::setResource('queueForDatabase', function (Connection $queue) { + return new EventDatabase($queue); +}, ['queue']); Server::setResource('queueForMessaging', function (Connection $queue) { return new Phone($queue); }, ['queue']); @@ -125,7 +138,7 @@ Server::setResource('queueForDeletes', function (Connection $queue) { return new Delete($queue); }, ['queue']); Server::setResource('queueForEvents', function (Connection $queue) { - return new Event('', '', $queue); + return new Event($queue); }, ['queue']); Server::setResource('queueForAudits', function (Connection $queue) { return new Audit($queue); diff --git a/bin/worker-builds b/bin/worker-builds index 2ba26ef4d..3400111cb 100644 --- a/bin/worker-builds +++ b/bin/worker-builds @@ -1,10 +1,3 @@ #!/bin/sh -if [ -z "$_APP_REDIS_USER" ] && [ -z "$_APP_REDIS_PASS" ] -then - REDIS_BACKEND="${_APP_REDIS_HOST}:${_APP_REDIS_PORT}" -else - REDIS_BACKEND="redis://${_APP_REDIS_USER}:${_APP_REDIS_PASS}@${_APP_REDIS_HOST}:${_APP_REDIS_PORT}" -fi - -INTERVAL=0.1 QUEUE='v1-builds' APP_INCLUDE='/usr/src/code/app/workers/builds.php' php /usr/src/code/vendor/bin/resque -dopcache.preload=opcache.preload=/usr/src/code/app/preload.php \ No newline at end of file +php /usr/src/code/app/worker.php builds $@ \ No newline at end of file diff --git a/bin/worker-deletes b/bin/worker-deletes index 02c2311fa..7c9793e6c 100644 --- a/bin/worker-deletes +++ b/bin/worker-deletes @@ -1,10 +1,3 @@ #!/bin/sh -if [ -z "$_APP_REDIS_USER" ] && [ -z "$_APP_REDIS_PASS" ] -then - REDIS_BACKEND="${_APP_REDIS_HOST}:${_APP_REDIS_PORT}" -else - REDIS_BACKEND="redis://${_APP_REDIS_USER}:${_APP_REDIS_PASS}@${_APP_REDIS_HOST}:${_APP_REDIS_PORT}" -fi - -INTERVAL=1 QUEUE='v1-deletes' APP_INCLUDE='/usr/src/code/app/workers/deletes.php' php /usr/src/code/vendor/bin/resque -dopcache.preload=opcache.preload=/usr/src/code/app/preload.php \ No newline at end of file +php /usr/src/code/app/worker.php deletes $@ \ No newline at end of file diff --git a/bin/worker-functions b/bin/worker-functions index 687f9fd0c..4757b1b72 100644 --- a/bin/worker-functions +++ b/bin/worker-functions @@ -1,3 +1,3 @@ #!/bin/sh -QUEUE=v1-functions php /usr/src/code/app/workers/functions.php $@ \ No newline at end of file +php /usr/src/code/app/worker.php functions $@ \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index 658912aa6..59baa08ba 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -235,6 +235,7 @@ services: - ./src:/usr/src/code/src - ./vendor/utopia-php/platform:/usr/src/code/vendor/utopia-php/platform - ./vendor/utopia-php/queue:/usr/src/code/vendor/utopia-php/queue + - ./vendor/utopia-php/pools:/usr/src/code/vendor/utopia-php/pools depends_on: - redis @@ -592,6 +593,8 @@ services: volumes: - ./app:/usr/src/code/app - ./src:/usr/src/code/src + - ./vendor/utopia-php/pools:/usr/src/code/vendor/utopia-php/pools + depends_on: - redis environment: @@ -614,6 +617,7 @@ services: - _APP_CONNECTIONS_DB_CONSOLE - _APP_CONNECTIONS_DB_PROJECT - _APP_CONNECTIONS_CACHE + - _APP_CONNECTIONS_QUEUE - _APP_MAINTENANCE_INTERVAL - _APP_MAINTENANCE_RETENTION_EXECUTION - _APP_MAINTENANCE_RETENTION_CACHE diff --git a/src/Appwrite/Platform/Services/Workers.php b/src/Appwrite/Platform/Services/Workers.php index 8bd7522cc..6ad47e32b 100644 --- a/src/Appwrite/Platform/Services/Workers.php +++ b/src/Appwrite/Platform/Services/Workers.php @@ -9,6 +9,9 @@ use Appwrite\Platform\Workers\Mails; use Appwrite\Platform\Workers\Messaging; use Appwrite\Platform\Workers\Certificates; use Appwrite\Platform\Workers\Databases; +use Appwrite\Platform\Workers\Functions; +use Appwrite\Platform\Workers\Builds; +use Appwrite\Platform\Workers\Deletes; use Appwrite\Platform\Workers\Usage; class Workers extends Service @@ -23,6 +26,9 @@ class Workers extends Service ->addAction(Messaging::getName(), new Messaging()) ->addAction(Certificates::getName(), new Certificates()) ->addAction(Databases::getName(), new Databases()) + ->addAction(Functions::getName(), new Functions()) + ->addAction(Builds::getName(), new Builds()) + ->addAction(Deletes::getName(), new Deletes()) //->addAction(Usage::getName(), new Usage()) ; } diff --git a/src/Appwrite/Platform/Tasks/Maintenance.php b/src/Appwrite/Platform/Tasks/Maintenance.php index 4327ca30e..214505ff7 100644 --- a/src/Appwrite/Platform/Tasks/Maintenance.php +++ b/src/Appwrite/Platform/Tasks/Maintenance.php @@ -2,10 +2,8 @@ namespace Appwrite\Platform\Tasks; -use Appwrite\Auth\Auth; use Appwrite\Event\Certificate; use Appwrite\Event\Delete; -use Appwrite\Event\Func; use Utopia\App; use Utopia\CLI\Console; use Utopia\Database\Database; @@ -13,8 +11,6 @@ use Utopia\Database\Document; use Utopia\Database\DateTime; use Utopia\Database\Query; use Utopia\Platform\Action; -use Utopia\Queue\Connection; -use Utopia\Registry\Registry; class Maintenance extends Action { @@ -38,104 +34,12 @@ class Maintenance extends Action Console::title('Maintenance V1'); Console::success(APP_NAME . ' maintenance process v1 has started'); - function notifyDeleteExecutionLogs(int $interval, Delete $queueForDeletes) - { - ($queueForDeletes) - ->setType(DELETE_TYPE_EXECUTIONS) - ->setDatetime(DateTime::addSeconds(new \DateTime(), -1 * $interval)) - ->trigger(); - } - - function notifyDeleteAbuseLogs(int $interval, Delete $queueForDeletes) - { - ($queueForDeletes) - ->setType(DELETE_TYPE_ABUSE) - ->setDatetime(DateTime::addSeconds(new \DateTime(), -1 * $interval)) - ->trigger(); - } - - function notifyDeleteAuditLogs(int $interval, Delete $queueForDeletes) - { - ($queueForDeletes) - ->setType(DELETE_TYPE_AUDIT) - ->setDatetime(DateTime::addSeconds(new \DateTime(), -1 * $interval)) - ->trigger(); - } - - function notifyDeleteUsageStats(int $usageStatsRetentionHourly, Delete $queueForDeletes) - { - ($queueForDeletes) - ->setType(DELETE_TYPE_USAGE) - ->setUsageRetentionHourlyDateTime(DateTime::addSeconds(new \DateTime(), -1 * $usageStatsRetentionHourly)) - ->trigger(); - } - - function notifyDeleteConnections(Delete $queueForDeletes) - { - ($queueForDeletes) - ->setType(DELETE_TYPE_REALTIME) - ->setDatetime(DateTime::addSeconds(new \DateTime(), -60)) - ->trigger(); - } - - function notifyDeleteExpiredSessions(Delete $queueForDeletes) - { - ($queueForDeletes) - ->setType(DELETE_TYPE_SESSIONS) - ->trigger(); - } - - function renewCertificates(Database $dbForConsole, Certificate $queueForCertificate) - { - $time = DateTime::now(); - - $certificates = $dbForConsole->find('certificates', [ - Query::lessThan('attempts', 5), // Maximum 5 attempts - Query::lessThanEqual('renewDate', $time), // includes 60 days cooldown (we have 30 days to renew) - Query::limit(200), // Limit 200 comes from LetsEncrypt (300 orders per 3 hours, keeping some for new domains) - ]); - - - if (\count($certificates) > 0) { - Console::info("[{$time}] Found " . \count($certificates) . " certificates for renewal, scheduling jobs."); - - foreach ($certificates as $certificate) { - $queueForCertificate - ->setDomain(new Document([ - 'domain' => $certificate->getAttribute('domain') - ])) - ->trigger(); - } - } else { - Console::info("[{$time}] No certificates for renewal."); - } - } - - function notifyDeleteCache($interval, Delete $queueForDeletes) - { - - ($queueForDeletes) - ->setType(DELETE_TYPE_CACHE_BY_TIMESTAMP) - ->setDatetime(DateTime::addSeconds(new \DateTime(), -1 * $interval)) - ->trigger(); - } - - function notifyDeleteSchedules($interval, Delete $queueForDeletes) - { - - ($queueForDeletes) - ->setType(DELETE_TYPE_SCHEDULES) - ->setDatetime(DateTime::addSeconds(new \DateTime(), -1 * $interval)) - ->trigger(); - } - // # of days in seconds (1 day = 86400s) $interval = (int) App::getEnv('_APP_MAINTENANCE_INTERVAL', '86400'); $executionLogsRetention = (int) App::getEnv('_APP_MAINTENANCE_RETENTION_EXECUTION', '1209600'); $auditLogRetention = (int) App::getEnv('_APP_MAINTENANCE_RETENTION_AUDIT', '1209600'); $abuseLogsRetention = (int) App::getEnv('_APP_MAINTENANCE_RETENTION_ABUSE', '86400'); $usageStatsRetentionHourly = (int) App::getEnv('_APP_MAINTENANCE_RETENTION_USAGE_HOURLY', '8640000'); //100 days - $cacheRetention = (int) App::getEnv('_APP_MAINTENANCE_RETENTION_CACHE', '2592000'); // 30 days $schedulesDeletionRetention = (int) App::getEnv('_APP_MAINTENANCE_RETENTION_SCHEDULES', '86400'); // 1 Day @@ -143,15 +47,106 @@ class Maintenance extends Action $time = DateTime::now(); Console::info("[{$time}] Notifying workers with maintenance tasks every {$interval} seconds"); - notifyDeleteExecutionLogs($executionLogsRetention, $queueForDeletes); - notifyDeleteAbuseLogs($abuseLogsRetention, $queueForDeletes); - notifyDeleteAuditLogs($auditLogRetention, $queueForDeletes); - notifyDeleteUsageStats($usageStatsRetentionHourly, $queueForDeletes); - notifyDeleteConnections($queueForDeletes); - notifyDeleteExpiredSessions($queueForDeletes); - renewCertificates($dbForConsole, $queueForCertificates); - notifyDeleteCache($cacheRetention, $queueForDeletes); - notifyDeleteSchedules($schedulesDeletionRetention, $queueForDeletes); + $this->notifyDeleteExecutionLogs($executionLogsRetention, $queueForDeletes); + $this->notifyDeleteAbuseLogs($abuseLogsRetention, $queueForDeletes); + $this->notifyDeleteAuditLogs($auditLogRetention, $queueForDeletes); + $this->notifyDeleteUsageStats($usageStatsRetentionHourly, $queueForDeletes); + $this->notifyDeleteConnections($queueForDeletes); + $this->notifyDeleteExpiredSessions($queueForDeletes); + $this->renewCertificates($dbForConsole, $queueForCertificates); + $this->notifyDeleteCache($cacheRetention, $queueForDeletes); + $this->notifyDeleteSchedules($schedulesDeletionRetention, $queueForDeletes); }, $interval); } + + private function notifyDeleteExecutionLogs(int $interval, Delete $queueForDeletes): void + { + ($queueForDeletes) + ->setType(DELETE_TYPE_EXECUTIONS) + ->setDatetime(DateTime::addSeconds(new \DateTime(), -1 * $interval)) + ->trigger(); + } + + private function notifyDeleteAbuseLogs(int $interval, Delete $queueForDeletes): void + { + ($queueForDeletes) + ->setType(DELETE_TYPE_ABUSE) + ->setDatetime(DateTime::addSeconds(new \DateTime(), -1 * $interval)) + ->trigger(); + } + + private function notifyDeleteAuditLogs(int $interval, Delete $queueForDeletes): void + { + ($queueForDeletes) + ->setType(DELETE_TYPE_AUDIT) + ->setDatetime(DateTime::addSeconds(new \DateTime(), -1 * $interval)) + ->trigger(); + } + + private function notifyDeleteUsageStats(int $usageStatsRetentionHourly, Delete $queueForDeletes): void + { + ($queueForDeletes) + ->setType(DELETE_TYPE_USAGE) + ->setUsageRetentionHourlyDateTime(DateTime::addSeconds(new \DateTime(), -1 * $usageStatsRetentionHourly)) + ->trigger(); + } + + private function notifyDeleteConnections(Delete $queueForDeletes): void + { + ($queueForDeletes) + ->setType(DELETE_TYPE_REALTIME) + ->setDatetime(DateTime::addSeconds(new \DateTime(), -60)) + ->trigger(); + } + + private function notifyDeleteExpiredSessions(Delete $queueForDeletes): void + { + ($queueForDeletes) + ->setType(DELETE_TYPE_SESSIONS) + ->trigger(); + } + + private function renewCertificates(Database $dbForConsole, Certificate $queueForCertificate): void + { + $time = DateTime::now(); + + $certificates = $dbForConsole->find('certificates', [ + Query::lessThan('attempts', 5), // Maximum 5 attempts + Query::lessThanEqual('renewDate', $time), // includes 60 days cooldown (we have 30 days to renew) + Query::limit(200), // Limit 200 comes from LetsEncrypt (300 orders per 3 hours, keeping some for new domains) + ]); + + + if (\count($certificates) > 0) { + Console::info("[{$time}] Found " . \count($certificates) . " certificates for renewal, scheduling jobs."); + + foreach ($certificates as $certificate) { + $queueForCertificate + ->setDomain(new Document([ + 'domain' => $certificate->getAttribute('domain') + ])) + ->trigger(); + } + } else { + Console::info("[{$time}] No certificates for renewal."); + } + } + + private function notifyDeleteCache($interval, Delete $queueForDeletes): void + { + + ($queueForDeletes) + ->setType(DELETE_TYPE_CACHE_BY_TIMESTAMP) + ->setDatetime(DateTime::addSeconds(new \DateTime(), -1 * $interval)) + ->trigger(); + } + + private function notifyDeleteSchedules($interval, Delete $queueForDeletes): void + { + + ($queueForDeletes) + ->setType(DELETE_TYPE_SCHEDULES) + ->setDatetime(DateTime::addSeconds(new \DateTime(), -1 * $interval)) + ->trigger(); + } } diff --git a/src/Appwrite/Platform/Workers/Builds.php b/src/Appwrite/Platform/Workers/Builds.php new file mode 100644 index 000000000..93c0bc758 --- /dev/null +++ b/src/Appwrite/Platform/Workers/Builds.php @@ -0,0 +1,290 @@ +desc('Builds worker') + ->inject('message') + ->inject('dbForProject') + ->inject('queueForEvents') + ->inject('queueForFunctions') + ->inject('queueForUsage') + ->callback(fn($message, Database $dbForProject, Event $queueForEvents, Func $queueForFunctions, Usage $queueForUsage) => $this->action($message, $dbForProject, $queueForEvents, $queueForFunctions, $queueForUsage)); + } + + /** + * @throws Exception|\Throwable + */ + public function action(Message $message, Database $dbForProject, Event $queueForEvents, Func $queueForFunctions, Usage $queueForUsage): void + { + $payload = $message->getPayload() ?? []; + + if (empty($payload)) { + throw new Exception('Missing payload'); + } + + $type = $payload['type'] ?? ''; + $project = new Document($payload['project'] ?? []); + $resource = new Document($payload['resource'] ?? []); + $deployment = new Document($payload['deployment'] ?? []); + + switch ($type) { + case BUILD_TYPE_DEPLOYMENT: + case BUILD_TYPE_RETRY: + Console::info('Creating build for deployment: ' . $deployment->getId()); + $this->buildDeployment( + dbForProject: $dbForProject, + queueForEvents: $queueForEvents, + queueForFunctions: $queueForFunctions, + queueForUsage: $queueForUsage, + deployment: $deployment, + project: $project, + function: $resource + ); + break; + + default: + throw new \Exception('Invalid build type'); + } + } + + /** + * @throws Authorization + * @throws \Throwable + * @throws Structure + */ + private function buildDeployment(Database $dbForProject, Event $queueForEvents, Func $queueForFunctions, Usage $queueForUsage, Document $deployment, Document $project, Document $function): void + { + $executor = new Executor(App::getEnv('_APP_EXECUTOR_HOST')); + $function = $dbForProject->getDocument('functions', $function->getId()); + + if ($function->isEmpty()) { + throw new Exception('Function not found', 404); + } + + $deployment = $dbForProject->getDocument('deployments', $deployment->getId()); + if ($deployment->isEmpty()) { + throw new Exception('Deployment not found', 404); + } + + $runtimes = Config::getParam('runtimes', []); + $key = $function->getAttribute('runtime'); + $runtime = $runtimes[$key] ?? null; + if (\is_null($runtime)) { + throw new Exception('Runtime "' . $function->getAttribute('runtime', '') . '" is not supported'); + } + + $connection = App::getEnv('_APP_CONNECTIONS_STORAGE', ''); + /** @TODO : move this to the registry or someplace else */ + $device = Storage::DEVICE_LOCAL; + try { + $dsn = new DSN($connection); + $device = $dsn->getScheme(); + } catch (\Exception $e) { + Console::error($e->getMessage() . 'Invalid DSN. Defaulting to Local device.'); + } + + $buildId = $deployment->getAttribute('buildId', ''); + $startTime = DateTime::now(); + + if (empty($buildId)) { + $buildId = ID::unique(); + $build = $dbForProject->createDocument('builds', new Document([ + '$id' => $buildId, + '$permissions' => [], + 'startTime' => $startTime, + 'deploymentId' => $deployment->getId(), + 'status' => 'processing', + 'path' => '', + 'size' => 0, + 'runtime' => $function->getAttribute('runtime'), + 'source' => $deployment->getAttribute('path'), + 'sourceType' => $device, + 'stdout' => '', + 'stderr' => '', + 'duration' => 0 + ])); + + $deployment->setAttribute('buildId', $buildId); + $deployment = $dbForProject->updateDocument('deployments', $deployment->getId(), $deployment); + } else { + $build = $dbForProject->getDocument('builds', $buildId); + } + + /** Request the executor to build the code... */ + $build->setAttribute('status', 'building'); + $build = $dbForProject->updateDocument('builds', $buildId, $build); + + /** Trigger Webhook */ + $deploymentUpdate = $queueForEvents + ->setQueue(Event::WEBHOOK_QUEUE_NAME) + ->setClass(Event::WEBHOOK_CLASS_NAME) + ->setProject($project) + ->setEvent('functions.[functionId].deployments.[deploymentId].update') + ->setParam('functionId', $function->getId()) + ->setParam('deploymentId', $deployment->getId()) + ->setPayload($deployment->getArrayCopy( + array_keys( + (new Deployment())->getRules() + ) + )); + + $deploymentUpdate->trigger(); + + /** Trigger Functions */ + $queueForFunctions + ->from($deploymentUpdate) + ->trigger(); + + + /** Trigger Realtime */ + $allEvents = Event::generateEvents('functions.[functionId].deployments.[deploymentId].update', [ + 'functionId' => $function->getId(), + 'deploymentId' => $deployment->getId() + ]); + $target = Realtime::fromPayload( + // Pass first, most verbose event pattern + event: $allEvents[0], + payload: $build, + project: $project + ); + + Realtime::send( + projectId: 'console', + payload: $build->getArrayCopy(), + events: $allEvents, + channels: $target['channels'], + roles: $target['roles'] + ); + + $source = $deployment->getAttribute('path'); + + $vars = array_reduce($function->getAttribute('vars', []), function (array $carry, Document $var) { + $carry[$var->getAttribute('key')] = $var->getAttribute('value'); + return $carry; + }, []); + + try { + $response = $executor->createRuntime( + deploymentId: $deployment->getId(), + projectId: $project->getId(), + source: $source, + image: $runtime['image'], + remove: true, + entrypoint: $deployment->getAttribute('entrypoint'), + workdir: '/usr/code', + destination: APP_STORAGE_BUILDS . "/app-{$project->getId()}", + variables: $vars, + commands: [ + 'sh', '-c', + 'tar -zxf /tmp/code.tar.gz -C /usr/code && \ + cd /usr/local/src/ && ./build.sh' + ] + ); + + /** Update the build document */ + $build->setAttribute('startTime', DateTime::format((new \DateTime())->setTimestamp($response['startTime']))); + $build->setAttribute('duration', \intval($response['duration'])); + $build->setAttribute('status', $response['status']); + $build->setAttribute('path', $response['path']); + $build->setAttribute('size', $response['size']); + $build->setAttribute('stderr', $response['stderr']); + $build->setAttribute('stdout', $response['stdout']); + + /* Also update the deployment buildTime */ + $deployment->setAttribute('buildTime', $response['duration']); + + Console::success("Build id: $buildId created"); + + $function->setAttribute('scheduleUpdatedAt', DateTime::now()); + + /** Set auto deploy */ + if ($deployment->getAttribute('activate') === true) { + $function->setAttribute('deployment', $deployment->getId()); + $function = $dbForProject->updateDocument('functions', $function->getId(), $function); + } + + /** Update function schedule */ + $dbForConsole = getConsoleDB(); + $schedule = $dbForConsole->getDocument('schedules', $function->getAttribute('scheduleId')); + $schedule->setAttribute('resourceUpdatedAt', $function->getAttribute('scheduleUpdatedAt')); + + $schedule + ->setAttribute('schedule', $function->getAttribute('schedule')) + ->setAttribute('active', !empty($function->getAttribute('schedule')) && !empty($function->getAttribute('deployment'))); + + + \Utopia\Database\Validator\Authorization::skip(fn() => $dbForConsole->updateDocument('schedules', $schedule->getId(), $schedule)); + } catch (\Throwable $th) { + $endTime = DateTime::now(); + $interval = (new \DateTime($endTime))->diff(new \DateTime($startTime)); + $build->setAttribute('duration', $interval->format('%s') + 0); + $build->setAttribute('status', 'failed'); + $build->setAttribute('stderr', $th->getMessage()); + Console::error($th->getMessage()); + } finally { + $build = $dbForProject->updateDocument('builds', $buildId, $build); + + /** + * Send realtime Event + */ + $target = Realtime::fromPayload( + // Pass first, most verbose event pattern + event: $allEvents[0], + payload: $build, + project: $project + ); + Realtime::send( + projectId: 'console', + payload: $build->getArrayCopy(), + events: $allEvents, + channels: $target['channels'], + roles: $target['roles'] + ); + + /** Trigger usage queue */ + $queueForUsage + ->setProject($project) + ->addMetric(METRIC_BUILDS, 1) // per project + ->addMetric(METRIC_BUILDS_STORAGE, $build->getAttribute('size', 0)) + ->addMetric(METRIC_BUILDS_COMPUTE, (int)$build->getAttribute('duration', 0) * 1000) + ->addMetric(str_replace('{functionInternalId}', $function->getInternalId(), METRIC_FUNCTION_ID_BUILDS), 1) // per function + ->addMetric(str_replace('{functionInternalId}', $function->getInternalId(), METRIC_FUNCTION_ID_BUILDS_STORAGE), $build->getAttribute('size', 0)) + ->addMetric(str_replace('{functionInternalId}', $function->getInternalId(), METRIC_FUNCTION_ID_BUILDS_COMPUTE), (int)$build->getAttribute('duration', 0) * 1000) + ->trigger(); + } + } +} diff --git a/src/Appwrite/Platform/Workers/Deletes.php b/src/Appwrite/Platform/Workers/Deletes.php new file mode 100644 index 000000000..bf53c396c --- /dev/null +++ b/src/Appwrite/Platform/Workers/Deletes.php @@ -0,0 +1,851 @@ +desc('Deletes worker') + ->inject('message') + ->inject('dbForConsole') + ->inject('getProjectDB') + ->inject('getFilesDevice') + ->inject('getFunctionsDevice') + ->inject('getBuildsDevice') + ->inject('getCacheDevice') + ->callback(fn($message, $dbForConsole, $getProjectDB, $getFilesDevice, $getFunctionsDevice, $getBuildsDevice, $getCacheDevice) => $this->action($message, $dbForConsole, $getProjectDB, $getFilesDevice, $getFunctionsDevice, $getBuildsDevice, $getCacheDevice)); + } + + /** + * @throws Exception + * @throws \Throwable + */ + public function action(Message $message, Database $dbForConsole, callable $getProjectDB, callable $getFilesDevice, callable $getFunctionsDevice, callable $getBuildsDevice, callable $getCacheDevice): void + { + $payload = $message->getPayload() ?? []; + + if (empty($payload)) { + throw new Exception('Missing payload'); + } + + $type = $payload['type'] ?? ''; + $datetime = $payload['datetime'] ?? null; + $hourlyUsageRetentionDatetime = $payload['hourlyUsageRetentionDatetime'] ?? null; + $resource = $payload['resource'] ?? null; + $document = new Document($payload['document'] ?? []); + $project = new Document($payload['project'] ?? []); + + switch (strval($type)) { + case DELETE_TYPE_DOCUMENT: + switch ($document->getCollection()) { + case DELETE_TYPE_DATABASES: + $this->deleteDatabase($getProjectDB, $document, $project); + break; + case DELETE_TYPE_COLLECTIONS: + $this->deleteCollection($getProjectDB, $document, $project); + break; + case DELETE_TYPE_PROJECTS: + $this->deleteProject($dbForConsole, $getProjectDB, $getFilesDevice, $getFunctionsDevice, $getBuildsDevice, $getCacheDevice, $document); + break; + case DELETE_TYPE_FUNCTIONS: + $this->deleteFunction($getProjectDB, $getFunctionsDevice, $getBuildsDevice, $document, $project); + break; + case DELETE_TYPE_DEPLOYMENTS: + $this->deleteDeployment($getProjectDB, $getFunctionsDevice, $getBuildsDevice, $document, $project); + break; + case DELETE_TYPE_USERS: + $this->deleteUser($getProjectDB, $document, $project); + break; + case DELETE_TYPE_TEAMS: + $this->deleteMemberships($getProjectDB, $document, $project); + break; + case DELETE_TYPE_BUCKETS: + $this->deleteBucket($getProjectDB, $getFilesDevice, $document, $project); + break; + default: + if (\str_starts_with($document->getCollection(), 'database_')) { + $this->deleteCollection($getProjectDB, $document, $project); + break; + } + Console::error('No lazy delete operation available for document of type: ' . $document->getCollection()); + break; + } + break; + + case DELETE_TYPE_EXECUTIONS: + $this->deleteExecutionLogs($dbForConsole, $getProjectDB, $datetime); + break; + + case DELETE_TYPE_AUDIT: + if (!empty($datetime)) { + $this->deleteAuditLogs($dbForConsole, $getProjectDB, $datetime); + } + + if (!$document->isEmpty()) { + $this->deleteAuditLogsByResource($getProjectDB, 'document/' . $document->getId(), $project); + } + + break; + + case DELETE_TYPE_ABUSE: + $this->deleteAbuseLogs($dbForConsole, $getProjectDB, $datetime); + break; + + case DELETE_TYPE_REALTIME: + $this->deleteRealtimeUsage($dbForConsole, $getProjectDB, $datetime); + break; + + case DELETE_TYPE_SESSIONS: + $this->deleteExpiredSessions($dbForConsole, $getProjectDB); + break; + + case DELETE_TYPE_CERTIFICATES: + $this->deleteCertificates($dbForConsole, $document); + break; + + case DELETE_TYPE_USAGE: + $this->deleteUsageStats($dbForConsole, $getProjectDB, $hourlyUsageRetentionDatetime); + break; + + case DELETE_TYPE_CACHE_BY_RESOURCE: + $this->deleteCacheByResource($getProjectDB, $resource); + break; + case DELETE_TYPE_CACHE_BY_TIMESTAMP: + $this->deleteCacheByDate($getProjectDB); + break; + case DELETE_TYPE_SCHEDULES: + $this->deleteSchedules($dbForConsole, $getProjectDB, $datetime); + break; + default: + Console::error('No delete operation for type: ' . $type); + break; + } + } + + /** + * @param Database $dbForConsole + * @param callable $getProjectDB + * @param string $datetime + * @return void + * @throws Authorization + * @throws \Throwable + */ + protected function deleteSchedules(Database $dbForConsole, callable $getProjectDB, string $datetime): void + { + $this->listByGroup( + 'schedules', + [ + Query::equal('region', [App::getEnv('_APP_REGION', 'default')]), + Query::equal('resourceType', ['function']), + Query::lessThanEqual('resourceUpdatedAt', $datetime), + Query::equal('active', [false]), + ], + $dbForConsole, + function (Document $document) use ($dbForConsole, $getProjectDB) { + $project = $dbForConsole->getDocument('projects', $document->getAttribute('projectId')); + + if ($project->isEmpty()) { + Console::warning('Unable to delete schedule for function ' . $document->getAttribute('resourceId')); + return; + } + + $function = $getProjectDB($project)->getDocument('functions', $document->getAttribute('resourceId')); + + if ($function->isEmpty()) { + $dbForConsole->deleteDocument('schedules', $document->getId()); + Console::success('Deleting schedule for function ' . $document->getAttribute('resourceId')); + } + } + ); + } + + /** + * @param callable $getProjectDB + * @param string $resource + * @throws Exception + */ + protected function deleteCacheByResource(callable $getProjectDB, string $resource): void + { + $this->deleteCacheFiles($getProjectDB, [ + Query::equal('resource', [$resource]), + ]); + } + + /** + * @throws Exception + */ + protected function deleteCacheByDate(callable $getProjectDB): void + { + $this->deleteCacheFiles($getProjectDB, [ + Query::lessThan('accessedAt', $this->args['datetime']), + ]); + } + + /** + * @param Database $dbForConsole + * @param callable $getProjectDB + * @param array $query + * @return void + * @throws Exception + */ + protected function deleteCacheFiles(Database $dbForConsole, callable $getProjectDB, array $query): void + { + $this->deleteForProjectIds($dbForConsole, function (Document $project) use ($query, $getProjectDB) { + $projectId = $project->getId(); + $dbForProject = $getProjectDB($project); + $cache = new Cache( + new Filesystem(APP_STORAGE_CACHE . DIRECTORY_SEPARATOR . 'app-' . $projectId) + ); + + $this->deleteByGroup( + 'cache', + $query, + $dbForProject, + function (Document $document) use ($cache, $projectId) { + $path = APP_STORAGE_CACHE . DIRECTORY_SEPARATOR . 'app-' . $projectId . DIRECTORY_SEPARATOR . $document->getId(); + + if ($cache->purge($document->getId())) { + Console::success('Deleting cache file: ' . $path); + } else { + Console::error('Failed to delete cache file: ' . $path); + } + } + ); + }); + } + + /** + * @param callable $getProjectDB + * @param Document $document database document + * @param Document $project + * @throws Exception + */ + protected function deleteDatabase(callable $getProjectDB, Document $document, Document $project): void + { + $databaseId = $document->getId(); + $dbForProject = $getProjectDB($project); + + $this->deleteByGroup('database_' . $document->getInternalId(), [], $dbForProject, function ($document) use ($getProjectDB, $project) { + $this->deleteCollection($getProjectDB, $document, $project); + }); + + $dbForProject->deleteCollection('database_' . $document->getInternalId()); + $this->deleteAuditLogsByResource($getProjectDB, 'database/' . $databaseId, $project); + } + + /** + * @param callable $getProjectDB + * @param Document $document teams document + * @param Document $project + * @throws Exception + */ + protected function deleteCollection(callable $getProjectDB, Document $document, Document $project): void + { + $collectionId = $document->getId(); + $databaseId = $document->getAttribute('databaseId'); + $databaseInternalId = $document->getAttribute('databaseInternalId'); + $dbForProject = $getProjectDB($project); + + $dbForProject->deleteCollection('database_' . $databaseInternalId . '_collection_' . $document->getInternalId()); + $this->deleteByGroup('attributes', [ + Query::equal('databaseId', [$databaseId]), + Query::equal('collectionId', [$collectionId]) + ], $dbForProject); + + $this->deleteByGroup('indexes', [ + Query::equal('databaseId', [$databaseId]), + Query::equal('collectionId', [$collectionId]) + ], $dbForProject); + + $this->deleteAuditLogsByResource($getProjectDB, 'database/' . $databaseId . '/collection/' . $collectionId, $project); + } + + /** + * @param Database $dbForConsole + * @param callable $getProjectDB + * @param string $hourlyUsageRetentionDatetime + * @throws Exception + */ + protected function deleteUsageStats(Database $dbForConsole, callable $getProjectDB, string $hourlyUsageRetentionDatetime) + { + $this->deleteForProjectIds($dbForConsole, function (Document $project) use ($getProjectDB, $hourlyUsageRetentionDatetime) { + $dbForProject = $getProjectDB($project); + // Delete Usage stats + $this->deleteByGroup('stats', [ + Query::lessThan('time', $hourlyUsageRetentionDatetime), + Query::equal('period', ['1h']), + ], $dbForProject); + }); + } + + /** + * @param callable $getProjectDB + * @param Document $document teams document + * @param Document $project + * @throws Exception + */ + protected function deleteMemberships(callable $getProjectDB, Document $document, Document $project): void + { + $teamId = $document->getAttribute('teamId', ''); + + // Delete Memberships + $this->deleteByGroup('memberships', [ + Query::equal('teamId', [$teamId]) + ], $getProjectDB($project)); + } + + /** + * @param Database $dbForConsole + * @param callable $getProjectDB + * @param callable $getFilesDevice + * @param callable $getFunctionsDevice + * @param callable $getBuildsDevice + * @param callable $getCacheDevice + * @param Document $document project document + * @throws Authorization + */ + protected function deleteProject(Database $dbForConsole, callable $getProjectDB, callable $getFilesDevice, callable $getFunctionsDevice, callable $getBuildsDevice, callable $getCacheDevice, Document $document): void + { + $projectId = $document->getId(); + + // Delete project domains and certificates + $domains = $dbForConsole->find('domains', [ + Query::equal('projectInternalId', [$document->getInternalId()]) + ]); + + foreach ($domains as $domain) { + $this->deleteCertificates($dbForConsole, $domain); + } + + // Delete project tables + $dbForProject = $getProjectDB($projectId, $document); + + while (true) { + $collections = $dbForProject->listCollections(); + + if (empty($collections)) { + break; + } + + foreach ($collections as $collection) { + $dbForProject->deleteCollection($collection->getId()); + } + } + + // Delete metadata tables + try { + $dbForProject->deleteCollection('_metadata'); + } catch (Exception) { + // Ignore: deleteCollection tries to delete a metadata entry after the collection is deleted, + // which will throw an exception here because the metadata collection is already deleted. + } + + // Delete all storage directories + $uploads = $getFilesDevice($projectId); + $functions = $getFunctionsDevice($projectId); + $builds = $getBuildsDevice($projectId); + $cache = $getCacheDevice($projectId); + + $uploads->delete($uploads->getRoot(), true); + $functions->delete($functions->getRoot(), true); + $builds->delete($builds->getRoot(), true); + $cache->delete($cache->getRoot(), true); + } + + /** + * @param callable $getProjectDB + * @param Document $document user document + * @param Document $project + * @throws Exception + */ + protected function deleteUser(callable $getProjectDB, Document $document, Document $project): void + { + $userId = $document->getId(); + $dbForProject = $getProjectDB($project); + + // Delete all sessions of this user from the sessions table and update the sessions field of the user record + $this->deleteByGroup('sessions', [ + Query::equal('userId', [$userId]) + ], $dbForProject); + + $dbForProject->deleteCachedDocument('users', $userId); + + // Delete Memberships and decrement team membership counts + $this->deleteByGroup('memberships', [ + Query::equal('userId', [$userId]) + ], $dbForProject, function (Document $document) use ($dbForProject) { + if ($document->getAttribute('confirm')) { // Count only confirmed members + $teamId = $document->getAttribute('teamId'); + $team = $dbForProject->getDocument('teams', $teamId); + if (!$team->isEmpty()) { + $team = $dbForProject->updateDocument( + 'teams', + $teamId, + // Ensure that total >= 0 + $team->setAttribute('total', \max($team->getAttribute('total', 0) - 1, 0)) + ); + } + } + }); + + // Delete tokens + $this->deleteByGroup('tokens', [ + Query::equal('userId', [$userId]) + ], $dbForProject); + } + + /** + * @param database $dbForConsole + * @param callable $getProjectDB + * @param string $datetime + * @throws Exception + */ + protected function deleteExecutionLogs(database $dbForConsole, callable $getProjectDB, string $datetime): void + { + $this->deleteForProjectIds($dbForConsole, function (Document $project) use ($getProjectDB, $datetime) { + $dbForProject = $getProjectDB($project); + // Delete Executions + $this->deleteByGroup('executions', [ + Query::lessThan('$createdAt', $datetime) + ], $dbForProject); + }); + } + + /** + * @param Database $dbForConsole + * @param callable $getProjectDB + * @return void + * @throws Exception|\Throwable + */ + protected function deleteExpiredSessions(Database $dbForConsole, callable $getProjectDB): void + { + + $this->deleteForProjectIds($dbForConsole, function (Document $project) use ($dbForConsole, $getProjectDB) { + $dbForProject = $getProjectDB($project); + $project = $dbForConsole->getDocument('projects', $project->getId()); + $duration = $project->getAttribute('auths', [])['duration'] ?? Auth::TOKEN_EXPIRATION_LOGIN_LONG; + $expired = DateTime::addSeconds(new \DateTime(), -1 * $duration); + + // Delete Sessions + $this->deleteByGroup('sessions', [ + Query::lessThan('$createdAt', $expired) + ], $dbForProject); + }); + } + + /** + * @param Database $dbForConsole + * @param callable $getProjectDB + * @param string $datetime + * @throws Exception + */ + protected function deleteRealtimeUsage(Database $dbForConsole, callable $getProjectDB, string $datetime): void + { + $this->deleteForProjectIds($dbForConsole, function (Document $project) use ($datetime, $getProjectDB) { + $dbForProject = $getProjectDB($project); + // Delete Dead Realtime Logs + $this->deleteByGroup('realtime', [ + Query::lessThan('timestamp', $datetime) + ], $dbForProject); + }); + } + + /** + * @param Database $dbForConsole + * @param callable $getProjectDB + * @param string $datetime + * @throws Exception + */ + protected function deleteAbuseLogs(Database $dbForConsole, callable $getProjectDB, string $datetime): void + { + if (empty($datetime)) { + throw new Exception('Failed to delete audit logs. No datetime provided'); + } + + $this->deleteForProjectIds($dbForConsole, function (Document $project) use ($getProjectDB, $datetime) { + $projectId = $project->getId(); + $dbForProject = $getProjectDB($project); + $timeLimit = new TimeLimit("", 0, 1, $dbForProject); + $abuse = new Abuse($timeLimit); + $status = $abuse->cleanup($datetime); + if (!$status) { + throw new Exception('Failed to delete Abuse logs for project ' . $projectId); + } + }); + } + + /** + * @param Database $dbForConsole + * @param callable $getProjectDB + * @param string $datetime + * @throws Exception + */ + protected function deleteAuditLogs(Database $dbForConsole, callable $getProjectDB, string $datetime): void + { + if (empty($datetime)) { + throw new Exception('Failed to delete audit logs. No datetime provided'); + } + + $this->deleteForProjectIds($dbForConsole, function (Document $project) use ($getProjectDB, $datetime) { + $projectId = $project->getId(); + $dbForProject = $getProjectDB($project); + $audit = new Audit($dbForProject); + $status = $audit->cleanup($datetime); + if (!$status) { + throw new Exception('Failed to delete Audit logs for project' . $projectId); + } + }); + } + + /** + * @param callable $getProjectDB + * @param string $resource + * @param Document $project + * @throws Exception + */ + protected function deleteAuditLogsByResource(callable $getProjectDB, string $resource, Document $project): void + { + $dbForProject = $getProjectDB($project); + + $this->deleteByGroup(Audit::COLLECTION, [ + Query::equal('resource', [$resource]) + ], $dbForProject); + } + + /** + * @param callable $getProjectDB + * @param callable $getFunctionsDevice + * @param callable $getBuildsDevice + * @param Document $document function document + * @param Document $project + * @throws Exception + */ + protected function deleteFunction(callable $getProjectDB, callable $getFunctionsDevice, callable $getBuildsDevice, Document $document, Document $project): void + { + $projectId = $project->getId(); + $dbForProject = $getProjectDB($project); + $functionId = $document->getId(); + + /** + * Delete Variables + */ + Console::info("Deleting variables for function " . $functionId); + $this->deleteByGroup('variables', [ + Query::equal('functionId', [$functionId]) + ], $dbForProject); + + /** + * Delete Deployments + */ + Console::info("Deleting deployments for function " . $functionId); + $storageFunctions = $getFunctionsDevice($projectId); + $deploymentIds = []; + $this->deleteByGroup('deployments', [ + Query::equal('resourceId', [$functionId]) + ], $dbForProject, function (Document $document) use ($storageFunctions, &$deploymentIds) { + $deploymentIds[] = $document->getId(); + if ($storageFunctions->delete($document->getAttribute('path', ''), true)) { + Console::success('Deleted deployment files: ' . $document->getAttribute('path', '')); + } else { + Console::error('Failed to delete deployment files: ' . $document->getAttribute('path', '')); + } + }); + + /** + * Delete builds + */ + Console::info("Deleting builds for function " . $functionId); + $storageBuilds = $getBuildsDevice($projectId); + foreach ($deploymentIds as $deploymentId) { + $this->deleteByGroup('builds', [ + Query::equal('deploymentId', [$deploymentId]) + ], $dbForProject, function (Document $document) use ($storageBuilds, $deploymentId) { + if ($storageBuilds->delete($document->getAttribute('outputPath', ''), true)) { + Console::success('Deleted build files: ' . $document->getAttribute('outputPath', '')); + } else { + Console::error('Failed to delete build files: ' . $document->getAttribute('outputPath', '')); + } + }); + } + + /** + * Delete Executions + */ + Console::info("Deleting executions for function " . $functionId); + $this->deleteByGroup('executions', [ + Query::equal('functionId', [$functionId]) + ], $dbForProject); + + // TODO: Request executor to delete runtime + } + + /** + * @param callable $getProjectDB + * @param callable $getFunctionsDevice + * @param callable $getBuildsDevice + * @param Document $document deployment document + * @param Document $project + * @throws Exception + */ + protected function deleteDeployment(callable $getProjectDB, callable $getFunctionsDevice, callable $getBuildsDevice, Document $document, Document $project): void + { + $projectId = $project->getId(); + $dbForProject = $getProjectDB($project); + $deploymentId = $document->getId(); + $functionId = $document->getAttribute('resourceId'); + + /** + * Delete deployment files + */ + Console::info("Deleting deployment files for deployment " . $deploymentId); + $storageFunctions = $getFunctionsDevice($projectId); + if ($storageFunctions->delete($document->getAttribute('path', ''), true)) { + Console::success('Deleted deployment files: ' . $document->getAttribute('path', '')); + } else { + Console::error('Failed to delete deployment files: ' . $document->getAttribute('path', '')); + } + + /** + * Delete builds + */ + Console::info("Deleting builds for deployment " . $deploymentId); + $storageBuilds = $getBuildsDevice($projectId); + $this->deleteByGroup('builds', [ + Query::equal('deploymentId', [$deploymentId]) + ], $dbForProject, function (Document $document) use ($storageBuilds) { + if ($storageBuilds->delete($document->getAttribute('outputPath', ''), true)) { + Console::success('Deleted build files: ' . $document->getAttribute('outputPath', '')); + } else { + Console::error('Failed to delete build files: ' . $document->getAttribute('outputPath', '')); + } + }); + + // TODO: Request executor to delete runtime + } + + + /** + * @param Document $document to be deleted + * @param Database $database to delete it from + * @param callable|null $callback to perform after document is deleted + * @return bool + * @throws Authorization + */ + protected function deleteById(Document $document, Database $database, callable $callback = null): bool + { + if ($database->deleteDocument($document->getCollection(), $document->getId())) { + Console::success('Deleted document "' . $document->getId() . '" successfully'); + + if (is_callable($callback)) { + $callback($document); + } + + return true; + } else { + Console::error('Failed to delete document: ' . $document->getId()); + return false; + } + } + + /** + * @param Database $dbForConsole + * @param callable $callback + * @throws Exception + */ + protected function deleteForProjectIds(database $dbForConsole, callable $callback): void + { + // TODO: @Meldiron name of this method no longer matches. It does not delete, and it gives whole document + $count = 0; + $chunk = 0; + $limit = 50; + $projects = []; + $sum = $limit; + + $executionStart = \microtime(true); + + while ($sum === $limit) { + $projects = $dbForConsole->find('projects', [Query::limit($limit), Query::offset($chunk * $limit)]); + + $chunk++; + + /** @var string[] $projectIds */ + $sum = count($projects); + + Console::info('Executing delete function for chunk #' . $chunk . '. Found ' . $sum . ' projects'); + foreach ($projects as $project) { + $callback($project); + $count++; + } + } + + $executionEnd = \microtime(true); + Console::info("Found {$count} projects " . ($executionEnd - $executionStart) . " seconds"); + } + + /** + * @param string $collection collectionID + * @param array $queries + * @param Database $database + * @param callable|null $callback + * @throws Exception + */ + protected function deleteByGroup(string $collection, array $queries, Database $database, callable $callback = null): void + { + $count = 0; + $chunk = 0; + $limit = 50; + $results = []; + $sum = $limit; + + $executionStart = \microtime(true); + + while ($sum === $limit) { + $chunk++; + + $results = $database->find($collection, \array_merge([Query::limit($limit)], $queries)); + + $sum = count($results); + + Console::info('Deleting chunk #' . $chunk . '. Found ' . $sum . ' documents'); + + foreach ($results as $document) { + $this->deleteById($document, $database, $callback); + $count++; + } + } + + $executionEnd = \microtime(true); + + Console::info("Deleted {$count} document by group in " . ($executionEnd - $executionStart) . " seconds"); + } + + /** + * @param string $collection collectionID + * @param Query[] $queries + * @param Database $database + * @param callable|null $callback + * @throws Exception + */ + protected function listByGroup(string $collection, array $queries, Database $database, callable $callback = null): void + { + $count = 0; + $chunk = 0; + $limit = 50; + $results = []; + $sum = $limit; + + $executionStart = \microtime(true); + + while ($sum === $limit) { + $chunk++; + + $results = $database->find($collection, \array_merge([Query::limit($limit)], $queries)); + + $sum = count($results); + + foreach ($results as $document) { + if (is_callable($callback)) { + $callback($document); + } + + $count++; + } + } + + $executionEnd = \microtime(true); + + Console::info("Listed {$count} document by group in " . ($executionEnd - $executionStart) . " seconds"); + } + + /** + * @param Database $dbForConsole + * @param Document $document certificates document + * @throws Authorization + */ + protected function deleteCertificates(Database $dbForConsole, Document $document): void + { + // If domain has certificate generated + if (isset($document['certificateId'])) { + $domainUsingCertificate = $dbForConsole->findOne('domains', [ + Query::equal('certificateId', [$document['certificateId']]) + ]); + + if (!$domainUsingCertificate) { + $mainDomain = App::getEnv('_APP_DOMAIN_TARGET', ''); + if ($mainDomain === $document->getAttribute('domain')) { + $domainUsingCertificate = $mainDomain; + } + } + + // If certificate is still used by some domain, mark we can't delete. + // Current domain should not be found, because we only have copy. Original domain is already deleted from database. + if ($domainUsingCertificate) { + Console::warning("Skipping certificate deletion, because a domain is still using it."); + return; + } + } + + $domain = $document->getAttribute('domain'); + $directory = APP_STORAGE_CERTIFICATES . '/' . $domain; + $checkTraversal = realpath($directory) === $directory; + + if ($domain && $checkTraversal && is_dir($directory)) { + // Delete certificate document, so Appwrite is aware of change + if (isset($document['certificateId'])) { + $dbForConsole->deleteDocument('certificates', $document['certificateId']); + } + + // Delete files, so Traefik is aware of change + array_map('unlink', glob($directory . '/*.*')); + rmdir($directory); + Console::info("Deleted certificate files for {$domain}"); + } else { + Console::info("No certificate files found for {$domain}"); + } + } + + /** + * @param callable $getProjectDB + * @param callable $getFilesDevice + * @param Document $document + * @param Document $project + * @return void + */ + protected function deleteBucket(callable $getProjectDB, callable $getFilesDevice, Document $document, Document $project): void + { + $projectId = $project->getId(); + $dbForProject = $getProjectDB($project); + + $dbForProject->deleteCollection('bucket_' . $document->getInternalId()); + + $device = $getFilesDevice($projectId); + + $device->deletePath($document->getId()); + } +} diff --git a/src/Appwrite/Platform/Workers/Functions.php b/src/Appwrite/Platform/Workers/Functions.php new file mode 100644 index 000000000..995356052 --- /dev/null +++ b/src/Appwrite/Platform/Workers/Functions.php @@ -0,0 +1,350 @@ +desc('Functions worker') + ->inject('message') + ->inject('dbForProject') + ->inject('queueForFunctions') + ->inject('queueForUsage') + ->callback(fn($message, $dbForProject, $queueForFunctions, $queueForUsage) => $this->action($message, $dbForProject, $queueForFunctions, $queueForUsage)); + } + + /** + * @throws Exception|\Throwable + */ + public function action(Message $message, Database $dbForProject, Func $queueForFunctions, Usage $queueForUsage): void + { + $payload = $message->getPayload() ?? []; + + if (empty($payload)) { + throw new Exception('Missing payload'); + } + + $payload = $message->getPayload() ?? []; + + if (empty($payload)) { + throw new Exception('Missing payload'); + } + + $type = $payload['type'] ?? ''; + $events = $payload['events'] ?? []; + $data = $payload['data'] ?? ''; + $eventData = $payload['payload'] ?? ''; + $project = new Document($payload['project'] ?? []); + $function = new Document($payload['function'] ?? []); + $user = new Document($payload['user'] ?? []); + + if ($project->getId() === 'console') { + return; + } + + if (!empty($events)) { + $limit = 30; + $sum = 30; + $offset = 0; + $functions = []; + /** @var Document[] $functions */ + while ($sum >= $limit) { + $functions = $dbForProject->find('functions', [ + Query::limit($limit), + Query::offset($offset), + Query::orderAsc('name'), + ]); + + $sum = \count($functions); + $offset = $offset + $limit; + + Console::log('Fetched ' . $sum . ' functions...'); + + foreach ($functions as $function) { + if (!array_intersect($events, $function->getAttribute('events', []))) { + continue; + } + Console::success('Iterating function: ' . $function->getAttribute('name')); + + $this->execute( + dbForProject: $dbForProject, + queueForFunctions: $queueForFunctions, + queueForUsage: $queueForUsage, + project: $project, + function: $function, + trigger: 'event', + user: $user, + event: $events[0], + eventData: \is_string($eventData) ? $eventData : \json_encode($eventData), + ); + Console::success('Triggered function: ' . $events[0]); + } + } + return; + } + + /** + * Handle Schedule and HTTP execution. + */ + switch ($type) { + case 'http': + $jwt = $payload['jwt'] ?? ''; + $execution = new Document($payload['execution'] ?? []); + $user = new Document($payload['user'] ?? []); + $this->execute( + dbForProject: $dbForProject, + queueForFunctions: $queueForFunctions, + queueForUsage: $queueForUsage, + project: $project, + function: $function, + trigger: 'http', + data: $data, + user: $user, + jwt: $jwt, + executionId: $execution->getId(), + ); + break; + case 'schedule': + $this->execute( + dbForProject: $dbForProject, + queueForFunctions: $queueForFunctions, + queueForUsage: $queueForUsage, + project: $project, + function: $function, + trigger: 'schedule', + ); + break; + } + } + + /** + * @throws Authorization + * @throws \Throwable + * @throws Structure + */ + private function execute( + Database $dbForProject, + Func $queueForFunctions, + Usage $queueForUsage, + Document $project, + Document $function, + string $trigger, + string $data = null, + ?Document $user = null, + string $jwt = null, + string $event = null, + string $eventData = null, + string $executionId = null, + ): void { + $user ??= new Document(); + $functionId = $function->getId(); + $functionInternalId = $function->getInternalId(); + $deploymentId = $function->getAttribute('deployment', ''); + + /** Check if deployment exists */ + $deployment = $dbForProject->getDocument('deployments', $deploymentId); + $deploymentInternalId = $deployment->getInternalId(); + + if ($deployment->getAttribute('resourceId') !== $functionId) { + throw new Exception('Deployment not found. Create deployment before trying to execute a function'); + } + + if ($deployment->isEmpty()) { + throw new Exception('Deployment not found. Create deployment before trying to execute a function'); + } + + /** Check if build has exists */ + $build = $dbForProject->getDocument('builds', $deployment->getAttribute('buildId', '')); + if ($build->isEmpty()) { + throw new Exception('Build not found'); + } + + if ($build->getAttribute('status') !== 'ready') { + throw new Exception('Build not ready'); + } + + /** Check if runtime is supported */ + $runtimes = Config::getParam('runtimes', []); + + if (!\array_key_exists($function->getAttribute('runtime'), $runtimes)) { + throw new Exception('Runtime "' . $function->getAttribute('runtime', '') . '" is not supported'); + } + + $runtime = $runtimes[$function->getAttribute('runtime')]; + + /** Create execution or update execution status */ + $execution = $dbForProject->getDocument('executions', $executionId ?? ''); + if ($execution->isEmpty()) { + $executionId = ID::unique(); + $execution = $dbForProject->createDocument('executions', new Document([ + '$id' => $executionId, + '$permissions' => $user->isEmpty() ? [] : [Permission::read(Role::user($user->getId()))], + 'functionId' => $functionId, + 'functionInternalId' => $functionInternalId, + 'deploymentInternalId' => $deploymentInternalId, + 'deploymentId' => $deploymentId, + 'trigger' => $trigger, + 'status' => 'waiting', + 'statusCode' => 0, + 'response' => '', + 'stderr' => '', + 'duration' => 0.0, + 'search' => implode(' ', [$function->getId(), $executionId]), + ])); + + // TODO: @Meldiron Trigger executions.create event here + + if ($execution->isEmpty()) { + throw new Exception('Failed to create or read execution'); + } + + /*** Usage */ + $queueForUsage + ->addMetric(METRIC_EXECUTIONS, 1) // per project + ->addMetric(str_replace('{functionInternalId}', $function->getInternalId(), METRIC_FUNCTION_ID_EXECUTIONS), 1); // per function + } + + $execution->setAttribute('status', 'processing'); + $execution = $dbForProject->updateDocument('executions', $executionId, $execution); + + $vars = array_reduce($function->getAttribute('vars', []), function (array $carry, Document $var) { + $carry[$var->getAttribute('key')] = $var->getAttribute('value'); + return $carry; + }, []); + + /** Collect environment variables */ + $vars = \array_merge($vars, [ + 'APPWRITE_FUNCTION_ID' => $functionId, + 'APPWRITE_FUNCTION_NAME' => $function->getAttribute('name'), + 'APPWRITE_FUNCTION_DEPLOYMENT' => $deploymentId, + 'APPWRITE_FUNCTION_TRIGGER' => $trigger, + 'APPWRITE_FUNCTION_PROJECT_ID' => $project->getId(), + 'APPWRITE_FUNCTION_RUNTIME_NAME' => $runtime['name'] ?? '', + 'APPWRITE_FUNCTION_RUNTIME_VERSION' => $runtime['version'] ?? '', + 'APPWRITE_FUNCTION_EVENT' => $event ?? '', + 'APPWRITE_FUNCTION_EVENT_DATA' => $eventData ?? '', + 'APPWRITE_FUNCTION_DATA' => $data ?? '', + 'APPWRITE_FUNCTION_USER_ID' => $user->getId() ?? '', + 'APPWRITE_FUNCTION_JWT' => $jwt ?? '', + ]); + + /** Execute function */ + try { + $client = new Executor(App::getEnv('_APP_EXECUTOR_HOST')); + $executionResponse = $client->createExecution( + projectId: $project->getId(), + deploymentId: $deploymentId, + payload: $vars['APPWRITE_FUNCTION_DATA'] ?? '', + variables: $vars, + timeout: $function->getAttribute('timeout', 0), + image: $runtime['image'], + source: $build->getAttribute('outputPath', ''), + entrypoint: $deployment->getAttribute('entrypoint', ''), + ); + + /** Update execution status */ + $execution + ->setAttribute('status', $executionResponse['status']) + ->setAttribute('statusCode', $executionResponse['statusCode']) + ->setAttribute('response', $executionResponse['response']) + ->setAttribute('stdout', $executionResponse['stdout']) + ->setAttribute('stderr', $executionResponse['stderr']) + ->setAttribute('duration', $executionResponse['duration']); + } catch (\Throwable $th) { + $interval = (new \DateTime())->diff(new \DateTime($execution->getCreatedAt())); + $execution + ->setAttribute('duration', (float)$interval->format('%s.%f')) + ->setAttribute('status', 'failed') + ->setAttribute('statusCode', $th->getCode()) + ->setAttribute('stderr', $th->getMessage()); + + Console::error($th->getTraceAsString()); + Console::error($th->getFile()); + Console::error($th->getLine()); + Console::error($th->getMessage()); + } + + $execution = $dbForProject->updateDocument('executions', $executionId, $execution); + + /** Trigger Webhook */ + $executionModel = new Execution(); + $executionUpdate = new Event(Event::WEBHOOK_QUEUE_NAME, Event::WEBHOOK_CLASS_NAME); + $executionUpdate + ->setProject($project) + ->setUser($user) + ->setEvent('functions.[functionId].executions.[executionId].update') + ->setParam('functionId', $function->getId()) + ->setParam('executionId', $execution->getId()) + ->setPayload($execution->getArrayCopy(array_keys($executionModel->getRules()))) + ->trigger(); + + /** Trigger Functions */ + $queueForFunctions + ->from($executionUpdate) + ->trigger(); + + /** Trigger realtime event */ + $allEvents = Event::generateEvents('functions.[functionId].executions.[executionId].update', [ + 'functionId' => $function->getId(), + 'executionId' => $execution->getId() + ]); + $target = Realtime::fromPayload( + // Pass first, most verbose event pattern + event: $allEvents[0], + payload: $execution + ); + Realtime::send( + projectId: 'console', + payload: $execution->getArrayCopy(), + events: $allEvents, + channels: $target['channels'], + roles: $target['roles'] + ); + Realtime::send( + projectId: $project->getId(), + payload: $execution->getArrayCopy(), + events: $allEvents, + channels: $target['channels'], + roles: $target['roles'] + ); + + /** Trigger usage queue */ + $queueForUsage + ->setProject($project) + ->addMetric(METRIC_EXECUTIONS_COMPUTE, (int)($execution->getAttribute('duration') * 1000))// per project + ->addMetric(str_replace('{functionInternalId}', $function->getInternalId(), METRIC_FUNCTION_ID_EXECUTIONS_COMPUTE), (int)($execution->getAttribute('duration') * 1000)) + ->trigger() + ; + } +}