functions/builds/deletes worker
This commit is contained in:
parent
d820d93ac7
commit
62c619434e
|
@ -26,10 +26,13 @@ use Utopia\Database\Document;
|
||||||
use Utopia\Logger\Log;
|
use Utopia\Logger\Log;
|
||||||
use Utopia\Pools\Group;
|
use Utopia\Pools\Group;
|
||||||
use Utopia\Queue\Connection;
|
use Utopia\Queue\Connection;
|
||||||
|
use Utopia\Queue\Server;
|
||||||
use Utopia\Registry\Registry;
|
use Utopia\Registry\Registry;
|
||||||
|
|
||||||
Authorization::disable();
|
Authorization::disable();
|
||||||
|
|
||||||
|
global $register;
|
||||||
|
|
||||||
CLI::setResource('register', fn () => $register);
|
CLI::setResource('register', fn () => $register);
|
||||||
|
|
||||||
CLI::setResource('cache', function ($pools) {
|
CLI::setResource('cache', function ($pools) {
|
||||||
|
@ -95,7 +98,7 @@ CLI::setResource('dbForConsole', function ($pools, $cache) {
|
||||||
CLI::setResource('getProjectDB', function (Group $pools, Database $dbForConsole, $cache) {
|
CLI::setResource('getProjectDB', function (Group $pools, Database $dbForConsole, $cache) {
|
||||||
$databases = []; // TODO: @Meldiron This should probably be responsibility of utopia-php/pools
|
$databases = []; // TODO: @Meldiron This should probably be responsibility of utopia-php/pools
|
||||||
|
|
||||||
$getProjectDB = function (Document $project) use ($pools, $dbForConsole, $cache, &$databases) {
|
return function (Document $project) use ($pools, $dbForConsole, $cache, &$databases) {
|
||||||
if ($project->isEmpty() || $project->getId() === 'console') {
|
if ($project->isEmpty() || $project->getId() === 'console') {
|
||||||
return $dbForConsole;
|
return $dbForConsole;
|
||||||
}
|
}
|
||||||
|
@ -121,8 +124,6 @@ CLI::setResource('getProjectDB', function (Group $pools, Database $dbForConsole,
|
||||||
|
|
||||||
return $database;
|
return $database;
|
||||||
};
|
};
|
||||||
|
|
||||||
return $getProjectDB;
|
|
||||||
}, ['pools', 'dbForConsole', 'cache']);
|
}, ['pools', 'dbForConsole', 'cache']);
|
||||||
|
|
||||||
CLI::setResource('queue', function (Group $pools) {
|
CLI::setResource('queue', function (Group $pools) {
|
||||||
|
@ -144,7 +145,7 @@ CLI::setResource('queueForDeletes', function (Connection $queue) {
|
||||||
return new Delete($queue);
|
return new Delete($queue);
|
||||||
}, ['queue']);
|
}, ['queue']);
|
||||||
CLI::setResource('queueForEvents', function (Connection $queue) {
|
CLI::setResource('queueForEvents', function (Connection $queue) {
|
||||||
return new Event('', '', $queue);
|
return new Event($queue);
|
||||||
}, ['queue']);
|
}, ['queue']);
|
||||||
CLI::setResource('queueForAudits', function (Connection $queue) {
|
CLI::setResource('queueForAudits', function (Connection $queue) {
|
||||||
return new Audit($queue);
|
return new Audit($queue);
|
||||||
|
|
|
@ -614,13 +614,13 @@ $register->set('pools', function () {
|
||||||
foreach ($connections as $key => $connection) {
|
foreach ($connections as $key => $connection) {
|
||||||
$type = $connection['type'] ?? '';
|
$type = $connection['type'] ?? '';
|
||||||
$dsns = $connection['dsns'] ?? '';
|
$dsns = $connection['dsns'] ?? '';
|
||||||
$multipe = $connection['multiple'] ?? false;
|
$multiple = $connection['multiple'] ?? false;
|
||||||
$schemes = $connection['schemes'] ?? [];
|
$schemes = $connection['schemes'] ?? [];
|
||||||
$config = [];
|
$config = [];
|
||||||
$dsns = explode(',', $connection['dsns'] ?? '');
|
$dsns = explode(',', $connection['dsns'] ?? '');
|
||||||
foreach ($dsns as &$dsn) {
|
foreach ($dsns as &$dsn) {
|
||||||
$dsn = explode('=', $dsn);
|
$dsn = explode('=', $dsn);
|
||||||
$name = ($multipe) ? $key . '_' . $dsn[0] : $key;
|
$name = ($multiple) ? $key . '_' . $dsn[0] : $key;
|
||||||
$dsn = $dsn[1] ?? '';
|
$dsn = $dsn[1] ?? '';
|
||||||
$config[] = $name;
|
$config[] = $name;
|
||||||
if (empty($dsn)) {
|
if (empty($dsn)) {
|
||||||
|
|
|
@ -12,7 +12,6 @@ use Appwrite\Event\Func;
|
||||||
use Appwrite\Event\Mail;
|
use Appwrite\Event\Mail;
|
||||||
use Appwrite\Event\Phone;
|
use Appwrite\Event\Phone;
|
||||||
use Appwrite\Event\Usage;
|
use Appwrite\Event\Usage;
|
||||||
use Appwrite\Extend\Exception;
|
|
||||||
use Appwrite\Platform\Appwrite;
|
use Appwrite\Platform\Appwrite;
|
||||||
use Swoole\Runtime;
|
use Swoole\Runtime;
|
||||||
use Utopia\App;
|
use Utopia\App;
|
||||||
|
@ -23,9 +22,7 @@ use Utopia\Config\Config;
|
||||||
use Utopia\Database\Database;
|
use Utopia\Database\Database;
|
||||||
use Utopia\Database\Document;
|
use Utopia\Database\Document;
|
||||||
use Utopia\Database\Validator\Authorization;
|
use Utopia\Database\Validator\Authorization;
|
||||||
use Utopia\DSN\DSN;
|
|
||||||
use Utopia\Platform\Service;
|
use Utopia\Platform\Service;
|
||||||
use Utopia\Queue\Adapter\Swoole;
|
|
||||||
use Utopia\Queue\Message;
|
use Utopia\Queue\Message;
|
||||||
use Utopia\Queue\Server;
|
use Utopia\Queue\Server;
|
||||||
use Utopia\Registry\Registry;
|
use Utopia\Registry\Registry;
|
||||||
|
@ -34,13 +31,6 @@ use Utopia\Logger\Logger;
|
||||||
use Utopia\Pools\Group;
|
use Utopia\Pools\Group;
|
||||||
use Utopia\Queue\Connection;
|
use Utopia\Queue\Connection;
|
||||||
use Utopia\Storage\Device;
|
use Utopia\Storage\Device;
|
||||||
use Utopia\Storage\Device\Backblaze;
|
|
||||||
use Utopia\Storage\Device\DOSpaces;
|
|
||||||
use Utopia\Storage\Device\Linode;
|
|
||||||
use Utopia\Storage\Device\Local;
|
|
||||||
use Utopia\Storage\Device\S3;
|
|
||||||
use Utopia\Storage\Device\Wasabi;
|
|
||||||
use Utopia\Storage\Storage;
|
|
||||||
|
|
||||||
Authorization::disable();
|
Authorization::disable();
|
||||||
Runtime::enableCoroutine(SWOOLE_HOOK_ALL);
|
Runtime::enableCoroutine(SWOOLE_HOOK_ALL);
|
||||||
|
@ -81,6 +71,37 @@ Server::setResource('dbForProject', function (Cache $cache, Registry $register,
|
||||||
return $adapter;
|
return $adapter;
|
||||||
}, ['cache', 'register', 'message', 'dbForConsole']);
|
}, ['cache', 'register', 'message', 'dbForConsole']);
|
||||||
|
|
||||||
|
Server::setResource('getProjectDB', function (Group $pools, Database $dbForConsole, $cache) {
|
||||||
|
$databases = []; // TODO: @Meldiron This should probably be responsibility of utopia-php/pools
|
||||||
|
|
||||||
|
return function (Document $project) use ($pools, $dbForConsole, $cache, &$databases) {
|
||||||
|
if ($project->isEmpty() || $project->getId() === 'console') {
|
||||||
|
return $dbForConsole;
|
||||||
|
}
|
||||||
|
|
||||||
|
$databaseName = $project->getAttribute('database');
|
||||||
|
|
||||||
|
if (isset($databases[$databaseName])) {
|
||||||
|
$database = $databases[$databaseName];
|
||||||
|
$database->setNamespace('_' . $project->getInternalId());
|
||||||
|
return $database;
|
||||||
|
}
|
||||||
|
|
||||||
|
$dbAdapter = $pools
|
||||||
|
->get($databaseName)
|
||||||
|
->pop()
|
||||||
|
->getResource();
|
||||||
|
|
||||||
|
$database = new Database($dbAdapter, $cache);
|
||||||
|
|
||||||
|
$databases[$databaseName] = $database;
|
||||||
|
|
||||||
|
$database->setNamespace('_' . $project->getInternalId());
|
||||||
|
|
||||||
|
return $database;
|
||||||
|
};
|
||||||
|
}, ['pools', 'dbForConsole', 'cache']);
|
||||||
|
|
||||||
Server::setResource('cache', function (Registry $register) {
|
Server::setResource('cache', function (Registry $register) {
|
||||||
$pools = $register->get('pools');
|
$pools = $register->get('pools');
|
||||||
$list = Config::getParam('pools-cache', []);
|
$list = Config::getParam('pools-cache', []);
|
||||||
|
@ -95,20 +116,12 @@ Server::setResource('cache', function (Registry $register) {
|
||||||
|
|
||||||
return new Cache(new Sharding($adapters));
|
return new Cache(new Sharding($adapters));
|
||||||
}, ['register']);
|
}, ['register']);
|
||||||
|
|
||||||
Server::setResource('queueForDatabase', function (Registry $register) {
|
|
||||||
$pools = $register->get('pools');
|
|
||||||
return new EventDatabase(
|
|
||||||
$pools
|
|
||||||
->get('queue')
|
|
||||||
->pop()
|
|
||||||
->getResource()
|
|
||||||
);
|
|
||||||
}, ['register']);
|
|
||||||
|
|
||||||
Server::setResource('queue', function (Group $pools) {
|
Server::setResource('queue', function (Group $pools) {
|
||||||
return $pools->get('queue')->pop()->getResource();
|
return $pools->get('queue')->pop()->getResource();
|
||||||
}, ['pools']);
|
}, ['pools']);
|
||||||
|
Server::setResource('queueForDatabase', function (Connection $queue) {
|
||||||
|
return new EventDatabase($queue);
|
||||||
|
}, ['queue']);
|
||||||
Server::setResource('queueForMessaging', function (Connection $queue) {
|
Server::setResource('queueForMessaging', function (Connection $queue) {
|
||||||
return new Phone($queue);
|
return new Phone($queue);
|
||||||
}, ['queue']);
|
}, ['queue']);
|
||||||
|
@ -125,7 +138,7 @@ Server::setResource('queueForDeletes', function (Connection $queue) {
|
||||||
return new Delete($queue);
|
return new Delete($queue);
|
||||||
}, ['queue']);
|
}, ['queue']);
|
||||||
Server::setResource('queueForEvents', function (Connection $queue) {
|
Server::setResource('queueForEvents', function (Connection $queue) {
|
||||||
return new Event('', '', $queue);
|
return new Event($queue);
|
||||||
}, ['queue']);
|
}, ['queue']);
|
||||||
Server::setResource('queueForAudits', function (Connection $queue) {
|
Server::setResource('queueForAudits', function (Connection $queue) {
|
||||||
return new Audit($queue);
|
return new Audit($queue);
|
||||||
|
|
|
@ -1,10 +1,3 @@
|
||||||
#!/bin/sh
|
#!/bin/sh
|
||||||
|
|
||||||
if [ -z "$_APP_REDIS_USER" ] && [ -z "$_APP_REDIS_PASS" ]
|
php /usr/src/code/app/worker.php builds $@
|
||||||
then
|
|
||||||
REDIS_BACKEND="${_APP_REDIS_HOST}:${_APP_REDIS_PORT}"
|
|
||||||
else
|
|
||||||
REDIS_BACKEND="redis://${_APP_REDIS_USER}:${_APP_REDIS_PASS}@${_APP_REDIS_HOST}:${_APP_REDIS_PORT}"
|
|
||||||
fi
|
|
||||||
|
|
||||||
INTERVAL=0.1 QUEUE='v1-builds' APP_INCLUDE='/usr/src/code/app/workers/builds.php' php /usr/src/code/vendor/bin/resque -dopcache.preload=opcache.preload=/usr/src/code/app/preload.php
|
|
|
@ -1,10 +1,3 @@
|
||||||
#!/bin/sh
|
#!/bin/sh
|
||||||
|
|
||||||
if [ -z "$_APP_REDIS_USER" ] && [ -z "$_APP_REDIS_PASS" ]
|
php /usr/src/code/app/worker.php deletes $@
|
||||||
then
|
|
||||||
REDIS_BACKEND="${_APP_REDIS_HOST}:${_APP_REDIS_PORT}"
|
|
||||||
else
|
|
||||||
REDIS_BACKEND="redis://${_APP_REDIS_USER}:${_APP_REDIS_PASS}@${_APP_REDIS_HOST}:${_APP_REDIS_PORT}"
|
|
||||||
fi
|
|
||||||
|
|
||||||
INTERVAL=1 QUEUE='v1-deletes' APP_INCLUDE='/usr/src/code/app/workers/deletes.php' php /usr/src/code/vendor/bin/resque -dopcache.preload=opcache.preload=/usr/src/code/app/preload.php
|
|
|
@ -1,3 +1,3 @@
|
||||||
#!/bin/sh
|
#!/bin/sh
|
||||||
|
|
||||||
QUEUE=v1-functions php /usr/src/code/app/workers/functions.php $@
|
php /usr/src/code/app/worker.php functions $@
|
|
@ -235,6 +235,7 @@ services:
|
||||||
- ./src:/usr/src/code/src
|
- ./src:/usr/src/code/src
|
||||||
- ./vendor/utopia-php/platform:/usr/src/code/vendor/utopia-php/platform
|
- ./vendor/utopia-php/platform:/usr/src/code/vendor/utopia-php/platform
|
||||||
- ./vendor/utopia-php/queue:/usr/src/code/vendor/utopia-php/queue
|
- ./vendor/utopia-php/queue:/usr/src/code/vendor/utopia-php/queue
|
||||||
|
- ./vendor/utopia-php/pools:/usr/src/code/vendor/utopia-php/pools
|
||||||
|
|
||||||
depends_on:
|
depends_on:
|
||||||
- redis
|
- redis
|
||||||
|
@ -592,6 +593,8 @@ services:
|
||||||
volumes:
|
volumes:
|
||||||
- ./app:/usr/src/code/app
|
- ./app:/usr/src/code/app
|
||||||
- ./src:/usr/src/code/src
|
- ./src:/usr/src/code/src
|
||||||
|
- ./vendor/utopia-php/pools:/usr/src/code/vendor/utopia-php/pools
|
||||||
|
|
||||||
depends_on:
|
depends_on:
|
||||||
- redis
|
- redis
|
||||||
environment:
|
environment:
|
||||||
|
@ -614,6 +617,7 @@ services:
|
||||||
- _APP_CONNECTIONS_DB_CONSOLE
|
- _APP_CONNECTIONS_DB_CONSOLE
|
||||||
- _APP_CONNECTIONS_DB_PROJECT
|
- _APP_CONNECTIONS_DB_PROJECT
|
||||||
- _APP_CONNECTIONS_CACHE
|
- _APP_CONNECTIONS_CACHE
|
||||||
|
- _APP_CONNECTIONS_QUEUE
|
||||||
- _APP_MAINTENANCE_INTERVAL
|
- _APP_MAINTENANCE_INTERVAL
|
||||||
- _APP_MAINTENANCE_RETENTION_EXECUTION
|
- _APP_MAINTENANCE_RETENTION_EXECUTION
|
||||||
- _APP_MAINTENANCE_RETENTION_CACHE
|
- _APP_MAINTENANCE_RETENTION_CACHE
|
||||||
|
|
|
@ -9,6 +9,9 @@ use Appwrite\Platform\Workers\Mails;
|
||||||
use Appwrite\Platform\Workers\Messaging;
|
use Appwrite\Platform\Workers\Messaging;
|
||||||
use Appwrite\Platform\Workers\Certificates;
|
use Appwrite\Platform\Workers\Certificates;
|
||||||
use Appwrite\Platform\Workers\Databases;
|
use Appwrite\Platform\Workers\Databases;
|
||||||
|
use Appwrite\Platform\Workers\Functions;
|
||||||
|
use Appwrite\Platform\Workers\Builds;
|
||||||
|
use Appwrite\Platform\Workers\Deletes;
|
||||||
use Appwrite\Platform\Workers\Usage;
|
use Appwrite\Platform\Workers\Usage;
|
||||||
|
|
||||||
class Workers extends Service
|
class Workers extends Service
|
||||||
|
@ -23,6 +26,9 @@ class Workers extends Service
|
||||||
->addAction(Messaging::getName(), new Messaging())
|
->addAction(Messaging::getName(), new Messaging())
|
||||||
->addAction(Certificates::getName(), new Certificates())
|
->addAction(Certificates::getName(), new Certificates())
|
||||||
->addAction(Databases::getName(), new Databases())
|
->addAction(Databases::getName(), new Databases())
|
||||||
|
->addAction(Functions::getName(), new Functions())
|
||||||
|
->addAction(Builds::getName(), new Builds())
|
||||||
|
->addAction(Deletes::getName(), new Deletes())
|
||||||
//->addAction(Usage::getName(), new Usage())
|
//->addAction(Usage::getName(), new Usage())
|
||||||
;
|
;
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,10 +2,8 @@
|
||||||
|
|
||||||
namespace Appwrite\Platform\Tasks;
|
namespace Appwrite\Platform\Tasks;
|
||||||
|
|
||||||
use Appwrite\Auth\Auth;
|
|
||||||
use Appwrite\Event\Certificate;
|
use Appwrite\Event\Certificate;
|
||||||
use Appwrite\Event\Delete;
|
use Appwrite\Event\Delete;
|
||||||
use Appwrite\Event\Func;
|
|
||||||
use Utopia\App;
|
use Utopia\App;
|
||||||
use Utopia\CLI\Console;
|
use Utopia\CLI\Console;
|
||||||
use Utopia\Database\Database;
|
use Utopia\Database\Database;
|
||||||
|
@ -13,8 +11,6 @@ use Utopia\Database\Document;
|
||||||
use Utopia\Database\DateTime;
|
use Utopia\Database\DateTime;
|
||||||
use Utopia\Database\Query;
|
use Utopia\Database\Query;
|
||||||
use Utopia\Platform\Action;
|
use Utopia\Platform\Action;
|
||||||
use Utopia\Queue\Connection;
|
|
||||||
use Utopia\Registry\Registry;
|
|
||||||
|
|
||||||
class Maintenance extends Action
|
class Maintenance extends Action
|
||||||
{
|
{
|
||||||
|
@ -38,104 +34,12 @@ class Maintenance extends Action
|
||||||
Console::title('Maintenance V1');
|
Console::title('Maintenance V1');
|
||||||
Console::success(APP_NAME . ' maintenance process v1 has started');
|
Console::success(APP_NAME . ' maintenance process v1 has started');
|
||||||
|
|
||||||
function notifyDeleteExecutionLogs(int $interval, Delete $queueForDeletes)
|
|
||||||
{
|
|
||||||
($queueForDeletes)
|
|
||||||
->setType(DELETE_TYPE_EXECUTIONS)
|
|
||||||
->setDatetime(DateTime::addSeconds(new \DateTime(), -1 * $interval))
|
|
||||||
->trigger();
|
|
||||||
}
|
|
||||||
|
|
||||||
function notifyDeleteAbuseLogs(int $interval, Delete $queueForDeletes)
|
|
||||||
{
|
|
||||||
($queueForDeletes)
|
|
||||||
->setType(DELETE_TYPE_ABUSE)
|
|
||||||
->setDatetime(DateTime::addSeconds(new \DateTime(), -1 * $interval))
|
|
||||||
->trigger();
|
|
||||||
}
|
|
||||||
|
|
||||||
function notifyDeleteAuditLogs(int $interval, Delete $queueForDeletes)
|
|
||||||
{
|
|
||||||
($queueForDeletes)
|
|
||||||
->setType(DELETE_TYPE_AUDIT)
|
|
||||||
->setDatetime(DateTime::addSeconds(new \DateTime(), -1 * $interval))
|
|
||||||
->trigger();
|
|
||||||
}
|
|
||||||
|
|
||||||
function notifyDeleteUsageStats(int $usageStatsRetentionHourly, Delete $queueForDeletes)
|
|
||||||
{
|
|
||||||
($queueForDeletes)
|
|
||||||
->setType(DELETE_TYPE_USAGE)
|
|
||||||
->setUsageRetentionHourlyDateTime(DateTime::addSeconds(new \DateTime(), -1 * $usageStatsRetentionHourly))
|
|
||||||
->trigger();
|
|
||||||
}
|
|
||||||
|
|
||||||
function notifyDeleteConnections(Delete $queueForDeletes)
|
|
||||||
{
|
|
||||||
($queueForDeletes)
|
|
||||||
->setType(DELETE_TYPE_REALTIME)
|
|
||||||
->setDatetime(DateTime::addSeconds(new \DateTime(), -60))
|
|
||||||
->trigger();
|
|
||||||
}
|
|
||||||
|
|
||||||
function notifyDeleteExpiredSessions(Delete $queueForDeletes)
|
|
||||||
{
|
|
||||||
($queueForDeletes)
|
|
||||||
->setType(DELETE_TYPE_SESSIONS)
|
|
||||||
->trigger();
|
|
||||||
}
|
|
||||||
|
|
||||||
function renewCertificates(Database $dbForConsole, Certificate $queueForCertificate)
|
|
||||||
{
|
|
||||||
$time = DateTime::now();
|
|
||||||
|
|
||||||
$certificates = $dbForConsole->find('certificates', [
|
|
||||||
Query::lessThan('attempts', 5), // Maximum 5 attempts
|
|
||||||
Query::lessThanEqual('renewDate', $time), // includes 60 days cooldown (we have 30 days to renew)
|
|
||||||
Query::limit(200), // Limit 200 comes from LetsEncrypt (300 orders per 3 hours, keeping some for new domains)
|
|
||||||
]);
|
|
||||||
|
|
||||||
|
|
||||||
if (\count($certificates) > 0) {
|
|
||||||
Console::info("[{$time}] Found " . \count($certificates) . " certificates for renewal, scheduling jobs.");
|
|
||||||
|
|
||||||
foreach ($certificates as $certificate) {
|
|
||||||
$queueForCertificate
|
|
||||||
->setDomain(new Document([
|
|
||||||
'domain' => $certificate->getAttribute('domain')
|
|
||||||
]))
|
|
||||||
->trigger();
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
Console::info("[{$time}] No certificates for renewal.");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
function notifyDeleteCache($interval, Delete $queueForDeletes)
|
|
||||||
{
|
|
||||||
|
|
||||||
($queueForDeletes)
|
|
||||||
->setType(DELETE_TYPE_CACHE_BY_TIMESTAMP)
|
|
||||||
->setDatetime(DateTime::addSeconds(new \DateTime(), -1 * $interval))
|
|
||||||
->trigger();
|
|
||||||
}
|
|
||||||
|
|
||||||
function notifyDeleteSchedules($interval, Delete $queueForDeletes)
|
|
||||||
{
|
|
||||||
|
|
||||||
($queueForDeletes)
|
|
||||||
->setType(DELETE_TYPE_SCHEDULES)
|
|
||||||
->setDatetime(DateTime::addSeconds(new \DateTime(), -1 * $interval))
|
|
||||||
->trigger();
|
|
||||||
}
|
|
||||||
|
|
||||||
// # of days in seconds (1 day = 86400s)
|
// # of days in seconds (1 day = 86400s)
|
||||||
$interval = (int) App::getEnv('_APP_MAINTENANCE_INTERVAL', '86400');
|
$interval = (int) App::getEnv('_APP_MAINTENANCE_INTERVAL', '86400');
|
||||||
$executionLogsRetention = (int) App::getEnv('_APP_MAINTENANCE_RETENTION_EXECUTION', '1209600');
|
$executionLogsRetention = (int) App::getEnv('_APP_MAINTENANCE_RETENTION_EXECUTION', '1209600');
|
||||||
$auditLogRetention = (int) App::getEnv('_APP_MAINTENANCE_RETENTION_AUDIT', '1209600');
|
$auditLogRetention = (int) App::getEnv('_APP_MAINTENANCE_RETENTION_AUDIT', '1209600');
|
||||||
$abuseLogsRetention = (int) App::getEnv('_APP_MAINTENANCE_RETENTION_ABUSE', '86400');
|
$abuseLogsRetention = (int) App::getEnv('_APP_MAINTENANCE_RETENTION_ABUSE', '86400');
|
||||||
$usageStatsRetentionHourly = (int) App::getEnv('_APP_MAINTENANCE_RETENTION_USAGE_HOURLY', '8640000'); //100 days
|
$usageStatsRetentionHourly = (int) App::getEnv('_APP_MAINTENANCE_RETENTION_USAGE_HOURLY', '8640000'); //100 days
|
||||||
|
|
||||||
$cacheRetention = (int) App::getEnv('_APP_MAINTENANCE_RETENTION_CACHE', '2592000'); // 30 days
|
$cacheRetention = (int) App::getEnv('_APP_MAINTENANCE_RETENTION_CACHE', '2592000'); // 30 days
|
||||||
$schedulesDeletionRetention = (int) App::getEnv('_APP_MAINTENANCE_RETENTION_SCHEDULES', '86400'); // 1 Day
|
$schedulesDeletionRetention = (int) App::getEnv('_APP_MAINTENANCE_RETENTION_SCHEDULES', '86400'); // 1 Day
|
||||||
|
|
||||||
|
@ -143,15 +47,106 @@ class Maintenance extends Action
|
||||||
$time = DateTime::now();
|
$time = DateTime::now();
|
||||||
|
|
||||||
Console::info("[{$time}] Notifying workers with maintenance tasks every {$interval} seconds");
|
Console::info("[{$time}] Notifying workers with maintenance tasks every {$interval} seconds");
|
||||||
notifyDeleteExecutionLogs($executionLogsRetention, $queueForDeletes);
|
$this->notifyDeleteExecutionLogs($executionLogsRetention, $queueForDeletes);
|
||||||
notifyDeleteAbuseLogs($abuseLogsRetention, $queueForDeletes);
|
$this->notifyDeleteAbuseLogs($abuseLogsRetention, $queueForDeletes);
|
||||||
notifyDeleteAuditLogs($auditLogRetention, $queueForDeletes);
|
$this->notifyDeleteAuditLogs($auditLogRetention, $queueForDeletes);
|
||||||
notifyDeleteUsageStats($usageStatsRetentionHourly, $queueForDeletes);
|
$this->notifyDeleteUsageStats($usageStatsRetentionHourly, $queueForDeletes);
|
||||||
notifyDeleteConnections($queueForDeletes);
|
$this->notifyDeleteConnections($queueForDeletes);
|
||||||
notifyDeleteExpiredSessions($queueForDeletes);
|
$this->notifyDeleteExpiredSessions($queueForDeletes);
|
||||||
renewCertificates($dbForConsole, $queueForCertificates);
|
$this->renewCertificates($dbForConsole, $queueForCertificates);
|
||||||
notifyDeleteCache($cacheRetention, $queueForDeletes);
|
$this->notifyDeleteCache($cacheRetention, $queueForDeletes);
|
||||||
notifyDeleteSchedules($schedulesDeletionRetention, $queueForDeletes);
|
$this->notifyDeleteSchedules($schedulesDeletionRetention, $queueForDeletes);
|
||||||
}, $interval);
|
}, $interval);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private function notifyDeleteExecutionLogs(int $interval, Delete $queueForDeletes): void
|
||||||
|
{
|
||||||
|
($queueForDeletes)
|
||||||
|
->setType(DELETE_TYPE_EXECUTIONS)
|
||||||
|
->setDatetime(DateTime::addSeconds(new \DateTime(), -1 * $interval))
|
||||||
|
->trigger();
|
||||||
|
}
|
||||||
|
|
||||||
|
private function notifyDeleteAbuseLogs(int $interval, Delete $queueForDeletes): void
|
||||||
|
{
|
||||||
|
($queueForDeletes)
|
||||||
|
->setType(DELETE_TYPE_ABUSE)
|
||||||
|
->setDatetime(DateTime::addSeconds(new \DateTime(), -1 * $interval))
|
||||||
|
->trigger();
|
||||||
|
}
|
||||||
|
|
||||||
|
private function notifyDeleteAuditLogs(int $interval, Delete $queueForDeletes): void
|
||||||
|
{
|
||||||
|
($queueForDeletes)
|
||||||
|
->setType(DELETE_TYPE_AUDIT)
|
||||||
|
->setDatetime(DateTime::addSeconds(new \DateTime(), -1 * $interval))
|
||||||
|
->trigger();
|
||||||
|
}
|
||||||
|
|
||||||
|
private function notifyDeleteUsageStats(int $usageStatsRetentionHourly, Delete $queueForDeletes): void
|
||||||
|
{
|
||||||
|
($queueForDeletes)
|
||||||
|
->setType(DELETE_TYPE_USAGE)
|
||||||
|
->setUsageRetentionHourlyDateTime(DateTime::addSeconds(new \DateTime(), -1 * $usageStatsRetentionHourly))
|
||||||
|
->trigger();
|
||||||
|
}
|
||||||
|
|
||||||
|
private function notifyDeleteConnections(Delete $queueForDeletes): void
|
||||||
|
{
|
||||||
|
($queueForDeletes)
|
||||||
|
->setType(DELETE_TYPE_REALTIME)
|
||||||
|
->setDatetime(DateTime::addSeconds(new \DateTime(), -60))
|
||||||
|
->trigger();
|
||||||
|
}
|
||||||
|
|
||||||
|
private function notifyDeleteExpiredSessions(Delete $queueForDeletes): void
|
||||||
|
{
|
||||||
|
($queueForDeletes)
|
||||||
|
->setType(DELETE_TYPE_SESSIONS)
|
||||||
|
->trigger();
|
||||||
|
}
|
||||||
|
|
||||||
|
private function renewCertificates(Database $dbForConsole, Certificate $queueForCertificate): void
|
||||||
|
{
|
||||||
|
$time = DateTime::now();
|
||||||
|
|
||||||
|
$certificates = $dbForConsole->find('certificates', [
|
||||||
|
Query::lessThan('attempts', 5), // Maximum 5 attempts
|
||||||
|
Query::lessThanEqual('renewDate', $time), // includes 60 days cooldown (we have 30 days to renew)
|
||||||
|
Query::limit(200), // Limit 200 comes from LetsEncrypt (300 orders per 3 hours, keeping some for new domains)
|
||||||
|
]);
|
||||||
|
|
||||||
|
|
||||||
|
if (\count($certificates) > 0) {
|
||||||
|
Console::info("[{$time}] Found " . \count($certificates) . " certificates for renewal, scheduling jobs.");
|
||||||
|
|
||||||
|
foreach ($certificates as $certificate) {
|
||||||
|
$queueForCertificate
|
||||||
|
->setDomain(new Document([
|
||||||
|
'domain' => $certificate->getAttribute('domain')
|
||||||
|
]))
|
||||||
|
->trigger();
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
Console::info("[{$time}] No certificates for renewal.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private function notifyDeleteCache($interval, Delete $queueForDeletes): void
|
||||||
|
{
|
||||||
|
|
||||||
|
($queueForDeletes)
|
||||||
|
->setType(DELETE_TYPE_CACHE_BY_TIMESTAMP)
|
||||||
|
->setDatetime(DateTime::addSeconds(new \DateTime(), -1 * $interval))
|
||||||
|
->trigger();
|
||||||
|
}
|
||||||
|
|
||||||
|
private function notifyDeleteSchedules($interval, Delete $queueForDeletes): void
|
||||||
|
{
|
||||||
|
|
||||||
|
($queueForDeletes)
|
||||||
|
->setType(DELETE_TYPE_SCHEDULES)
|
||||||
|
->setDatetime(DateTime::addSeconds(new \DateTime(), -1 * $interval))
|
||||||
|
->trigger();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
290
src/Appwrite/Platform/Workers/Builds.php
Normal file
290
src/Appwrite/Platform/Workers/Builds.php
Normal file
|
@ -0,0 +1,290 @@
|
||||||
|
<?php
|
||||||
|
|
||||||
|
namespace Appwrite\Platform\Workers;
|
||||||
|
|
||||||
|
use Appwrite\Event\Event;
|
||||||
|
use Appwrite\Event\Func;
|
||||||
|
use Appwrite\Event\Usage;
|
||||||
|
use Appwrite\Messaging\Adapter\Realtime;
|
||||||
|
use Appwrite\Utopia\Response\Model\Deployment;
|
||||||
|
use Exception;
|
||||||
|
use Executor\Executor;
|
||||||
|
use Utopia\App;
|
||||||
|
use Utopia\CLI\Console;
|
||||||
|
use Utopia\Config\Config;
|
||||||
|
use Utopia\Database\Database;
|
||||||
|
use Utopia\Database\DateTime;
|
||||||
|
use Utopia\Database\Document;
|
||||||
|
use Utopia\Database\Exception\Authorization;
|
||||||
|
use Utopia\Database\Exception\Structure;
|
||||||
|
use Utopia\Database\Helpers\ID;
|
||||||
|
use Utopia\DSN\DSN;
|
||||||
|
use Utopia\Platform\Action;
|
||||||
|
use Utopia\Queue\Message;
|
||||||
|
use Utopia\Storage\Storage;
|
||||||
|
|
||||||
|
class Builds extends Action
|
||||||
|
{
|
||||||
|
public static function getName(): string
|
||||||
|
{
|
||||||
|
return 'builds';
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
public function __construct()
|
||||||
|
{
|
||||||
|
$this
|
||||||
|
->desc('Builds worker')
|
||||||
|
->inject('message')
|
||||||
|
->inject('dbForProject')
|
||||||
|
->inject('queueForEvents')
|
||||||
|
->inject('queueForFunctions')
|
||||||
|
->inject('queueForUsage')
|
||||||
|
->callback(fn($message, Database $dbForProject, Event $queueForEvents, Func $queueForFunctions, Usage $queueForUsage) => $this->action($message, $dbForProject, $queueForEvents, $queueForFunctions, $queueForUsage));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @throws Exception|\Throwable
|
||||||
|
*/
|
||||||
|
public function action(Message $message, Database $dbForProject, Event $queueForEvents, Func $queueForFunctions, Usage $queueForUsage): void
|
||||||
|
{
|
||||||
|
$payload = $message->getPayload() ?? [];
|
||||||
|
|
||||||
|
if (empty($payload)) {
|
||||||
|
throw new Exception('Missing payload');
|
||||||
|
}
|
||||||
|
|
||||||
|
$type = $payload['type'] ?? '';
|
||||||
|
$project = new Document($payload['project'] ?? []);
|
||||||
|
$resource = new Document($payload['resource'] ?? []);
|
||||||
|
$deployment = new Document($payload['deployment'] ?? []);
|
||||||
|
|
||||||
|
switch ($type) {
|
||||||
|
case BUILD_TYPE_DEPLOYMENT:
|
||||||
|
case BUILD_TYPE_RETRY:
|
||||||
|
Console::info('Creating build for deployment: ' . $deployment->getId());
|
||||||
|
$this->buildDeployment(
|
||||||
|
dbForProject: $dbForProject,
|
||||||
|
queueForEvents: $queueForEvents,
|
||||||
|
queueForFunctions: $queueForFunctions,
|
||||||
|
queueForUsage: $queueForUsage,
|
||||||
|
deployment: $deployment,
|
||||||
|
project: $project,
|
||||||
|
function: $resource
|
||||||
|
);
|
||||||
|
break;
|
||||||
|
|
||||||
|
default:
|
||||||
|
throw new \Exception('Invalid build type');
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @throws Authorization
|
||||||
|
* @throws \Throwable
|
||||||
|
* @throws Structure
|
||||||
|
*/
|
||||||
|
private function buildDeployment(Database $dbForProject, Event $queueForEvents, Func $queueForFunctions, Usage $queueForUsage, Document $deployment, Document $project, Document $function): void
|
||||||
|
{
|
||||||
|
$executor = new Executor(App::getEnv('_APP_EXECUTOR_HOST'));
|
||||||
|
$function = $dbForProject->getDocument('functions', $function->getId());
|
||||||
|
|
||||||
|
if ($function->isEmpty()) {
|
||||||
|
throw new Exception('Function not found', 404);
|
||||||
|
}
|
||||||
|
|
||||||
|
$deployment = $dbForProject->getDocument('deployments', $deployment->getId());
|
||||||
|
if ($deployment->isEmpty()) {
|
||||||
|
throw new Exception('Deployment not found', 404);
|
||||||
|
}
|
||||||
|
|
||||||
|
$runtimes = Config::getParam('runtimes', []);
|
||||||
|
$key = $function->getAttribute('runtime');
|
||||||
|
$runtime = $runtimes[$key] ?? null;
|
||||||
|
if (\is_null($runtime)) {
|
||||||
|
throw new Exception('Runtime "' . $function->getAttribute('runtime', '') . '" is not supported');
|
||||||
|
}
|
||||||
|
|
||||||
|
$connection = App::getEnv('_APP_CONNECTIONS_STORAGE', '');
|
||||||
|
/** @TODO : move this to the registry or someplace else */
|
||||||
|
$device = Storage::DEVICE_LOCAL;
|
||||||
|
try {
|
||||||
|
$dsn = new DSN($connection);
|
||||||
|
$device = $dsn->getScheme();
|
||||||
|
} catch (\Exception $e) {
|
||||||
|
Console::error($e->getMessage() . 'Invalid DSN. Defaulting to Local device.');
|
||||||
|
}
|
||||||
|
|
||||||
|
$buildId = $deployment->getAttribute('buildId', '');
|
||||||
|
$startTime = DateTime::now();
|
||||||
|
|
||||||
|
if (empty($buildId)) {
|
||||||
|
$buildId = ID::unique();
|
||||||
|
$build = $dbForProject->createDocument('builds', new Document([
|
||||||
|
'$id' => $buildId,
|
||||||
|
'$permissions' => [],
|
||||||
|
'startTime' => $startTime,
|
||||||
|
'deploymentId' => $deployment->getId(),
|
||||||
|
'status' => 'processing',
|
||||||
|
'path' => '',
|
||||||
|
'size' => 0,
|
||||||
|
'runtime' => $function->getAttribute('runtime'),
|
||||||
|
'source' => $deployment->getAttribute('path'),
|
||||||
|
'sourceType' => $device,
|
||||||
|
'stdout' => '',
|
||||||
|
'stderr' => '',
|
||||||
|
'duration' => 0
|
||||||
|
]));
|
||||||
|
|
||||||
|
$deployment->setAttribute('buildId', $buildId);
|
||||||
|
$deployment = $dbForProject->updateDocument('deployments', $deployment->getId(), $deployment);
|
||||||
|
} else {
|
||||||
|
$build = $dbForProject->getDocument('builds', $buildId);
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Request the executor to build the code... */
|
||||||
|
$build->setAttribute('status', 'building');
|
||||||
|
$build = $dbForProject->updateDocument('builds', $buildId, $build);
|
||||||
|
|
||||||
|
/** Trigger Webhook */
|
||||||
|
$deploymentUpdate = $queueForEvents
|
||||||
|
->setQueue(Event::WEBHOOK_QUEUE_NAME)
|
||||||
|
->setClass(Event::WEBHOOK_CLASS_NAME)
|
||||||
|
->setProject($project)
|
||||||
|
->setEvent('functions.[functionId].deployments.[deploymentId].update')
|
||||||
|
->setParam('functionId', $function->getId())
|
||||||
|
->setParam('deploymentId', $deployment->getId())
|
||||||
|
->setPayload($deployment->getArrayCopy(
|
||||||
|
array_keys(
|
||||||
|
(new Deployment())->getRules()
|
||||||
|
)
|
||||||
|
));
|
||||||
|
|
||||||
|
$deploymentUpdate->trigger();
|
||||||
|
|
||||||
|
/** Trigger Functions */
|
||||||
|
$queueForFunctions
|
||||||
|
->from($deploymentUpdate)
|
||||||
|
->trigger();
|
||||||
|
|
||||||
|
|
||||||
|
/** Trigger Realtime */
|
||||||
|
$allEvents = Event::generateEvents('functions.[functionId].deployments.[deploymentId].update', [
|
||||||
|
'functionId' => $function->getId(),
|
||||||
|
'deploymentId' => $deployment->getId()
|
||||||
|
]);
|
||||||
|
$target = Realtime::fromPayload(
|
||||||
|
// Pass first, most verbose event pattern
|
||||||
|
event: $allEvents[0],
|
||||||
|
payload: $build,
|
||||||
|
project: $project
|
||||||
|
);
|
||||||
|
|
||||||
|
Realtime::send(
|
||||||
|
projectId: 'console',
|
||||||
|
payload: $build->getArrayCopy(),
|
||||||
|
events: $allEvents,
|
||||||
|
channels: $target['channels'],
|
||||||
|
roles: $target['roles']
|
||||||
|
);
|
||||||
|
|
||||||
|
$source = $deployment->getAttribute('path');
|
||||||
|
|
||||||
|
$vars = array_reduce($function->getAttribute('vars', []), function (array $carry, Document $var) {
|
||||||
|
$carry[$var->getAttribute('key')] = $var->getAttribute('value');
|
||||||
|
return $carry;
|
||||||
|
}, []);
|
||||||
|
|
||||||
|
try {
|
||||||
|
$response = $executor->createRuntime(
|
||||||
|
deploymentId: $deployment->getId(),
|
||||||
|
projectId: $project->getId(),
|
||||||
|
source: $source,
|
||||||
|
image: $runtime['image'],
|
||||||
|
remove: true,
|
||||||
|
entrypoint: $deployment->getAttribute('entrypoint'),
|
||||||
|
workdir: '/usr/code',
|
||||||
|
destination: APP_STORAGE_BUILDS . "/app-{$project->getId()}",
|
||||||
|
variables: $vars,
|
||||||
|
commands: [
|
||||||
|
'sh', '-c',
|
||||||
|
'tar -zxf /tmp/code.tar.gz -C /usr/code && \
|
||||||
|
cd /usr/local/src/ && ./build.sh'
|
||||||
|
]
|
||||||
|
);
|
||||||
|
|
||||||
|
/** Update the build document */
|
||||||
|
$build->setAttribute('startTime', DateTime::format((new \DateTime())->setTimestamp($response['startTime'])));
|
||||||
|
$build->setAttribute('duration', \intval($response['duration']));
|
||||||
|
$build->setAttribute('status', $response['status']);
|
||||||
|
$build->setAttribute('path', $response['path']);
|
||||||
|
$build->setAttribute('size', $response['size']);
|
||||||
|
$build->setAttribute('stderr', $response['stderr']);
|
||||||
|
$build->setAttribute('stdout', $response['stdout']);
|
||||||
|
|
||||||
|
/* Also update the deployment buildTime */
|
||||||
|
$deployment->setAttribute('buildTime', $response['duration']);
|
||||||
|
|
||||||
|
Console::success("Build id: $buildId created");
|
||||||
|
|
||||||
|
$function->setAttribute('scheduleUpdatedAt', DateTime::now());
|
||||||
|
|
||||||
|
/** Set auto deploy */
|
||||||
|
if ($deployment->getAttribute('activate') === true) {
|
||||||
|
$function->setAttribute('deployment', $deployment->getId());
|
||||||
|
$function = $dbForProject->updateDocument('functions', $function->getId(), $function);
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Update function schedule */
|
||||||
|
$dbForConsole = getConsoleDB();
|
||||||
|
$schedule = $dbForConsole->getDocument('schedules', $function->getAttribute('scheduleId'));
|
||||||
|
$schedule->setAttribute('resourceUpdatedAt', $function->getAttribute('scheduleUpdatedAt'));
|
||||||
|
|
||||||
|
$schedule
|
||||||
|
->setAttribute('schedule', $function->getAttribute('schedule'))
|
||||||
|
->setAttribute('active', !empty($function->getAttribute('schedule')) && !empty($function->getAttribute('deployment')));
|
||||||
|
|
||||||
|
|
||||||
|
\Utopia\Database\Validator\Authorization::skip(fn() => $dbForConsole->updateDocument('schedules', $schedule->getId(), $schedule));
|
||||||
|
} catch (\Throwable $th) {
|
||||||
|
$endTime = DateTime::now();
|
||||||
|
$interval = (new \DateTime($endTime))->diff(new \DateTime($startTime));
|
||||||
|
$build->setAttribute('duration', $interval->format('%s') + 0);
|
||||||
|
$build->setAttribute('status', 'failed');
|
||||||
|
$build->setAttribute('stderr', $th->getMessage());
|
||||||
|
Console::error($th->getMessage());
|
||||||
|
} finally {
|
||||||
|
$build = $dbForProject->updateDocument('builds', $buildId, $build);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Send realtime Event
|
||||||
|
*/
|
||||||
|
$target = Realtime::fromPayload(
|
||||||
|
// Pass first, most verbose event pattern
|
||||||
|
event: $allEvents[0],
|
||||||
|
payload: $build,
|
||||||
|
project: $project
|
||||||
|
);
|
||||||
|
Realtime::send(
|
||||||
|
projectId: 'console',
|
||||||
|
payload: $build->getArrayCopy(),
|
||||||
|
events: $allEvents,
|
||||||
|
channels: $target['channels'],
|
||||||
|
roles: $target['roles']
|
||||||
|
);
|
||||||
|
|
||||||
|
/** Trigger usage queue */
|
||||||
|
$queueForUsage
|
||||||
|
->setProject($project)
|
||||||
|
->addMetric(METRIC_BUILDS, 1) // per project
|
||||||
|
->addMetric(METRIC_BUILDS_STORAGE, $build->getAttribute('size', 0))
|
||||||
|
->addMetric(METRIC_BUILDS_COMPUTE, (int)$build->getAttribute('duration', 0) * 1000)
|
||||||
|
->addMetric(str_replace('{functionInternalId}', $function->getInternalId(), METRIC_FUNCTION_ID_BUILDS), 1) // per function
|
||||||
|
->addMetric(str_replace('{functionInternalId}', $function->getInternalId(), METRIC_FUNCTION_ID_BUILDS_STORAGE), $build->getAttribute('size', 0))
|
||||||
|
->addMetric(str_replace('{functionInternalId}', $function->getInternalId(), METRIC_FUNCTION_ID_BUILDS_COMPUTE), (int)$build->getAttribute('duration', 0) * 1000)
|
||||||
|
->trigger();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
851
src/Appwrite/Platform/Workers/Deletes.php
Normal file
851
src/Appwrite/Platform/Workers/Deletes.php
Normal file
|
@ -0,0 +1,851 @@
|
||||||
|
<?php
|
||||||
|
|
||||||
|
namespace Appwrite\Platform\Workers;
|
||||||
|
|
||||||
|
use Utopia\Abuse\Abuse;
|
||||||
|
use Utopia\Abuse\Adapters\TimeLimit;
|
||||||
|
use Utopia\Audit\Audit;
|
||||||
|
use Utopia\Cache\Adapter\Filesystem;
|
||||||
|
use Utopia\Cache\Cache;
|
||||||
|
use Utopia\Database\Database;
|
||||||
|
use Exception;
|
||||||
|
use Utopia\App;
|
||||||
|
use Utopia\CLI\Console;
|
||||||
|
use Utopia\Database\DateTime;
|
||||||
|
use Utopia\Database\Document;
|
||||||
|
use Utopia\Database\Exception\Authorization;
|
||||||
|
use Utopia\Database\Query;
|
||||||
|
use Utopia\Platform\Action;
|
||||||
|
use Utopia\Queue\Message;
|
||||||
|
|
||||||
|
class Deletes extends Action
|
||||||
|
{
|
||||||
|
public static function getName(): string
|
||||||
|
{
|
||||||
|
return 'deletes';
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
public function __construct()
|
||||||
|
{
|
||||||
|
$this
|
||||||
|
->desc('Deletes worker')
|
||||||
|
->inject('message')
|
||||||
|
->inject('dbForConsole')
|
||||||
|
->inject('getProjectDB')
|
||||||
|
->inject('getFilesDevice')
|
||||||
|
->inject('getFunctionsDevice')
|
||||||
|
->inject('getBuildsDevice')
|
||||||
|
->inject('getCacheDevice')
|
||||||
|
->callback(fn($message, $dbForConsole, $getProjectDB, $getFilesDevice, $getFunctionsDevice, $getBuildsDevice, $getCacheDevice) => $this->action($message, $dbForConsole, $getProjectDB, $getFilesDevice, $getFunctionsDevice, $getBuildsDevice, $getCacheDevice));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @throws Exception
|
||||||
|
* @throws \Throwable
|
||||||
|
*/
|
||||||
|
public function action(Message $message, Database $dbForConsole, callable $getProjectDB, callable $getFilesDevice, callable $getFunctionsDevice, callable $getBuildsDevice, callable $getCacheDevice): void
|
||||||
|
{
|
||||||
|
$payload = $message->getPayload() ?? [];
|
||||||
|
|
||||||
|
if (empty($payload)) {
|
||||||
|
throw new Exception('Missing payload');
|
||||||
|
}
|
||||||
|
|
||||||
|
$type = $payload['type'] ?? '';
|
||||||
|
$datetime = $payload['datetime'] ?? null;
|
||||||
|
$hourlyUsageRetentionDatetime = $payload['hourlyUsageRetentionDatetime'] ?? null;
|
||||||
|
$resource = $payload['resource'] ?? null;
|
||||||
|
$document = new Document($payload['document'] ?? []);
|
||||||
|
$project = new Document($payload['project'] ?? []);
|
||||||
|
|
||||||
|
switch (strval($type)) {
|
||||||
|
case DELETE_TYPE_DOCUMENT:
|
||||||
|
switch ($document->getCollection()) {
|
||||||
|
case DELETE_TYPE_DATABASES:
|
||||||
|
$this->deleteDatabase($getProjectDB, $document, $project);
|
||||||
|
break;
|
||||||
|
case DELETE_TYPE_COLLECTIONS:
|
||||||
|
$this->deleteCollection($getProjectDB, $document, $project);
|
||||||
|
break;
|
||||||
|
case DELETE_TYPE_PROJECTS:
|
||||||
|
$this->deleteProject($dbForConsole, $getProjectDB, $getFilesDevice, $getFunctionsDevice, $getBuildsDevice, $getCacheDevice, $document);
|
||||||
|
break;
|
||||||
|
case DELETE_TYPE_FUNCTIONS:
|
||||||
|
$this->deleteFunction($getProjectDB, $getFunctionsDevice, $getBuildsDevice, $document, $project);
|
||||||
|
break;
|
||||||
|
case DELETE_TYPE_DEPLOYMENTS:
|
||||||
|
$this->deleteDeployment($getProjectDB, $getFunctionsDevice, $getBuildsDevice, $document, $project);
|
||||||
|
break;
|
||||||
|
case DELETE_TYPE_USERS:
|
||||||
|
$this->deleteUser($getProjectDB, $document, $project);
|
||||||
|
break;
|
||||||
|
case DELETE_TYPE_TEAMS:
|
||||||
|
$this->deleteMemberships($getProjectDB, $document, $project);
|
||||||
|
break;
|
||||||
|
case DELETE_TYPE_BUCKETS:
|
||||||
|
$this->deleteBucket($getProjectDB, $getFilesDevice, $document, $project);
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
if (\str_starts_with($document->getCollection(), 'database_')) {
|
||||||
|
$this->deleteCollection($getProjectDB, $document, $project);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
Console::error('No lazy delete operation available for document of type: ' . $document->getCollection());
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
|
||||||
|
case DELETE_TYPE_EXECUTIONS:
|
||||||
|
$this->deleteExecutionLogs($dbForConsole, $getProjectDB, $datetime);
|
||||||
|
break;
|
||||||
|
|
||||||
|
case DELETE_TYPE_AUDIT:
|
||||||
|
if (!empty($datetime)) {
|
||||||
|
$this->deleteAuditLogs($dbForConsole, $getProjectDB, $datetime);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!$document->isEmpty()) {
|
||||||
|
$this->deleteAuditLogsByResource($getProjectDB, 'document/' . $document->getId(), $project);
|
||||||
|
}
|
||||||
|
|
||||||
|
break;
|
||||||
|
|
||||||
|
case DELETE_TYPE_ABUSE:
|
||||||
|
$this->deleteAbuseLogs($dbForConsole, $getProjectDB, $datetime);
|
||||||
|
break;
|
||||||
|
|
||||||
|
case DELETE_TYPE_REALTIME:
|
||||||
|
$this->deleteRealtimeUsage($dbForConsole, $getProjectDB, $datetime);
|
||||||
|
break;
|
||||||
|
|
||||||
|
case DELETE_TYPE_SESSIONS:
|
||||||
|
$this->deleteExpiredSessions($dbForConsole, $getProjectDB);
|
||||||
|
break;
|
||||||
|
|
||||||
|
case DELETE_TYPE_CERTIFICATES:
|
||||||
|
$this->deleteCertificates($dbForConsole, $document);
|
||||||
|
break;
|
||||||
|
|
||||||
|
case DELETE_TYPE_USAGE:
|
||||||
|
$this->deleteUsageStats($dbForConsole, $getProjectDB, $hourlyUsageRetentionDatetime);
|
||||||
|
break;
|
||||||
|
|
||||||
|
case DELETE_TYPE_CACHE_BY_RESOURCE:
|
||||||
|
$this->deleteCacheByResource($getProjectDB, $resource);
|
||||||
|
break;
|
||||||
|
case DELETE_TYPE_CACHE_BY_TIMESTAMP:
|
||||||
|
$this->deleteCacheByDate($getProjectDB);
|
||||||
|
break;
|
||||||
|
case DELETE_TYPE_SCHEDULES:
|
||||||
|
$this->deleteSchedules($dbForConsole, $getProjectDB, $datetime);
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
Console::error('No delete operation for type: ' . $type);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param Database $dbForConsole
|
||||||
|
* @param callable $getProjectDB
|
||||||
|
* @param string $datetime
|
||||||
|
* @return void
|
||||||
|
* @throws Authorization
|
||||||
|
* @throws \Throwable
|
||||||
|
*/
|
||||||
|
protected function deleteSchedules(Database $dbForConsole, callable $getProjectDB, string $datetime): void
|
||||||
|
{
|
||||||
|
$this->listByGroup(
|
||||||
|
'schedules',
|
||||||
|
[
|
||||||
|
Query::equal('region', [App::getEnv('_APP_REGION', 'default')]),
|
||||||
|
Query::equal('resourceType', ['function']),
|
||||||
|
Query::lessThanEqual('resourceUpdatedAt', $datetime),
|
||||||
|
Query::equal('active', [false]),
|
||||||
|
],
|
||||||
|
$dbForConsole,
|
||||||
|
function (Document $document) use ($dbForConsole, $getProjectDB) {
|
||||||
|
$project = $dbForConsole->getDocument('projects', $document->getAttribute('projectId'));
|
||||||
|
|
||||||
|
if ($project->isEmpty()) {
|
||||||
|
Console::warning('Unable to delete schedule for function ' . $document->getAttribute('resourceId'));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
$function = $getProjectDB($project)->getDocument('functions', $document->getAttribute('resourceId'));
|
||||||
|
|
||||||
|
if ($function->isEmpty()) {
|
||||||
|
$dbForConsole->deleteDocument('schedules', $document->getId());
|
||||||
|
Console::success('Deleting schedule for function ' . $document->getAttribute('resourceId'));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param callable $getProjectDB
|
||||||
|
* @param string $resource
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
protected function deleteCacheByResource(callable $getProjectDB, string $resource): void
|
||||||
|
{
|
||||||
|
$this->deleteCacheFiles($getProjectDB, [
|
||||||
|
Query::equal('resource', [$resource]),
|
||||||
|
]);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
protected function deleteCacheByDate(callable $getProjectDB): void
|
||||||
|
{
|
||||||
|
$this->deleteCacheFiles($getProjectDB, [
|
||||||
|
Query::lessThan('accessedAt', $this->args['datetime']),
|
||||||
|
]);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param Database $dbForConsole
|
||||||
|
* @param callable $getProjectDB
|
||||||
|
* @param array $query
|
||||||
|
* @return void
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
protected function deleteCacheFiles(Database $dbForConsole, callable $getProjectDB, array $query): void
|
||||||
|
{
|
||||||
|
$this->deleteForProjectIds($dbForConsole, function (Document $project) use ($query, $getProjectDB) {
|
||||||
|
$projectId = $project->getId();
|
||||||
|
$dbForProject = $getProjectDB($project);
|
||||||
|
$cache = new Cache(
|
||||||
|
new Filesystem(APP_STORAGE_CACHE . DIRECTORY_SEPARATOR . 'app-' . $projectId)
|
||||||
|
);
|
||||||
|
|
||||||
|
$this->deleteByGroup(
|
||||||
|
'cache',
|
||||||
|
$query,
|
||||||
|
$dbForProject,
|
||||||
|
function (Document $document) use ($cache, $projectId) {
|
||||||
|
$path = APP_STORAGE_CACHE . DIRECTORY_SEPARATOR . 'app-' . $projectId . DIRECTORY_SEPARATOR . $document->getId();
|
||||||
|
|
||||||
|
if ($cache->purge($document->getId())) {
|
||||||
|
Console::success('Deleting cache file: ' . $path);
|
||||||
|
} else {
|
||||||
|
Console::error('Failed to delete cache file: ' . $path);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param callable $getProjectDB
|
||||||
|
* @param Document $document database document
|
||||||
|
* @param Document $project
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
protected function deleteDatabase(callable $getProjectDB, Document $document, Document $project): void
|
||||||
|
{
|
||||||
|
$databaseId = $document->getId();
|
||||||
|
$dbForProject = $getProjectDB($project);
|
||||||
|
|
||||||
|
$this->deleteByGroup('database_' . $document->getInternalId(), [], $dbForProject, function ($document) use ($getProjectDB, $project) {
|
||||||
|
$this->deleteCollection($getProjectDB, $document, $project);
|
||||||
|
});
|
||||||
|
|
||||||
|
$dbForProject->deleteCollection('database_' . $document->getInternalId());
|
||||||
|
$this->deleteAuditLogsByResource($getProjectDB, 'database/' . $databaseId, $project);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param callable $getProjectDB
|
||||||
|
* @param Document $document teams document
|
||||||
|
* @param Document $project
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
protected function deleteCollection(callable $getProjectDB, Document $document, Document $project): void
|
||||||
|
{
|
||||||
|
$collectionId = $document->getId();
|
||||||
|
$databaseId = $document->getAttribute('databaseId');
|
||||||
|
$databaseInternalId = $document->getAttribute('databaseInternalId');
|
||||||
|
$dbForProject = $getProjectDB($project);
|
||||||
|
|
||||||
|
$dbForProject->deleteCollection('database_' . $databaseInternalId . '_collection_' . $document->getInternalId());
|
||||||
|
$this->deleteByGroup('attributes', [
|
||||||
|
Query::equal('databaseId', [$databaseId]),
|
||||||
|
Query::equal('collectionId', [$collectionId])
|
||||||
|
], $dbForProject);
|
||||||
|
|
||||||
|
$this->deleteByGroup('indexes', [
|
||||||
|
Query::equal('databaseId', [$databaseId]),
|
||||||
|
Query::equal('collectionId', [$collectionId])
|
||||||
|
], $dbForProject);
|
||||||
|
|
||||||
|
$this->deleteAuditLogsByResource($getProjectDB, 'database/' . $databaseId . '/collection/' . $collectionId, $project);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param Database $dbForConsole
|
||||||
|
* @param callable $getProjectDB
|
||||||
|
* @param string $hourlyUsageRetentionDatetime
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
protected function deleteUsageStats(Database $dbForConsole, callable $getProjectDB, string $hourlyUsageRetentionDatetime)
|
||||||
|
{
|
||||||
|
$this->deleteForProjectIds($dbForConsole, function (Document $project) use ($getProjectDB, $hourlyUsageRetentionDatetime) {
|
||||||
|
$dbForProject = $getProjectDB($project);
|
||||||
|
// Delete Usage stats
|
||||||
|
$this->deleteByGroup('stats', [
|
||||||
|
Query::lessThan('time', $hourlyUsageRetentionDatetime),
|
||||||
|
Query::equal('period', ['1h']),
|
||||||
|
], $dbForProject);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param callable $getProjectDB
|
||||||
|
* @param Document $document teams document
|
||||||
|
* @param Document $project
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
protected function deleteMemberships(callable $getProjectDB, Document $document, Document $project): void
|
||||||
|
{
|
||||||
|
$teamId = $document->getAttribute('teamId', '');
|
||||||
|
|
||||||
|
// Delete Memberships
|
||||||
|
$this->deleteByGroup('memberships', [
|
||||||
|
Query::equal('teamId', [$teamId])
|
||||||
|
], $getProjectDB($project));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param Database $dbForConsole
|
||||||
|
* @param callable $getProjectDB
|
||||||
|
* @param callable $getFilesDevice
|
||||||
|
* @param callable $getFunctionsDevice
|
||||||
|
* @param callable $getBuildsDevice
|
||||||
|
* @param callable $getCacheDevice
|
||||||
|
* @param Document $document project document
|
||||||
|
* @throws Authorization
|
||||||
|
*/
|
||||||
|
protected function deleteProject(Database $dbForConsole, callable $getProjectDB, callable $getFilesDevice, callable $getFunctionsDevice, callable $getBuildsDevice, callable $getCacheDevice, Document $document): void
|
||||||
|
{
|
||||||
|
$projectId = $document->getId();
|
||||||
|
|
||||||
|
// Delete project domains and certificates
|
||||||
|
$domains = $dbForConsole->find('domains', [
|
||||||
|
Query::equal('projectInternalId', [$document->getInternalId()])
|
||||||
|
]);
|
||||||
|
|
||||||
|
foreach ($domains as $domain) {
|
||||||
|
$this->deleteCertificates($dbForConsole, $domain);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delete project tables
|
||||||
|
$dbForProject = $getProjectDB($projectId, $document);
|
||||||
|
|
||||||
|
while (true) {
|
||||||
|
$collections = $dbForProject->listCollections();
|
||||||
|
|
||||||
|
if (empty($collections)) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
foreach ($collections as $collection) {
|
||||||
|
$dbForProject->deleteCollection($collection->getId());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delete metadata tables
|
||||||
|
try {
|
||||||
|
$dbForProject->deleteCollection('_metadata');
|
||||||
|
} catch (Exception) {
|
||||||
|
// Ignore: deleteCollection tries to delete a metadata entry after the collection is deleted,
|
||||||
|
// which will throw an exception here because the metadata collection is already deleted.
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delete all storage directories
|
||||||
|
$uploads = $getFilesDevice($projectId);
|
||||||
|
$functions = $getFunctionsDevice($projectId);
|
||||||
|
$builds = $getBuildsDevice($projectId);
|
||||||
|
$cache = $getCacheDevice($projectId);
|
||||||
|
|
||||||
|
$uploads->delete($uploads->getRoot(), true);
|
||||||
|
$functions->delete($functions->getRoot(), true);
|
||||||
|
$builds->delete($builds->getRoot(), true);
|
||||||
|
$cache->delete($cache->getRoot(), true);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param callable $getProjectDB
|
||||||
|
* @param Document $document user document
|
||||||
|
* @param Document $project
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
protected function deleteUser(callable $getProjectDB, Document $document, Document $project): void
|
||||||
|
{
|
||||||
|
$userId = $document->getId();
|
||||||
|
$dbForProject = $getProjectDB($project);
|
||||||
|
|
||||||
|
// Delete all sessions of this user from the sessions table and update the sessions field of the user record
|
||||||
|
$this->deleteByGroup('sessions', [
|
||||||
|
Query::equal('userId', [$userId])
|
||||||
|
], $dbForProject);
|
||||||
|
|
||||||
|
$dbForProject->deleteCachedDocument('users', $userId);
|
||||||
|
|
||||||
|
// Delete Memberships and decrement team membership counts
|
||||||
|
$this->deleteByGroup('memberships', [
|
||||||
|
Query::equal('userId', [$userId])
|
||||||
|
], $dbForProject, function (Document $document) use ($dbForProject) {
|
||||||
|
if ($document->getAttribute('confirm')) { // Count only confirmed members
|
||||||
|
$teamId = $document->getAttribute('teamId');
|
||||||
|
$team = $dbForProject->getDocument('teams', $teamId);
|
||||||
|
if (!$team->isEmpty()) {
|
||||||
|
$team = $dbForProject->updateDocument(
|
||||||
|
'teams',
|
||||||
|
$teamId,
|
||||||
|
// Ensure that total >= 0
|
||||||
|
$team->setAttribute('total', \max($team->getAttribute('total', 0) - 1, 0))
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// Delete tokens
|
||||||
|
$this->deleteByGroup('tokens', [
|
||||||
|
Query::equal('userId', [$userId])
|
||||||
|
], $dbForProject);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param database $dbForConsole
|
||||||
|
* @param callable $getProjectDB
|
||||||
|
* @param string $datetime
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
protected function deleteExecutionLogs(database $dbForConsole, callable $getProjectDB, string $datetime): void
|
||||||
|
{
|
||||||
|
$this->deleteForProjectIds($dbForConsole, function (Document $project) use ($getProjectDB, $datetime) {
|
||||||
|
$dbForProject = $getProjectDB($project);
|
||||||
|
// Delete Executions
|
||||||
|
$this->deleteByGroup('executions', [
|
||||||
|
Query::lessThan('$createdAt', $datetime)
|
||||||
|
], $dbForProject);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param Database $dbForConsole
|
||||||
|
* @param callable $getProjectDB
|
||||||
|
* @return void
|
||||||
|
* @throws Exception|\Throwable
|
||||||
|
*/
|
||||||
|
protected function deleteExpiredSessions(Database $dbForConsole, callable $getProjectDB): void
|
||||||
|
{
|
||||||
|
|
||||||
|
$this->deleteForProjectIds($dbForConsole, function (Document $project) use ($dbForConsole, $getProjectDB) {
|
||||||
|
$dbForProject = $getProjectDB($project);
|
||||||
|
$project = $dbForConsole->getDocument('projects', $project->getId());
|
||||||
|
$duration = $project->getAttribute('auths', [])['duration'] ?? Auth::TOKEN_EXPIRATION_LOGIN_LONG;
|
||||||
|
$expired = DateTime::addSeconds(new \DateTime(), -1 * $duration);
|
||||||
|
|
||||||
|
// Delete Sessions
|
||||||
|
$this->deleteByGroup('sessions', [
|
||||||
|
Query::lessThan('$createdAt', $expired)
|
||||||
|
], $dbForProject);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param Database $dbForConsole
|
||||||
|
* @param callable $getProjectDB
|
||||||
|
* @param string $datetime
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
protected function deleteRealtimeUsage(Database $dbForConsole, callable $getProjectDB, string $datetime): void
|
||||||
|
{
|
||||||
|
$this->deleteForProjectIds($dbForConsole, function (Document $project) use ($datetime, $getProjectDB) {
|
||||||
|
$dbForProject = $getProjectDB($project);
|
||||||
|
// Delete Dead Realtime Logs
|
||||||
|
$this->deleteByGroup('realtime', [
|
||||||
|
Query::lessThan('timestamp', $datetime)
|
||||||
|
], $dbForProject);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param Database $dbForConsole
|
||||||
|
* @param callable $getProjectDB
|
||||||
|
* @param string $datetime
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
protected function deleteAbuseLogs(Database $dbForConsole, callable $getProjectDB, string $datetime): void
|
||||||
|
{
|
||||||
|
if (empty($datetime)) {
|
||||||
|
throw new Exception('Failed to delete audit logs. No datetime provided');
|
||||||
|
}
|
||||||
|
|
||||||
|
$this->deleteForProjectIds($dbForConsole, function (Document $project) use ($getProjectDB, $datetime) {
|
||||||
|
$projectId = $project->getId();
|
||||||
|
$dbForProject = $getProjectDB($project);
|
||||||
|
$timeLimit = new TimeLimit("", 0, 1, $dbForProject);
|
||||||
|
$abuse = new Abuse($timeLimit);
|
||||||
|
$status = $abuse->cleanup($datetime);
|
||||||
|
if (!$status) {
|
||||||
|
throw new Exception('Failed to delete Abuse logs for project ' . $projectId);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param Database $dbForConsole
|
||||||
|
* @param callable $getProjectDB
|
||||||
|
* @param string $datetime
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
protected function deleteAuditLogs(Database $dbForConsole, callable $getProjectDB, string $datetime): void
|
||||||
|
{
|
||||||
|
if (empty($datetime)) {
|
||||||
|
throw new Exception('Failed to delete audit logs. No datetime provided');
|
||||||
|
}
|
||||||
|
|
||||||
|
$this->deleteForProjectIds($dbForConsole, function (Document $project) use ($getProjectDB, $datetime) {
|
||||||
|
$projectId = $project->getId();
|
||||||
|
$dbForProject = $getProjectDB($project);
|
||||||
|
$audit = new Audit($dbForProject);
|
||||||
|
$status = $audit->cleanup($datetime);
|
||||||
|
if (!$status) {
|
||||||
|
throw new Exception('Failed to delete Audit logs for project' . $projectId);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param callable $getProjectDB
|
||||||
|
* @param string $resource
|
||||||
|
* @param Document $project
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
protected function deleteAuditLogsByResource(callable $getProjectDB, string $resource, Document $project): void
|
||||||
|
{
|
||||||
|
$dbForProject = $getProjectDB($project);
|
||||||
|
|
||||||
|
$this->deleteByGroup(Audit::COLLECTION, [
|
||||||
|
Query::equal('resource', [$resource])
|
||||||
|
], $dbForProject);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param callable $getProjectDB
|
||||||
|
* @param callable $getFunctionsDevice
|
||||||
|
* @param callable $getBuildsDevice
|
||||||
|
* @param Document $document function document
|
||||||
|
* @param Document $project
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
protected function deleteFunction(callable $getProjectDB, callable $getFunctionsDevice, callable $getBuildsDevice, Document $document, Document $project): void
|
||||||
|
{
|
||||||
|
$projectId = $project->getId();
|
||||||
|
$dbForProject = $getProjectDB($project);
|
||||||
|
$functionId = $document->getId();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Delete Variables
|
||||||
|
*/
|
||||||
|
Console::info("Deleting variables for function " . $functionId);
|
||||||
|
$this->deleteByGroup('variables', [
|
||||||
|
Query::equal('functionId', [$functionId])
|
||||||
|
], $dbForProject);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Delete Deployments
|
||||||
|
*/
|
||||||
|
Console::info("Deleting deployments for function " . $functionId);
|
||||||
|
$storageFunctions = $getFunctionsDevice($projectId);
|
||||||
|
$deploymentIds = [];
|
||||||
|
$this->deleteByGroup('deployments', [
|
||||||
|
Query::equal('resourceId', [$functionId])
|
||||||
|
], $dbForProject, function (Document $document) use ($storageFunctions, &$deploymentIds) {
|
||||||
|
$deploymentIds[] = $document->getId();
|
||||||
|
if ($storageFunctions->delete($document->getAttribute('path', ''), true)) {
|
||||||
|
Console::success('Deleted deployment files: ' . $document->getAttribute('path', ''));
|
||||||
|
} else {
|
||||||
|
Console::error('Failed to delete deployment files: ' . $document->getAttribute('path', ''));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Delete builds
|
||||||
|
*/
|
||||||
|
Console::info("Deleting builds for function " . $functionId);
|
||||||
|
$storageBuilds = $getBuildsDevice($projectId);
|
||||||
|
foreach ($deploymentIds as $deploymentId) {
|
||||||
|
$this->deleteByGroup('builds', [
|
||||||
|
Query::equal('deploymentId', [$deploymentId])
|
||||||
|
], $dbForProject, function (Document $document) use ($storageBuilds, $deploymentId) {
|
||||||
|
if ($storageBuilds->delete($document->getAttribute('outputPath', ''), true)) {
|
||||||
|
Console::success('Deleted build files: ' . $document->getAttribute('outputPath', ''));
|
||||||
|
} else {
|
||||||
|
Console::error('Failed to delete build files: ' . $document->getAttribute('outputPath', ''));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Delete Executions
|
||||||
|
*/
|
||||||
|
Console::info("Deleting executions for function " . $functionId);
|
||||||
|
$this->deleteByGroup('executions', [
|
||||||
|
Query::equal('functionId', [$functionId])
|
||||||
|
], $dbForProject);
|
||||||
|
|
||||||
|
// TODO: Request executor to delete runtime
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param callable $getProjectDB
|
||||||
|
* @param callable $getFunctionsDevice
|
||||||
|
* @param callable $getBuildsDevice
|
||||||
|
* @param Document $document deployment document
|
||||||
|
* @param Document $project
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
protected function deleteDeployment(callable $getProjectDB, callable $getFunctionsDevice, callable $getBuildsDevice, Document $document, Document $project): void
|
||||||
|
{
|
||||||
|
$projectId = $project->getId();
|
||||||
|
$dbForProject = $getProjectDB($project);
|
||||||
|
$deploymentId = $document->getId();
|
||||||
|
$functionId = $document->getAttribute('resourceId');
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Delete deployment files
|
||||||
|
*/
|
||||||
|
Console::info("Deleting deployment files for deployment " . $deploymentId);
|
||||||
|
$storageFunctions = $getFunctionsDevice($projectId);
|
||||||
|
if ($storageFunctions->delete($document->getAttribute('path', ''), true)) {
|
||||||
|
Console::success('Deleted deployment files: ' . $document->getAttribute('path', ''));
|
||||||
|
} else {
|
||||||
|
Console::error('Failed to delete deployment files: ' . $document->getAttribute('path', ''));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Delete builds
|
||||||
|
*/
|
||||||
|
Console::info("Deleting builds for deployment " . $deploymentId);
|
||||||
|
$storageBuilds = $getBuildsDevice($projectId);
|
||||||
|
$this->deleteByGroup('builds', [
|
||||||
|
Query::equal('deploymentId', [$deploymentId])
|
||||||
|
], $dbForProject, function (Document $document) use ($storageBuilds) {
|
||||||
|
if ($storageBuilds->delete($document->getAttribute('outputPath', ''), true)) {
|
||||||
|
Console::success('Deleted build files: ' . $document->getAttribute('outputPath', ''));
|
||||||
|
} else {
|
||||||
|
Console::error('Failed to delete build files: ' . $document->getAttribute('outputPath', ''));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// TODO: Request executor to delete runtime
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param Document $document to be deleted
|
||||||
|
* @param Database $database to delete it from
|
||||||
|
* @param callable|null $callback to perform after document is deleted
|
||||||
|
* @return bool
|
||||||
|
* @throws Authorization
|
||||||
|
*/
|
||||||
|
protected function deleteById(Document $document, Database $database, callable $callback = null): bool
|
||||||
|
{
|
||||||
|
if ($database->deleteDocument($document->getCollection(), $document->getId())) {
|
||||||
|
Console::success('Deleted document "' . $document->getId() . '" successfully');
|
||||||
|
|
||||||
|
if (is_callable($callback)) {
|
||||||
|
$callback($document);
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
|
} else {
|
||||||
|
Console::error('Failed to delete document: ' . $document->getId());
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param Database $dbForConsole
|
||||||
|
* @param callable $callback
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
protected function deleteForProjectIds(database $dbForConsole, callable $callback): void
|
||||||
|
{
|
||||||
|
// TODO: @Meldiron name of this method no longer matches. It does not delete, and it gives whole document
|
||||||
|
$count = 0;
|
||||||
|
$chunk = 0;
|
||||||
|
$limit = 50;
|
||||||
|
$projects = [];
|
||||||
|
$sum = $limit;
|
||||||
|
|
||||||
|
$executionStart = \microtime(true);
|
||||||
|
|
||||||
|
while ($sum === $limit) {
|
||||||
|
$projects = $dbForConsole->find('projects', [Query::limit($limit), Query::offset($chunk * $limit)]);
|
||||||
|
|
||||||
|
$chunk++;
|
||||||
|
|
||||||
|
/** @var string[] $projectIds */
|
||||||
|
$sum = count($projects);
|
||||||
|
|
||||||
|
Console::info('Executing delete function for chunk #' . $chunk . '. Found ' . $sum . ' projects');
|
||||||
|
foreach ($projects as $project) {
|
||||||
|
$callback($project);
|
||||||
|
$count++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
$executionEnd = \microtime(true);
|
||||||
|
Console::info("Found {$count} projects " . ($executionEnd - $executionStart) . " seconds");
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param string $collection collectionID
|
||||||
|
* @param array $queries
|
||||||
|
* @param Database $database
|
||||||
|
* @param callable|null $callback
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
protected function deleteByGroup(string $collection, array $queries, Database $database, callable $callback = null): void
|
||||||
|
{
|
||||||
|
$count = 0;
|
||||||
|
$chunk = 0;
|
||||||
|
$limit = 50;
|
||||||
|
$results = [];
|
||||||
|
$sum = $limit;
|
||||||
|
|
||||||
|
$executionStart = \microtime(true);
|
||||||
|
|
||||||
|
while ($sum === $limit) {
|
||||||
|
$chunk++;
|
||||||
|
|
||||||
|
$results = $database->find($collection, \array_merge([Query::limit($limit)], $queries));
|
||||||
|
|
||||||
|
$sum = count($results);
|
||||||
|
|
||||||
|
Console::info('Deleting chunk #' . $chunk . '. Found ' . $sum . ' documents');
|
||||||
|
|
||||||
|
foreach ($results as $document) {
|
||||||
|
$this->deleteById($document, $database, $callback);
|
||||||
|
$count++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
$executionEnd = \microtime(true);
|
||||||
|
|
||||||
|
Console::info("Deleted {$count} document by group in " . ($executionEnd - $executionStart) . " seconds");
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param string $collection collectionID
|
||||||
|
* @param Query[] $queries
|
||||||
|
* @param Database $database
|
||||||
|
* @param callable|null $callback
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
protected function listByGroup(string $collection, array $queries, Database $database, callable $callback = null): void
|
||||||
|
{
|
||||||
|
$count = 0;
|
||||||
|
$chunk = 0;
|
||||||
|
$limit = 50;
|
||||||
|
$results = [];
|
||||||
|
$sum = $limit;
|
||||||
|
|
||||||
|
$executionStart = \microtime(true);
|
||||||
|
|
||||||
|
while ($sum === $limit) {
|
||||||
|
$chunk++;
|
||||||
|
|
||||||
|
$results = $database->find($collection, \array_merge([Query::limit($limit)], $queries));
|
||||||
|
|
||||||
|
$sum = count($results);
|
||||||
|
|
||||||
|
foreach ($results as $document) {
|
||||||
|
if (is_callable($callback)) {
|
||||||
|
$callback($document);
|
||||||
|
}
|
||||||
|
|
||||||
|
$count++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
$executionEnd = \microtime(true);
|
||||||
|
|
||||||
|
Console::info("Listed {$count} document by group in " . ($executionEnd - $executionStart) . " seconds");
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param Database $dbForConsole
|
||||||
|
* @param Document $document certificates document
|
||||||
|
* @throws Authorization
|
||||||
|
*/
|
||||||
|
protected function deleteCertificates(Database $dbForConsole, Document $document): void
|
||||||
|
{
|
||||||
|
// If domain has certificate generated
|
||||||
|
if (isset($document['certificateId'])) {
|
||||||
|
$domainUsingCertificate = $dbForConsole->findOne('domains', [
|
||||||
|
Query::equal('certificateId', [$document['certificateId']])
|
||||||
|
]);
|
||||||
|
|
||||||
|
if (!$domainUsingCertificate) {
|
||||||
|
$mainDomain = App::getEnv('_APP_DOMAIN_TARGET', '');
|
||||||
|
if ($mainDomain === $document->getAttribute('domain')) {
|
||||||
|
$domainUsingCertificate = $mainDomain;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// If certificate is still used by some domain, mark we can't delete.
|
||||||
|
// Current domain should not be found, because we only have copy. Original domain is already deleted from database.
|
||||||
|
if ($domainUsingCertificate) {
|
||||||
|
Console::warning("Skipping certificate deletion, because a domain is still using it.");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
$domain = $document->getAttribute('domain');
|
||||||
|
$directory = APP_STORAGE_CERTIFICATES . '/' . $domain;
|
||||||
|
$checkTraversal = realpath($directory) === $directory;
|
||||||
|
|
||||||
|
if ($domain && $checkTraversal && is_dir($directory)) {
|
||||||
|
// Delete certificate document, so Appwrite is aware of change
|
||||||
|
if (isset($document['certificateId'])) {
|
||||||
|
$dbForConsole->deleteDocument('certificates', $document['certificateId']);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delete files, so Traefik is aware of change
|
||||||
|
array_map('unlink', glob($directory . '/*.*'));
|
||||||
|
rmdir($directory);
|
||||||
|
Console::info("Deleted certificate files for {$domain}");
|
||||||
|
} else {
|
||||||
|
Console::info("No certificate files found for {$domain}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param callable $getProjectDB
|
||||||
|
* @param callable $getFilesDevice
|
||||||
|
* @param Document $document
|
||||||
|
* @param Document $project
|
||||||
|
* @return void
|
||||||
|
*/
|
||||||
|
protected function deleteBucket(callable $getProjectDB, callable $getFilesDevice, Document $document, Document $project): void
|
||||||
|
{
|
||||||
|
$projectId = $project->getId();
|
||||||
|
$dbForProject = $getProjectDB($project);
|
||||||
|
|
||||||
|
$dbForProject->deleteCollection('bucket_' . $document->getInternalId());
|
||||||
|
|
||||||
|
$device = $getFilesDevice($projectId);
|
||||||
|
|
||||||
|
$device->deletePath($document->getId());
|
||||||
|
}
|
||||||
|
}
|
350
src/Appwrite/Platform/Workers/Functions.php
Normal file
350
src/Appwrite/Platform/Workers/Functions.php
Normal file
|
@ -0,0 +1,350 @@
|
||||||
|
<?php
|
||||||
|
|
||||||
|
namespace Appwrite\Platform\Workers;
|
||||||
|
|
||||||
|
use Appwrite\Event\Event;
|
||||||
|
use Appwrite\Event\Func;
|
||||||
|
use Appwrite\Event\Usage;
|
||||||
|
use Appwrite\Messaging\Adapter\Realtime;
|
||||||
|
use Appwrite\Utopia\Response\Model\Execution;
|
||||||
|
use Exception;
|
||||||
|
use Executor\Executor;
|
||||||
|
use Utopia\App;
|
||||||
|
use Utopia\CLI\Console;
|
||||||
|
use Utopia\Config\Config;
|
||||||
|
use Utopia\Database\Database;
|
||||||
|
use Utopia\Database\Document;
|
||||||
|
use Utopia\Database\Exception\Authorization;
|
||||||
|
use Utopia\Database\Exception\Structure;
|
||||||
|
use Utopia\Database\Helpers\ID;
|
||||||
|
use Utopia\Database\Helpers\Permission;
|
||||||
|
use Utopia\Database\Helpers\Role;
|
||||||
|
use Utopia\Database\Query;
|
||||||
|
use Utopia\Platform\Action;
|
||||||
|
use Utopia\Queue\Message;
|
||||||
|
|
||||||
|
class Functions extends Action
|
||||||
|
{
|
||||||
|
public static function getName(): string
|
||||||
|
{
|
||||||
|
return 'functions';
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
public function __construct()
|
||||||
|
{
|
||||||
|
$this
|
||||||
|
->desc('Functions worker')
|
||||||
|
->inject('message')
|
||||||
|
->inject('dbForProject')
|
||||||
|
->inject('queueForFunctions')
|
||||||
|
->inject('queueForUsage')
|
||||||
|
->callback(fn($message, $dbForProject, $queueForFunctions, $queueForUsage) => $this->action($message, $dbForProject, $queueForFunctions, $queueForUsage));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @throws Exception|\Throwable
|
||||||
|
*/
|
||||||
|
public function action(Message $message, Database $dbForProject, Func $queueForFunctions, Usage $queueForUsage): void
|
||||||
|
{
|
||||||
|
$payload = $message->getPayload() ?? [];
|
||||||
|
|
||||||
|
if (empty($payload)) {
|
||||||
|
throw new Exception('Missing payload');
|
||||||
|
}
|
||||||
|
|
||||||
|
$payload = $message->getPayload() ?? [];
|
||||||
|
|
||||||
|
if (empty($payload)) {
|
||||||
|
throw new Exception('Missing payload');
|
||||||
|
}
|
||||||
|
|
||||||
|
$type = $payload['type'] ?? '';
|
||||||
|
$events = $payload['events'] ?? [];
|
||||||
|
$data = $payload['data'] ?? '';
|
||||||
|
$eventData = $payload['payload'] ?? '';
|
||||||
|
$project = new Document($payload['project'] ?? []);
|
||||||
|
$function = new Document($payload['function'] ?? []);
|
||||||
|
$user = new Document($payload['user'] ?? []);
|
||||||
|
|
||||||
|
if ($project->getId() === 'console') {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!empty($events)) {
|
||||||
|
$limit = 30;
|
||||||
|
$sum = 30;
|
||||||
|
$offset = 0;
|
||||||
|
$functions = [];
|
||||||
|
/** @var Document[] $functions */
|
||||||
|
while ($sum >= $limit) {
|
||||||
|
$functions = $dbForProject->find('functions', [
|
||||||
|
Query::limit($limit),
|
||||||
|
Query::offset($offset),
|
||||||
|
Query::orderAsc('name'),
|
||||||
|
]);
|
||||||
|
|
||||||
|
$sum = \count($functions);
|
||||||
|
$offset = $offset + $limit;
|
||||||
|
|
||||||
|
Console::log('Fetched ' . $sum . ' functions...');
|
||||||
|
|
||||||
|
foreach ($functions as $function) {
|
||||||
|
if (!array_intersect($events, $function->getAttribute('events', []))) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
Console::success('Iterating function: ' . $function->getAttribute('name'));
|
||||||
|
|
||||||
|
$this->execute(
|
||||||
|
dbForProject: $dbForProject,
|
||||||
|
queueForFunctions: $queueForFunctions,
|
||||||
|
queueForUsage: $queueForUsage,
|
||||||
|
project: $project,
|
||||||
|
function: $function,
|
||||||
|
trigger: 'event',
|
||||||
|
user: $user,
|
||||||
|
event: $events[0],
|
||||||
|
eventData: \is_string($eventData) ? $eventData : \json_encode($eventData),
|
||||||
|
);
|
||||||
|
Console::success('Triggered function: ' . $events[0]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Handle Schedule and HTTP execution.
|
||||||
|
*/
|
||||||
|
switch ($type) {
|
||||||
|
case 'http':
|
||||||
|
$jwt = $payload['jwt'] ?? '';
|
||||||
|
$execution = new Document($payload['execution'] ?? []);
|
||||||
|
$user = new Document($payload['user'] ?? []);
|
||||||
|
$this->execute(
|
||||||
|
dbForProject: $dbForProject,
|
||||||
|
queueForFunctions: $queueForFunctions,
|
||||||
|
queueForUsage: $queueForUsage,
|
||||||
|
project: $project,
|
||||||
|
function: $function,
|
||||||
|
trigger: 'http',
|
||||||
|
data: $data,
|
||||||
|
user: $user,
|
||||||
|
jwt: $jwt,
|
||||||
|
executionId: $execution->getId(),
|
||||||
|
);
|
||||||
|
break;
|
||||||
|
case 'schedule':
|
||||||
|
$this->execute(
|
||||||
|
dbForProject: $dbForProject,
|
||||||
|
queueForFunctions: $queueForFunctions,
|
||||||
|
queueForUsage: $queueForUsage,
|
||||||
|
project: $project,
|
||||||
|
function: $function,
|
||||||
|
trigger: 'schedule',
|
||||||
|
);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @throws Authorization
|
||||||
|
* @throws \Throwable
|
||||||
|
* @throws Structure
|
||||||
|
*/
|
||||||
|
private function execute(
|
||||||
|
Database $dbForProject,
|
||||||
|
Func $queueForFunctions,
|
||||||
|
Usage $queueForUsage,
|
||||||
|
Document $project,
|
||||||
|
Document $function,
|
||||||
|
string $trigger,
|
||||||
|
string $data = null,
|
||||||
|
?Document $user = null,
|
||||||
|
string $jwt = null,
|
||||||
|
string $event = null,
|
||||||
|
string $eventData = null,
|
||||||
|
string $executionId = null,
|
||||||
|
): void {
|
||||||
|
$user ??= new Document();
|
||||||
|
$functionId = $function->getId();
|
||||||
|
$functionInternalId = $function->getInternalId();
|
||||||
|
$deploymentId = $function->getAttribute('deployment', '');
|
||||||
|
|
||||||
|
/** Check if deployment exists */
|
||||||
|
$deployment = $dbForProject->getDocument('deployments', $deploymentId);
|
||||||
|
$deploymentInternalId = $deployment->getInternalId();
|
||||||
|
|
||||||
|
if ($deployment->getAttribute('resourceId') !== $functionId) {
|
||||||
|
throw new Exception('Deployment not found. Create deployment before trying to execute a function');
|
||||||
|
}
|
||||||
|
|
||||||
|
if ($deployment->isEmpty()) {
|
||||||
|
throw new Exception('Deployment not found. Create deployment before trying to execute a function');
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Check if build has exists */
|
||||||
|
$build = $dbForProject->getDocument('builds', $deployment->getAttribute('buildId', ''));
|
||||||
|
if ($build->isEmpty()) {
|
||||||
|
throw new Exception('Build not found');
|
||||||
|
}
|
||||||
|
|
||||||
|
if ($build->getAttribute('status') !== 'ready') {
|
||||||
|
throw new Exception('Build not ready');
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Check if runtime is supported */
|
||||||
|
$runtimes = Config::getParam('runtimes', []);
|
||||||
|
|
||||||
|
if (!\array_key_exists($function->getAttribute('runtime'), $runtimes)) {
|
||||||
|
throw new Exception('Runtime "' . $function->getAttribute('runtime', '') . '" is not supported');
|
||||||
|
}
|
||||||
|
|
||||||
|
$runtime = $runtimes[$function->getAttribute('runtime')];
|
||||||
|
|
||||||
|
/** Create execution or update execution status */
|
||||||
|
$execution = $dbForProject->getDocument('executions', $executionId ?? '');
|
||||||
|
if ($execution->isEmpty()) {
|
||||||
|
$executionId = ID::unique();
|
||||||
|
$execution = $dbForProject->createDocument('executions', new Document([
|
||||||
|
'$id' => $executionId,
|
||||||
|
'$permissions' => $user->isEmpty() ? [] : [Permission::read(Role::user($user->getId()))],
|
||||||
|
'functionId' => $functionId,
|
||||||
|
'functionInternalId' => $functionInternalId,
|
||||||
|
'deploymentInternalId' => $deploymentInternalId,
|
||||||
|
'deploymentId' => $deploymentId,
|
||||||
|
'trigger' => $trigger,
|
||||||
|
'status' => 'waiting',
|
||||||
|
'statusCode' => 0,
|
||||||
|
'response' => '',
|
||||||
|
'stderr' => '',
|
||||||
|
'duration' => 0.0,
|
||||||
|
'search' => implode(' ', [$function->getId(), $executionId]),
|
||||||
|
]));
|
||||||
|
|
||||||
|
// TODO: @Meldiron Trigger executions.create event here
|
||||||
|
|
||||||
|
if ($execution->isEmpty()) {
|
||||||
|
throw new Exception('Failed to create or read execution');
|
||||||
|
}
|
||||||
|
|
||||||
|
/*** Usage */
|
||||||
|
$queueForUsage
|
||||||
|
->addMetric(METRIC_EXECUTIONS, 1) // per project
|
||||||
|
->addMetric(str_replace('{functionInternalId}', $function->getInternalId(), METRIC_FUNCTION_ID_EXECUTIONS), 1); // per function
|
||||||
|
}
|
||||||
|
|
||||||
|
$execution->setAttribute('status', 'processing');
|
||||||
|
$execution = $dbForProject->updateDocument('executions', $executionId, $execution);
|
||||||
|
|
||||||
|
$vars = array_reduce($function->getAttribute('vars', []), function (array $carry, Document $var) {
|
||||||
|
$carry[$var->getAttribute('key')] = $var->getAttribute('value');
|
||||||
|
return $carry;
|
||||||
|
}, []);
|
||||||
|
|
||||||
|
/** Collect environment variables */
|
||||||
|
$vars = \array_merge($vars, [
|
||||||
|
'APPWRITE_FUNCTION_ID' => $functionId,
|
||||||
|
'APPWRITE_FUNCTION_NAME' => $function->getAttribute('name'),
|
||||||
|
'APPWRITE_FUNCTION_DEPLOYMENT' => $deploymentId,
|
||||||
|
'APPWRITE_FUNCTION_TRIGGER' => $trigger,
|
||||||
|
'APPWRITE_FUNCTION_PROJECT_ID' => $project->getId(),
|
||||||
|
'APPWRITE_FUNCTION_RUNTIME_NAME' => $runtime['name'] ?? '',
|
||||||
|
'APPWRITE_FUNCTION_RUNTIME_VERSION' => $runtime['version'] ?? '',
|
||||||
|
'APPWRITE_FUNCTION_EVENT' => $event ?? '',
|
||||||
|
'APPWRITE_FUNCTION_EVENT_DATA' => $eventData ?? '',
|
||||||
|
'APPWRITE_FUNCTION_DATA' => $data ?? '',
|
||||||
|
'APPWRITE_FUNCTION_USER_ID' => $user->getId() ?? '',
|
||||||
|
'APPWRITE_FUNCTION_JWT' => $jwt ?? '',
|
||||||
|
]);
|
||||||
|
|
||||||
|
/** Execute function */
|
||||||
|
try {
|
||||||
|
$client = new Executor(App::getEnv('_APP_EXECUTOR_HOST'));
|
||||||
|
$executionResponse = $client->createExecution(
|
||||||
|
projectId: $project->getId(),
|
||||||
|
deploymentId: $deploymentId,
|
||||||
|
payload: $vars['APPWRITE_FUNCTION_DATA'] ?? '',
|
||||||
|
variables: $vars,
|
||||||
|
timeout: $function->getAttribute('timeout', 0),
|
||||||
|
image: $runtime['image'],
|
||||||
|
source: $build->getAttribute('outputPath', ''),
|
||||||
|
entrypoint: $deployment->getAttribute('entrypoint', ''),
|
||||||
|
);
|
||||||
|
|
||||||
|
/** Update execution status */
|
||||||
|
$execution
|
||||||
|
->setAttribute('status', $executionResponse['status'])
|
||||||
|
->setAttribute('statusCode', $executionResponse['statusCode'])
|
||||||
|
->setAttribute('response', $executionResponse['response'])
|
||||||
|
->setAttribute('stdout', $executionResponse['stdout'])
|
||||||
|
->setAttribute('stderr', $executionResponse['stderr'])
|
||||||
|
->setAttribute('duration', $executionResponse['duration']);
|
||||||
|
} catch (\Throwable $th) {
|
||||||
|
$interval = (new \DateTime())->diff(new \DateTime($execution->getCreatedAt()));
|
||||||
|
$execution
|
||||||
|
->setAttribute('duration', (float)$interval->format('%s.%f'))
|
||||||
|
->setAttribute('status', 'failed')
|
||||||
|
->setAttribute('statusCode', $th->getCode())
|
||||||
|
->setAttribute('stderr', $th->getMessage());
|
||||||
|
|
||||||
|
Console::error($th->getTraceAsString());
|
||||||
|
Console::error($th->getFile());
|
||||||
|
Console::error($th->getLine());
|
||||||
|
Console::error($th->getMessage());
|
||||||
|
}
|
||||||
|
|
||||||
|
$execution = $dbForProject->updateDocument('executions', $executionId, $execution);
|
||||||
|
|
||||||
|
/** Trigger Webhook */
|
||||||
|
$executionModel = new Execution();
|
||||||
|
$executionUpdate = new Event(Event::WEBHOOK_QUEUE_NAME, Event::WEBHOOK_CLASS_NAME);
|
||||||
|
$executionUpdate
|
||||||
|
->setProject($project)
|
||||||
|
->setUser($user)
|
||||||
|
->setEvent('functions.[functionId].executions.[executionId].update')
|
||||||
|
->setParam('functionId', $function->getId())
|
||||||
|
->setParam('executionId', $execution->getId())
|
||||||
|
->setPayload($execution->getArrayCopy(array_keys($executionModel->getRules())))
|
||||||
|
->trigger();
|
||||||
|
|
||||||
|
/** Trigger Functions */
|
||||||
|
$queueForFunctions
|
||||||
|
->from($executionUpdate)
|
||||||
|
->trigger();
|
||||||
|
|
||||||
|
/** Trigger realtime event */
|
||||||
|
$allEvents = Event::generateEvents('functions.[functionId].executions.[executionId].update', [
|
||||||
|
'functionId' => $function->getId(),
|
||||||
|
'executionId' => $execution->getId()
|
||||||
|
]);
|
||||||
|
$target = Realtime::fromPayload(
|
||||||
|
// Pass first, most verbose event pattern
|
||||||
|
event: $allEvents[0],
|
||||||
|
payload: $execution
|
||||||
|
);
|
||||||
|
Realtime::send(
|
||||||
|
projectId: 'console',
|
||||||
|
payload: $execution->getArrayCopy(),
|
||||||
|
events: $allEvents,
|
||||||
|
channels: $target['channels'],
|
||||||
|
roles: $target['roles']
|
||||||
|
);
|
||||||
|
Realtime::send(
|
||||||
|
projectId: $project->getId(),
|
||||||
|
payload: $execution->getArrayCopy(),
|
||||||
|
events: $allEvents,
|
||||||
|
channels: $target['channels'],
|
||||||
|
roles: $target['roles']
|
||||||
|
);
|
||||||
|
|
||||||
|
/** Trigger usage queue */
|
||||||
|
$queueForUsage
|
||||||
|
->setProject($project)
|
||||||
|
->addMetric(METRIC_EXECUTIONS_COMPUTE, (int)($execution->getAttribute('duration') * 1000))// per project
|
||||||
|
->addMetric(str_replace('{functionInternalId}', $function->getInternalId(), METRIC_FUNCTION_ID_EXECUTIONS_COMPUTE), (int)($execution->getAttribute('duration') * 1000))
|
||||||
|
->trigger()
|
||||||
|
;
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in a new issue