1
0
Fork 0
mirror of synced 2024-06-03 03:14:50 +12:00
This commit is contained in:
Eldad Fux 2021-08-30 10:19:29 +03:00
parent 95a20159a7
commit c84dc0fa92

View file

@ -11,15 +11,19 @@ use Utopia\CLI\Console;
use Utopia\Database\Adapter\MariaDB; use Utopia\Database\Adapter\MariaDB;
use Utopia\Database\Database; use Utopia\Database\Database;
use Utopia\Database\Document; use Utopia\Database\Document;
use Utopia\Database\Query;
use Utopia\Database\Validator\Authorization; use Utopia\Database\Validator\Authorization;
/** /**
* Metrics We collect * Metrics We collect
* *
* General
*
* requests * requests
* network * network
* executions * executions
*
* Database
*
* database.collections.create * database.collections.create
* database.collections.read * database.collections.read
* database.collections.update * database.collections.update
@ -32,10 +36,16 @@ use Utopia\Database\Validator\Authorization;
* database.collections.{collectionId}.documents.read * database.collections.{collectionId}.documents.read
* database.collections.{collectionId}.documents.update * database.collections.{collectionId}.documents.update
* database.collections.{collectionId}.documents.delete * database.collections.{collectionId}.documents.delete
*
* Storage
*
* storage.buckets.{bucketId}.files.create * storage.buckets.{bucketId}.files.create
* storage.buckets.{bucketId}.files.read * storage.buckets.{bucketId}.files.read
* storage.buckets.{bucketId}.files.update * storage.buckets.{bucketId}.files.update
* storage.buckets.{bucketId}.files.delete * storage.buckets.{bucketId}.files.delete
*
* Users
*
* users.create * users.create
* users.read * users.read
* users.update * users.update
@ -71,7 +81,7 @@ $cli
Console::title('Usage Aggregation V1'); Console::title('Usage Aggregation V1');
Console::success(APP_NAME . ' usage aggregation process v1 has started'); Console::success(APP_NAME . ' usage aggregation process v1 has started');
$interval = (int) App::getEnv('_APP_USAGE_AGGREGATION_INTERVAL', '30'); //30 seconds $interval = (int) App::getEnv('_APP_USAGE_AGGREGATION_INTERVAL', '30'); // 30 seconds (by default)
$periods = [ $periods = [
[ [
'key' => '30m', 'key' => '30m',
@ -189,16 +199,18 @@ $cli
], ],
]; ];
// TODO Maybe move this to the setResource method, and reuse in the http.php file
$attempts = 0; $attempts = 0;
$max = 10; $max = 10;
$sleep = 1; $sleep = 1;
do { // connect to db do { // connect to db
try { try {
$attempts++; $attempts++;
$db = $register->get('db'); $db = $register->get('db');
$redis = $register->get('cache'); $redis = $register->get('cache');
break; // leave the do-while if successful break; // leave the do-while if successful
} catch (\Exception$e) { } catch (\Exception $e) {
Console::warning("Database not ready. Retrying connection ({$attempts})..."); Console::warning("Database not ready. Retrying connection ({$attempts})...");
if ($attempts >= $max) { if ($attempts >= $max) {
throw new \Exception('Failed to connect to database: ' . $e->getMessage()); throw new \Exception('Failed to connect to database: ' . $e->getMessage());
@ -207,6 +219,7 @@ $cli
} }
} while ($attempts < $max); } while ($attempts < $max);
// TODO use inject
$cacheAdapter = new Cache(new Redis($redis)); $cacheAdapter = new Cache(new Redis($redis));
$dbForProject = new Database(new MariaDB($db), $cacheAdapter); $dbForProject = new Database(new MariaDB($db), $cacheAdapter);
$dbForConsole = new Database(new MariaDB($db), $cacheAdapter); $dbForConsole = new Database(new MariaDB($db), $cacheAdapter);
@ -223,9 +236,13 @@ $cli
$loopStart = microtime(true); $loopStart = microtime(true);
/**
* Aggregate InfluxDB every 30 seconds
*/
$client = $register->get('influxdb'); $client = $register->get('influxdb');
if ($client) { if ($client) {
$database = $client->selectDB('telegraf'); $database = $client->selectDB('telegraf');
// sync data // sync data
foreach ($globalMetrics as $metric => $options) { //for each metrics foreach ($globalMetrics as $metric => $options) { //for each metrics
foreach ($periods as $period) { // aggregate data for each period foreach ($periods as $period) { // aggregate data for each period
@ -250,12 +267,11 @@ $cli
$points = $result->getPoints(); $points = $result->getPoints();
foreach ($points as $point) { foreach ($points as $point) {
$projectId = $point['projectId']; $projectId = $point['projectId'];
if (!empty($projectId) && $projectId != 'console') { if (!empty($projectId) && $projectId != 'console') {
$dbForProject->setNamespace('project_' . $projectId . '_internal'); $dbForProject->setNamespace('project_' . $projectId . '_internal');
if($metric == 'functions.functionId.executions') {
var_dump($points);
}
$metricUpdated = $metric; $metricUpdated = $metric;
if (!empty($groupBy)) { if (!empty($groupBy)) {
$groupedBy = $point[$options['groupBy']] ?? ''; $groupedBy = $point[$options['groupBy']] ?? '';
if (empty($groupedBy)) { if (empty($groupedBy)) {
@ -263,9 +279,11 @@ $cli
} }
$metricUpdated = str_replace($options['groupBy'], $groupedBy, $metric); $metricUpdated = str_replace($options['groupBy'], $groupedBy, $metric);
} }
$time = \strtotime($point['time']); $time = \strtotime($point['time']);
$id = \md5($time . '_' . $period['key'] . '_' . $metricUpdated); //construct unique id for each metric using time, period and metric $id = \md5($time . '_' . $period['key'] . '_' . $metricUpdated); //construct unique id for each metric using time, period and metric
$value = (!empty($point['value'])) ? $point['value'] : 0; $value = (!empty($point['value'])) ? $point['value'] : 0;
try { try {
$document = $dbForProject->getDocument('stats', $id); $document = $dbForProject->getDocument('stats', $id);
if ($document->isEmpty()) { if ($document->isEmpty()) {
@ -279,12 +297,11 @@ $cli
])); ]));
} else { } else {
$dbForProject->updateDocument('stats', $document->getId(), $dbForProject->updateDocument('stats', $document->getId(),
$document->setAttribute('value', $value)); $document->setAttribute('value', $value));
} }
$latestTime[$metric][$period['key']] = $time; $latestTime[$metric][$period['key']] = $time;
} catch (\Exception$e) { } catch (\Exception $e) { // if projects are deleted this might fail
// if projects are deleted this might fail Console::warning("Failed to save data for project {$projectId} and metric {$metricUpdated}: {$e->getMessage()}");
Console::warning("Failed to save data for project {$projectId} and metric {$metricUpdated}");
} }
} }
} }
@ -292,119 +309,140 @@ $cli
} }
} }
if ($iterations % 30 == 0) { //every 15 minutes /**
// aggregate number of objects in database * Aggregate MariaDB every 15 minutes
// get count of all the documents per collection - * Some of the queries here might contain full-table scans.
// buckets will have the same */
if ($iterations % 30 == 0) { // Every 15 minutes
// Aggregate number of objects in database
// Get count of all the documents per collection -
// Buckets will have the same
$latestProject = null; $latestProject = null;
do { do {
$projects = $dbForConsole->find('projects', [], 100, orderAfter:$latestProject); $projects = $dbForConsole->find('projects', [], 100, orderAfter:$latestProject);
if (!empty($projects)) {
$latestProject = $projects[array_key_last($projects)]; if (empty($projects)) {
continue;
}
foreach ($projects as $project) { $latestProject = $projects[array_key_last($projects)];
$id = $project->getId();
// get total storage foreach ($projects as $project) {
$dbForProject->setNamespace('project_' . $id . '_internal'); $id = $project->getId();
$storageTotal = $dbForProject->sum('files', 'sizeOriginal') + $dbForProject->sum('tags', 'size');
$dbForProject->createDocument('stats', new Document([ // Get total storage
'$id' => $dbForProject->getId(), $dbForProject->setNamespace('project_' . $id . '_internal');
'period' => '15m', $storageTotal = $dbForProject->sum('files', 'sizeOriginal') + $dbForProject->sum('tags', 'size');
'time' => time(),
'metric' => 'storage.total',
'value' => $storageTotal,
'type' => 1,
]));
$collections = [ $dbForProject->createDocument('stats', new Document([
'users' => [ '$id' => $dbForProject->getId(),
'namespace' => 'internal', 'period' => '15m',
], 'time' => time(),
'collections' => [ 'metric' => 'storage.total',
'metricPrefix' => 'database', 'value' => $storageTotal,
'namespace' => 'internal', 'type' => 1,
'subCollections' => [ ]));
'documents' => [
'namespace' => 'external', $collections = [
], 'users' => [
'namespace' => 'internal',
],
'collections' => [
'metricPrefix' => 'database',
'namespace' => 'internal',
'subCollections' => [ // TODO better document this key
'documents' => [
'namespace' => 'external',
], ],
], ],
'files' => [ ],
'metricPrefix' => 'storage', 'files' => [
'namespace' => 'internal', 'metricPrefix' => 'storage',
], 'namespace' => 'internal',
]; ],
foreach ($collections as $collection => $options) { ];
try {
foreach ($collections as $collection => $options) {
try {
$dbForProject->setNamespace("project_{$id}_{$options['namespace']}");
$count = $dbForProject->count($collection);
$dbForProject->setNamespace("project_{$id}_internal");
$metricPrefix = $options['metricPrefix'] ?? '';
$metric = empty($metricPrefix) ? "{$collection}.count" : "{$metricPrefix}.{$collection}.count";
$dbForProject->createDocument('stats', new Document([
'$id' => $dbForProject->getId(),
'time' => time(),
'period' => '15m',
'metric' => $metric,
'value' => $count,
'type' => 1,
]));
$subCollections = $options['subCollections'] ?? [];
if (empty($subCollections)) {
continue;
}
$latestParent = null;
$subCollectionCounts = []; //total project level count of sub collections
do {
$dbForProject->setNamespace("project_{$id}_{$options['namespace']}"); $dbForProject->setNamespace("project_{$id}_{$options['namespace']}");
$count = $dbForProject->count($collection); $parents = $dbForProject->find($collection, [], 100, orderAfter:$latestParent);
$dbForProject->setNamespace("project_{$id}_internal");
$metricPrefix = $options['metricPrefix'] ?? '';
$metric = empty($metricPrefix) ? "{$collection}.count" : "{$metricPrefix}.{$collection}.count";
$dbForProject->createDocument('stats', new Document([
'$id' => $dbForProject->getId(),
'time' => time(),
'period' => '15m',
'metric' => $metric,
'value' => $count,
'type' => 1,
]));
$subCollections = $options['subCollections'] ?? []; if (empty($parents)) {
if (!empty($subCollections)) { continue;
$latestParent = null; }
$subCollectionCounts = []; //total project level count of sub collections
do {
$dbForProject->setNamespace("project_{$id}_{$options['namespace']}");
$parents = $dbForProject->find($collection, [], 100, orderAfter:$latestParent);
if (!empty($parents)) {
$latestParent = $parents[array_key_last($parents)];
foreach ($parents as $parent) {
foreach ($subCollections as $subCollection => $subOptions) {
$dbForProject->setNamespace("project_{$id}_{$subOptions['namespace']}");
$count = $dbForProject->count($parent->getId());
$subCollectionsCounts[$subCollection] = ($subCollectionCounts[$subCollection] ?? 0) + $count;
$dbForProject->setNamespace("project_{$id}_internal"); $latestParent = $parents[array_key_last($parents)];
$dbForProject->createDocument('stats', new Document([
'$id' => $dbForProject->getId(), foreach ($parents as $parent) {
'time' => time(), foreach ($subCollections as $subCollection => $subOptions) {
'period' => '15m', $dbForProject->setNamespace("project_{$id}_{$subOptions['namespace']}");
'metric' => empty($metricPrefix) ? "{$collection}.{$parent->getId()}.{$subCollection}.count" : "{$metricPrefix}.{$collection}.{$parent->getId()}.{$subCollection}.count", $count = $dbForProject->count($parent->getId());
'value' => $count, $subCollectionsCounts[$subCollection] = ($subCollectionCounts[$subCollection] ?? 0) + $count;
'type' => 1,
]));
}
}
}
} while (!empty($parents));
foreach ($subCollectionsCounts as $subCollection => $count) {
$dbForProject->setNamespace("project_{$id}_internal"); $dbForProject->setNamespace("project_{$id}_internal");
$dbForProject->createDocument('stats', new Document([ $dbForProject->createDocument('stats', new Document([
'$id' => $dbForProject->getId(), '$id' => $dbForProject->getId(),
'time' => time(), 'time' => time(),
'period' => '15m', 'period' => '15m',
'metric' => empty($metricPrefix) ? "{$subCollection}.count" : "{$metricPrefix}.{$subCollection}.count", 'metric' => empty($metricPrefix) ? "{$collection}.{$parent->getId()}.{$subCollection}.count" : "{$metricPrefix}.{$collection}.{$parent->getId()}.{$subCollection}.count",
'value' => $count, 'value' => $count,
'type' => 1, 'type' => 1,
])); ]));
} }
} }
} catch (\Exception$e) { } while (!empty($parents));
Console::warning("Failed to save database counters data for project {$collection}");
foreach ($subCollectionsCounts as $subCollection => $count) {
$dbForProject->setNamespace("project_{$id}_internal");
$dbForProject->createDocument('stats', new Document([
'$id' => $dbForProject->getId(),
'time' => time(),
'period' => '15m',
'metric' => empty($metricPrefix) ? "{$subCollection}.count" : "{$metricPrefix}.{$subCollection}.count",
'value' => $count,
'type' => 1,
]));
} }
} catch (\Exception $e) {
Console::warning("Failed to save database counters data for project {$collection}: {$e->getMessage()}");
} }
} }
} }
} while (!empty($projects)); } while (!empty($projects));
} }
$iterations++; $iterations++;
$loopTook = microtime(true) - $loopStart; $loopTook = microtime(true) - $loopStart;
$now = date('d-m-Y H:i:s', time()); $now = date('d-m-Y H:i:s', time());
Console::info("[{$now}] Aggregation took {$loopTook} seconds"); Console::info("[{$now}] Aggregation took {$loopTook} seconds");
}, $interval); }, $interval);
}); });