1
0
Fork 0
mirror of synced 2024-06-28 11:10:46 +12:00

Merge branch 'feat-usage-daemon' of github.com:appwrite/appwrite into feat-new-usage-endpoints

This commit is contained in:
Christy Jacob 2021-08-30 18:01:47 +05:30
commit 4902bcea74
2 changed files with 260 additions and 111 deletions

View file

@ -11,15 +11,19 @@ use Utopia\CLI\Console;
use Utopia\Database\Adapter\MariaDB;
use Utopia\Database\Database;
use Utopia\Database\Document;
use Utopia\Database\Query;
use Utopia\Database\Validator\Authorization;
/**
* Metrics We collect
*
* General
*
* requests
* network
* executions
*
* Database
*
* database.collections.create
* database.collections.read
* database.collections.update
@ -32,10 +36,16 @@ use Utopia\Database\Validator\Authorization;
* database.collections.{collectionId}.documents.read
* database.collections.{collectionId}.documents.update
* database.collections.{collectionId}.documents.delete
*
* Storage
*
* storage.buckets.{bucketId}.files.create
* storage.buckets.{bucketId}.files.read
* storage.buckets.{bucketId}.files.update
* storage.buckets.{bucketId}.files.delete
*
* Users
*
* users.create
* users.read
* users.update
@ -71,7 +81,7 @@ $cli
Console::title('Usage Aggregation V1');
Console::success(APP_NAME . ' usage aggregation process v1 has started');
$interval = (int) App::getEnv('_APP_USAGE_AGGREGATION_INTERVAL', '30'); //30 seconds
$interval = (int) App::getEnv('_APP_USAGE_AGGREGATION_INTERVAL', '30'); // 30 seconds (by default)
$periods = [
[
'key' => '30m',
@ -189,16 +199,18 @@ $cli
],
];
// TODO Maybe move this to the setResource method, and reuse in the http.php file
$attempts = 0;
$max = 10;
$sleep = 1;
do { // connect to db
try {
$attempts++;
$db = $register->get('db');
$redis = $register->get('cache');
break; // leave the do-while if successful
} catch (\Exception$e) {
} catch (\Exception $e) {
Console::warning("Database not ready. Retrying connection ({$attempts})...");
if ($attempts >= $max) {
throw new \Exception('Failed to connect to database: ' . $e->getMessage());
@ -207,6 +219,7 @@ $cli
}
} while ($attempts < $max);
// TODO use inject
$cacheAdapter = new Cache(new Redis($redis));
$dbForProject = new Database(new MariaDB($db), $cacheAdapter);
$dbForConsole = new Database(new MariaDB($db), $cacheAdapter);
@ -223,9 +236,13 @@ $cli
$loopStart = microtime(true);
/**
* Aggregate InfluxDB every 30 seconds
*/
$client = $register->get('influxdb');
if ($client) {
$database = $client->selectDB('telegraf');
// sync data
foreach ($globalMetrics as $metric => $options) { //for each metrics
foreach ($periods as $period) { // aggregate data for each period
@ -235,10 +252,10 @@ $cli
}
$end = DateTime::createFromFormat('U', \strtotime('now'))->format(DateTime::RFC3339);
$table = $options['table']; //which influxdb table to query for this metric
$groupBy = empty($options['groupBy']) ? '' : ', "' . $options['groupBy'] . '"'; //some sub level metrics may be grouped by other tags like collectionId, bucketId, etc
$table = $options['table']; //Which influxdb table to query for this metric
$groupBy = empty($options['groupBy']) ? '' : ', "' . $options['groupBy'] . '"'; //Some sub level metrics may be grouped by other tags like collectionId, bucketId, etc
$filters = $options['filters'] ?? [];
$filters = $options['filters'] ?? []; // Some metrics might have additional filters, like function's status
if (!empty($filters)) {
$filters = ' AND ' . implode(' AND ', array_map(function ($filter, $value) {
return '"' . $filter . '"=\'' . $value . '\'';
@ -246,16 +263,15 @@ $cli
}
$result = $database->query('SELECT sum(value) AS "value" FROM "' . $table . '" WHERE time > \'' . $start . '\' AND time < \'' . $end . '\' AND "metric_type"=\'counter\'' . (empty($filters) ? '' : $filters) . ' GROUP BY time(' . $period['key'] . '), "projectId"' . $groupBy . ' FILL(null)');
$points = $result->getPoints();
foreach ($points as $point) {
$projectId = $point['projectId'];
if (!empty($projectId) && $projectId != 'console') {
$dbForProject->setNamespace('project_' . $projectId . '_internal');
if($metric == 'functions.functionId.executions') {
var_dump($points);
}
$metricUpdated = $metric;
if (!empty($groupBy)) {
$groupedBy = $point[$options['groupBy']] ?? '';
if (empty($groupedBy)) {
@ -263,9 +279,11 @@ $cli
}
$metricUpdated = str_replace($options['groupBy'], $groupedBy, $metric);
}
$time = \strtotime($point['time']);
$id = \md5($time . '_' . $period['key'] . '_' . $metricUpdated); //construct unique id for each metric using time, period and metric
$id = \md5($time . '_' . $period['key'] . '_' . $metricUpdated); //Construct unique id for each metric using time, period and metric
$value = (!empty($point['value'])) ? $point['value'] : 0;
try {
$document = $dbForProject->getDocument('stats', $id);
if ($document->isEmpty()) {
@ -279,12 +297,11 @@ $cli
]));
} else {
$dbForProject->updateDocument('stats', $document->getId(),
$document->setAttribute('value', $value));
$document->setAttribute('value', $value));
}
$latestTime[$metric][$period['key']] = $time;
} catch (\Exception$e) {
// if projects are deleted this might fail
Console::warning("Failed to save data for project {$projectId} and metric {$metricUpdated}");
} catch (\Exception $e) { // if projects are deleted this might fail
Console::warning("Failed to save data for project {$projectId} and metric {$metricUpdated}: {$e->getMessage()}");
}
}
}
@ -292,119 +309,245 @@ $cli
}
}
if ($iterations % 30 == 0) { //every 15 minutes
// aggregate number of objects in database
// get count of all the documents per collection -
// buckets will have the same
/**
* Aggregate MariaDB every 15 minutes
* Some of the queries here might contain full-table scans.
*/
if ($iterations % 30 == 0) { // Every 15 minutes aggregate number of objects in database
$latestProject = null;
do {
do { // Loop over all the projects
$projects = $dbForConsole->find('projects', [], 100, orderAfter:$latestProject);
if (!empty($projects)) {
$latestProject = $projects[array_key_last($projects)];
foreach ($projects as $project) {
$id = $project->getId();
if (empty($projects)) {
continue;
}
// get total storage
$dbForProject->setNamespace('project_' . $id . '_internal');
$storageTotal = $dbForProject->sum('files', 'sizeOriginal') + $dbForProject->sum('tags', 'size');
$latestProject = $projects[array_key_last($projects)];
foreach ($projects as $project) {
$projectId = $project->getId();
// Get total storage
$dbForProject->setNamespace('project_' . $projectId . '_internal');
$storageTotal = $dbForProject->sum('files', 'sizeOriginal') + $dbForProject->sum('tags', 'size');
$time = (int) (floor(time() / 1800) * 1800); // Time rounded to nearest 30 minutes
$id = \md5($time . '_30m_storage.total'); //Construct unique id for each metric using time, period and metric
$document = $dbForProject->getDocument('stats', $id);
if ($document->isEmpty()) {
$dbForProject->createDocument('stats', new Document([
'$id' => $dbForProject->getId(),
'period' => '15m',
'time' => time(),
'$id' => $id,
'period' => '30m',
'time' => $time,
'metric' => 'storage.total',
'value' => $storageTotal,
'type' => 1,
]));
} else {
$dbForProject->updateDocument('stats', $document->getId(),
$document->setAttribute('value', $storageTotal));
}
$time = (int) (floor(time() / 86400) * 86400); // Time rounded to nearest day
$id = \md5($time . '_1d_storage.total'); //Construct unique id for each metric using time, period and metric
$document = $dbForProject->getDocument('stats', $id);
if ($document->isEmpty()) {
$dbForProject->createDocument('stats', new Document([
'$id' => $id,
'period' => '1d',
'time' => $time,
'metric' => 'storage.total',
'value' => $storageTotal,
'type' => 1,
]));
} else {
$dbForProject->updateDocument('stats', $document->getId(),
$document->setAttribute('value', $storageTotal));
}
$collections = [
'users' => [
'namespace' => 'internal',
],
'collections' => [
'metricPrefix' => 'database',
'namespace' => 'internal',
'subCollections' => [
'documents' => [
'namespace' => 'external',
],
$collections = [
'users' => [
'namespace' => 'internal',
],
'collections' => [
'metricPrefix' => 'database',
'namespace' => 'internal',
'subCollections' => [ // Some collections, like collections and later buckets have child collections that need counting
'documents' => [
'namespace' => 'external',
],
],
'files' => [
'metricPrefix' => 'storage',
'namespace' => 'internal',
],
];
foreach ($collections as $collection => $options) {
try {
$dbForProject->setNamespace("project_{$id}_{$options['namespace']}");
$count = $dbForProject->count($collection);
$dbForProject->setNamespace("project_{$id}_internal");
$metricPrefix = $options['metricPrefix'] ?? '';
$metric = empty($metricPrefix) ? "{$collection}.count" : "{$metricPrefix}.{$collection}.count";
],
'files' => [
'metricPrefix' => 'storage',
'namespace' => 'internal',
],
];
foreach ($collections as $collection => $options) {
try {
$dbForProject->setNamespace("project_{$projectId}_{$options['namespace']}");
$count = $dbForProject->count($collection);
$dbForProject->setNamespace("project_{$projectId}_internal");
$metricPrefix = $options['metricPrefix'] ?? '';
$metric = empty($metricPrefix) ? "{$collection}.count" : "{$metricPrefix}.{$collection}.count";
$time = (int) (floor(time() / 1800) * 1800); // Time rounded to nearest 30 minutes
$id = \md5($time . '_30m_' . $metric); //Construct unique id for each metric using time, period and metric
$document = $dbForProject->getDocument('stats', $id);
if ($document->isEmpty()) {
$dbForProject->createDocument('stats', new Document([
'$id' => $dbForProject->getId(),
'time' => time(),
'period' => '15m',
'$id' => $id,
'time' => $time,
'period' => '30m',
'metric' => $metric,
'value' => $count,
'type' => 1,
]));
} else {
$dbForProject->updateDocument('stats', $document->getId(),
$document->setAttribute('value', $count));
}
$subCollections = $options['subCollections'] ?? [];
if (!empty($subCollections)) {
$latestParent = null;
$subCollectionCounts = []; //total project level count of sub collections
do {
$dbForProject->setNamespace("project_{$id}_{$options['namespace']}");
$parents = $dbForProject->find($collection, [], 100, orderAfter:$latestParent);
if (!empty($parents)) {
$latestParent = $parents[array_key_last($parents)];
foreach ($parents as $parent) {
foreach ($subCollections as $subCollection => $subOptions) {
$dbForProject->setNamespace("project_{$id}_{$subOptions['namespace']}");
$count = $dbForProject->count($parent->getId());
$subCollectionsCounts[$subCollection] = ($subCollectionCounts[$subCollection] ?? 0) + $count;
$time = (int) (floor(time() / 86400) * 86400); // Time rounded to nearest day
$id = \md5($time . '_1d_' . $metric); //Construct unique id for each metric using time, period and metric
$document = $dbForProject->getDocument('stats', $id);
if ($document->isEmpty()) {
$dbForProject->createDocument('stats', new Document([
'$id' => $id,
'time' => $time,
'period' => '1d',
'metric' => $metric,
'value' => $count,
'type' => 1,
]));
} else {
$dbForProject->updateDocument('stats', $document->getId(),
$document->setAttribute('value', $count));
}
$dbForProject->setNamespace("project_{$id}_internal");
$dbForProject->createDocument('stats', new Document([
'$id' => $dbForProject->getId(),
'time' => time(),
'period' => '15m',
'metric' => empty($metricPrefix) ? "{$collection}.{$parent->getId()}.{$subCollection}.count" : "{$metricPrefix}.{$collection}.{$parent->getId()}.{$subCollection}.count",
'value' => $count,
'type' => 1,
]));
}
}
$subCollections = $options['subCollections'] ?? [];
if (empty($subCollections)) {
continue;
}
$latestParent = null;
$subCollectionCounts = []; //total project level count of sub collections
do { // Loop over all the parent collection document for each sub collection
$dbForProject->setNamespace("project_{$projectId}_{$options['namespace']}");
$parents = $dbForProject->find($collection, [], 100, orderAfter:$latestParent); // Get all the parents for the sub collections for example for documents, this will get all the collections
if (empty($parents)) {
continue;
}
$latestParent = $parents[array_key_last($parents)];
foreach ($parents as $parent) {
foreach ($subCollections as $subCollection => $subOptions) { // Sub collection counts, like database.collections.collectionId.documents.count
$dbForProject->setNamespace("project_{$projectId}_{$subOptions['namespace']}");
$count = $dbForProject->count($parent->getId());
$subCollectionCounts[$subCollection] = ($subCollectionCounts[$subCollection] ?? 0) + $count; // Project level counts for sub collections like database.documents.count
$dbForProject->setNamespace("project_{$projectId}_internal");
$metric = empty($metricPrefix) ? "{$collection}.{$parent->getId()}.{$subCollection}.count" : "{$metricPrefix}.{$collection}.{$parent->getId()}.{$subCollection}.count";
$time = (int) (floor(time() / 1800) * 1800); // Time rounded to nearest 30 minutes
$id = \md5($time . '_30m_' . $metric); //Construct unique id for each metric using time, period and metric
$document = $dbForProject->getDocument('stats', $id);
if ($document->isEmpty()) {
$dbForProject->createDocument('stats', new Document([
'$id' => $id,
'time' => $time,
'period' => '30m',
'metric' => $metric,
'value' => $count,
'type' => 1,
]));
} else {
$dbForProject->updateDocument('stats', $document->getId(),
$document->setAttribute('value', $count));
}
} while (!empty($parents));
foreach ($subCollectionsCounts as $subCollection => $count) {
$dbForProject->setNamespace("project_{$id}_internal");
$dbForProject->createDocument('stats', new Document([
'$id' => $dbForProject->getId(),
'time' => time(),
'period' => '15m',
'metric' => empty($metricPrefix) ? "{$subCollection}.count" : "{$metricPrefix}.{$subCollection}.count",
'value' => $count,
'type' => 1,
]));
$time = (int) (floor(time() / 86400) * 86400); // Time rounded to nearest day
$id = \md5($time . '_1d_' . $metric); //Construct unique id for each metric using time, period and metric
$document = $dbForProject->getDocument('stats', $id);
if ($document->isEmpty()) {
$dbForProject->createDocument('stats', new Document([
'$id' => $id,
'time' => $time,
'period' => '1d',
'metric' => $metric,
'value' => $count,
'type' => 1,
]));
} else {
$dbForProject->updateDocument('stats', $document->getId(),
$document->setAttribute('value', $count));
}
}
}
} catch (\Exception$e) {
Console::warning("Failed to save database counters data for project {$collection}");
} while (!empty($parents));
/**
* Inserting project level counts for sub collections like database.documents.count
*/
foreach ($subCollectionCounts as $subCollection => $count) {
$dbForProject->setNamespace("project_{$projectId}_internal");
$metric = empty($metricPrefix) ? "{$subCollection}.count" : "{$metricPrefix}.{$subCollection}.count";
$time = (int) (floor(time() / 1800) * 1800); // Time rounded to nearest 30 minutes
$id = \md5($time . '_30m_' . $metric); //Construct unique id for each metric using time, period and metric
$document = $dbForProject->getDocument('stats', $id);
if ($document->isEmpty()) {
$dbForProject->createDocument('stats', new Document([
'$id' => $id,
'time' => $time,
'period' => '30m',
'metric' => $metric,
'value' => $count,
'type' => 1,
]));
} else {
$dbForProject->updateDocument('stats', $document->getId(),
$document->setAttribute('value', $count));
}
$time = (int) (floor(time() / 86400) * 86400); // Time rounded to nearest day
$id = \md5($time . '_1d_' . $metric); //Construct unique id for each metric using time, period and metric
$document = $dbForProject->getDocument('stats', $id);
if ($document->isEmpty()) {
$dbForProject->createDocument('stats', new Document([
'$id' => $id,
'time' => $time,
'period' => '1d',
'metric' => $metric,
'value' => $count,
'type' => 1,
]));
} else {
$dbForProject->updateDocument('stats', $document->getId(),
$document->setAttribute('value', $count));
}
}
} catch (\Exception$e) {
Console::warning("Failed to save database counters data for project {$collection}: {$e->getMessage()}");
}
}
}
} while (!empty($projects));
}
$iterations++;
$loopTook = microtime(true) - $loopStart;
$now = date('d-m-Y H:i:s', time());
Console::info("[{$now}] Aggregation took {$loopTook} seconds");
}, $interval);
});

View file

@ -18,8 +18,14 @@ Console::success(APP_NAME.' deletes worker v1 has started'."\n");
class DeletesV1 extends Worker
{
/**
* @var array
*/
public $args = [];
/**
* @var Database
*/
protected $consoleDB = null;
public function init(): void
@ -33,7 +39,7 @@ class DeletesV1 extends Worker
switch (strval($type)) {
case DELETE_TYPE_DOCUMENT:
$document = $this->args['document'] ?? '';
$document = $this->args['document'] ?? [];
$document = new Document($document);
switch ($document->getCollection()) {
@ -87,7 +93,8 @@ class DeletesV1 extends Worker
* @param Document $document teams document
* @param string $projectId
*/
protected function deleteMemberships(Document $document, $projectId) {
protected function deleteMemberships(Document $document, string $projectId): void
{
$teamId = $document->getAttribute('teamId', '');
// Delete Memberships
@ -99,7 +106,7 @@ class DeletesV1 extends Worker
/**
* @param Document $document project document
*/
protected function deleteProject(Document $document)
protected function deleteProject(Document $document): void
{
$projectId = $document->getId();
// Delete all DBs
@ -118,7 +125,7 @@ class DeletesV1 extends Worker
* @param Document $document user document
* @param string $projectId
*/
protected function deleteUser(Document $document, $projectId)
protected function deleteUser(Document $document, string $projectId): void
{
$userId = $document->getId();
@ -143,9 +150,9 @@ class DeletesV1 extends Worker
/**
* @param int $timestamp
*/
protected function deleteExecutionLogs($timestamp)
protected function deleteExecutionLogs(int $timestamp): void
{
$this->deleteForProjectIds(function($projectId) use ($timestamp) {
$this->deleteForProjectIds(function(string $projectId) use ($timestamp) {
if (!($dbForInternal = $this->getInternalDB($projectId))) {
throw new Exception('Failed to get projectDB for project '.$projectId);
}
@ -160,7 +167,7 @@ class DeletesV1 extends Worker
/**
* @param int $timestamp
*/
protected function deleteAbuseLogs($timestamp)
protected function deleteAbuseLogs(int $timestamp): void
{
if($timestamp == 0) {
throw new Exception('Failed to delete audit logs. No timestamp provided');
@ -180,7 +187,7 @@ class DeletesV1 extends Worker
/**
* @param int $timestamp
*/
protected function deleteAuditLogs($timestamp)
protected function deleteAuditLogs(int $timestamp): void
{
if($timestamp == 0) {
throw new Exception('Failed to delete audit logs. No timestamp provided');
@ -198,7 +205,7 @@ class DeletesV1 extends Worker
* @param Document $document function document
* @param string $projectId
*/
protected function deleteFunction(Document $document, $projectId)
protected function deleteFunction(Document $document, string $projectId): void
{
$dbForInternal = $this->getInternalDB($projectId);
$device = new Local(APP_STORAGE_FUNCTIONS.'/app-'.$projectId);
@ -255,7 +262,7 @@ class DeletesV1 extends Worker
/**
* @param callable $callback
*/
protected function deleteForProjectIds(callable $callback)
protected function deleteForProjectIds(callable $callback): void
{
$count = 0;
$chunk = 0;
@ -266,12 +273,12 @@ class DeletesV1 extends Worker
$executionStart = \microtime(true);
while($sum === $limit) {
$chunk++;
Authorization::disable();
$projects = $this->getConsoleDB()->find('projects', [], $limit);
$projects = $this->getConsoleDB()->find('projects', [], $limit, ($chunk * $limit));
Authorization::reset();
$chunk++;
$projectIds = array_map (function ($project) {
return $project->getId();
}, $projects);
@ -295,7 +302,7 @@ class DeletesV1 extends Worker
* @param Database $database
* @param callable $callback
*/
protected function deleteByGroup(string $collection, array $queries, Database $database, callable $callback = null)
protected function deleteByGroup(string $collection, array $queries, Database $database, callable $callback = null): void
{
$count = 0;
$chunk = 0;
@ -331,9 +338,8 @@ class DeletesV1 extends Worker
/**
* @param Document $document certificates document
* @return Database
*/
protected function deleteCertificates(Document $document)
protected function deleteCertificates(Document $document): void
{
$domain = $document->getAttribute('domain');
$directory = APP_STORAGE_CERTIFICATES . '/' . $domain;