Merge pull request #3373 from appwrite/feat-usage-refactor
This commit is contained in:
commit
2c89d063c1
|
@ -2,271 +2,131 @@
|
|||
|
||||
global $cli, $register;
|
||||
|
||||
use Appwrite\Stats\Usage;
|
||||
use Appwrite\Stats\UsageDB;
|
||||
use InfluxDB\Database as InfluxDatabase;
|
||||
use Utopia\App;
|
||||
use Utopia\Cache\Adapter\Redis;
|
||||
use Utopia\Cache\Adapter\Redis as RedisCache;
|
||||
use Utopia\Cache\Cache;
|
||||
use Utopia\CLI\Console;
|
||||
use Utopia\Database\Adapter\MariaDB;
|
||||
use Utopia\Database\Database;
|
||||
use Utopia\Database\Document;
|
||||
use Utopia\Database\Validator\Authorization;
|
||||
use Utopia\Registry\Registry;
|
||||
use Utopia\Logger\Log;
|
||||
|
||||
/**
|
||||
* Metrics We collect
|
||||
*
|
||||
* General
|
||||
*
|
||||
* requests
|
||||
* network
|
||||
* executions
|
||||
*
|
||||
* Database
|
||||
*
|
||||
* database.collections.create
|
||||
* database.collections.read
|
||||
* database.collections.update
|
||||
* database.collections.delete
|
||||
* database.documents.create
|
||||
* database.documents.read
|
||||
* database.documents.update
|
||||
* database.documents.delete
|
||||
* database.collections.{collectionId}.documents.create
|
||||
* database.collections.{collectionId}.documents.read
|
||||
* database.collections.{collectionId}.documents.update
|
||||
* database.collections.{collectionId}.documents.delete
|
||||
*
|
||||
* Storage
|
||||
*
|
||||
* storage.buckets.create
|
||||
* storage.buckets.read
|
||||
* storage.buckets.update
|
||||
* storage.buckets.delete
|
||||
* storage.files.create
|
||||
* storage.files.read
|
||||
* storage.files.update
|
||||
* storage.files.delete
|
||||
* storage.buckets.{bucketId}.files.create
|
||||
* storage.buckets.{bucketId}.files.read
|
||||
* storage.buckets.{bucketId}.files.update
|
||||
* storage.buckets.{bucketId}.files.delete
|
||||
*
|
||||
* Users
|
||||
*
|
||||
* users.create
|
||||
* users.read
|
||||
* users.update
|
||||
* users.delete
|
||||
* users.sessions.create
|
||||
* users.sessions.{provider}.create
|
||||
* users.sessions.delete
|
||||
*
|
||||
* Functions
|
||||
*
|
||||
* functions.{functionId}.executions
|
||||
* functions.{functionId}.failures
|
||||
* functions.{functionId}.compute
|
||||
*
|
||||
* Counters
|
||||
*
|
||||
* users.count
|
||||
* storage.buckets.count
|
||||
* storage.files.count
|
||||
* storage.buckets.{bucketId}.files.count
|
||||
* database.collections.count
|
||||
* database.documents.count
|
||||
* database.collections.{collectionId}.documents.count
|
||||
*
|
||||
* Totals
|
||||
*
|
||||
* storage.total
|
||||
*
|
||||
*/
|
||||
function getDatabase(Registry &$register, string $namespace): Database
|
||||
{
|
||||
$attempts = 0;
|
||||
|
||||
do {
|
||||
try {
|
||||
$attempts++;
|
||||
|
||||
$db = $register->get('db');
|
||||
$redis = $register->get('cache');
|
||||
|
||||
$cache = new Cache(new RedisCache($redis));
|
||||
$database = new Database(new MariaDB($db), $cache);
|
||||
$database->setDefaultDatabase(App::getEnv('_APP_DB_SCHEMA', 'appwrite'));
|
||||
$database->setNamespace($namespace);
|
||||
|
||||
if (!$database->exists($database->getDefaultDatabase(), 'projects')) {
|
||||
throw new Exception('Projects collection not ready');
|
||||
}
|
||||
break; // leave loop if successful
|
||||
} catch (\Exception$e) {
|
||||
Console::warning("Database not ready. Retrying connection ({$attempts})...");
|
||||
if ($attempts >= DATABASE_RECONNECT_MAX_ATTEMPTS) {
|
||||
throw new \Exception('Failed to connect to database: ' . $e->getMessage());
|
||||
}
|
||||
sleep(DATABASE_RECONNECT_SLEEP);
|
||||
}
|
||||
} while ($attempts < DATABASE_RECONNECT_MAX_ATTEMPTS);
|
||||
|
||||
return $database;
|
||||
}
|
||||
|
||||
function getInfluxDB(Registry &$register): InfluxDatabase
|
||||
{
|
||||
/** @var InfluxDB\Client $client */
|
||||
$client = $register->get('influxdb');
|
||||
$attempts = 0;
|
||||
$max = 10;
|
||||
$sleep = 1;
|
||||
|
||||
do { // check if telegraf database is ready
|
||||
try {
|
||||
$attempts++;
|
||||
$database = $client->selectDB('telegraf');
|
||||
if (in_array('telegraf', $client->listDatabases())) {
|
||||
break; // leave the do-while if successful
|
||||
}
|
||||
} catch (\Throwable$th) {
|
||||
Console::warning("InfluxDB not ready. Retrying connection ({$attempts})...");
|
||||
if ($attempts >= $max) {
|
||||
throw new \Exception('InfluxDB database not ready yet');
|
||||
}
|
||||
sleep($sleep);
|
||||
}
|
||||
} while ($attempts < $max);
|
||||
return $database;
|
||||
}
|
||||
|
||||
$logError = function (Throwable $error, string $action = 'syncUsageStats') use ($register) {
|
||||
$logger = $register->get('logger');
|
||||
|
||||
if ($logger) {
|
||||
$version = App::getEnv('_APP_VERSION', 'UNKNOWN');
|
||||
|
||||
$log = new Log();
|
||||
$log->setNamespace("realtime");
|
||||
$log->setServer(\gethostname());
|
||||
$log->setVersion($version);
|
||||
$log->setType(Log::TYPE_ERROR);
|
||||
$log->setMessage($error->getMessage());
|
||||
|
||||
$log->addTag('code', $error->getCode());
|
||||
$log->addTag('verboseType', get_class($error));
|
||||
|
||||
$log->addExtra('file', $error->getFile());
|
||||
$log->addExtra('line', $error->getLine());
|
||||
$log->addExtra('trace', $error->getTraceAsString());
|
||||
$log->addExtra('detailedTrace', $error->getTrace());
|
||||
|
||||
$log->setAction($action);
|
||||
|
||||
$isProduction = App::getEnv('_APP_ENV', 'development') === 'production';
|
||||
$log->setEnvironment($isProduction ? Log::ENVIRONMENT_PRODUCTION : Log::ENVIRONMENT_STAGING);
|
||||
|
||||
$responseCode = $logger->addLog($log);
|
||||
Console::info('Usage stats log pushed with status code: ' . $responseCode);
|
||||
}
|
||||
|
||||
Console::warning("Failed: {$error->getMessage()}");
|
||||
Console::warning($error->getTraceAsString());
|
||||
};
|
||||
|
||||
$cli
|
||||
->task('usage')
|
||||
->desc('Schedules syncing data from influxdb to Appwrite console db')
|
||||
->action(function () use ($register) {
|
||||
->action(function () use ($register, $logError) {
|
||||
Console::title('Usage Aggregation V1');
|
||||
Console::success(APP_NAME . ' usage aggregation process v1 has started');
|
||||
|
||||
$interval = (int) App::getEnv('_APP_USAGE_AGGREGATION_INTERVAL', '30'); // 30 seconds (by default)
|
||||
$periods = [
|
||||
[
|
||||
'key' => '30m',
|
||||
'startTime' => '-24 hours',
|
||||
],
|
||||
[
|
||||
'key' => '1d',
|
||||
'startTime' => '-90 days',
|
||||
],
|
||||
];
|
||||
|
||||
// all the metrics that we are collecting at the moment
|
||||
$globalMetrics = [
|
||||
'requests' => [
|
||||
'table' => 'appwrite_usage_requests_all',
|
||||
],
|
||||
'network' => [
|
||||
'table' => 'appwrite_usage_network_all',
|
||||
],
|
||||
'executions' => [
|
||||
'table' => 'appwrite_usage_executions_all',
|
||||
],
|
||||
'database.collections.create' => [
|
||||
'table' => 'appwrite_usage_database_collections_create',
|
||||
],
|
||||
'database.collections.read' => [
|
||||
'table' => 'appwrite_usage_database_collections_read',
|
||||
],
|
||||
'database.collections.update' => [
|
||||
'table' => 'appwrite_usage_database_collections_update',
|
||||
],
|
||||
'database.collections.delete' => [
|
||||
'table' => 'appwrite_usage_database_collections_delete',
|
||||
],
|
||||
'database.documents.create' => [
|
||||
'table' => 'appwrite_usage_database_documents_create',
|
||||
],
|
||||
'database.documents.read' => [
|
||||
'table' => 'appwrite_usage_database_documents_read',
|
||||
],
|
||||
'database.documents.update' => [
|
||||
'table' => 'appwrite_usage_database_documents_update',
|
||||
],
|
||||
'database.documents.delete' => [
|
||||
'table' => 'appwrite_usage_database_documents_delete',
|
||||
],
|
||||
'database.collections.collectionId.documents.create' => [
|
||||
'table' => 'appwrite_usage_database_documents_create',
|
||||
'groupBy' => 'collectionId',
|
||||
],
|
||||
'database.collections.collectionId.documents.read' => [
|
||||
'table' => 'appwrite_usage_database_documents_read',
|
||||
'groupBy' => 'collectionId',
|
||||
],
|
||||
'database.collections.collectionId.documents.update' => [
|
||||
'table' => 'appwrite_usage_database_documents_update',
|
||||
'groupBy' => 'collectionId',
|
||||
],
|
||||
'database.collections.collectionId.documents.delete' => [
|
||||
'table' => 'appwrite_usage_database_documents_delete',
|
||||
'groupBy' => 'collectionId',
|
||||
],
|
||||
'storage.buckets.create' => [
|
||||
'table' => 'appwrite_usage_storage_buckets_create',
|
||||
],
|
||||
'storage.buckets.read' => [
|
||||
'table' => 'appwrite_usage_storage_buckets_read',
|
||||
],
|
||||
'storage.buckets.update' => [
|
||||
'table' => 'appwrite_usage_storage_buckets_update',
|
||||
],
|
||||
'storage.buckets.delete' => [
|
||||
'table' => 'appwrite_usage_storage_buckets_delete',
|
||||
],
|
||||
'storage.files.create' => [
|
||||
'table' => 'appwrite_usage_storage_files_create',
|
||||
],
|
||||
'storage.files.read' => [
|
||||
'table' => 'appwrite_usage_storage_files_read',
|
||||
],
|
||||
'storage.files.update' => [
|
||||
'table' => 'appwrite_usage_storage_files_update',
|
||||
],
|
||||
'storage.files.delete' => [
|
||||
'table' => 'appwrite_usage_storage_files_delete',
|
||||
],
|
||||
'storage.buckets.bucketId.files.create' => [
|
||||
'table' => 'appwrite_usage_storage_files_create',
|
||||
'groupBy' => 'bucketId',
|
||||
],
|
||||
'storage.buckets.bucketId.files.read' => [
|
||||
'table' => 'appwrite_usage_storage_files_read',
|
||||
'groupBy' => 'bucketId',
|
||||
],
|
||||
'storage.buckets.bucketId.files.update' => [
|
||||
'table' => 'appwrite_usage_storage_files_update',
|
||||
'groupBy' => 'bucketId',
|
||||
],
|
||||
'storage.buckets.bucketId.files.delete' => [
|
||||
'table' => 'appwrite_usage_storage_files_delete',
|
||||
'groupBy' => 'bucketId',
|
||||
],
|
||||
'users.create' => [
|
||||
'table' => 'appwrite_usage_users_create',
|
||||
],
|
||||
'users.read' => [
|
||||
'table' => 'appwrite_usage_users_read',
|
||||
],
|
||||
'users.update' => [
|
||||
'table' => 'appwrite_usage_users_update',
|
||||
],
|
||||
'users.delete' => [
|
||||
'table' => 'appwrite_usage_users_delete',
|
||||
],
|
||||
'users.sessions.create' => [
|
||||
'table' => 'appwrite_usage_users_sessions_create',
|
||||
],
|
||||
'users.sessions.provider.create' => [
|
||||
'table' => 'appwrite_usage_users_sessions_create',
|
||||
'groupBy' => 'provider',
|
||||
],
|
||||
'users.sessions.delete' => [
|
||||
'table' => 'appwrite_usage_users_sessions_delete',
|
||||
],
|
||||
'functions.functionId.executions' => [
|
||||
'table' => 'appwrite_usage_executions_all',
|
||||
'groupBy' => 'functionId',
|
||||
],
|
||||
'functions.functionId.compute' => [
|
||||
'table' => 'appwrite_usage_executions_time',
|
||||
'groupBy' => 'functionId',
|
||||
],
|
||||
'functions.functionId.failures' => [
|
||||
'table' => 'appwrite_usage_executions_all',
|
||||
'groupBy' => 'functionId',
|
||||
'filters' => [
|
||||
'functionStatus' => 'failed',
|
||||
],
|
||||
],
|
||||
];
|
||||
$database = getDatabase($register, '_console');
|
||||
$influxDB = getInfluxDB($register);
|
||||
|
||||
// TODO Maybe move this to the setResource method, and reuse in the http.php file
|
||||
$attempts = 0;
|
||||
$max = 10;
|
||||
$sleep = 1;
|
||||
$usage = new Usage($database, $influxDB, $logError);
|
||||
|
||||
$db = null;
|
||||
$redis = null;
|
||||
do { // connect to db
|
||||
try {
|
||||
$attempts++;
|
||||
$db = $register->get('db');
|
||||
$redis = $register->get('cache');
|
||||
break; // leave the do-while if successful
|
||||
} catch (\Exception $e) {
|
||||
Console::warning("Database not ready. Retrying connection ({$attempts})...");
|
||||
if ($attempts >= $max) {
|
||||
throw new \Exception('Failed to connect to database: ' . $e->getMessage());
|
||||
}
|
||||
sleep($sleep);
|
||||
}
|
||||
} while ($attempts < $max);
|
||||
|
||||
// TODO use inject
|
||||
$cacheAdapter = new Cache(new Redis($redis));
|
||||
$dbForProject = new Database(new MariaDB($db), $cacheAdapter);
|
||||
$dbForConsole = new Database(new MariaDB($db), $cacheAdapter);
|
||||
$dbForProject->setDefaultDatabase(App::getEnv('_APP_DB_SCHEMA', 'appwrite'));
|
||||
$dbForConsole->setDefaultDatabase(App::getEnv('_APP_DB_SCHEMA', 'appwrite'));
|
||||
$dbForConsole->setNamespace('_console');
|
||||
|
||||
$latestTime = [];
|
||||
$usageDB = new UsageDB($database, $logError);
|
||||
|
||||
Authorization::disable();
|
||||
|
||||
$iterations = 0;
|
||||
Console::loop(function () use ($interval, $register, $dbForProject, $dbForConsole, $globalMetrics, $periods, &$latestTime, &$iterations) {
|
||||
Console::loop(function () use ($interval, $usage, $usageDB, &$iterations) {
|
||||
$now = date('d-m-Y H:i:s', time());
|
||||
Console::info("[{$now}] Aggregating usage data every {$interval} seconds");
|
||||
|
||||
|
@ -274,105 +134,10 @@ $cli
|
|||
|
||||
/**
|
||||
* Aggregate InfluxDB every 30 seconds
|
||||
* @var InfluxDB\Client $client
|
||||
*/
|
||||
$client = $register->get('influxdb');
|
||||
$attempts = 0;
|
||||
$max = 10;
|
||||
$sleep = 1;
|
||||
$usage->collect();
|
||||
|
||||
do { // check if telegraf database is ready
|
||||
try {
|
||||
$attempts++;
|
||||
$database = $client->selectDB('telegraf');
|
||||
if (in_array('telegraf', $client->listDatabases())) {
|
||||
break; // leave the do-while if successful
|
||||
}
|
||||
} catch (\Throwable $th) {
|
||||
Console::warning("InfluxDB not ready. Retrying connection ({$attempts})...");
|
||||
if ($attempts >= $max) {
|
||||
throw new \Exception('InfluxDB database not ready yet');
|
||||
}
|
||||
sleep($sleep);
|
||||
}
|
||||
} while ($attempts < $max);
|
||||
|
||||
// sync data
|
||||
foreach ($globalMetrics as $metric => $options) { //for each metrics
|
||||
foreach ($periods as $period) { // aggregate data for each period
|
||||
$start = DateTime::createFromFormat('U', \strtotime($period['startTime']))->format(DateTime::RFC3339);
|
||||
if (!empty($latestTime[$metric][$period['key']])) {
|
||||
$start = DateTime::createFromFormat('U', $latestTime[$metric][$period['key']])->format(DateTime::RFC3339);
|
||||
}
|
||||
$end = DateTime::createFromFormat('U', \strtotime('now'))->format(DateTime::RFC3339);
|
||||
|
||||
$table = $options['table']; //Which influxdb table to query for this metric
|
||||
$groupBy = empty($options['groupBy']) ? '' : ', "' . $options['groupBy'] . '"'; //Some sub level metrics may be grouped by other tags like collectionId, bucketId, etc
|
||||
|
||||
$filters = $options['filters'] ?? []; // Some metrics might have additional filters, like function's status
|
||||
if (!empty($filters)) {
|
||||
$filters = ' AND ' . implode(' AND ', array_map(fn ($filter, $value) => "\"{$filter}\"='{$value}'", array_keys($filters), array_values($filters)));
|
||||
} else {
|
||||
$filters = '';
|
||||
}
|
||||
|
||||
$query = "SELECT sum(value) AS \"value\" FROM \"{$table}\" WHERE \"time\" > '{$start}' AND \"time\" < '{$end}' AND \"metric_type\"='counter' {$filters} GROUP BY time({$period['key']}), \"projectId\" {$groupBy} FILL(null)";
|
||||
try {
|
||||
$result = $database->query($query);
|
||||
|
||||
$points = $result->getPoints();
|
||||
foreach ($points as $point) {
|
||||
$projectId = $point['projectId'];
|
||||
|
||||
if (!empty($projectId) && $projectId !== 'console') {
|
||||
$dbForProject->setNamespace('_' . $projectId);
|
||||
$metricUpdated = $metric;
|
||||
|
||||
if (!empty($groupBy)) {
|
||||
$groupedBy = $point[$options['groupBy']] ?? '';
|
||||
if (empty($groupedBy)) {
|
||||
continue;
|
||||
}
|
||||
$metricUpdated = str_replace($options['groupBy'], $groupedBy, $metric);
|
||||
}
|
||||
|
||||
$time = \strtotime($point['time']);
|
||||
$id = \md5($time . '_' . $period['key'] . '_' . $metricUpdated); //Construct unique id for each metric using time, period and metric
|
||||
$value = (!empty($point['value'])) ? $point['value'] : 0;
|
||||
|
||||
try {
|
||||
$document = $dbForProject->getDocument('stats', $id);
|
||||
if ($document->isEmpty()) {
|
||||
$dbForProject->createDocument('stats', new Document([
|
||||
'$id' => $id,
|
||||
'period' => $period['key'],
|
||||
'time' => $time,
|
||||
'metric' => $metricUpdated,
|
||||
'value' => $value,
|
||||
'type' => 0,
|
||||
]));
|
||||
} else {
|
||||
$dbForProject->updateDocument(
|
||||
'stats',
|
||||
$document->getId(),
|
||||
$document->setAttribute('value', $value)
|
||||
);
|
||||
}
|
||||
$latestTime[$metric][$period['key']] = $time;
|
||||
} catch (\Exception $e) { // if projects are deleted this might fail
|
||||
Console::warning("Failed to save data for project {$projectId} and metric {$metricUpdated}: {$e->getMessage()}");
|
||||
Console::warning($e->getTraceAsString());
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (\Exception $e) {
|
||||
Console::warning("Failed to Query: {$e->getMessage()}");
|
||||
Console::warning($e->getTraceAsString());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if ($iterations % 30 != 0) { // Aggregate aggregate number of objects in database only after 15 minutes
|
||||
if ($iterations % 30 != 0) { // return if 30 iterations has not passed
|
||||
$iterations++;
|
||||
$loopTook = microtime(true) - $loopStart;
|
||||
$now = date('d-m-Y H:i:s', time());
|
||||
|
@ -380,6 +145,7 @@ $cli
|
|||
return;
|
||||
}
|
||||
|
||||
$iterations = 0; // Reset iterations to prevent overflow when running for long time
|
||||
/**
|
||||
* Aggregate MariaDB every 15 minutes
|
||||
* Some of the queries here might contain full-table scans.
|
||||
|
@ -387,435 +153,7 @@ $cli
|
|||
$now = date('d-m-Y H:i:s', time());
|
||||
Console::info("[{$now}] Aggregating database counters.");
|
||||
|
||||
$latestProject = null;
|
||||
do { // Loop over all the projects
|
||||
$attempts = 0;
|
||||
$max = 10;
|
||||
$sleep = 1;
|
||||
|
||||
do { // list projects
|
||||
try {
|
||||
$attempts++;
|
||||
$projects = $dbForConsole->find('projects', [], 100, cursor: $latestProject);
|
||||
break; // leave the do-while if successful
|
||||
} catch (\Exception $e) {
|
||||
Console::warning("Console DB not ready yet. Retrying ({$attempts})...");
|
||||
if ($attempts >= $max) {
|
||||
throw new \Exception('Failed access console db: ' . $e->getMessage());
|
||||
}
|
||||
sleep($sleep);
|
||||
}
|
||||
} while ($attempts < $max);
|
||||
|
||||
if (empty($projects)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
$latestProject = $projects[array_key_last($projects)];
|
||||
|
||||
foreach ($projects as $project) {
|
||||
$projectId = $project->getId();
|
||||
|
||||
// Get total storage
|
||||
$dbForProject->setNamespace('_' . $projectId);
|
||||
$deploymentsTotal = $dbForProject->sum('deployments', 'size');
|
||||
|
||||
$time = (int) (floor(time() / 1800) * 1800); // Time rounded to nearest 30 minutes
|
||||
$id = \md5($time . '_30m_storage.deployments.total'); //Construct unique id for each metric using time, period and metric
|
||||
$document = $dbForProject->getDocument('stats', $id);
|
||||
try {
|
||||
if ($document->isEmpty()) {
|
||||
$dbForProject->createDocument('stats', new Document([
|
||||
'$id' => $id,
|
||||
'period' => '30m',
|
||||
'time' => $time,
|
||||
'metric' => 'storage.deployments.total',
|
||||
'value' => $deploymentsTotal,
|
||||
'type' => 1,
|
||||
]));
|
||||
} else {
|
||||
$dbForProject->updateDocument(
|
||||
'stats',
|
||||
$document->getId(),
|
||||
$document->setAttribute('value', $deploymentsTotal)
|
||||
);
|
||||
}
|
||||
$time = (int) (floor(time() / 86400) * 86400); // Time rounded to nearest day
|
||||
$id = \md5($time . '_1d_storage.deployments.total'); //Construct unique id for each metric using time, period and metric
|
||||
$document = $dbForProject->getDocument('stats', $id);
|
||||
if ($document->isEmpty()) {
|
||||
$dbForProject->createDocument('stats', new Document([
|
||||
'$id' => $id,
|
||||
'period' => '1d',
|
||||
'time' => $time,
|
||||
'metric' => 'storage.deployments.total',
|
||||
'value' => $deploymentsTotal,
|
||||
'type' => 1,
|
||||
]));
|
||||
} else {
|
||||
$dbForProject->updateDocument(
|
||||
'stats',
|
||||
$document->getId(),
|
||||
$document->setAttribute('value', $deploymentsTotal)
|
||||
);
|
||||
}
|
||||
} catch (\Exception $e) {
|
||||
Console::warning("Failed to save data for project {$projectId} and metric storage.deployments.total: {$e->getMessage()}");
|
||||
Console::warning($e->getTraceAsString());
|
||||
}
|
||||
|
||||
|
||||
$collections = [
|
||||
'users' => [
|
||||
'namespace' => '',
|
||||
],
|
||||
'collections' => [
|
||||
'metricPrefix' => 'database',
|
||||
'namespace' => '',
|
||||
'subCollections' => [ // Some collections, like collections and later buckets have child collections that need counting
|
||||
'documents' => [
|
||||
'collectionPrefix' => 'collection_',
|
||||
'namespace' => '',
|
||||
],
|
||||
],
|
||||
],
|
||||
'buckets' => [
|
||||
'metricPrefix' => 'storage',
|
||||
'namespace' => '',
|
||||
'subCollections' => [
|
||||
'files' => [
|
||||
'namespace' => '',
|
||||
'collectionPrefix' => 'bucket_',
|
||||
'total' => [
|
||||
'field' => 'sizeOriginal'
|
||||
]
|
||||
],
|
||||
]
|
||||
]
|
||||
];
|
||||
|
||||
foreach ($collections as $collection => $options) {
|
||||
try {
|
||||
$dbForProject->setNamespace("_{$projectId}");
|
||||
$count = $dbForProject->count($collection);
|
||||
$metricPrefix = $options['metricPrefix'] ?? '';
|
||||
$metric = empty($metricPrefix) ? "{$collection}.count" : "{$metricPrefix}.{$collection}.count";
|
||||
|
||||
$time = (int) (floor(time() / 1800) * 1800); // Time rounded to nearest 30 minutes
|
||||
$id = \md5($time . '_30m_' . $metric); //Construct unique id for each metric using time, period and metric
|
||||
$document = $dbForProject->getDocument('stats', $id);
|
||||
if ($document->isEmpty()) {
|
||||
$dbForProject->createDocument('stats', new Document([
|
||||
'$id' => $id,
|
||||
'time' => $time,
|
||||
'period' => '30m',
|
||||
'metric' => $metric,
|
||||
'value' => $count,
|
||||
'type' => 1,
|
||||
]));
|
||||
} else {
|
||||
$dbForProject->updateDocument(
|
||||
'stats',
|
||||
$document->getId(),
|
||||
$document->setAttribute('value', $count)
|
||||
);
|
||||
}
|
||||
|
||||
$time = (int) (floor(time() / 86400) * 86400); // Time rounded to nearest day
|
||||
$id = \md5($time . '_1d_' . $metric); //Construct unique id for each metric using time, period and metric
|
||||
$document = $dbForProject->getDocument('stats', $id);
|
||||
if ($document->isEmpty()) {
|
||||
$dbForProject->createDocument('stats', new Document([
|
||||
'$id' => $id,
|
||||
'time' => $time,
|
||||
'period' => '1d',
|
||||
'metric' => $metric,
|
||||
'value' => $count,
|
||||
'type' => 1,
|
||||
]));
|
||||
} else {
|
||||
$dbForProject->updateDocument(
|
||||
'stats',
|
||||
$document->getId(),
|
||||
$document->setAttribute('value', $count)
|
||||
);
|
||||
}
|
||||
|
||||
$subCollections = $options['subCollections'] ?? [];
|
||||
|
||||
if (empty($subCollections)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
$latestParent = null;
|
||||
$subCollectionCounts = []; //total project level count of sub collections
|
||||
$subCollectionTotals = []; //total project level sum of sub collections
|
||||
|
||||
do { // Loop over all the parent collection document for each sub collection
|
||||
$dbForProject->setNamespace("_{$projectId}");
|
||||
$parents = $dbForProject->find($collection, [], 100, cursor: $latestParent); // Get all the parents for the sub collections for example for documents, this will get all the collections
|
||||
|
||||
if (empty($parents)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
$latestParent = $parents[array_key_last($parents)];
|
||||
|
||||
foreach ($parents as $parent) {
|
||||
foreach ($subCollections as $subCollection => $subOptions) { // Sub collection counts, like database.collections.collectionId.documents.count
|
||||
$dbForProject->setNamespace("_{$projectId}");
|
||||
$count = $dbForProject->count(($subOptions['collectionPrefix'] ?? '') . $parent->getInternalId());
|
||||
|
||||
$subCollectionCounts[$subCollection] = ($subCollectionCounts[$subCollection] ?? 0) + $count; // Project level counts for sub collections like database.documents.count
|
||||
|
||||
$dbForProject->setNamespace("_{$projectId}");
|
||||
|
||||
$metric = empty($metricPrefix) ? "{$collection}.{$parent->getId()}.{$subCollection}.count" : "{$metricPrefix}.{$collection}.{$parent->getInternalId()}.{$subCollection}.count";
|
||||
$time = (int) (floor(time() / 1800) * 1800); // Time rounded to nearest 30 minutes
|
||||
$id = \md5($time . '_30m_' . $metric); //Construct unique id for each metric using time, period and metric
|
||||
$document = $dbForProject->getDocument('stats', $id);
|
||||
if ($document->isEmpty()) {
|
||||
$dbForProject->createDocument('stats', new Document([
|
||||
'$id' => $id,
|
||||
'time' => $time,
|
||||
'period' => '30m',
|
||||
'metric' => $metric,
|
||||
'value' => $count,
|
||||
'type' => 1,
|
||||
]));
|
||||
} else {
|
||||
$dbForProject->updateDocument(
|
||||
'stats',
|
||||
$document->getId(),
|
||||
$document->setAttribute('value', $count)
|
||||
);
|
||||
}
|
||||
|
||||
$time = (int) (floor(time() / 86400) * 86400); // Time rounded to nearest day
|
||||
$id = \md5($time . '_1d_' . $metric); //Construct unique id for each metric using time, period and metric
|
||||
$document = $dbForProject->getDocument('stats', $id);
|
||||
if ($document->isEmpty()) {
|
||||
$dbForProject->createDocument('stats', new Document([
|
||||
'$id' => $id,
|
||||
'time' => $time,
|
||||
'period' => '1d',
|
||||
'metric' => $metric,
|
||||
'value' => $count,
|
||||
'type' => 1,
|
||||
]));
|
||||
} else {
|
||||
$dbForProject->updateDocument(
|
||||
'stats',
|
||||
$document->getId(),
|
||||
$document->setAttribute('value', $count)
|
||||
);
|
||||
}
|
||||
|
||||
// check if sum calculation is required
|
||||
$total = $subOptions['total'] ?? [];
|
||||
if (empty($total)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
$dbForProject->setNamespace("_{$projectId}");
|
||||
$total = (int) $dbForProject->sum(($subOptions['collectionPrefix'] ?? '') . $parent->getInternalId(), $total['field']);
|
||||
|
||||
$subCollectionTotals[$subCollection] = ($ssubCollectionTotals[$subCollection] ?? 0) + $total; // Project level sum for sub collections like storage.total
|
||||
|
||||
$dbForProject->setNamespace("_{$projectId}");
|
||||
|
||||
$metric = empty($metricPrefix) ? "{$collection}.{$parent->getId()}.{$subCollection}.total" : "{$metricPrefix}.{$collection}.{$parent->getInternalId()}.{$subCollection}.total";
|
||||
$time = (int) (floor(time() / 1800) * 1800); // Time rounded to nearest 30 minutes
|
||||
$id = \md5($time . '_30m_' . $metric); //Construct unique id for each metric using time, period and metric
|
||||
$document = $dbForProject->getDocument('stats', $id);
|
||||
if ($document->isEmpty()) {
|
||||
$dbForProject->createDocument('stats', new Document([
|
||||
'$id' => $id,
|
||||
'time' => $time,
|
||||
'period' => '30m',
|
||||
'metric' => $metric,
|
||||
'value' => $total,
|
||||
'type' => 1,
|
||||
]));
|
||||
} else {
|
||||
$dbForProject->updateDocument(
|
||||
'stats',
|
||||
$document->getId(),
|
||||
$document->setAttribute('value', $total)
|
||||
);
|
||||
}
|
||||
|
||||
$time = (int) (floor(time() / 86400) * 86400); // Time rounded to nearest day
|
||||
$id = \md5($time . '_1d_' . $metric); //Construct unique id for each metric using time, period and metric
|
||||
$document = $dbForProject->getDocument('stats', $id);
|
||||
if ($document->isEmpty()) {
|
||||
$dbForProject->createDocument('stats', new Document([
|
||||
'$id' => $id,
|
||||
'time' => $time,
|
||||
'period' => '1d',
|
||||
'metric' => $metric,
|
||||
'value' => $total,
|
||||
'type' => 1,
|
||||
]));
|
||||
} else {
|
||||
$dbForProject->updateDocument(
|
||||
'stats',
|
||||
$document->getId(),
|
||||
$document->setAttribute('value', $total)
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
} while (!empty($parents));
|
||||
|
||||
/**
|
||||
* Inserting project level counts for sub collections like database.documents.count
|
||||
*/
|
||||
foreach ($subCollectionCounts as $subCollection => $count) {
|
||||
$dbForProject->setNamespace("_{$projectId}");
|
||||
|
||||
$metric = empty($metricPrefix) ? "{$subCollection}.count" : "{$metricPrefix}.{$subCollection}.count";
|
||||
|
||||
$time = (int) (floor(time() / 1800) * 1800); // Time rounded to nearest 30 minutes
|
||||
$id = \md5($time . '_30m_' . $metric); //Construct unique id for each metric using time, period and metric
|
||||
$document = $dbForProject->getDocument('stats', $id);
|
||||
if ($document->isEmpty()) {
|
||||
$dbForProject->createDocument('stats', new Document([
|
||||
'$id' => $id,
|
||||
'time' => $time,
|
||||
'period' => '30m',
|
||||
'metric' => $metric,
|
||||
'value' => $count,
|
||||
'type' => 1,
|
||||
]));
|
||||
} else {
|
||||
$dbForProject->updateDocument(
|
||||
'stats',
|
||||
$document->getId(),
|
||||
$document->setAttribute('value', $count)
|
||||
);
|
||||
}
|
||||
|
||||
$time = (int) (floor(time() / 86400) * 86400); // Time rounded to nearest day
|
||||
$id = \md5($time . '_1d_' . $metric); //Construct unique id for each metric using time, period and metric
|
||||
$document = $dbForProject->getDocument('stats', $id);
|
||||
if ($document->isEmpty()) {
|
||||
$dbForProject->createDocument('stats', new Document([
|
||||
'$id' => $id,
|
||||
'time' => $time,
|
||||
'period' => '1d',
|
||||
'metric' => $metric,
|
||||
'value' => $count,
|
||||
'type' => 1,
|
||||
]));
|
||||
} else {
|
||||
$dbForProject->updateDocument(
|
||||
'stats',
|
||||
$document->getId(),
|
||||
$document->setAttribute('value', $count)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Inserting project level sums for sub collections like storage.files.total
|
||||
*/
|
||||
foreach ($subCollectionTotals as $subCollection => $count) {
|
||||
$dbForProject->setNamespace("_{$projectId}");
|
||||
|
||||
$metric = empty($metricPrefix) ? "{$subCollection}.total" : "{$metricPrefix}.{$subCollection}.total";
|
||||
|
||||
$time = (int) (floor(time() / 1800) * 1800); // Time rounded to nearest 30 minutes
|
||||
$id = \md5($time . '_30m_' . $metric); //Construct unique id for each metric using time, period and metric
|
||||
$document = $dbForProject->getDocument('stats', $id);
|
||||
if ($document->isEmpty()) {
|
||||
$dbForProject->createDocument('stats', new Document([
|
||||
'$id' => $id,
|
||||
'time' => $time,
|
||||
'period' => '30m',
|
||||
'metric' => $metric,
|
||||
'value' => $count,
|
||||
'type' => 1,
|
||||
]));
|
||||
} else {
|
||||
$dbForProject->updateDocument(
|
||||
'stats',
|
||||
$document->getId(),
|
||||
$document->setAttribute('value', $count)
|
||||
);
|
||||
}
|
||||
|
||||
$time = (int) (floor(time() / 86400) * 86400); // Time rounded to nearest day
|
||||
$id = \md5($time . '_1d_' . $metric); //Construct unique id for each metric using time, period and metric
|
||||
$document = $dbForProject->getDocument('stats', $id);
|
||||
if ($document->isEmpty()) {
|
||||
$dbForProject->createDocument('stats', new Document([
|
||||
'$id' => $id,
|
||||
'time' => $time,
|
||||
'period' => '1d',
|
||||
'metric' => $metric,
|
||||
'value' => $count,
|
||||
'type' => 1,
|
||||
]));
|
||||
} else {
|
||||
$dbForProject->updateDocument(
|
||||
'stats',
|
||||
$document->getId(),
|
||||
$document->setAttribute('value', $count)
|
||||
);
|
||||
}
|
||||
|
||||
// aggregate storage.total = storage.files.total + storage.deployments.total
|
||||
if ($metricPrefix === 'storage' && $subCollection === 'files') {
|
||||
$metric = 'storage.total';
|
||||
$time = (int) (floor(time() / 1800) * 1800); // Time rounded to nearest 30 minutes
|
||||
$id = \md5($time . '_30m_' . $metric); //Construct unique id for each metric using time, period and metric
|
||||
$document = $dbForProject->getDocument('stats', $id);
|
||||
if ($document->isEmpty()) {
|
||||
$dbForProject->createDocument('stats', new Document([
|
||||
'$id' => $id,
|
||||
'time' => $time,
|
||||
'period' => '30m',
|
||||
'metric' => $metric,
|
||||
'value' => $count + $deploymentsTotal,
|
||||
'type' => 1,
|
||||
]));
|
||||
} else {
|
||||
$dbForProject->updateDocument(
|
||||
'stats',
|
||||
$document->getId(),
|
||||
$document->setAttribute('value', $count + $deploymentsTotal)
|
||||
);
|
||||
}
|
||||
|
||||
$time = (int) (floor(time() / 86400) * 86400); // Time rounded to nearest day
|
||||
$id = \md5($time . '_1d_' . $metric); //Construct unique id for each metric using time, period and metric
|
||||
$document = $dbForProject->getDocument('stats', $id);
|
||||
if ($document->isEmpty()) {
|
||||
$dbForProject->createDocument('stats', new Document([
|
||||
'$id' => $id,
|
||||
'time' => $time,
|
||||
'period' => '1d',
|
||||
'metric' => $metric,
|
||||
'value' => $count + $deploymentsTotal,
|
||||
'type' => 1,
|
||||
]));
|
||||
} else {
|
||||
$dbForProject->updateDocument(
|
||||
'stats',
|
||||
$document->getId(),
|
||||
$document->setAttribute('value', $count + $deploymentsTotal)
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (\Exception$e) {
|
||||
Console::warning("Failed to save database counters data for project {$collection}: {$e->getMessage()}");
|
||||
Console::warning($e->getTraceAsString());
|
||||
}
|
||||
}
|
||||
}
|
||||
} while (!empty($projects));
|
||||
$usageDB->collect();
|
||||
|
||||
$iterations++;
|
||||
$loopTook = microtime(true) - $loopStart;
|
||||
|
|
|
@ -552,6 +552,8 @@ services:
|
|||
- _APP_REDIS_PORT
|
||||
- _APP_REDIS_USER
|
||||
- _APP_REDIS_PASS
|
||||
- _APP_LOGGING_PROVIDER
|
||||
- _APP_LOGGING_CONFIG
|
||||
|
||||
appwrite-schedule:
|
||||
image: <?php echo $organization; ?>/<?php echo $image; ?>:<?php echo $version."\n"; ?>
|
||||
|
|
|
@ -588,6 +588,8 @@ services:
|
|||
- _APP_REDIS_PORT
|
||||
- _APP_REDIS_USER
|
||||
- _APP_REDIS_PASS
|
||||
- _APP_LOGGING_PROVIDER
|
||||
- _APP_LOGGING_CONFIG
|
||||
|
||||
appwrite-schedule:
|
||||
entrypoint: schedule
|
||||
|
|
301
src/Appwrite/Stats/Usage.php
Normal file
301
src/Appwrite/Stats/Usage.php
Normal file
|
@ -0,0 +1,301 @@
|
|||
<?php
|
||||
|
||||
namespace Appwrite\Stats;
|
||||
|
||||
use Utopia\Database\Database;
|
||||
use Utopia\Database\Document;
|
||||
use InfluxDB\Database as InfluxDatabase;
|
||||
use DateTime;
|
||||
|
||||
class Usage
|
||||
{
|
||||
protected InfluxDatabase $influxDB;
|
||||
protected Database $database;
|
||||
protected $errorHandler;
|
||||
private array $latestTime = [];
|
||||
|
||||
// all the mertics that we are collecting
|
||||
protected array $metrics = [
|
||||
'requests' => [
|
||||
'table' => 'appwrite_usage_requests_all',
|
||||
],
|
||||
'network' => [
|
||||
'table' => 'appwrite_usage_network_all',
|
||||
],
|
||||
'executions' => [
|
||||
'table' => 'appwrite_usage_executions_all',
|
||||
],
|
||||
'database.collections.create' => [
|
||||
'table' => 'appwrite_usage_database_collections_create',
|
||||
],
|
||||
'database.collections.read' => [
|
||||
'table' => 'appwrite_usage_database_collections_read',
|
||||
],
|
||||
'database.collections.update' => [
|
||||
'table' => 'appwrite_usage_database_collections_update',
|
||||
],
|
||||
'database.collections.delete' => [
|
||||
'table' => 'appwrite_usage_database_collections_delete',
|
||||
],
|
||||
'database.documents.create' => [
|
||||
'table' => 'appwrite_usage_database_documents_create',
|
||||
],
|
||||
'database.documents.read' => [
|
||||
'table' => 'appwrite_usage_database_documents_read',
|
||||
],
|
||||
'database.documents.update' => [
|
||||
'table' => 'appwrite_usage_database_documents_update',
|
||||
],
|
||||
'database.documents.delete' => [
|
||||
'table' => 'appwrite_usage_database_documents_delete',
|
||||
],
|
||||
'database.collections.collectionId.documents.create' => [
|
||||
'table' => 'appwrite_usage_database_documents_create',
|
||||
'groupBy' => 'collectionId',
|
||||
],
|
||||
'database.collections.collectionId.documents.read' => [
|
||||
'table' => 'appwrite_usage_database_documents_read',
|
||||
'groupBy' => 'collectionId',
|
||||
],
|
||||
'database.collections.collectionId.documents.update' => [
|
||||
'table' => 'appwrite_usage_database_documents_update',
|
||||
'groupBy' => 'collectionId',
|
||||
],
|
||||
'database.collections.collectionId.documents.delete' => [
|
||||
'table' => 'appwrite_usage_database_documents_delete',
|
||||
'groupBy' => 'collectionId',
|
||||
],
|
||||
'storage.buckets.create' => [
|
||||
'table' => 'appwrite_usage_storage_buckets_create',
|
||||
],
|
||||
'storage.buckets.read' => [
|
||||
'table' => 'appwrite_usage_storage_buckets_read',
|
||||
],
|
||||
'storage.buckets.update' => [
|
||||
'table' => 'appwrite_usage_storage_buckets_update',
|
||||
],
|
||||
'storage.buckets.delete' => [
|
||||
'table' => 'appwrite_usage_storage_buckets_delete',
|
||||
],
|
||||
'storage.files.create' => [
|
||||
'table' => 'appwrite_usage_storage_files_create',
|
||||
],
|
||||
'storage.files.read' => [
|
||||
'table' => 'appwrite_usage_storage_files_read',
|
||||
],
|
||||
'storage.files.update' => [
|
||||
'table' => 'appwrite_usage_storage_files_update',
|
||||
],
|
||||
'storage.files.delete' => [
|
||||
'table' => 'appwrite_usage_storage_files_delete',
|
||||
],
|
||||
'storage.buckets.bucketId.files.create' => [
|
||||
'table' => 'appwrite_usage_storage_files_create',
|
||||
'groupBy' => 'bucketId',
|
||||
],
|
||||
'storage.buckets.bucketId.files.read' => [
|
||||
'table' => 'appwrite_usage_storage_files_read',
|
||||
'groupBy' => 'bucketId',
|
||||
],
|
||||
'storage.buckets.bucketId.files.update' => [
|
||||
'table' => 'appwrite_usage_storage_files_update',
|
||||
'groupBy' => 'bucketId',
|
||||
],
|
||||
'storage.buckets.bucketId.files.delete' => [
|
||||
'table' => 'appwrite_usage_storage_files_delete',
|
||||
'groupBy' => 'bucketId',
|
||||
],
|
||||
'users.create' => [
|
||||
'table' => 'appwrite_usage_users_create',
|
||||
],
|
||||
'users.read' => [
|
||||
'table' => 'appwrite_usage_users_read',
|
||||
],
|
||||
'users.update' => [
|
||||
'table' => 'appwrite_usage_users_update',
|
||||
],
|
||||
'users.delete' => [
|
||||
'table' => 'appwrite_usage_users_delete',
|
||||
],
|
||||
'users.sessions.create' => [
|
||||
'table' => 'appwrite_usage_users_sessions_create',
|
||||
],
|
||||
'users.sessions.provider.create' => [
|
||||
'table' => 'appwrite_usage_users_sessions_create',
|
||||
'groupBy' => 'provider',
|
||||
],
|
||||
'users.sessions.delete' => [
|
||||
'table' => 'appwrite_usage_users_sessions_delete',
|
||||
],
|
||||
'functions.functionId.executions' => [
|
||||
'table' => 'appwrite_usage_executions_all',
|
||||
'groupBy' => 'functionId',
|
||||
],
|
||||
'functions.functionId.compute' => [
|
||||
'table' => 'appwrite_usage_executions_time',
|
||||
'groupBy' => 'functionId',
|
||||
],
|
||||
'functions.functionId.failures' => [
|
||||
'table' => 'appwrite_usage_executions_all',
|
||||
'groupBy' => 'functionId',
|
||||
'filters' => [
|
||||
'functionStatus' => 'failed',
|
||||
],
|
||||
],
|
||||
];
|
||||
|
||||
protected array $periods = [
|
||||
[
|
||||
'key' => '30m',
|
||||
'multiplier' => 1800,
|
||||
'startTime' => '-24 hours',
|
||||
],
|
||||
[
|
||||
'key' => '1d',
|
||||
'multiplier' => 86400,
|
||||
'startTime' => '-90 days',
|
||||
],
|
||||
];
|
||||
|
||||
public function __construct(Database $database, InfluxDatabase $influxDB, callable $errorHandler = null)
|
||||
{
|
||||
$this->database = $database;
|
||||
$this->influxDB = $influxDB;
|
||||
$this->errorHandler = $errorHandler;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create or Update Mertic
|
||||
* Create or update each metric in the stats collection for the given project
|
||||
*
|
||||
* @param string $projectId
|
||||
* @param int $time
|
||||
* @param string $period
|
||||
* @param string $metric
|
||||
* @param int $value
|
||||
* @param int $type
|
||||
*
|
||||
* @return void
|
||||
*/
|
||||
private function createOrUpdateMetric(string $projectId, int $time, string $period, string $metric, int $value, int $type): void
|
||||
{
|
||||
$id = \md5("{$time}_{$period}_{$metric}");
|
||||
$this->database->setNamespace('_' . $projectId);
|
||||
try {
|
||||
$document = $this->database->getDocument('stats', $id);
|
||||
if ($document->isEmpty()) {
|
||||
$this->database->createDocument('stats', new Document([
|
||||
'$id' => $id,
|
||||
'period' => $period,
|
||||
'time' => $time,
|
||||
'metric' => $metric,
|
||||
'value' => $value,
|
||||
'type' => $type,
|
||||
]));
|
||||
} else {
|
||||
$this->database->updateDocument(
|
||||
'stats',
|
||||
$document->getId(),
|
||||
$document->setAttribute('value', $value)
|
||||
);
|
||||
}
|
||||
$this->latestTime[$metric][$period] = $time;
|
||||
} catch (\Exception $e) { // if projects are deleted this might fail
|
||||
if (is_callable($this->errorHandler)) {
|
||||
call_user_func($this->errorHandler, $e, "sync_project_{$projectId}_metric_{$metric}");
|
||||
} else {
|
||||
throw $e;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Sync From InfluxDB
|
||||
* Sync stats from influxDB to stats collection in the Appwrite database
|
||||
*
|
||||
* @param string $metric
|
||||
* @param array $options
|
||||
* @param array $period
|
||||
*
|
||||
* @return void
|
||||
*/
|
||||
private function syncFromInfluxDB(string $metric, array $options, array $period): void
|
||||
{
|
||||
$start = DateTime::createFromFormat('U', \strtotime($period['startTime']))->format(DateTime::RFC3339);
|
||||
if (!empty($this->latestTime[$metric][$period['key']])) {
|
||||
$start = DateTime::createFromFormat('U', $this->latestTime[$metric][$period['key']])->format(DateTime::RFC3339);
|
||||
}
|
||||
$end = DateTime::createFromFormat('U', \strtotime('now'))->format(DateTime::RFC3339);
|
||||
|
||||
$table = $options['table']; //Which influxdb table to query for this metric
|
||||
$groupBy = empty($options['groupBy']) ? '' : ', "' . $options['groupBy'] . '"'; //Some sub level metrics may be grouped by other tags like collectionId, bucketId, etc
|
||||
|
||||
$filters = $options['filters'] ?? []; // Some metrics might have additional filters, like function's status
|
||||
if (!empty($filters)) {
|
||||
$filters = ' AND ' . implode(' AND ', array_map(fn ($filter, $value) => "\"{$filter}\"='{$value}'", array_keys($filters), array_values($filters)));
|
||||
} else {
|
||||
$filters = '';
|
||||
}
|
||||
|
||||
$query = "SELECT sum(value) AS \"value\" ";
|
||||
$query .= "FROM \"{$table}\" ";
|
||||
$query .= "WHERE \"time\" > '{$start}' ";
|
||||
$query .= "AND \"time\" < '{$end}' ";
|
||||
$query .= "AND \"metric_type\"='counter' {$filters} ";
|
||||
$query .= "GROUP BY time({$period['key']}), \"projectId\" {$groupBy} ";
|
||||
$query .= "FILL(null)";
|
||||
$result = $this->influxDB->query($query);
|
||||
|
||||
$points = $result->getPoints();
|
||||
foreach ($points as $point) {
|
||||
$projectId = $point['projectId'];
|
||||
|
||||
if (!empty($projectId) && $projectId !== 'console') {
|
||||
$metricUpdated = $metric;
|
||||
|
||||
if (!empty($groupBy)) {
|
||||
$groupedBy = $point[$options['groupBy']] ?? '';
|
||||
if (empty($groupedBy)) {
|
||||
continue;
|
||||
}
|
||||
$metricUpdated = str_replace($options['groupBy'], $groupedBy, $metric);
|
||||
}
|
||||
|
||||
$time = \strtotime($point['time']);
|
||||
$value = (!empty($point['value'])) ? $point['value'] : 0;
|
||||
|
||||
$this->createOrUpdateMetric(
|
||||
$projectId,
|
||||
$time,
|
||||
$period['key'],
|
||||
$metricUpdated,
|
||||
$value,
|
||||
0
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Collect Stats
|
||||
* Collect all the stats from Influd DB to Database
|
||||
*
|
||||
* @return void
|
||||
*/
|
||||
public function collect(): void
|
||||
{
|
||||
foreach ($this->metrics as $metric => $options) { //for each metrics
|
||||
foreach ($this->periods as $period) { // aggregate data for each period
|
||||
try {
|
||||
$this->syncFromInfluxDB($metric, $options, $period);
|
||||
} catch (\Exception $e) {
|
||||
if (is_callable($this->errorHandler)) {
|
||||
call_user_func($this->errorHandler, $e);
|
||||
} else {
|
||||
throw $e;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
262
src/Appwrite/Stats/UsageDB.php
Normal file
262
src/Appwrite/Stats/UsageDB.php
Normal file
|
@ -0,0 +1,262 @@
|
|||
<?php
|
||||
|
||||
namespace Appwrite\Stats;
|
||||
|
||||
use Utopia\Database\Database;
|
||||
use Utopia\Database\Document;
|
||||
|
||||
class UsageDB extends Usage
|
||||
{
|
||||
public function __construct(Database $database, callable $errorHandler = null)
|
||||
{
|
||||
$this->database = $database;
|
||||
$this->errorHandler = $errorHandler;
|
||||
}
|
||||
/**
|
||||
* Create or Update Mertic
|
||||
* Create or update each metric in the stats collection for the given project
|
||||
*
|
||||
* @param string $projectId
|
||||
* @param string $metric
|
||||
* @param int $value
|
||||
*
|
||||
* @return void
|
||||
*/
|
||||
private function createOrUpdateMetric(string $projectId, string $metric, int $value): void
|
||||
{
|
||||
foreach ($this->periods as $options) {
|
||||
$period = $options['key'];
|
||||
$time = (int) (floor(time() / $options['multiplier']) * $options['multiplier']);
|
||||
$id = \md5("{$time}_{$period}_{$metric}");
|
||||
$this->database->setNamespace('_' . $projectId);
|
||||
try {
|
||||
$document = $this->database->getDocument('stats', $id);
|
||||
if ($document->isEmpty()) {
|
||||
$this->database->createDocument('stats', new Document([
|
||||
'$id' => $id,
|
||||
'period' => $period,
|
||||
'time' => $time,
|
||||
'metric' => $metric,
|
||||
'value' => $value,
|
||||
'type' => 1,
|
||||
]));
|
||||
} else {
|
||||
$this->database->updateDocument(
|
||||
'stats',
|
||||
$document->getId(),
|
||||
$document->setAttribute('value', $value)
|
||||
);
|
||||
}
|
||||
} catch (\Exception$e) { // if projects are deleted this might fail
|
||||
if (is_callable($this->errorHandler)) {
|
||||
call_user_func($this->errorHandler, $e, "sync_project_{$projectId}_metric_{$metric}");
|
||||
} else {
|
||||
throw $e;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Foreach Document
|
||||
* Call provided callback for each document in the collection
|
||||
*
|
||||
* @param string $projectId
|
||||
* @param string $collection
|
||||
* @param array $queries
|
||||
* @param callable $callback
|
||||
*
|
||||
* @return void
|
||||
*/
|
||||
private function foreachDocument(string $projectId, string $collection, array $queries, callable $callback): void
|
||||
{
|
||||
$limit = 50;
|
||||
$results = [];
|
||||
$sum = $limit;
|
||||
$latestDocument = null;
|
||||
$this->database->setNamespace('_' . $projectId);
|
||||
|
||||
while ($sum === $limit) {
|
||||
try {
|
||||
$results = $this->database->find($collection, $queries, $limit, cursor:$latestDocument);
|
||||
} catch (\Exception $e) {
|
||||
if (is_callable($this->errorHandler)) {
|
||||
call_user_func($this->errorHandler, $e, "fetch_documents_project_{$projectId}_collection_{$collection}");
|
||||
return;
|
||||
} else {
|
||||
throw $e;
|
||||
}
|
||||
}
|
||||
if (empty($results)) {
|
||||
return;
|
||||
}
|
||||
|
||||
$sum = count($results);
|
||||
|
||||
foreach ($results as $document) {
|
||||
if (is_callable($callback)) {
|
||||
$callback($document);
|
||||
}
|
||||
}
|
||||
$latestDocument = $results[array_key_last($results)];
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Sum
|
||||
* Calculate sum of a attribute of documents in collection
|
||||
*
|
||||
* @param string $projectId
|
||||
* @param string $collection
|
||||
* @param string $attribute
|
||||
* @param string $metric
|
||||
*
|
||||
* @return int
|
||||
*/
|
||||
private function sum(string $projectId, string $collection, string $attribute, string $metric): int
|
||||
{
|
||||
$this->database->setNamespace('_' . $projectId);
|
||||
try {
|
||||
$sum = (int) $this->database->sum($collection, $attribute);
|
||||
$this->createOrUpdateMetric($projectId, $metric, $sum);
|
||||
return $sum;
|
||||
} catch (\Exception $e) {
|
||||
if (is_callable($this->errorHandler)) {
|
||||
call_user_func($this->errorHandler, $e, "fetch_sum_project_{$projectId}_collection_{$collection}");
|
||||
} else {
|
||||
throw $e;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Count
|
||||
* Count number of documents in collection
|
||||
*
|
||||
* @param string $projectId
|
||||
* @param string $collection
|
||||
* @param string $metric
|
||||
*
|
||||
* @return int
|
||||
*/
|
||||
private function count(string $projectId, string $collection, string $metric): int
|
||||
{
|
||||
$this->database->setNamespace("_{$projectId}");
|
||||
try {
|
||||
$count = $this->database->count($collection);
|
||||
$this->createOrUpdateMetric($projectId, $metric, $count);
|
||||
return $count;
|
||||
} catch (\Exception $e) {
|
||||
if (is_callable($this->errorHandler)) {
|
||||
call_user_func($this->errorHandler, $e, "fetch_count_project_{$projectId}_collection_{$collection}");
|
||||
} else {
|
||||
throw $e;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Deployments Total
|
||||
* Total sum of storage used by deployments
|
||||
*
|
||||
* @param string $projectId
|
||||
*
|
||||
* @return int
|
||||
*/
|
||||
private function deploymentsTotal(string $projectId): int
|
||||
{
|
||||
return $this->sum($projectId, 'deployments', 'size', 'stroage.deployments.total');
|
||||
}
|
||||
|
||||
/**
|
||||
* Users Stats
|
||||
* Metric: users.count
|
||||
*
|
||||
* @param string $projectId
|
||||
*
|
||||
* @return void
|
||||
*/
|
||||
private function usersStats(string $projectId): void
|
||||
{
|
||||
$this->count($projectId, 'users', 'users.count');
|
||||
}
|
||||
|
||||
/**
|
||||
* Storage Stats
|
||||
* Metrics: storage.total, storage.files.total, storage.buckets.{bucketId}.files.total,
|
||||
* storage.buckets.count, storage.files.count, storage.buckets.{bucketId}.files.count
|
||||
*
|
||||
* @param string $projectId
|
||||
*
|
||||
* @return void
|
||||
*/
|
||||
private function storageStats(string $projectId): void
|
||||
{
|
||||
$deploymentsTotal = $this->deploymentsTotal($projectId);
|
||||
|
||||
$projectFilesTotal = 0;
|
||||
$projectFilesCount = 0;
|
||||
|
||||
$metric = 'storage.buckets.count';
|
||||
$this->count($projectId, 'buckets', $metric);
|
||||
|
||||
$this->foreachDocument($projectId, 'buckets', [], function ($bucket) use (&$projectFilesCount, &$projectFilesTotal, $projectId,) {
|
||||
$metric = "storage.buckets.{$bucket->getId()}.files.count";
|
||||
|
||||
$count = $this->count($projectId, 'bucket_' . $bucket->getInternalId(), $metric);
|
||||
$projectFilesCount += $count;
|
||||
|
||||
$metric = "storage.buckets.{$bucket->getId()}.files.total";
|
||||
$sum = $this->sum($projectId, 'bucket_' . $bucket->getInternalId(), 'sizeOriginal', $metric);
|
||||
$projectFilesTotal += $sum;
|
||||
});
|
||||
|
||||
$this->createOrUpdateMetric($projectId, 'storage.files.count', $projectFilesCount);
|
||||
$this->createOrUpdateMetric($projectId, 'storage.files.total', $projectFilesTotal);
|
||||
|
||||
$this->createOrUpdateMetric($projectId, 'storage.total', $projectFilesTotal + $deploymentsTotal);
|
||||
}
|
||||
|
||||
/**
|
||||
* Database Stats
|
||||
* Collect all database stats
|
||||
* Metrics: database.collections.count, database.collections.{collectionId}.documents.count,
|
||||
* database.documents.count
|
||||
*
|
||||
* @param string $projectId
|
||||
*
|
||||
* @return void
|
||||
*/
|
||||
private function databaseStats(string $projectId): void
|
||||
{
|
||||
$projectDocumentsCount = 0;
|
||||
|
||||
$metric = 'database.collections.count';
|
||||
$this->count($projectId, 'collections', $metric);
|
||||
|
||||
$this->foreachDocument($projectId, 'collections', [], function ($collection) use (&$projectDocumentsCount, $projectId,) {
|
||||
$metric = "database.collections.{$collection->getId()}.documents.count";
|
||||
|
||||
$count = $this->count($projectId, 'collection_' . $collection->getInternalId(), $metric);
|
||||
$projectDocumentsCount += $count;
|
||||
});
|
||||
|
||||
$this->createOrUpdateMetric($projectId, 'database.documents.count', $projectDocumentsCount);
|
||||
}
|
||||
|
||||
/**
|
||||
* Collect Stats
|
||||
* Collect all database related stats
|
||||
*
|
||||
* @return void
|
||||
*/
|
||||
public function collect(): void
|
||||
{
|
||||
$this->foreachDocument('console', 'projects', [], function ($project) {
|
||||
$projectId = $project->getId();
|
||||
$this->usersStats($projectId);
|
||||
$this->databaseStats($projectId);
|
||||
$this->storageStats($projectId);
|
||||
});
|
||||
}
|
||||
}
|
Loading…
Reference in a new issue