1
0
Fork 0
mirror of synced 2024-06-14 00:34:51 +12:00

cleaning up refactors

This commit is contained in:
Damodar Lohani 2022-06-13 16:56:26 +05:45
parent da533571b3
commit 2a13cc33ac
3 changed files with 245 additions and 254 deletions

View file

@ -11,7 +11,6 @@ use Utopia\Cache\Cache;
use Utopia\CLI\Console;
use Utopia\Database\Adapter\MariaDB;
use Utopia\Database\Database;
use Utopia\Database\Document;
use Utopia\Database\Validator\Authorization;
use Utopia\Registry\Registry;
@ -31,8 +30,8 @@ function getDatabase(Registry&$register, string $namespace): Database
$database->setDefaultDatabase(App::getEnv('_APP_DB_SCHEMA', 'appwrite'));
$database->setNamespace($namespace);
if (!$database->exists($database->getDefaultDatabase(), 'realtime')) {
throw new Exception('Collection not ready');
if (!$database->exists($database->getDefaultDatabase(), 'projects')) {
throw new Exception('Projects collection not ready');
}
break; // leave loop if successful
} catch (\Exception$e) {
@ -73,81 +72,15 @@ function getInfluxDB(Registry&$register): InfluxDatabase
return $database;
}
/**
* Metrics We collect
*
* General
*
* requests
* network
* executions
*
* Database
*
* database.collections.create
* database.collections.read
* database.collections.update
* database.collections.delete
* database.documents.create
* database.documents.read
* database.documents.update
* database.documents.delete
* database.collections.{collectionId}.documents.create
* database.collections.{collectionId}.documents.read
* database.collections.{collectionId}.documents.update
* database.collections.{collectionId}.documents.delete
*
* Storage
*
* storage.buckets.create
* storage.buckets.read
* storage.buckets.update
* storage.buckets.delete
* storage.files.create
* storage.files.read
* storage.files.update
* storage.files.delete
* storage.buckets.{bucketId}.files.create
* storage.buckets.{bucketId}.files.read
* storage.buckets.{bucketId}.files.update
* storage.buckets.{bucketId}.files.delete
*
* Users
*
* users.create
* users.read
* users.update
* users.delete
* users.sessions.create
* users.sessions.{provider}.create
* users.sessions.delete
*
* Functions
*
* functions.{functionId}.executions
* functions.{functionId}.failures
* functions.{functionId}.compute
*
* Counters
*
* users.count
* storage.buckets.count
* storage.files.count
* storage.buckets.{bucketId}.files.count
* database.collections.count
* database.documents.count
* database.collections.{collectionId}.documents.count
*
* Totals
*
* storage.total
*
*/
$logError = function($message, $stackTrace) {
Console::warning("Failed: {$message}");
Console::warning($stackTrace);
};
$cli
->task('usage')
->desc('Schedules syncing data from influxdb to Appwrite console db')
->action(function () use ($register) {
->action(function () use ($register, $logError) {
Console::title('Usage Aggregation V1');
Console::success(APP_NAME . ' usage aggregation process v1 has started');
@ -156,37 +89,25 @@ $cli
$database = getDatabase($register, '_console');
$influxDB = getInfluxDB($register);
$usage = new Usage($database, $influxDB);
$usageDB = new UsageDB($database);
$usage = new Usage($database, $influxDB, $logError);
$latestTime = [];
$usageDB = new UsageDB($database, $logError);
Authorization::disable();
$iterations = 0;
Console::loop(function () use ($interval, $database, $usage, $usageDB, &$latestTime, &$iterations) {
Console::loop(function () use ($interval, $usage, $usageDB, &$iterations) {
$now = date('d-m-Y H:i:s', time());
Console::info("[{$now}] Aggregating usage data every {$interval} seconds");
$loopStart = microtime(true);
/**
* Aggregate InfluxDB every 30 seconds
*/
* Aggregate InfluxDB every 30 seconds
*/
$usage->collect();
// sync data
foreach ($usage->getMetrics() as $metric => $options) { //for each metrics
foreach ($usage->getPeriods() as $period) { // aggregate data for each period
try {
$usage->syncFromInfluxDB($metric, $options, $period, $latestTime);
} catch (\Exception$e) {
Console::warning("Failed: {$e->getMessage()}");
Console::warning($e->getTraceAsString());
}
}
}
if ($iterations % 30 != 0) { // Aggregate aggregate number of objects in database only after 15 minutes
if ($iterations % 30 != 0) { // return if 30 iterations has not passed
$iterations++;
$loopTook = microtime(true) - $loopStart;
$now = date('d-m-Y H:i:s', time());
@ -194,104 +115,15 @@ $cli
return;
}
$iterations = 0; // Reset iterations to prevent overflow when running for long time
/**
* Aggregate MariaDB every 15 minutes
* Some of the queries here might contain full-table scans.
*/
* Aggregate MariaDB every 15 minutes
* Some of the queries here might contain full-table scans.
*/
$now = date('d-m-Y H:i:s', time());
Console::info("[{$now}] Aggregating database counters.");
$usageDB->foreachDocument('console', 'projects', [], function ($project) use ($usageDB) {
$projectId = $project->getId();
// Get total storage of deployments
try {
$deploymentsTotal = $usageDB->sum($projectId, 'deployments', 'size', 'storage.deployments.total');
} catch (\Exception$e) {
Console::warning("Failed to save data for project {$projectId} and metric storage.deployments.total: {$e->getMessage()}");
Console::warning($e->getTraceAsString());
}
foreach ($usageDB->getCollections() as $collection => $options) {
try {
$metricPrefix = $options['metricPrefix'] ?? '';
$metric = empty($metricPrefix) ? "{$collection}.count" : "{$metricPrefix}.{$collection}.count";
$usageDB->count($projectId, $collection, $metric);
$subCollections = $options['subCollections'] ?? [];
if (empty($subCollections)) {
continue;
}
$subCollectionCounts = []; //total project level count of sub collections
$subCollectionTotals = []; //total project level sum of sub collections
$usageDB->foreachDocument($projectId, $collection, [], function ($parent) use (&$subCollectionCounts, &$subCollectionTotals, $subCollections, $projectId, $usageDB, $collection) {
foreach ($subCollections as $subCollection => $subOptions) { // Sub collection counts, like database.collections.collectionId.documents.count
$metric = empty($metricPrefix) ? "{$collection}.{$parent->getId()}.{$subCollection}.count" : "{$metricPrefix}.{$collection}.{$parent->getInternalId()}.{$subCollection}.count";
$count = $usageDB->count($projectId, ($subOptions['collectionPrefix'] ?? '') . $parent->getInternalId(), $metric);
$subCollectionCounts[$subCollection] = ($subCollectionCounts[$subCollection] ?? 0) + $count; // Project level counts for sub collections like database.documents.count
// check if sum calculation is required
$total = $subOptions['total'] ?? [];
if (empty($total)) {
continue;
}
$metric = empty($metricPrefix) ? "{$collection}.{$parent->getId()}.{$subCollection}.total" : "{$metricPrefix}.{$collection}.{$parent->getInternalId()}.{$subCollection}.total";
$total = $usageDB->sum($projectId, ($subOptions['collectionPrefix'] ?? '') . $parent->getInternalId(), $total['field'], $metric);
$subCollectionTotals[$subCollection] = ($subCollectionTotals[$subCollection] ?? 0) + $total; // Project level sum for sub collections like storage.total
}
});
/**
* Inserting project level counts for sub collections like database.documents.count
*/
foreach ($subCollectionCounts as $subCollection => $count) {
$metric = empty($metricPrefix) ? "{$subCollection}.count" : "{$metricPrefix}.{$subCollection}.count";
$time = (int) (floor(time() / 1800) * 1800); // Time rounded to nearest 30 minutes
$usageDB->createOrUpdateMetric($projectId, $time, '30m', $metric, $count, 1);
$time = (int) (floor(time() / 86400) * 86400); // Time rounded to nearest day
$usageDB->createOrUpdateMetric($projectId, $time, '1d', $metric, $count, 1);
}
/**
* Inserting project level sums for sub collections like storage.files.total
*/
foreach ($subCollectionTotals as $subCollection => $count) {
$metric = empty($metricPrefix) ? "{$subCollection}.total" : "{$metricPrefix}.{$subCollection}.total";
$time = (int) (floor(time() / 1800) * 1800); // Time rounded to nearest 30 minutes
$usageDB->createOrUpdateMetric($projectId, $time, '30m', $metric, $count, 1);
$time = (int) (floor(time() / 86400) * 86400); // Time rounded to nearest day
$usageDB->createOrUpdateMetric($projectId, $time, '1d', $metric, $count, 1);
// aggregate storage.total = storage.files.total + storage.deployments.total
if ($metricPrefix === 'storage' && $subCollection === 'files') {
$metric = 'storage.total';
$time = (int) (floor(time() / 1800) * 1800); // Time rounded to nearest 30 minutes
$usageDB->createOrUpdateMetric($projectId, $time, '30m', $metric, $count + $deploymentsTotal, 1);
$time = (int) (floor(time() / 86400) * 86400); // Time rounded to nearest day
$usageDB->createOrUpdateMetric($projectId, $time, '1d', $metric, $count + $deploymentsTotal, 1);
}
}
} catch (\Exception$e) {
Console::warning("Failed: {$e->getMessage()}");
Console::warning($e->getTraceAsString());
}
}
});
$usageDB->collect();
$iterations++;
$loopTook = microtime(true) - $loopStart;

View file

@ -10,6 +10,8 @@ use DateTime;
class Usage {
protected InfluxDatabase $influxDB;
protected Database $database;
protected $errorHandler;
private array $latestTime = [];
// all the mertics that we are collecting
protected array $metrics = [
@ -144,28 +146,21 @@ class Usage {
protected array $periods = [
[
'key' => '30m',
'multiplier' => 1800,
'startTime' => '-24 hours',
],
[
'key' => '1d',
'multiplier' => 86400,
'startTime' => '-90 days',
],
];
public function __construct(Database $database, InfluxDatabase $influxDB)
public function __construct(Database $database, InfluxDatabase $influxDB, callable $errorHandler = null)
{
$this->database = $database;
$this->influxDB = $influxDB;
}
public function getMetrics(): array
{
return $this->metrics;
}
public function getPeriods(): array
{
return $this->periods;
$this->errorHandler = $errorHandler;
}
/**
@ -181,7 +176,7 @@ class Usage {
*
* @return void
*/
public function createOrUpdateMetric(string $projectId, int $time, string $period, string $metric, int $value, int $type): void
private function createOrUpdateMetric(string $projectId, int $time, string $period, string $metric, int $value, int $type): void
{
$id = \md5("{$time}_{$period}_{$metric}");
$this->database->setNamespace('_' . $projectId);
@ -203,9 +198,13 @@ class Usage {
$document->setAttribute('value', $value)
);
}
$latestTime[$metric][$period['key']] = $time;
$this->latestTime[$metric][$period['key']] = $time;
} catch (\Exception $e) { // if projects are deleted this might fail
throw new \Exception("Unable to save data for project {$projectId} and metric {$metric}: {$e->getMessage()}");
if(is_callable($this->errorHandler)) {
call_user_func($this->errorHandler, "Unable to save data for project {$projectId} and metric {$metric}: {$e->getMessage()}", $e->getTraceAsString());
} else {
throw $e;
}
}
}
@ -216,15 +215,14 @@ class Usage {
* @param string $metric
* @param array $options
* @param array $period
* @param array $latestTime
*
* @return void
*/
public function syncFromInfluxDB(string $metric, array $options, array $period, array &$latestTime): void
private function syncFromInfluxDB(string $metric, array $options, array $period): void
{
$start = DateTime::createFromFormat('U', \strtotime($period['startTime']))->format(DateTime::RFC3339);
if (!empty($latestTime[$metric][$period['key']])) {
$start = DateTime::createFromFormat('U', $latestTime[$metric][$period['key']])->format(DateTime::RFC3339);
if (!empty($this->latestTime[$metric][$period['key']])) {
$start = DateTime::createFromFormat('U', $this->latestTime[$metric][$period['key']])->format(DateTime::RFC3339);
}
$end = DateTime::createFromFormat('U', \strtotime('now'))->format(DateTime::RFC3339);
@ -270,4 +268,27 @@ class Usage {
}
}
}
/**
* Collect Stats
* Collect all the stats from Influd DB to Database
*
* @return void
*/
public function collect(): void
{
foreach ($this->metrics as $metric => $options) { //for each metrics
foreach ($this->periods as $period) { // aggregate data for each period
try {
$this->syncFromInfluxDB($metric, $options, $period);
} catch (\Exception $e) {
if(is_callable($this->errorHandler)) {
call_user_func($this->errorHandler, $e->getMessage(), $e->getTraceAsString());
} else {
throw $e;
}
}
}
}
}
}

View file

@ -3,49 +3,72 @@
namespace Appwrite\Stats;
use Utopia\Database\Database;
use Utopia\Database\Document;
class UsageDB extends Usage
{
protected array $collections = [
'users' => [
'namespace' => '',
],
'collections' => [
'metricPrefix' => 'database',
'namespace' => '',
'subCollections' => [ // Some collections, like collections and later buckets have child collections that need counting
'documents' => [
'collectionPrefix' => 'collection_',
'namespace' => '',
],
],
],
'buckets' => [
'metricPrefix' => 'storage',
'namespace' => '',
'subCollections' => [
'files' => [
'namespace' => '',
'collectionPrefix' => 'bucket_',
'total' => [
'field' => 'sizeOriginal',
],
],
],
],
];
public function __construct(Database $database)
public function __construct(Database $database, callable $errorHandler = null)
{
$this->database = $database;
$this->errorHandler = $errorHandler;
}
public function getCollections(): array
/**
* Create or Update Mertic
* Create or update each metric in the stats collection for the given project
*
* @param string $projectId
* @param string $metric
* @param int $value
*
* @return void
*/
private function createOrUpdateMetric(string $projectId, string $metric, int $value): void
{
return $this->collections;
foreach ($this->periods as $options) {
$period = $options['key'];
$time = (int) (floor(time() / $options['multiplier']) * $options['multiplier']);
$id = \md5("{$time}_{$period}_{$metric}");
$this->database->setNamespace('_' . $projectId);
try {
$document = $this->database->getDocument('stats', $id);
if ($document->isEmpty()) {
$this->database->createDocument('stats', new Document([
'$id' => $id,
'period' => $period['key'],
'time' => $time,
'metric' => $metric,
'value' => $value,
'type' => 1,
]));
} else {
$this->database->updateDocument(
'stats',
$document->getId(),
$document->setAttribute('value', $value)
);
}
} catch (\Exception$e) { // if projects are deleted this might fail
if (is_callable($this->errorHandler)) {
call_user_func($this->errorHandler, "Unable to save data for project {$projectId} and metric {$metric}: {$e->getMessage()}", $e->getTraceAsString());
} else {
throw $e;
}
}
}
}
public function foreachDocument(string $projectId, string $collection, array $queries, callable $callback): void
/**
* Foreach Document
* Call provided callback for each document in the collection
*
* @param string $projectId
* @param string $collection
* @param array $queries
* @param callable $callback
*
* @return void
*/
private function foreachDocument(string $projectId, string $collection, array $queries, callable $callback): void
{
$limit = 50;
$results = [];
@ -67,31 +90,146 @@ class UsageDB extends Usage
}
}
public function sum(string $projectId, string $collection, string $attribute, string $metric): int
/**
* Sum
* Calculate sum of a attribute of documents in collection
*
* @param string $projectId
* @param string $collection
* @param string $attribute
* @param string $metric
*
* @return int
*/
private function sum(string $projectId, string $collection, string $attribute, string $metric): int
{
$this->database->setNamespace('_' . $projectId);
$sum = (int) $this->database->sum($collection, $attribute);
$time = (int) (floor(time() / 1800) * 1800); // Time rounded to nearest 30 minutes
$this->createOrUpdateMetric($projectId, $time, '30m', $metric, $sum, 1);
$time = (int) (floor(time() / 86400) * 86400); // Time rounded to nearest day
$this->createOrUpdateMetric($projectId, $time, '1d', $metric, $sum, 1);
$this->createOrUpdateMetric($projectId, $metric, $sum);
return $sum;
}
public function count(string $projectId, string $collection, string $metric): int
/**
* Count
* Count number of documents in collection
*
* @param string $projectId
* @param string $collection
* @param string $metric
*
* @return int
*/
private function count(string $projectId, string $collection, string $metric): int
{
$this->database->setNamespace("_{$projectId}");
$count = $this->database->count($collection);
$metricPrefix = $options['metricPrefix'] ?? '';
$metric = empty($metricPrefix) ? "{$collection}.count" : "{$metricPrefix}.{$collection}.count";
$time = (int) (floor(time() / 1800) * 1800); // Time rounded to nearest 30 minutes
$this->createOrUpdateMetric($projectId, $time, '30m', $metric, $count, 1);
$time = (int) (floor(time() / 86400) * 86400); // Time rounded to nearest day
$this->createOrUpdateMetric($projectId, $time, '1d', $metric, $count, 1);
$this->createOrUpdateMetric($projectId, $metric, $count);
return $count;
}
/**
* Deployments Total
* Total sum of storage used by deployments
*
* @param string $projectId
*
* @return int
*/
private function deploymentsTotal(string $projectId): int
{
return $this->sum($projectId, 'deployments', 'size', 'stroage.deployments.total');
}
/**
* Users Stats
* Metric: users.count
*
* @param string $projectId
*
* @return void
*/
private function usersStats(string $projectId): void
{
$this->count($projectId, 'users', 'users.count');
}
/**
* Storage Stats
* Metrics: storage.total, storage.files.total, storage.buckets.{bucketId}.files.total,
* storage.buckets.count, storage.files.count, storage.buckets.{bucketId}.files.count
*
* @param string $projectId
*
* @return void
*/
private function storageStats(string $projectId): void
{
$deploymentsTotal = $this->deploymentsTotal($projectId);
$projectFilesTotal = 0;
$projectFilesCount = 0;
$metric = 'storage.buckets.count';
$this->count($projectId, 'buckets', $metric);
$this->foreachDocument($projectId, 'buckets', [], function ($bucket) use (&$projectFilesCount, &$projectFilesTotal, $projectId, ) {
$metric = "storage.buckets.{$bucket->getId()}.files.count";
$count = $this->count($projectId, 'buckets_' . $bucket->getInternalId(), $metric);
$projectFilesCount += $count;
$metric = "storage.buckets.{$bucket->getId()}.files.total";
$sum = $this->sum($projectId, 'bucket_' . $bucket->getInternalId(), 'sizeOriginal', $metric);
$projectFilesTotal += $sum;
});
$this->createOrUpdateMetric($projectId, 'storage.files.count', $projectFilesCount);
$this->createOrUpdateMetric($projectId, 'storage.files.total', $projectFilesTotal);
$this->createOrUpdateMetric($projectId, 'storage.total', $projectFilesTotal + $deploymentsTotal);
}
/**
* Database Stats
* Collect all database stats
* Metrics: database.collections.count, database.collections.{collectionId}.documents.count,
* database.documents.count
*
* @param string $projectId
*
* @return void
*/
private function databaseStats(string $projectId): void
{
$projectDocumentsCount = 0;
$metric = 'database.collections.count';
$this->count($projectId, 'collections', $metric);
$this->foreachDocument($projectId, 'collections', [], function ($collection) use (&$projectDocumentsCount, $projectId, ) {
$metric = "database.collections.{$collection->getId()}.documents.count";
$count = $this->count($projectId, 'collection_' . $collection->getInternalId(), $metric);
$projectDocumentsCount += $count;
});
$this->createOrUpdateMetric($projectId, 'database.documents.count', $projectDocumentsCount);
}
/**
* Collect Stats
* Collect all database related stats
*
* @return void
*/
public function collect(): void
{
$this->foreachDocument('console', 'projects', [], function ($project) {
$projectId = $project->getId();
$this->usersStats($projectId);
$this->databaseStats($projectId);
$this->storageStats($projectId);
});
}
}