moved reduce after delete document to usage-worker
This commit is contained in:
parent
44af531881
commit
4198cbfca5
|
@ -1 +1 @@
|
|||
Subproject commit b1a81a390a05746701651fca49e0d853f430677c
|
||||
Subproject commit af3d741ae8f02c2e16b8b4ea4664a3f8970290fd
|
|
@ -53,15 +53,10 @@ $parseLabel = function (string $label, array $responsePayload, array $requestPar
|
|||
$databaseListener = function (string $event, Document $document, Document $project, Usage $queueForUsage, Database $dbForProject, Logger|null $logger) {
|
||||
|
||||
$value = 1;
|
||||
|
||||
if ($event === Database::EVENT_DOCUMENT_DELETE) {
|
||||
$value = -1;
|
||||
}
|
||||
|
||||
/**
|
||||
* On Documents that tied by relations like functions>deployments>build || documents>collection>database || buckets>files
|
||||
* When we remove a parent document we need to deduct his children aggregation from the project scope
|
||||
*/
|
||||
try {
|
||||
switch (true) {
|
||||
case $document->getCollection() === 'teams':
|
||||
|
@ -71,14 +66,9 @@ $databaseListener = function (string $event, Document $document, Document $proje
|
|||
case $document->getCollection() === 'users':
|
||||
$queueForUsage
|
||||
->addMetric("users", $value); // per project
|
||||
|
||||
//Project level sessions deduction
|
||||
if ($event === Database::EVENT_DOCUMENT_DELETE) {
|
||||
$sessions = count($document->getAttribute('sessions', 0));
|
||||
if (!empty($sessions)) {
|
||||
$queueForUsage
|
||||
->addMetric("sessions", ($sessions * -1)); // per project
|
||||
}
|
||||
$queueForUsage
|
||||
->addReduce($document);
|
||||
}
|
||||
break;
|
||||
case $document->getCollection() === 'sessions': // sessions
|
||||
|
@ -90,14 +80,8 @@ $databaseListener = function (string $event, Document $document, Document $proje
|
|||
->addMetric("databases", $value); // per project
|
||||
|
||||
if ($event === Database::EVENT_DOCUMENT_DELETE) {
|
||||
//Project level collections/documents deduction
|
||||
$collections = $dbForProject->getDocument('stats', md5("_inf_" . "{$document->getInternalId()}" . ".collections"));
|
||||
$documents = $dbForProject->getDocument('stats', md5("_inf_" . "{$document->getInternalId()}" . ".documents"));
|
||||
if (!empty($collections['value'])) {
|
||||
$queueForUsage
|
||||
->addMetric("collections", ($collections['value'] * -1))
|
||||
->addMetric("documents", ($documents['value'] * -1));
|
||||
}
|
||||
$queueForUsage
|
||||
->addReduce($document);
|
||||
}
|
||||
break;
|
||||
case str_starts_with($document->getCollection(), 'database_') && !str_contains($document->getCollection(), 'collection'): //collections
|
||||
|
@ -109,12 +93,8 @@ $databaseListener = function (string $event, Document $document, Document $proje
|
|||
;
|
||||
|
||||
if ($event === Database::EVENT_DOCUMENT_DELETE) {
|
||||
//Project documents deduction
|
||||
$documents = $dbForProject->getDocument('stats', md5("_inf_" . "{$databaseId}" . ".documents"));
|
||||
if (!empty($documents['value'])) {
|
||||
$queueForUsage
|
||||
->addMetric("documents", ($documents['value'] * -1));
|
||||
}
|
||||
$queueForUsage
|
||||
->addReduce($document);
|
||||
}
|
||||
break;
|
||||
case str_starts_with($document->getCollection(), 'database_') && str_contains($document->getCollection(), '_collection_'): //documents
|
||||
|
@ -127,20 +107,12 @@ $databaseListener = function (string $event, Document $document, Document $proje
|
|||
->addMetric("{$databaseId}" . "." . "{$collectionId}" . ".documents", $value) // per collection
|
||||
;
|
||||
break;
|
||||
case $document->getCollection() === 'buckets':
|
||||
case $document->getCollection() === 'buckets': //buckets
|
||||
$queueForUsage
|
||||
->addMetric("buckets", $value); // per project
|
||||
|
||||
if ($event === Database::EVENT_DOCUMENT_DELETE) {
|
||||
//Project files/files.storage deduction
|
||||
$files = $dbForProject->getDocument('stats', md5("_inf_" . "{$document->getInternalId()}" . ".files"));
|
||||
$storage = $dbForProject->getDocument('stats', md5("_inf_" . "{$document->getInternalId()}" . ".files.storage"));
|
||||
|
||||
if (!empty($files['value'])) {
|
||||
$queueForUsage
|
||||
->addMetric("files", ($files['value'] * -1))
|
||||
->addMetric("files.storage", ($storage['value'] * -1));
|
||||
}
|
||||
$queueForUsage
|
||||
->addReduce($document);
|
||||
}
|
||||
break;
|
||||
case str_starts_with($document->getCollection(), 'bucket_'): // files
|
||||
|
@ -155,39 +127,11 @@ $databaseListener = function (string $event, Document $document, Document $proje
|
|||
break;
|
||||
case $document->getCollection() === 'functions':
|
||||
$queueForUsage
|
||||
->addMetric("functions", $value); // per project
|
||||
->addMetric('functions', $value); // per project
|
||||
|
||||
if ($event === Database::EVENT_DOCUMENT_DELETE) {
|
||||
//Project level function/builds/executions deduction
|
||||
$deployments = $dbForProject->getDocument('stats', md5("_inf_function." . "{$document->getInternalId()}" . ".deployments"));
|
||||
$deploymentsStorage = $dbForProject->getDocument('stats', md5("_inf_function." . "{$document->getInternalId()}" . ".deployments.storage"));
|
||||
$builds = $dbForProject->getDocument('stats', md5("_inf_" . "{$document->getInternalId()}" . ".builds"));
|
||||
$buildsStorage = $dbForProject->getDocument('stats', md5("_inf_" . "{$document->getInternalId()}" . ".builds.storage"));
|
||||
$buildsCompute = $dbForProject->getDocument('stats', md5("_inf_" . "{$document->getInternalId()}" . ".builds.compute"));
|
||||
$executions = $dbForProject->getDocument('stats', md5("_inf_" . "{$document->getInternalId()}" . ".executions"));
|
||||
$executionsCompute = $dbForProject->getDocument('stats', md5("_inf_" . "{$document->getInternalId()}" . ".executions.compute"));
|
||||
|
||||
if (!empty($deployments['value'])) {
|
||||
$queueForUsage
|
||||
->addMetric("deployments", ($deployments['value'] * -1))
|
||||
->addMetric("deployments.storage", ($deploymentsStorage['value'] * -1))
|
||||
;
|
||||
}
|
||||
|
||||
if (!empty($builds['value'])) {
|
||||
$queueForUsage
|
||||
->addMetric("builds", ($builds['value'] * -1))
|
||||
->addMetric("builds.storage", ($buildsStorage['value'] * -1))
|
||||
->addMetric("builds.compute", ($buildsCompute['value'] * -1))
|
||||
;
|
||||
}
|
||||
|
||||
if (!empty($executions['value'])) {
|
||||
$queueForUsage
|
||||
->addMetric("executions", ($executions['value'] * -1))
|
||||
->addMetric("executions.compute", ($executionsCompute['value'] * -1))
|
||||
;
|
||||
}
|
||||
$queueForUsage
|
||||
->addReduce($document);
|
||||
}
|
||||
break;
|
||||
case $document->getCollection() === 'deployments':
|
||||
|
|
|
@ -4,6 +4,7 @@ require_once __DIR__ . '/../worker.php';
|
|||
|
||||
use Swoole\Timer;
|
||||
use Utopia\App;
|
||||
use Utopia\Cache\Cache;
|
||||
use Utopia\Database\Database;
|
||||
use Utopia\Database\DateTime;
|
||||
use Utopia\Database\Document;
|
||||
|
@ -11,6 +12,8 @@ use Utopia\Database\Exception\Duplicate;
|
|||
use Utopia\Database\Validator\Authorization;
|
||||
use Utopia\Queue\Message;
|
||||
use Utopia\CLI\Console;
|
||||
use Utopia\Queue\Server;
|
||||
use Utopia\Registry\Registry;
|
||||
|
||||
Authorization::disable();
|
||||
Authorization::setDefaultStatus(false);
|
||||
|
@ -22,13 +25,152 @@ $periods['1d'] = 'Y-m-d 00:00';
|
|||
//$periods['1m'] = 'Y-m-1 00:00';
|
||||
$periods['inf'] = '0000-00-00 00:00';
|
||||
|
||||
/**
|
||||
* On Documents that tied by relations like functions>deployments>build || documents>collection>database || buckets>files
|
||||
* When we remove a parent document we need to deduct his children aggregation from the project scope
|
||||
*/
|
||||
Server::setResource('reduce', function (Cache $cache, Registry $register, $pools) {
|
||||
return function ($database, $projectInternalId, Document $document, array &$metrics) use ($pools, $cache, $register): void {
|
||||
try {
|
||||
$dbForProject = new Database(
|
||||
$pools
|
||||
->get($database)
|
||||
->pop()
|
||||
->getResource(),
|
||||
$cache
|
||||
);
|
||||
|
||||
$dbForProject->setNamespace('_' . $projectInternalId);
|
||||
|
||||
switch (true) {
|
||||
case $document->getCollection() === 'users':
|
||||
$sessions = count($document->getAttribute('sessions', 0));
|
||||
if (!empty($sessions)) {
|
||||
$metrics[] = [
|
||||
'key' => 'sessions',
|
||||
'value' => ($sessions * -1),
|
||||
];
|
||||
}
|
||||
break;
|
||||
case $document->getCollection() === 'databases': // databases
|
||||
$collections = $dbForProject->getDocument('stats', md5("_inf_" . "{$document->getInternalId()}" . ".collections"));
|
||||
$documents = $dbForProject->getDocument('stats', md5("_inf_" . "{$document->getInternalId()}" . ".documents"));
|
||||
if (!empty($collections['value'])) {
|
||||
$metrics[] = [
|
||||
'key' => 'collections',
|
||||
'value' => ($collections['value'] * -1),
|
||||
];
|
||||
$metrics[] = [
|
||||
'key' => 'documents',
|
||||
'value' => ($documents['value'] * -1),
|
||||
];
|
||||
}
|
||||
break;
|
||||
case str_starts_with($document->getCollection(), 'database_') && !str_contains($document->getCollection(), 'collection'): //collections
|
||||
$parts = explode('_', $document->getCollection());
|
||||
$databaseId = $parts[1] ?? 0;
|
||||
$documents = $dbForProject->getDocument('stats', md5("_inf_" . "{$databaseId}" . ".documents"));
|
||||
if (!empty($documents['value'])) {
|
||||
$metrics[] = [
|
||||
'key' => 'documents',
|
||||
'value' => ($documents['value'] * -1),
|
||||
];
|
||||
}
|
||||
break;
|
||||
|
||||
case $document->getCollection() === 'buckets':
|
||||
$files = $dbForProject->getDocument('stats', md5("_inf_" . "{$document->getInternalId()}" . ".files"));
|
||||
$storage = $dbForProject->getDocument('stats', md5("_inf_" . "{$document->getInternalId()}" . ".files.storage"));
|
||||
|
||||
if (!empty($files['value'])) {
|
||||
$metrics[] = [
|
||||
'key' => 'files',
|
||||
'value' => ($files['value'] * -1),
|
||||
];
|
||||
$metrics[] = [
|
||||
'key' => 'files.storage',
|
||||
'value' => ($storage['value'] * -1),
|
||||
];
|
||||
}
|
||||
break;
|
||||
|
||||
case $document->getCollection() === 'functions':
|
||||
$deployments = $dbForProject->getDocument('stats', md5("_inf_function." . "{$document->getInternalId()}" . ".deployments"));
|
||||
$deploymentsStorage = $dbForProject->getDocument('stats', md5("_inf_function." . "{$document->getInternalId()}" . ".deployments.storage"));
|
||||
$builds = $dbForProject->getDocument('stats', md5("_inf_" . "{$document->getInternalId()}" . ".builds"));
|
||||
$buildsStorage = $dbForProject->getDocument('stats', md5("_inf_" . "{$document->getInternalId()}" . ".builds.storage"));
|
||||
$buildsCompute = $dbForProject->getDocument('stats', md5("_inf_" . "{$document->getInternalId()}" . ".builds.compute"));
|
||||
$executions = $dbForProject->getDocument('stats', md5("_inf_" . "{$document->getInternalId()}" . ".executions"));
|
||||
$executionsCompute = $dbForProject->getDocument('stats', md5("_inf_" . "{$document->getInternalId()}" . ".executions.compute"));
|
||||
|
||||
if (!empty($deployments['value'])) {
|
||||
$metrics[] = [
|
||||
'key' => 'deployments',
|
||||
'value' => ($deployments['value'] * -1),
|
||||
];
|
||||
$metrics[] = [
|
||||
'key' => 'deployments.storage',
|
||||
'value' => ($deploymentsStorage['value'] * -1),
|
||||
];
|
||||
}
|
||||
|
||||
if (!empty($builds['value'])) {
|
||||
$metrics[] = [
|
||||
'key' => 'builds',
|
||||
'value' => ($builds['value'] * -1),
|
||||
];
|
||||
$metrics[] = [
|
||||
'key' => 'builds.storage',
|
||||
'value' => ($buildsStorage['value'] * -1),
|
||||
];
|
||||
$metrics[] = [
|
||||
'key' => 'builds.compute',
|
||||
'value' => ($buildsCompute['value'] * -1),
|
||||
];
|
||||
}
|
||||
|
||||
if (!empty($executions['value'])) {
|
||||
$metrics[] = [
|
||||
'key' => 'executions',
|
||||
'value' => ($executions['value'] * -1),
|
||||
];
|
||||
$metrics[] = [
|
||||
'key' => 'executions.compute',
|
||||
'value' => ($executionsCompute['value'] * -1),
|
||||
];
|
||||
}
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
} catch (\Exception $e) {
|
||||
console::error($e->getMessage());
|
||||
} finally {
|
||||
$pools->reclaim();
|
||||
}
|
||||
};
|
||||
}, ['cache', 'register', 'pools']);
|
||||
|
||||
|
||||
$server->job()
|
||||
->inject('message')
|
||||
->action(function (Message $message) use (&$stats) {
|
||||
->inject('reduce')
|
||||
|
||||
->action(function (Message $message, callable $reduce) use (&$stats) {
|
||||
|
||||
$payload = $message->getPayload() ?? [];
|
||||
$project = new Document($payload['project'] ?? []);
|
||||
$projectId = $project->getInternalId();
|
||||
|
||||
foreach ($payload['reduce'] ?? [] as $document) {
|
||||
$reduce(
|
||||
database: $project->getAttribute('database'),
|
||||
projectInternalId: $project->getInternalId(),
|
||||
document: new Document($document ?? []),
|
||||
metrics: $payload['metrics'],
|
||||
);
|
||||
}
|
||||
|
||||
$stats[$projectId]['database'] = $project->getAttribute('database');
|
||||
|
||||
foreach ($payload['metrics'] ?? [] as $metric) {
|
||||
|
@ -56,9 +198,9 @@ $server
|
|||
try {
|
||||
$dbForProject = new Database(
|
||||
$pools
|
||||
->get($project['database'])
|
||||
->pop()
|
||||
->getResource(),
|
||||
->get($project['database'])
|
||||
->pop()
|
||||
->getResource(),
|
||||
$cache
|
||||
);
|
||||
|
||||
|
|
|
@ -4,16 +4,32 @@ namespace Appwrite\Event;
|
|||
|
||||
use Utopia\Queue\Client;
|
||||
use Utopia\Queue\Connection;
|
||||
use Utopia\Database\Document;
|
||||
|
||||
class Usage extends Event
|
||||
{
|
||||
protected array $metrics = [];
|
||||
protected array $reduce = [];
|
||||
|
||||
public function __construct(protected Connection $connection)
|
||||
{
|
||||
parent::__construct(Event::USAGE_QUEUE_NAME, Event::USAGE_CLASS_NAME);
|
||||
}
|
||||
|
||||
/**
|
||||
* Add reduce.
|
||||
*
|
||||
* @param Document $document
|
||||
* @return self
|
||||
*/
|
||||
public function addReduce(Document $document): self
|
||||
{
|
||||
$this->reduce[] = $document;
|
||||
|
||||
return $this;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Add metric.
|
||||
*
|
||||
|
@ -31,6 +47,8 @@ class Usage extends Event
|
|||
return $this;
|
||||
}
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* Sends metrics to the usage worker.
|
||||
*
|
||||
|
@ -42,6 +60,7 @@ class Usage extends Event
|
|||
|
||||
return $client->enqueue([
|
||||
'project' => $this->getProject(),
|
||||
'reduce' => $this->reduce,
|
||||
'metrics' => $this->metrics,
|
||||
]);
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue