From 4198cbfca535a7d007aff503a095006a5292612b Mon Sep 17 00:00:00 2001 From: shimon Date: Wed, 28 Dec 2022 18:27:26 +0200 Subject: [PATCH] moved reduce after delete document to usage-worker --- app/console | 2 +- app/controllers/shared/api.php | 80 +++--------------- app/workers/usage.php | 150 ++++++++++++++++++++++++++++++++- src/Appwrite/Event/Usage.php | 19 +++++ 4 files changed, 178 insertions(+), 73 deletions(-) diff --git a/app/console b/app/console index b1a81a390a..af3d741ae8 160000 --- a/app/console +++ b/app/console @@ -1 +1 @@ -Subproject commit b1a81a390a05746701651fca49e0d853f430677c +Subproject commit af3d741ae8f02c2e16b8b4ea4664a3f8970290fd diff --git a/app/controllers/shared/api.php b/app/controllers/shared/api.php index bd518fa5be..ea2def52b3 100644 --- a/app/controllers/shared/api.php +++ b/app/controllers/shared/api.php @@ -53,15 +53,10 @@ $parseLabel = function (string $label, array $responsePayload, array $requestPar $databaseListener = function (string $event, Document $document, Document $project, Usage $queueForUsage, Database $dbForProject, Logger|null $logger) { $value = 1; - if ($event === Database::EVENT_DOCUMENT_DELETE) { $value = -1; } - /** - * On Documents that tied by relations like functions>deployments>build || documents>collection>database || buckets>files - * When we remove a parent document we need to deduct his children aggregation from the project scope - */ try { switch (true) { case $document->getCollection() === 'teams': @@ -71,14 +66,9 @@ $databaseListener = function (string $event, Document $document, Document $proje case $document->getCollection() === 'users': $queueForUsage ->addMetric("users", $value); // per project - - //Project level sessions deduction if ($event === Database::EVENT_DOCUMENT_DELETE) { - $sessions = count($document->getAttribute('sessions', 0)); - if (!empty($sessions)) { - $queueForUsage - ->addMetric("sessions", ($sessions * -1)); // per project - } + $queueForUsage + ->addReduce($document); } break; case $document->getCollection() === 'sessions': // sessions @@ -90,14 +80,8 @@ $databaseListener = function (string $event, Document $document, Document $proje ->addMetric("databases", $value); // per project if ($event === Database::EVENT_DOCUMENT_DELETE) { - //Project level collections/documents deduction - $collections = $dbForProject->getDocument('stats', md5("_inf_" . "{$document->getInternalId()}" . ".collections")); - $documents = $dbForProject->getDocument('stats', md5("_inf_" . "{$document->getInternalId()}" . ".documents")); - if (!empty($collections['value'])) { - $queueForUsage - ->addMetric("collections", ($collections['value'] * -1)) - ->addMetric("documents", ($documents['value'] * -1)); - } + $queueForUsage + ->addReduce($document); } break; case str_starts_with($document->getCollection(), 'database_') && !str_contains($document->getCollection(), 'collection'): //collections @@ -109,12 +93,8 @@ $databaseListener = function (string $event, Document $document, Document $proje ; if ($event === Database::EVENT_DOCUMENT_DELETE) { - //Project documents deduction - $documents = $dbForProject->getDocument('stats', md5("_inf_" . "{$databaseId}" . ".documents")); - if (!empty($documents['value'])) { - $queueForUsage - ->addMetric("documents", ($documents['value'] * -1)); - } + $queueForUsage + ->addReduce($document); } break; case str_starts_with($document->getCollection(), 'database_') && str_contains($document->getCollection(), '_collection_'): //documents @@ -127,20 +107,12 @@ $databaseListener = function (string $event, Document $document, Document $proje ->addMetric("{$databaseId}" . "." . "{$collectionId}" . ".documents", $value) // per collection ; break; - case $document->getCollection() === 'buckets': + case $document->getCollection() === 'buckets': //buckets $queueForUsage ->addMetric("buckets", $value); // per project - if ($event === Database::EVENT_DOCUMENT_DELETE) { - //Project files/files.storage deduction - $files = $dbForProject->getDocument('stats', md5("_inf_" . "{$document->getInternalId()}" . ".files")); - $storage = $dbForProject->getDocument('stats', md5("_inf_" . "{$document->getInternalId()}" . ".files.storage")); - - if (!empty($files['value'])) { - $queueForUsage - ->addMetric("files", ($files['value'] * -1)) - ->addMetric("files.storage", ($storage['value'] * -1)); - } + $queueForUsage + ->addReduce($document); } break; case str_starts_with($document->getCollection(), 'bucket_'): // files @@ -155,39 +127,11 @@ $databaseListener = function (string $event, Document $document, Document $proje break; case $document->getCollection() === 'functions': $queueForUsage - ->addMetric("functions", $value); // per project + ->addMetric('functions', $value); // per project if ($event === Database::EVENT_DOCUMENT_DELETE) { - //Project level function/builds/executions deduction - $deployments = $dbForProject->getDocument('stats', md5("_inf_function." . "{$document->getInternalId()}" . ".deployments")); - $deploymentsStorage = $dbForProject->getDocument('stats', md5("_inf_function." . "{$document->getInternalId()}" . ".deployments.storage")); - $builds = $dbForProject->getDocument('stats', md5("_inf_" . "{$document->getInternalId()}" . ".builds")); - $buildsStorage = $dbForProject->getDocument('stats', md5("_inf_" . "{$document->getInternalId()}" . ".builds.storage")); - $buildsCompute = $dbForProject->getDocument('stats', md5("_inf_" . "{$document->getInternalId()}" . ".builds.compute")); - $executions = $dbForProject->getDocument('stats', md5("_inf_" . "{$document->getInternalId()}" . ".executions")); - $executionsCompute = $dbForProject->getDocument('stats', md5("_inf_" . "{$document->getInternalId()}" . ".executions.compute")); - - if (!empty($deployments['value'])) { - $queueForUsage - ->addMetric("deployments", ($deployments['value'] * -1)) - ->addMetric("deployments.storage", ($deploymentsStorage['value'] * -1)) - ; - } - - if (!empty($builds['value'])) { - $queueForUsage - ->addMetric("builds", ($builds['value'] * -1)) - ->addMetric("builds.storage", ($buildsStorage['value'] * -1)) - ->addMetric("builds.compute", ($buildsCompute['value'] * -1)) - ; - } - - if (!empty($executions['value'])) { - $queueForUsage - ->addMetric("executions", ($executions['value'] * -1)) - ->addMetric("executions.compute", ($executionsCompute['value'] * -1)) - ; - } + $queueForUsage + ->addReduce($document); } break; case $document->getCollection() === 'deployments': diff --git a/app/workers/usage.php b/app/workers/usage.php index 947583947c..a3f707b41a 100644 --- a/app/workers/usage.php +++ b/app/workers/usage.php @@ -4,6 +4,7 @@ require_once __DIR__ . '/../worker.php'; use Swoole\Timer; use Utopia\App; +use Utopia\Cache\Cache; use Utopia\Database\Database; use Utopia\Database\DateTime; use Utopia\Database\Document; @@ -11,6 +12,8 @@ use Utopia\Database\Exception\Duplicate; use Utopia\Database\Validator\Authorization; use Utopia\Queue\Message; use Utopia\CLI\Console; +use Utopia\Queue\Server; +use Utopia\Registry\Registry; Authorization::disable(); Authorization::setDefaultStatus(false); @@ -22,13 +25,152 @@ $periods['1d'] = 'Y-m-d 00:00'; //$periods['1m'] = 'Y-m-1 00:00'; $periods['inf'] = '0000-00-00 00:00'; +/** + * On Documents that tied by relations like functions>deployments>build || documents>collection>database || buckets>files + * When we remove a parent document we need to deduct his children aggregation from the project scope + */ +Server::setResource('reduce', function (Cache $cache, Registry $register, $pools) { + return function ($database, $projectInternalId, Document $document, array &$metrics) use ($pools, $cache, $register): void { + try { + $dbForProject = new Database( + $pools + ->get($database) + ->pop() + ->getResource(), + $cache + ); + + $dbForProject->setNamespace('_' . $projectInternalId); + + switch (true) { + case $document->getCollection() === 'users': + $sessions = count($document->getAttribute('sessions', 0)); + if (!empty($sessions)) { + $metrics[] = [ + 'key' => 'sessions', + 'value' => ($sessions * -1), + ]; + } + break; + case $document->getCollection() === 'databases': // databases + $collections = $dbForProject->getDocument('stats', md5("_inf_" . "{$document->getInternalId()}" . ".collections")); + $documents = $dbForProject->getDocument('stats', md5("_inf_" . "{$document->getInternalId()}" . ".documents")); + if (!empty($collections['value'])) { + $metrics[] = [ + 'key' => 'collections', + 'value' => ($collections['value'] * -1), + ]; + $metrics[] = [ + 'key' => 'documents', + 'value' => ($documents['value'] * -1), + ]; + } + break; + case str_starts_with($document->getCollection(), 'database_') && !str_contains($document->getCollection(), 'collection'): //collections + $parts = explode('_', $document->getCollection()); + $databaseId = $parts[1] ?? 0; + $documents = $dbForProject->getDocument('stats', md5("_inf_" . "{$databaseId}" . ".documents")); + if (!empty($documents['value'])) { + $metrics[] = [ + 'key' => 'documents', + 'value' => ($documents['value'] * -1), + ]; + } + break; + + case $document->getCollection() === 'buckets': + $files = $dbForProject->getDocument('stats', md5("_inf_" . "{$document->getInternalId()}" . ".files")); + $storage = $dbForProject->getDocument('stats', md5("_inf_" . "{$document->getInternalId()}" . ".files.storage")); + + if (!empty($files['value'])) { + $metrics[] = [ + 'key' => 'files', + 'value' => ($files['value'] * -1), + ]; + $metrics[] = [ + 'key' => 'files.storage', + 'value' => ($storage['value'] * -1), + ]; + } + break; + + case $document->getCollection() === 'functions': + $deployments = $dbForProject->getDocument('stats', md5("_inf_function." . "{$document->getInternalId()}" . ".deployments")); + $deploymentsStorage = $dbForProject->getDocument('stats', md5("_inf_function." . "{$document->getInternalId()}" . ".deployments.storage")); + $builds = $dbForProject->getDocument('stats', md5("_inf_" . "{$document->getInternalId()}" . ".builds")); + $buildsStorage = $dbForProject->getDocument('stats', md5("_inf_" . "{$document->getInternalId()}" . ".builds.storage")); + $buildsCompute = $dbForProject->getDocument('stats', md5("_inf_" . "{$document->getInternalId()}" . ".builds.compute")); + $executions = $dbForProject->getDocument('stats', md5("_inf_" . "{$document->getInternalId()}" . ".executions")); + $executionsCompute = $dbForProject->getDocument('stats', md5("_inf_" . "{$document->getInternalId()}" . ".executions.compute")); + + if (!empty($deployments['value'])) { + $metrics[] = [ + 'key' => 'deployments', + 'value' => ($deployments['value'] * -1), + ]; + $metrics[] = [ + 'key' => 'deployments.storage', + 'value' => ($deploymentsStorage['value'] * -1), + ]; + } + + if (!empty($builds['value'])) { + $metrics[] = [ + 'key' => 'builds', + 'value' => ($builds['value'] * -1), + ]; + $metrics[] = [ + 'key' => 'builds.storage', + 'value' => ($buildsStorage['value'] * -1), + ]; + $metrics[] = [ + 'key' => 'builds.compute', + 'value' => ($buildsCompute['value'] * -1), + ]; + } + + if (!empty($executions['value'])) { + $metrics[] = [ + 'key' => 'executions', + 'value' => ($executions['value'] * -1), + ]; + $metrics[] = [ + 'key' => 'executions.compute', + 'value' => ($executionsCompute['value'] * -1), + ]; + } + break; + default: + break; + } + } catch (\Exception $e) { + console::error($e->getMessage()); + } finally { + $pools->reclaim(); + } + }; +}, ['cache', 'register', 'pools']); + + $server->job() ->inject('message') - ->action(function (Message $message) use (&$stats) { + ->inject('reduce') + + ->action(function (Message $message, callable $reduce) use (&$stats) { $payload = $message->getPayload() ?? []; $project = new Document($payload['project'] ?? []); $projectId = $project->getInternalId(); + + foreach ($payload['reduce'] ?? [] as $document) { + $reduce( + database: $project->getAttribute('database'), + projectInternalId: $project->getInternalId(), + document: new Document($document ?? []), + metrics: $payload['metrics'], + ); + } + $stats[$projectId]['database'] = $project->getAttribute('database'); foreach ($payload['metrics'] ?? [] as $metric) { @@ -56,9 +198,9 @@ $server try { $dbForProject = new Database( $pools - ->get($project['database']) - ->pop() - ->getResource(), + ->get($project['database']) + ->pop() + ->getResource(), $cache ); diff --git a/src/Appwrite/Event/Usage.php b/src/Appwrite/Event/Usage.php index 00ef7c17fe..f046914c88 100644 --- a/src/Appwrite/Event/Usage.php +++ b/src/Appwrite/Event/Usage.php @@ -4,16 +4,32 @@ namespace Appwrite\Event; use Utopia\Queue\Client; use Utopia\Queue\Connection; +use Utopia\Database\Document; class Usage extends Event { protected array $metrics = []; + protected array $reduce = []; public function __construct(protected Connection $connection) { parent::__construct(Event::USAGE_QUEUE_NAME, Event::USAGE_CLASS_NAME); } + /** + * Add reduce. + * + * @param Document $document + * @return self + */ + public function addReduce(Document $document): self + { + $this->reduce[] = $document; + + return $this; + } + + /** * Add metric. * @@ -31,6 +47,8 @@ class Usage extends Event return $this; } + + /** * Sends metrics to the usage worker. * @@ -42,6 +60,7 @@ class Usage extends Event return $client->enqueue([ 'project' => $this->getProject(), + 'reduce' => $this->reduce, 'metrics' => $this->metrics, ]); }