From 141d55bd361e53d79df8f78c8c4e6ad0b85fc64c Mon Sep 17 00:00:00 2001 From: shimon Date: Tue, 13 Dec 2022 09:35:05 +0200 Subject: [PATCH] build+functions worker --- app/config/collections.php | 39 ++++++++++++++++- app/controllers/api/functions.php | 13 +++--- app/controllers/shared/api.php | 26 +++++------ app/init.php | 4 +- app/worker.php | 15 +++++-- app/workers/builds.php | 18 +++++++- app/workers/functions.php | 24 ++++++++--- app/workers/usage.php | 43 ++++++++++++++----- docker-compose.yml | 3 +- src/Appwrite/Resque/Worker.php | 31 ++++++++++--- .../Utopia/Response/Model/UsageFunction.php | 37 ++++++---------- .../Utopia/Response/Model/UsageFunctions.php | 4 +- src/Executor/Executor.php | 1 - 13 files changed, 177 insertions(+), 81 deletions(-) diff --git a/app/config/collections.php b/app/config/collections.php index 9f63830f7..06a267ed1 100644 --- a/app/config/collections.php +++ b/app/config/collections.php @@ -3174,7 +3174,44 @@ $collections = [ ], ], ], - + 'statsLogger' => [ + '$collection' => ID::custom(Database::METADATA), + '$id' => ID::custom('statsLogger'), + 'name' => 'StatsLogger', + 'attributes' => [ + [ + '$id' => ID::custom('time'), + 'type' => Database::VAR_DATETIME, + 'format' => '', + 'size' => 0, + 'signed' => false, + 'required' => false, + 'default' => null, + 'array' => false, + 'filters' => ['datetime'], + ], + [ + '$id' => ID::custom('metrics'), + 'type' => Database::VAR_STRING, + 'format' => '', + 'size' => 1024, + 'signed' => true, + 'required' => false, + 'default' => [], + 'array' => false, + 'filters' => ['json'], + ], + ], + 'indexes' => [ + [ + '$id' => ID::custom('_key_time'), + 'type' => Database::INDEX_KEY, + 'attributes' => ['time'], + 'lengths' => [], + 'orders' => [Database::ORDER_DESC], + ], + ], + ], 'realtime' => [ '$collection' => ID::custom(Database::METADATA), '$id' => ID::custom('realtime'), diff --git a/app/controllers/api/functions.php b/app/controllers/api/functions.php index 0e184b4cd..115276cc6 100644 --- a/app/controllers/api/functions.php +++ b/app/controllers/api/functions.php @@ -1373,10 +1373,10 @@ App::get('/v1/functions/:functionId/usage') $stats = $usage = []; $days = $periods[$range]; $metrics = [ - 'function.' . $function->getId() . '.deployments', - 'function.' . $function->getId() . '.deployments.storage', + 'functions.' . $function->getId() . '.deployments', + 'functions.' . $function->getId() . '.deployments.storage', $function->getId() . '.builds', - $function->getId() . '.builds.storage', + $function->getId() . '.builds.compute', $function->getId() . '.executions', $function->getId() . '.executions.compute', ]; @@ -1424,7 +1424,7 @@ App::get('/v1/functions/:functionId/usage') 'deployments' => $usage[$metrics[0]], 'deploymentsStorage' => $usage[$metrics[1]], 'builds' => $usage[$metrics[2]], - 'buildsStorage' => $usage[$metrics[3]], + 'buildsCompute' => $usage[$metrics[3]], 'executions' => $usage[$metrics[4]], 'executionsCompute' => $usage[$metrics[5]], ]), Response::MODEL_USAGE_FUNCTION); @@ -1453,7 +1453,7 @@ App::get('/v1/functions/usage') 'deployments', 'deployments.storage', 'builds', - 'builds.storage', + 'builds.compute', 'executions', 'executions.compute', ]; @@ -1495,14 +1495,13 @@ App::get('/v1/functions/usage') } $usage[$metric] = array_reverse($usage[$metric]); } - $response->dynamic(new Document([ 'range' => $range, 'functions' => $usage[$metrics[0]], 'deployments' => $usage[$metrics[1]], 'deploymentsStorage' => $usage[$metrics[2]], 'builds' => $usage[$metrics[3]], - 'buildsStorage' => $usage[$metrics[4]], + 'buildsCompute' => $usage[$metrics[4]], 'executions' => $usage[$metrics[5]], 'executionsCompute' => $usage[$metrics[6]], ]), Response::MODEL_USAGE_FUNCTIONS); diff --git a/app/controllers/shared/api.php b/app/controllers/shared/api.php index 8bf0be6f7..6da2d2bb2 100644 --- a/app/controllers/shared/api.php +++ b/app/controllers/shared/api.php @@ -62,14 +62,13 @@ $databaseListener = function (string $event, array $args, Document $project, Usa * On Documents that tied by relations like functions>deployments>build || documents>collection>database || buckets>files * When we remove a parent document we need to deduct his children aggregation from the project scope */ - + var_dump($document->getCollection()); switch (true) { case $document->getCollection() === 'teams': $queueForUsage->addMetric("teams", $value); // per project break; case $document->getCollection() === 'users': $queueForUsage->addMetric("users", $value); // per project - // Project sessions deduction if ($event === Database::EVENT_DOCUMENT_DELETE) { $userSessions = (count($document->getAttribute('sessions'))); @@ -84,14 +83,13 @@ $databaseListener = function (string $event, array $args, Document $project, Usa } } break; - case $document->getCollection() === 'sessions': // Todo sessions count offset issue + case $document->getCollection() === 'sessions': // sessions Todo sessions count offset issue $queueForUsage->addMetric("sessions", $value); // per project break; - case $document->getCollection() === 'databases': + case $document->getCollection() === 'databases': // databases $queueForUsage->addMetric("databases", $value); // per project if ($event === Database::EVENT_DOCUMENT_DELETE) { - //Project collections deduction $dbCollections = $dbForProject->getDocument('stats', md5("_inf_" . "{$document->getId()}" . ".collections")); $projectCollections = $dbForProject->getDocument('stats', md5("_inf_collections")); @@ -117,14 +115,13 @@ $databaseListener = function (string $event, array $args, Document $project, Usa } } break; - case $document->getCollection() === 'database_' . $document['$internalId']: // collections + case str_starts_with($document->getCollection(), 'database_') && !str_contains($document->getCollection(), 'collection'): //collections $queueForUsage - ->addMetric("collections", $value) // per project - ->addMetric("{$document['databaseId']}" . ".collections", $value) // per database - ; + ->addMetric("collections", $value) // per project + ->addMetric("{$document['databaseId']}" . ".collections", $value) // per database + ; if ($event === Database::EVENT_DOCUMENT_DELETE) { - //Project documents deduction $dbDocuments = $dbForProject->getDocument('stats', md5("_inf_" . "{$document['databaseId']}" . ".documents")); $projectDocuments = $dbForProject->getDocument('stats', md5("_inf_documents")); @@ -150,7 +147,6 @@ $databaseListener = function (string $event, array $args, Document $project, Usa $queueForUsage->addMetric("buckets", $value); // per project if ($event === Database::EVENT_DOCUMENT_DELETE) { - //Project files deduction $bucketFiles = $dbForProject->getDocument('stats', md5("_inf_" . "{$document->getId()}" . ".files")); $projectFiles = $dbForProject->getDocument('stats', md5("_inf_files")); @@ -175,7 +171,7 @@ $databaseListener = function (string $event, array $args, Document $project, Usa } } break; - case $document->getCollection() === 'bucket_' . $document['$internalId']: // files + case str_starts_with($document->getCollection(), 'bucket_'): // files $queueForUsage ->addMetric("files", $value) // per project ->addMetric("files.storage", $document->getAttribute('sizeOriginal') * $value) // per project @@ -187,7 +183,6 @@ $databaseListener = function (string $event, array $args, Document $project, Usa $queueForUsage->addMetric("functions", $value); // per project if ($event === Database::EVENT_DOCUMENT_DELETE) { - //Project deployments deduction $functionDeployments = $dbForProject->getDocument('stats', md5("_inf_function." . "{$document->getId()}" . ".deployments")); $projectDeployments = $dbForProject->getDocument('stats', md5("_inf_deployments")); @@ -274,9 +269,9 @@ $databaseListener = function (string $event, array $args, Document $project, Usa $queueForUsage ->addMetric("builds", $value) // per project - ->addMetric("builds.storage", $document->getAttribute('size') * $value) // per project + ->addMetric("builds.compute", $document->getAttribute('duration') * $value) // per project ->addMetric("{$deployment['resourceId']}" . ".builds", $value) // per function - ->addMetric("{$deployment['resourceId']}" . ".builds.storage", $document->getAttribute('size') * $value) // per function + ->addMetric("{$deployment['resourceId']}" . ".builds.compute", $document->getAttribute('duration') * $value) // per function ; break; case $document->getCollection() === 'executions': @@ -645,4 +640,5 @@ App::shutdown() ->addMetric("network.outbound", $response->getSize()) ->trigger(); } + var_dump(1); }); diff --git a/app/init.php b/app/init.php index 0516d9d3e..395144206 100644 --- a/app/init.php +++ b/app/init.php @@ -39,7 +39,6 @@ use Appwrite\Network\Validator\URL; use Appwrite\OpenSSL\OpenSSL; use Appwrite\URL\URL as AppwriteURL; use Utopia\App; -use Utopia\Queue\Connection; use Utopia\Validator\Range; use Utopia\Validator\WhiteList; use Utopia\Database\ID; @@ -74,7 +73,10 @@ use Appwrite\Event\Func; use MaxMind\Db\Reader; use PHPMailer\PHPMailer\PHPMailer; use Swoole\Database\PDOProxy; +use Utopia\CLI\Console; use Utopia\Queue; +use Utopia\Queue\Connection; +use Utopia\Storage\Storage; const APP_NAME = 'Appwrite'; const APP_DOMAIN = 'appwrite.io'; diff --git a/app/worker.php b/app/worker.php index 8151381d4..78671d731 100644 --- a/app/worker.php +++ b/app/worker.php @@ -3,6 +3,7 @@ require_once __DIR__ . '/init.php'; use Appwrite\Event\Func; +use Appwrite\Event\Usage; use Swoole\Runtime; use Utopia\App; use Utopia\Cache\Adapter\Sharding; @@ -85,12 +86,18 @@ Server::setResource('queueForFunctions', function (Registry $register) { ); }, ['register']); -Server::setResource('logger', function ($register) { - return $register->get('logger'); +Server::setResource('queueForUsage', function (Registry $register) { + $pools = $register->get('pools'); + return new Usage( + $pools + ->get('queue') + ->pop() + ->getResource() + ); }, ['register']); -Server::setResource('statsd', function ($register) { - return $register->get('statsd'); +Server::setResource('logger', function ($register) { + return $register->get('logger'); }, ['register']); Server::setResource('pools', function ($register) { diff --git a/app/workers/builds.php b/app/workers/builds.php index 7754e3ec6..ff8bafae5 100644 --- a/app/workers/builds.php +++ b/app/workers/builds.php @@ -56,6 +56,11 @@ class BuildsV1 extends Worker } } + /** + * @throws \Utopia\Database\Exception\Authorization + * @throws Throwable + * @throws \Utopia\Database\Exception\Structure + */ protected function buildDeployment(Document $project, Document $function, Document $deployment) { global $register; @@ -169,8 +174,8 @@ class BuildsV1 extends Worker try { $response = $this->executor->createRuntime( - projectId: $project->getId(), deploymentId: $deployment->getId(), + projectId: $project->getId(), source: $source, image: $runtime['image'], remove: true, @@ -248,6 +253,17 @@ class BuildsV1 extends Worker roles: $target['roles'] ); } + + /** Trigger usage queue */ + $this + ->getUsageQueue() + ->setProject($project) + ->addMetric("builds", 1) // per project + ->addMetric("builds.compute", $build->getAttribute('duration')) // per project + ->addMetric("{$function->getId()}" . ".builds", 1) // per function + ->addMetric("{$function->getId()}" . ".builds.compute", $build->getAttribute('duration')) // per function + ->trigger() + ; } public function shutdown(): void diff --git a/app/workers/functions.php b/app/workers/functions.php index 132241f60..d90a3438e 100644 --- a/app/workers/functions.php +++ b/app/workers/functions.php @@ -2,12 +2,12 @@ require_once __DIR__ . '/../worker.php'; +use Appwrite\Event\Usage; use Utopia\Queue\Message; use Appwrite\Event\Event; use Appwrite\Event\Func; use Appwrite\Messaging\Adapter\Realtime; use Appwrite\Utopia\Response\Model\Execution; -use Domnikl\Statsd\Client; use Executor\Executor; use Utopia\App; use Utopia\CLI\Console; @@ -28,7 +28,7 @@ Server::setResource('execute', function () { return function ( Func $queueForFunctions, Database $dbForProject, - Client $statsd, + Usage $queueForUsage, Document $project, Document $function, string $trigger, @@ -39,6 +39,7 @@ Server::setResource('execute', function () { string $eventData = null, string $executionId = null, ) { + $user ??= new Document(); $functionId = $function->getId(); $deploymentId = $function->getAttribute('deployment', ''); @@ -201,6 +202,15 @@ Server::setResource('execute', function () { channels: $target['channels'], roles: $target['roles'] ); + + /** Trigger usage queue */ + $queueForUsage + ->setProject($project) + ->addMetric('executions', 1) // per project + ->addMetric('executions.compute', $execution->getAttribute('duration'))// per project + ->addMetric("{$function->getId()}" . ".executions", 1) + ->addMetric("{$function->getId()}" . ".executions.compute", $execution->getAttribute('duration')) + ->trigger(); }; }); @@ -208,9 +218,9 @@ $server->job() ->inject('message') ->inject('dbForProject') ->inject('queueForFunctions') - ->inject('statsd') + ->inject('queueForUsage') ->inject('execute') - ->action(function (Message $message, Database $dbForProject, Func $queueForFunctions, Client $statsd, callable $execute) { + ->action(function (Message $message, Database $dbForProject, Func $queueForFunctions, Usage $queueForUsage, callable $execute) { $payload = $message->getPayload() ?? []; if (empty($payload)) { @@ -253,7 +263,7 @@ $server->job() } Console::success('Iterating function: ' . $function->getAttribute('name')); $execute( - statsd: $statsd, + queueForUsage: $queueForUsage, dbForProject: $dbForProject, project: $project, function: $function, @@ -292,7 +302,7 @@ $server->job() data: $data, user: $user, jwt: $jwt, - statsd: $statsd, + queueForUsage: $queueForUsage, ); break; case 'schedule': @@ -308,7 +318,7 @@ $server->job() data: null, user: null, jwt: null, - statsd: $statsd, + queueForUsage: $queueForUsage, ); break; } diff --git a/app/workers/usage.php b/app/workers/usage.php index 57e733aa2..662c73973 100644 --- a/app/workers/usage.php +++ b/app/workers/usage.php @@ -2,10 +2,13 @@ require_once __DIR__ . '/../worker.php'; +use Appwrite\Extend\Exception; use Swoole\Timer; use Utopia\App; use Utopia\Database\Database; +use Utopia\Database\DateTime; use Utopia\Database\Document; +use Utopia\Database\Exception\Duplicate; use Utopia\Database\Validator\Authorization; use Utopia\Queue\Message; use Utopia\CLI\Console; @@ -43,8 +46,6 @@ $server->job() } }); - -$server->start(); $server ->workerStart() ->inject('register') @@ -54,17 +55,21 @@ $server Timer::tick(30000, function () use ($register, $cache, $pools, $periods, &$stats) { $slice = array_slice($stats, 0, count($stats)); array_splice($stats, 0, count($stats)); + $log = []; foreach ($slice as $metric) { + $dbForProject = new Database( + $pools + ->get($metric['database']) + ->pop() + ->getResource(), + $cache + ); + $dbForProject->setNamespace('_' . $metric['projectInternalId']); foreach ($periods as $period => $format) { $time = 'inf' === $period ? null : date($format, time()); $id = \md5("{$time}_{$period}_{$metric['key']}"); - $dbForProject = new Database($pools->get($metric['database'])->pop()->getResource(), $cache); - $dbForProject->setNamespace('_' . $metric['projectInternalId']); - try { - $document = $dbForProject->getDocument('stats', $id); - if ($document->isEmpty()) { - //console::log("{$period}, {$time}, {$metric['key']}={$metric['value']}"); + try { $dbForProject->createDocument('stats', new Document([ '$id' => $id, 'period' => $period, @@ -73,21 +78,37 @@ $server 'value' => $metric['value'], 'region' => App::getEnv('_APP_REGION', 'default'), ])); - } else { - //console::info("{$document->getAttribute('period')}, {$document->getAttribute('time')}, {$document->getAttribute('metric')} = {$value}"); + } catch (Duplicate $th) { $dbForProject->increaseDocumentAttribute( 'stats', - $document->getId(), + $id, 'value', $metric['value'] ); } + + $log[] = [ + 'id' => $id, + 'period' => $period, + 'time' => $time, + 'metric' => $metric['key'], + 'value' => $metric['value'], + 'region' => App::getEnv('_APP_REGION', 'default'), + ]; } catch (\Exception $e) { console::error($e->getMessage()); } finally { $pools->reclaim(); } } + + if (!empty($metrics)) { + $dbForProject->createDocument('statsLogger', new Document([ + 'time' => DateTime::now(), + 'metrics' => $log, + ])); + } } }); }); +$server->start(); diff --git a/docker-compose.yml b/docker-compose.yml index d3b1e5f8d..557c52ea7 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -93,7 +93,7 @@ services: - ./public:/usr/src/code/public - ./src:/usr/src/code/src - ./dev:/usr/local/dev - #- ./vendor/utopia-php/database:/usr/src/code/vendor/utopia-php/database + - ./vendor/utopia-php/database:/usr/src/code/vendor/utopia-php/database depends_on: - mariadb - redis @@ -390,6 +390,7 @@ services: - _APP_CONNECTIONS_QUEUE - _APP_LOGGING_PROVIDER - _APP_LOGGING_CONFIG + - _APP_CONNECTIONS_STORAGE appwrite-worker-certificates: entrypoint: worker-certificates diff --git a/src/Appwrite/Resque/Worker.php b/src/Appwrite/Resque/Worker.php index e96c24d75..50ea7d3ec 100644 --- a/src/Appwrite/Resque/Worker.php +++ b/src/Appwrite/Resque/Worker.php @@ -2,6 +2,8 @@ namespace Appwrite\Resque; +use Appwrite\Event\Func; +use Appwrite\Event\Usage; use Exception; use Utopia\App; use Utopia\Cache\Cache; @@ -9,6 +11,8 @@ use Utopia\Config\Config; use Utopia\Cache\Adapter\Sharding; use Utopia\CLI\Console; use Utopia\Database\Database; +use Utopia\Pools\Group; +use Utopia\Queue\Connection; use Utopia\Storage\Device; use Utopia\Storage\Device\Local; use Utopia\Storage\Device\DOSpaces; @@ -141,7 +145,7 @@ abstract class Worker global $register; try { - $pools = $register->get('pools'); /** @var \Utopia\Pools\Group $pools */ + $pools = $register->get('pools'); /** @var Group $pools */ $pools->reclaim(); $this->shutdown(); @@ -176,7 +180,7 @@ abstract class Worker { global $register; - $pools = $register->get('pools'); /** @var \Utopia\Pools\Group $pools */ + $pools = $register->get('pools'); /** @var Group $pools */ if ($project->isEmpty() || $project->getId() === 'console') { return $this->getConsoleDB(); @@ -213,7 +217,7 @@ abstract class Worker { global $register; - $pools = $register->get('pools'); /** @var \Utopia\Pools\Group $pools */ + $pools = $register->get('pools'); /** @var Group $pools */ $dbAdapter = $pools ->get('console') @@ -237,7 +241,7 @@ abstract class Worker { global $register; - $pools = $register->get('pools'); /** @var \Utopia\Pools\Group $pools */ + $pools = $register->get('pools'); /** @var Group $pools */ $list = Config::getParam('pools-cache', []); $adapters = []; @@ -253,6 +257,24 @@ abstract class Worker return new Cache(new Sharding($adapters)); } + /** + * Get usage queue + * @return Usage + * @throws Exception + */ + protected function getUsageQueue(): Usage + { + global $register; + + $pools = $register->get('pools'); /** @var Group $pools */ + $queue = $pools + ->get('queue') + ->pop() + ->getResource(); + + return new Usage($queue); + } + /** * Get Functions Storage Device * @param string $projectId of the project @@ -291,7 +313,6 @@ abstract class Worker public function getDevice($root): Device { $connection = App::getEnv('_APP_CONNECTIONS_STORAGE', ''); - $acl = 'private'; $device = Storage::DEVICE_LOCAL; $accessKey = ''; diff --git a/src/Appwrite/Utopia/Response/Model/UsageFunction.php b/src/Appwrite/Utopia/Response/Model/UsageFunction.php index 58d76bbf4..410db7333 100644 --- a/src/Appwrite/Utopia/Response/Model/UsageFunction.php +++ b/src/Appwrite/Utopia/Response/Model/UsageFunction.php @@ -16,58 +16,45 @@ class UsageFunction extends Model 'default' => '', 'example' => '30d', ]) - ->addRule('executionsTotal', [ + ->addRule('deployments', [ 'type' => Response::MODEL_METRIC, - 'description' => 'Aggregated stats for number of function executions.', + 'description' => 'Aggregated stats for number of function deployments.', 'default' => [], 'example' => [], 'array' => true ]) - ->addRule('executionsFailure', [ + ->addRule('deploymentsStorage', [ 'type' => Response::MODEL_METRIC, - 'description' => 'Aggregated stats for function execution failures.', + 'description' => 'Aggregated stats for function deployments storage.', 'default' => [], 'example' => [], 'array' => true ]) - ->addRule('executionsSuccess', [ - 'type' => Response::MODEL_METRIC, - 'description' => 'Aggregated stats for function execution successes.', - 'default' => [], - 'example' => [], - 'array' => true - ]) - ->addRule('executionsTime', [ - 'type' => Response::MODEL_METRIC, - 'description' => 'Aggregated stats for function execution duration.', - 'default' => [], - 'example' => [], - 'array' => true - ]) - ->addRule('buildsTotal', [ + ->addRule('builds', [ 'type' => Response::MODEL_METRIC, 'description' => 'Aggregated stats for number of function builds.', 'default' => [], 'example' => [], 'array' => true ]) - ->addRule('buildsFailure', [ + ->addRule('buildsCompute', [ 'type' => Response::MODEL_METRIC, - 'description' => 'Aggregated stats for function build failures.', + 'description' => 'Aggregated stats for function build compute.', 'default' => [], 'example' => [], 'array' => true ]) - ->addRule('buildsSuccess', [ + ->addRule('executions', [ 'type' => Response::MODEL_METRIC, - 'description' => 'Aggregated stats for function build successes.', + 'description' => 'Aggregated stats for number of function executions.', 'default' => [], 'example' => [], 'array' => true ]) - ->addRule('buildsTime', [ + + ->addRule('executionsCompute', [ 'type' => Response::MODEL_METRIC, - 'description' => 'Aggregated stats for function build duration.', + 'description' => 'Aggregated stats for function execution compute.', 'default' => [], 'example' => [], 'array' => true diff --git a/src/Appwrite/Utopia/Response/Model/UsageFunctions.php b/src/Appwrite/Utopia/Response/Model/UsageFunctions.php index b3bfa8629..85ab2111e 100644 --- a/src/Appwrite/Utopia/Response/Model/UsageFunctions.php +++ b/src/Appwrite/Utopia/Response/Model/UsageFunctions.php @@ -44,9 +44,9 @@ class UsageFunctions extends Model 'example' => [], 'array' => true ]) - ->addRule('buildsStorage', [ + ->addRule('buildsCompute', [ 'type' => Response::MODEL_METRIC, - 'description' => 'Aggregated stats for function build storage.', + 'description' => 'Aggregated stats for function build compute.', 'default' => [], 'example' => [], 'array' => true diff --git a/src/Executor/Executor.php b/src/Executor/Executor.php index 43b2228e1..dfb4ccf46 100644 --- a/src/Executor/Executor.php +++ b/src/Executor/Executor.php @@ -133,7 +133,6 @@ class Executor 'variables' => $variables, 'payload' => $payload, 'timeout' => $timeout, - 'image' => $image, 'source' => $source, 'entrypoint' => $entrypoint,