diff --git a/app/tasks/usage.php b/app/tasks/usage.php index 2ed08af63..106ace910 100644 --- a/app/tasks/usage.php +++ b/app/tasks/usage.php @@ -2,271 +2,131 @@ global $cli, $register; +use Appwrite\Stats\Usage; +use Appwrite\Stats\UsageDB; +use InfluxDB\Database as InfluxDatabase; use Utopia\App; -use Utopia\Cache\Adapter\Redis; +use Utopia\Cache\Adapter\Redis as RedisCache; use Utopia\Cache\Cache; use Utopia\CLI\Console; use Utopia\Database\Adapter\MariaDB; use Utopia\Database\Database; -use Utopia\Database\Document; use Utopia\Database\Validator\Authorization; +use Utopia\Registry\Registry; +use Utopia\Logger\Log; -/** - * Metrics We collect - * - * General - * - * requests - * network - * executions - * - * Database - * - * database.collections.create - * database.collections.read - * database.collections.update - * database.collections.delete - * database.documents.create - * database.documents.read - * database.documents.update - * database.documents.delete - * database.collections.{collectionId}.documents.create - * database.collections.{collectionId}.documents.read - * database.collections.{collectionId}.documents.update - * database.collections.{collectionId}.documents.delete - * - * Storage - * - * storage.buckets.create - * storage.buckets.read - * storage.buckets.update - * storage.buckets.delete - * storage.files.create - * storage.files.read - * storage.files.update - * storage.files.delete - * storage.buckets.{bucketId}.files.create - * storage.buckets.{bucketId}.files.read - * storage.buckets.{bucketId}.files.update - * storage.buckets.{bucketId}.files.delete - * - * Users - * - * users.create - * users.read - * users.update - * users.delete - * users.sessions.create - * users.sessions.{provider}.create - * users.sessions.delete - * - * Functions - * - * functions.{functionId}.executions - * functions.{functionId}.failures - * functions.{functionId}.compute - * - * Counters - * - * users.count - * storage.buckets.count - * storage.files.count - * storage.buckets.{bucketId}.files.count - * database.collections.count - * database.documents.count - * database.collections.{collectionId}.documents.count - * - * Totals - * - * storage.total - * - */ +function getDatabase(Registry &$register, string $namespace): Database +{ + $attempts = 0; + + do { + try { + $attempts++; + + $db = $register->get('db'); + $redis = $register->get('cache'); + + $cache = new Cache(new RedisCache($redis)); + $database = new Database(new MariaDB($db), $cache); + $database->setDefaultDatabase(App::getEnv('_APP_DB_SCHEMA', 'appwrite')); + $database->setNamespace($namespace); + + if (!$database->exists($database->getDefaultDatabase(), 'projects')) { + throw new Exception('Projects collection not ready'); + } + break; // leave loop if successful + } catch (\Exception$e) { + Console::warning("Database not ready. Retrying connection ({$attempts})..."); + if ($attempts >= DATABASE_RECONNECT_MAX_ATTEMPTS) { + throw new \Exception('Failed to connect to database: ' . $e->getMessage()); + } + sleep(DATABASE_RECONNECT_SLEEP); + } + } while ($attempts < DATABASE_RECONNECT_MAX_ATTEMPTS); + + return $database; +} + +function getInfluxDB(Registry &$register): InfluxDatabase +{ + /** @var InfluxDB\Client $client */ + $client = $register->get('influxdb'); + $attempts = 0; + $max = 10; + $sleep = 1; + + do { // check if telegraf database is ready + try { + $attempts++; + $database = $client->selectDB('telegraf'); + if (in_array('telegraf', $client->listDatabases())) { + break; // leave the do-while if successful + } + } catch (\Throwable$th) { + Console::warning("InfluxDB not ready. Retrying connection ({$attempts})..."); + if ($attempts >= $max) { + throw new \Exception('InfluxDB database not ready yet'); + } + sleep($sleep); + } + } while ($attempts < $max); + return $database; +} + +$logError = function (Throwable $error, string $action = 'syncUsageStats') use ($register) { + $logger = $register->get('logger'); + + if ($logger) { + $version = App::getEnv('_APP_VERSION', 'UNKNOWN'); + + $log = new Log(); + $log->setNamespace("realtime"); + $log->setServer(\gethostname()); + $log->setVersion($version); + $log->setType(Log::TYPE_ERROR); + $log->setMessage($error->getMessage()); + + $log->addTag('code', $error->getCode()); + $log->addTag('verboseType', get_class($error)); + + $log->addExtra('file', $error->getFile()); + $log->addExtra('line', $error->getLine()); + $log->addExtra('trace', $error->getTraceAsString()); + $log->addExtra('detailedTrace', $error->getTrace()); + + $log->setAction($action); + + $isProduction = App::getEnv('_APP_ENV', 'development') === 'production'; + $log->setEnvironment($isProduction ? Log::ENVIRONMENT_PRODUCTION : Log::ENVIRONMENT_STAGING); + + $responseCode = $logger->addLog($log); + Console::info('Usage stats log pushed with status code: ' . $responseCode); + } + + Console::warning("Failed: {$error->getMessage()}"); + Console::warning($error->getTraceAsString()); +}; $cli ->task('usage') ->desc('Schedules syncing data from influxdb to Appwrite console db') - ->action(function () use ($register) { + ->action(function () use ($register, $logError) { Console::title('Usage Aggregation V1'); Console::success(APP_NAME . ' usage aggregation process v1 has started'); $interval = (int) App::getEnv('_APP_USAGE_AGGREGATION_INTERVAL', '30'); // 30 seconds (by default) - $periods = [ - [ - 'key' => '30m', - 'startTime' => '-24 hours', - ], - [ - 'key' => '1d', - 'startTime' => '-90 days', - ], - ]; - // all the metrics that we are collecting at the moment - $globalMetrics = [ - 'requests' => [ - 'table' => 'appwrite_usage_requests_all', - ], - 'network' => [ - 'table' => 'appwrite_usage_network_all', - ], - 'executions' => [ - 'table' => 'appwrite_usage_executions_all', - ], - 'database.collections.create' => [ - 'table' => 'appwrite_usage_database_collections_create', - ], - 'database.collections.read' => [ - 'table' => 'appwrite_usage_database_collections_read', - ], - 'database.collections.update' => [ - 'table' => 'appwrite_usage_database_collections_update', - ], - 'database.collections.delete' => [ - 'table' => 'appwrite_usage_database_collections_delete', - ], - 'database.documents.create' => [ - 'table' => 'appwrite_usage_database_documents_create', - ], - 'database.documents.read' => [ - 'table' => 'appwrite_usage_database_documents_read', - ], - 'database.documents.update' => [ - 'table' => 'appwrite_usage_database_documents_update', - ], - 'database.documents.delete' => [ - 'table' => 'appwrite_usage_database_documents_delete', - ], - 'database.collections.collectionId.documents.create' => [ - 'table' => 'appwrite_usage_database_documents_create', - 'groupBy' => 'collectionId', - ], - 'database.collections.collectionId.documents.read' => [ - 'table' => 'appwrite_usage_database_documents_read', - 'groupBy' => 'collectionId', - ], - 'database.collections.collectionId.documents.update' => [ - 'table' => 'appwrite_usage_database_documents_update', - 'groupBy' => 'collectionId', - ], - 'database.collections.collectionId.documents.delete' => [ - 'table' => 'appwrite_usage_database_documents_delete', - 'groupBy' => 'collectionId', - ], - 'storage.buckets.create' => [ - 'table' => 'appwrite_usage_storage_buckets_create', - ], - 'storage.buckets.read' => [ - 'table' => 'appwrite_usage_storage_buckets_read', - ], - 'storage.buckets.update' => [ - 'table' => 'appwrite_usage_storage_buckets_update', - ], - 'storage.buckets.delete' => [ - 'table' => 'appwrite_usage_storage_buckets_delete', - ], - 'storage.files.create' => [ - 'table' => 'appwrite_usage_storage_files_create', - ], - 'storage.files.read' => [ - 'table' => 'appwrite_usage_storage_files_read', - ], - 'storage.files.update' => [ - 'table' => 'appwrite_usage_storage_files_update', - ], - 'storage.files.delete' => [ - 'table' => 'appwrite_usage_storage_files_delete', - ], - 'storage.buckets.bucketId.files.create' => [ - 'table' => 'appwrite_usage_storage_files_create', - 'groupBy' => 'bucketId', - ], - 'storage.buckets.bucketId.files.read' => [ - 'table' => 'appwrite_usage_storage_files_read', - 'groupBy' => 'bucketId', - ], - 'storage.buckets.bucketId.files.update' => [ - 'table' => 'appwrite_usage_storage_files_update', - 'groupBy' => 'bucketId', - ], - 'storage.buckets.bucketId.files.delete' => [ - 'table' => 'appwrite_usage_storage_files_delete', - 'groupBy' => 'bucketId', - ], - 'users.create' => [ - 'table' => 'appwrite_usage_users_create', - ], - 'users.read' => [ - 'table' => 'appwrite_usage_users_read', - ], - 'users.update' => [ - 'table' => 'appwrite_usage_users_update', - ], - 'users.delete' => [ - 'table' => 'appwrite_usage_users_delete', - ], - 'users.sessions.create' => [ - 'table' => 'appwrite_usage_users_sessions_create', - ], - 'users.sessions.provider.create' => [ - 'table' => 'appwrite_usage_users_sessions_create', - 'groupBy' => 'provider', - ], - 'users.sessions.delete' => [ - 'table' => 'appwrite_usage_users_sessions_delete', - ], - 'functions.functionId.executions' => [ - 'table' => 'appwrite_usage_executions_all', - 'groupBy' => 'functionId', - ], - 'functions.functionId.compute' => [ - 'table' => 'appwrite_usage_executions_time', - 'groupBy' => 'functionId', - ], - 'functions.functionId.failures' => [ - 'table' => 'appwrite_usage_executions_all', - 'groupBy' => 'functionId', - 'filters' => [ - 'functionStatus' => 'failed', - ], - ], - ]; + $database = getDatabase($register, '_console'); + $influxDB = getInfluxDB($register); - // TODO Maybe move this to the setResource method, and reuse in the http.php file - $attempts = 0; - $max = 10; - $sleep = 1; + $usage = new Usage($database, $influxDB, $logError); - $db = null; - $redis = null; - do { // connect to db - try { - $attempts++; - $db = $register->get('db'); - $redis = $register->get('cache'); - break; // leave the do-while if successful - } catch (\Exception $e) { - Console::warning("Database not ready. Retrying connection ({$attempts})..."); - if ($attempts >= $max) { - throw new \Exception('Failed to connect to database: ' . $e->getMessage()); - } - sleep($sleep); - } - } while ($attempts < $max); - - // TODO use inject - $cacheAdapter = new Cache(new Redis($redis)); - $dbForProject = new Database(new MariaDB($db), $cacheAdapter); - $dbForConsole = new Database(new MariaDB($db), $cacheAdapter); - $dbForProject->setDefaultDatabase(App::getEnv('_APP_DB_SCHEMA', 'appwrite')); - $dbForConsole->setDefaultDatabase(App::getEnv('_APP_DB_SCHEMA', 'appwrite')); - $dbForConsole->setNamespace('_console'); - - $latestTime = []; + $usageDB = new UsageDB($database, $logError); Authorization::disable(); $iterations = 0; - Console::loop(function () use ($interval, $register, $dbForProject, $dbForConsole, $globalMetrics, $periods, &$latestTime, &$iterations) { + Console::loop(function () use ($interval, $usage, $usageDB, &$iterations) { $now = date('d-m-Y H:i:s', time()); Console::info("[{$now}] Aggregating usage data every {$interval} seconds"); @@ -274,105 +134,10 @@ $cli /** * Aggregate InfluxDB every 30 seconds - * @var InfluxDB\Client $client */ - $client = $register->get('influxdb'); - $attempts = 0; - $max = 10; - $sleep = 1; + $usage->collect(); - do { // check if telegraf database is ready - try { - $attempts++; - $database = $client->selectDB('telegraf'); - if (in_array('telegraf', $client->listDatabases())) { - break; // leave the do-while if successful - } - } catch (\Throwable $th) { - Console::warning("InfluxDB not ready. Retrying connection ({$attempts})..."); - if ($attempts >= $max) { - throw new \Exception('InfluxDB database not ready yet'); - } - sleep($sleep); - } - } while ($attempts < $max); - - // sync data - foreach ($globalMetrics as $metric => $options) { //for each metrics - foreach ($periods as $period) { // aggregate data for each period - $start = DateTime::createFromFormat('U', \strtotime($period['startTime']))->format(DateTime::RFC3339); - if (!empty($latestTime[$metric][$period['key']])) { - $start = DateTime::createFromFormat('U', $latestTime[$metric][$period['key']])->format(DateTime::RFC3339); - } - $end = DateTime::createFromFormat('U', \strtotime('now'))->format(DateTime::RFC3339); - - $table = $options['table']; //Which influxdb table to query for this metric - $groupBy = empty($options['groupBy']) ? '' : ', "' . $options['groupBy'] . '"'; //Some sub level metrics may be grouped by other tags like collectionId, bucketId, etc - - $filters = $options['filters'] ?? []; // Some metrics might have additional filters, like function's status - if (!empty($filters)) { - $filters = ' AND ' . implode(' AND ', array_map(fn ($filter, $value) => "\"{$filter}\"='{$value}'", array_keys($filters), array_values($filters))); - } else { - $filters = ''; - } - - $query = "SELECT sum(value) AS \"value\" FROM \"{$table}\" WHERE \"time\" > '{$start}' AND \"time\" < '{$end}' AND \"metric_type\"='counter' {$filters} GROUP BY time({$period['key']}), \"projectId\" {$groupBy} FILL(null)"; - try { - $result = $database->query($query); - - $points = $result->getPoints(); - foreach ($points as $point) { - $projectId = $point['projectId']; - - if (!empty($projectId) && $projectId !== 'console') { - $dbForProject->setNamespace('_' . $projectId); - $metricUpdated = $metric; - - if (!empty($groupBy)) { - $groupedBy = $point[$options['groupBy']] ?? ''; - if (empty($groupedBy)) { - continue; - } - $metricUpdated = str_replace($options['groupBy'], $groupedBy, $metric); - } - - $time = \strtotime($point['time']); - $id = \md5($time . '_' . $period['key'] . '_' . $metricUpdated); //Construct unique id for each metric using time, period and metric - $value = (!empty($point['value'])) ? $point['value'] : 0; - - try { - $document = $dbForProject->getDocument('stats', $id); - if ($document->isEmpty()) { - $dbForProject->createDocument('stats', new Document([ - '$id' => $id, - 'period' => $period['key'], - 'time' => $time, - 'metric' => $metricUpdated, - 'value' => $value, - 'type' => 0, - ])); - } else { - $dbForProject->updateDocument( - 'stats', - $document->getId(), - $document->setAttribute('value', $value) - ); - } - $latestTime[$metric][$period['key']] = $time; - } catch (\Exception $e) { // if projects are deleted this might fail - Console::warning("Failed to save data for project {$projectId} and metric {$metricUpdated}: {$e->getMessage()}"); - Console::warning($e->getTraceAsString()); - } - } - } - } catch (\Exception $e) { - Console::warning("Failed to Query: {$e->getMessage()}"); - Console::warning($e->getTraceAsString()); - } - } - } - - if ($iterations % 30 != 0) { // Aggregate aggregate number of objects in database only after 15 minutes + if ($iterations % 30 != 0) { // return if 30 iterations has not passed $iterations++; $loopTook = microtime(true) - $loopStart; $now = date('d-m-Y H:i:s', time()); @@ -380,6 +145,7 @@ $cli return; } + $iterations = 0; // Reset iterations to prevent overflow when running for long time /** * Aggregate MariaDB every 15 minutes * Some of the queries here might contain full-table scans. @@ -387,435 +153,7 @@ $cli $now = date('d-m-Y H:i:s', time()); Console::info("[{$now}] Aggregating database counters."); - $latestProject = null; - do { // Loop over all the projects - $attempts = 0; - $max = 10; - $sleep = 1; - - do { // list projects - try { - $attempts++; - $projects = $dbForConsole->find('projects', [], 100, cursor: $latestProject); - break; // leave the do-while if successful - } catch (\Exception $e) { - Console::warning("Console DB not ready yet. Retrying ({$attempts})..."); - if ($attempts >= $max) { - throw new \Exception('Failed access console db: ' . $e->getMessage()); - } - sleep($sleep); - } - } while ($attempts < $max); - - if (empty($projects)) { - continue; - } - - $latestProject = $projects[array_key_last($projects)]; - - foreach ($projects as $project) { - $projectId = $project->getId(); - - // Get total storage - $dbForProject->setNamespace('_' . $projectId); - $deploymentsTotal = $dbForProject->sum('deployments', 'size'); - - $time = (int) (floor(time() / 1800) * 1800); // Time rounded to nearest 30 minutes - $id = \md5($time . '_30m_storage.deployments.total'); //Construct unique id for each metric using time, period and metric - $document = $dbForProject->getDocument('stats', $id); - try { - if ($document->isEmpty()) { - $dbForProject->createDocument('stats', new Document([ - '$id' => $id, - 'period' => '30m', - 'time' => $time, - 'metric' => 'storage.deployments.total', - 'value' => $deploymentsTotal, - 'type' => 1, - ])); - } else { - $dbForProject->updateDocument( - 'stats', - $document->getId(), - $document->setAttribute('value', $deploymentsTotal) - ); - } - $time = (int) (floor(time() / 86400) * 86400); // Time rounded to nearest day - $id = \md5($time . '_1d_storage.deployments.total'); //Construct unique id for each metric using time, period and metric - $document = $dbForProject->getDocument('stats', $id); - if ($document->isEmpty()) { - $dbForProject->createDocument('stats', new Document([ - '$id' => $id, - 'period' => '1d', - 'time' => $time, - 'metric' => 'storage.deployments.total', - 'value' => $deploymentsTotal, - 'type' => 1, - ])); - } else { - $dbForProject->updateDocument( - 'stats', - $document->getId(), - $document->setAttribute('value', $deploymentsTotal) - ); - } - } catch (\Exception $e) { - Console::warning("Failed to save data for project {$projectId} and metric storage.deployments.total: {$e->getMessage()}"); - Console::warning($e->getTraceAsString()); - } - - - $collections = [ - 'users' => [ - 'namespace' => '', - ], - 'collections' => [ - 'metricPrefix' => 'database', - 'namespace' => '', - 'subCollections' => [ // Some collections, like collections and later buckets have child collections that need counting - 'documents' => [ - 'collectionPrefix' => 'collection_', - 'namespace' => '', - ], - ], - ], - 'buckets' => [ - 'metricPrefix' => 'storage', - 'namespace' => '', - 'subCollections' => [ - 'files' => [ - 'namespace' => '', - 'collectionPrefix' => 'bucket_', - 'total' => [ - 'field' => 'sizeOriginal' - ] - ], - ] - ] - ]; - - foreach ($collections as $collection => $options) { - try { - $dbForProject->setNamespace("_{$projectId}"); - $count = $dbForProject->count($collection); - $metricPrefix = $options['metricPrefix'] ?? ''; - $metric = empty($metricPrefix) ? "{$collection}.count" : "{$metricPrefix}.{$collection}.count"; - - $time = (int) (floor(time() / 1800) * 1800); // Time rounded to nearest 30 minutes - $id = \md5($time . '_30m_' . $metric); //Construct unique id for each metric using time, period and metric - $document = $dbForProject->getDocument('stats', $id); - if ($document->isEmpty()) { - $dbForProject->createDocument('stats', new Document([ - '$id' => $id, - 'time' => $time, - 'period' => '30m', - 'metric' => $metric, - 'value' => $count, - 'type' => 1, - ])); - } else { - $dbForProject->updateDocument( - 'stats', - $document->getId(), - $document->setAttribute('value', $count) - ); - } - - $time = (int) (floor(time() / 86400) * 86400); // Time rounded to nearest day - $id = \md5($time . '_1d_' . $metric); //Construct unique id for each metric using time, period and metric - $document = $dbForProject->getDocument('stats', $id); - if ($document->isEmpty()) { - $dbForProject->createDocument('stats', new Document([ - '$id' => $id, - 'time' => $time, - 'period' => '1d', - 'metric' => $metric, - 'value' => $count, - 'type' => 1, - ])); - } else { - $dbForProject->updateDocument( - 'stats', - $document->getId(), - $document->setAttribute('value', $count) - ); - } - - $subCollections = $options['subCollections'] ?? []; - - if (empty($subCollections)) { - continue; - } - - $latestParent = null; - $subCollectionCounts = []; //total project level count of sub collections - $subCollectionTotals = []; //total project level sum of sub collections - - do { // Loop over all the parent collection document for each sub collection - $dbForProject->setNamespace("_{$projectId}"); - $parents = $dbForProject->find($collection, [], 100, cursor: $latestParent); // Get all the parents for the sub collections for example for documents, this will get all the collections - - if (empty($parents)) { - continue; - } - - $latestParent = $parents[array_key_last($parents)]; - - foreach ($parents as $parent) { - foreach ($subCollections as $subCollection => $subOptions) { // Sub collection counts, like database.collections.collectionId.documents.count - $dbForProject->setNamespace("_{$projectId}"); - $count = $dbForProject->count(($subOptions['collectionPrefix'] ?? '') . $parent->getInternalId()); - - $subCollectionCounts[$subCollection] = ($subCollectionCounts[$subCollection] ?? 0) + $count; // Project level counts for sub collections like database.documents.count - - $dbForProject->setNamespace("_{$projectId}"); - - $metric = empty($metricPrefix) ? "{$collection}.{$parent->getId()}.{$subCollection}.count" : "{$metricPrefix}.{$collection}.{$parent->getInternalId()}.{$subCollection}.count"; - $time = (int) (floor(time() / 1800) * 1800); // Time rounded to nearest 30 minutes - $id = \md5($time . '_30m_' . $metric); //Construct unique id for each metric using time, period and metric - $document = $dbForProject->getDocument('stats', $id); - if ($document->isEmpty()) { - $dbForProject->createDocument('stats', new Document([ - '$id' => $id, - 'time' => $time, - 'period' => '30m', - 'metric' => $metric, - 'value' => $count, - 'type' => 1, - ])); - } else { - $dbForProject->updateDocument( - 'stats', - $document->getId(), - $document->setAttribute('value', $count) - ); - } - - $time = (int) (floor(time() / 86400) * 86400); // Time rounded to nearest day - $id = \md5($time . '_1d_' . $metric); //Construct unique id for each metric using time, period and metric - $document = $dbForProject->getDocument('stats', $id); - if ($document->isEmpty()) { - $dbForProject->createDocument('stats', new Document([ - '$id' => $id, - 'time' => $time, - 'period' => '1d', - 'metric' => $metric, - 'value' => $count, - 'type' => 1, - ])); - } else { - $dbForProject->updateDocument( - 'stats', - $document->getId(), - $document->setAttribute('value', $count) - ); - } - - // check if sum calculation is required - $total = $subOptions['total'] ?? []; - if (empty($total)) { - continue; - } - - $dbForProject->setNamespace("_{$projectId}"); - $total = (int) $dbForProject->sum(($subOptions['collectionPrefix'] ?? '') . $parent->getInternalId(), $total['field']); - - $subCollectionTotals[$subCollection] = ($ssubCollectionTotals[$subCollection] ?? 0) + $total; // Project level sum for sub collections like storage.total - - $dbForProject->setNamespace("_{$projectId}"); - - $metric = empty($metricPrefix) ? "{$collection}.{$parent->getId()}.{$subCollection}.total" : "{$metricPrefix}.{$collection}.{$parent->getInternalId()}.{$subCollection}.total"; - $time = (int) (floor(time() / 1800) * 1800); // Time rounded to nearest 30 minutes - $id = \md5($time . '_30m_' . $metric); //Construct unique id for each metric using time, period and metric - $document = $dbForProject->getDocument('stats', $id); - if ($document->isEmpty()) { - $dbForProject->createDocument('stats', new Document([ - '$id' => $id, - 'time' => $time, - 'period' => '30m', - 'metric' => $metric, - 'value' => $total, - 'type' => 1, - ])); - } else { - $dbForProject->updateDocument( - 'stats', - $document->getId(), - $document->setAttribute('value', $total) - ); - } - - $time = (int) (floor(time() / 86400) * 86400); // Time rounded to nearest day - $id = \md5($time . '_1d_' . $metric); //Construct unique id for each metric using time, period and metric - $document = $dbForProject->getDocument('stats', $id); - if ($document->isEmpty()) { - $dbForProject->createDocument('stats', new Document([ - '$id' => $id, - 'time' => $time, - 'period' => '1d', - 'metric' => $metric, - 'value' => $total, - 'type' => 1, - ])); - } else { - $dbForProject->updateDocument( - 'stats', - $document->getId(), - $document->setAttribute('value', $total) - ); - } - } - } - } while (!empty($parents)); - - /** - * Inserting project level counts for sub collections like database.documents.count - */ - foreach ($subCollectionCounts as $subCollection => $count) { - $dbForProject->setNamespace("_{$projectId}"); - - $metric = empty($metricPrefix) ? "{$subCollection}.count" : "{$metricPrefix}.{$subCollection}.count"; - - $time = (int) (floor(time() / 1800) * 1800); // Time rounded to nearest 30 minutes - $id = \md5($time . '_30m_' . $metric); //Construct unique id for each metric using time, period and metric - $document = $dbForProject->getDocument('stats', $id); - if ($document->isEmpty()) { - $dbForProject->createDocument('stats', new Document([ - '$id' => $id, - 'time' => $time, - 'period' => '30m', - 'metric' => $metric, - 'value' => $count, - 'type' => 1, - ])); - } else { - $dbForProject->updateDocument( - 'stats', - $document->getId(), - $document->setAttribute('value', $count) - ); - } - - $time = (int) (floor(time() / 86400) * 86400); // Time rounded to nearest day - $id = \md5($time . '_1d_' . $metric); //Construct unique id for each metric using time, period and metric - $document = $dbForProject->getDocument('stats', $id); - if ($document->isEmpty()) { - $dbForProject->createDocument('stats', new Document([ - '$id' => $id, - 'time' => $time, - 'period' => '1d', - 'metric' => $metric, - 'value' => $count, - 'type' => 1, - ])); - } else { - $dbForProject->updateDocument( - 'stats', - $document->getId(), - $document->setAttribute('value', $count) - ); - } - } - - /** - * Inserting project level sums for sub collections like storage.files.total - */ - foreach ($subCollectionTotals as $subCollection => $count) { - $dbForProject->setNamespace("_{$projectId}"); - - $metric = empty($metricPrefix) ? "{$subCollection}.total" : "{$metricPrefix}.{$subCollection}.total"; - - $time = (int) (floor(time() / 1800) * 1800); // Time rounded to nearest 30 minutes - $id = \md5($time . '_30m_' . $metric); //Construct unique id for each metric using time, period and metric - $document = $dbForProject->getDocument('stats', $id); - if ($document->isEmpty()) { - $dbForProject->createDocument('stats', new Document([ - '$id' => $id, - 'time' => $time, - 'period' => '30m', - 'metric' => $metric, - 'value' => $count, - 'type' => 1, - ])); - } else { - $dbForProject->updateDocument( - 'stats', - $document->getId(), - $document->setAttribute('value', $count) - ); - } - - $time = (int) (floor(time() / 86400) * 86400); // Time rounded to nearest day - $id = \md5($time . '_1d_' . $metric); //Construct unique id for each metric using time, period and metric - $document = $dbForProject->getDocument('stats', $id); - if ($document->isEmpty()) { - $dbForProject->createDocument('stats', new Document([ - '$id' => $id, - 'time' => $time, - 'period' => '1d', - 'metric' => $metric, - 'value' => $count, - 'type' => 1, - ])); - } else { - $dbForProject->updateDocument( - 'stats', - $document->getId(), - $document->setAttribute('value', $count) - ); - } - - // aggregate storage.total = storage.files.total + storage.deployments.total - if ($metricPrefix === 'storage' && $subCollection === 'files') { - $metric = 'storage.total'; - $time = (int) (floor(time() / 1800) * 1800); // Time rounded to nearest 30 minutes - $id = \md5($time . '_30m_' . $metric); //Construct unique id for each metric using time, period and metric - $document = $dbForProject->getDocument('stats', $id); - if ($document->isEmpty()) { - $dbForProject->createDocument('stats', new Document([ - '$id' => $id, - 'time' => $time, - 'period' => '30m', - 'metric' => $metric, - 'value' => $count + $deploymentsTotal, - 'type' => 1, - ])); - } else { - $dbForProject->updateDocument( - 'stats', - $document->getId(), - $document->setAttribute('value', $count + $deploymentsTotal) - ); - } - - $time = (int) (floor(time() / 86400) * 86400); // Time rounded to nearest day - $id = \md5($time . '_1d_' . $metric); //Construct unique id for each metric using time, period and metric - $document = $dbForProject->getDocument('stats', $id); - if ($document->isEmpty()) { - $dbForProject->createDocument('stats', new Document([ - '$id' => $id, - 'time' => $time, - 'period' => '1d', - 'metric' => $metric, - 'value' => $count + $deploymentsTotal, - 'type' => 1, - ])); - } else { - $dbForProject->updateDocument( - 'stats', - $document->getId(), - $document->setAttribute('value', $count + $deploymentsTotal) - ); - } - } - } - } catch (\Exception$e) { - Console::warning("Failed to save database counters data for project {$collection}: {$e->getMessage()}"); - Console::warning($e->getTraceAsString()); - } - } - } - } while (!empty($projects)); + $usageDB->collect(); $iterations++; $loopTook = microtime(true) - $loopStart; diff --git a/app/views/install/compose.phtml b/app/views/install/compose.phtml index 6fa8335bf..cf9a11139 100644 --- a/app/views/install/compose.phtml +++ b/app/views/install/compose.phtml @@ -552,6 +552,8 @@ services: - _APP_REDIS_PORT - _APP_REDIS_USER - _APP_REDIS_PASS + - _APP_LOGGING_PROVIDER + - _APP_LOGGING_CONFIG appwrite-schedule: image: /: diff --git a/docker-compose.yml b/docker-compose.yml index b6e1ed68b..c5e11f1c5 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -588,6 +588,8 @@ services: - _APP_REDIS_PORT - _APP_REDIS_USER - _APP_REDIS_PASS + - _APP_LOGGING_PROVIDER + - _APP_LOGGING_CONFIG appwrite-schedule: entrypoint: schedule diff --git a/src/Appwrite/Stats/Usage.php b/src/Appwrite/Stats/Usage.php new file mode 100644 index 000000000..9a885c011 --- /dev/null +++ b/src/Appwrite/Stats/Usage.php @@ -0,0 +1,301 @@ + [ + 'table' => 'appwrite_usage_requests_all', + ], + 'network' => [ + 'table' => 'appwrite_usage_network_all', + ], + 'executions' => [ + 'table' => 'appwrite_usage_executions_all', + ], + 'database.collections.create' => [ + 'table' => 'appwrite_usage_database_collections_create', + ], + 'database.collections.read' => [ + 'table' => 'appwrite_usage_database_collections_read', + ], + 'database.collections.update' => [ + 'table' => 'appwrite_usage_database_collections_update', + ], + 'database.collections.delete' => [ + 'table' => 'appwrite_usage_database_collections_delete', + ], + 'database.documents.create' => [ + 'table' => 'appwrite_usage_database_documents_create', + ], + 'database.documents.read' => [ + 'table' => 'appwrite_usage_database_documents_read', + ], + 'database.documents.update' => [ + 'table' => 'appwrite_usage_database_documents_update', + ], + 'database.documents.delete' => [ + 'table' => 'appwrite_usage_database_documents_delete', + ], + 'database.collections.collectionId.documents.create' => [ + 'table' => 'appwrite_usage_database_documents_create', + 'groupBy' => 'collectionId', + ], + 'database.collections.collectionId.documents.read' => [ + 'table' => 'appwrite_usage_database_documents_read', + 'groupBy' => 'collectionId', + ], + 'database.collections.collectionId.documents.update' => [ + 'table' => 'appwrite_usage_database_documents_update', + 'groupBy' => 'collectionId', + ], + 'database.collections.collectionId.documents.delete' => [ + 'table' => 'appwrite_usage_database_documents_delete', + 'groupBy' => 'collectionId', + ], + 'storage.buckets.create' => [ + 'table' => 'appwrite_usage_storage_buckets_create', + ], + 'storage.buckets.read' => [ + 'table' => 'appwrite_usage_storage_buckets_read', + ], + 'storage.buckets.update' => [ + 'table' => 'appwrite_usage_storage_buckets_update', + ], + 'storage.buckets.delete' => [ + 'table' => 'appwrite_usage_storage_buckets_delete', + ], + 'storage.files.create' => [ + 'table' => 'appwrite_usage_storage_files_create', + ], + 'storage.files.read' => [ + 'table' => 'appwrite_usage_storage_files_read', + ], + 'storage.files.update' => [ + 'table' => 'appwrite_usage_storage_files_update', + ], + 'storage.files.delete' => [ + 'table' => 'appwrite_usage_storage_files_delete', + ], + 'storage.buckets.bucketId.files.create' => [ + 'table' => 'appwrite_usage_storage_files_create', + 'groupBy' => 'bucketId', + ], + 'storage.buckets.bucketId.files.read' => [ + 'table' => 'appwrite_usage_storage_files_read', + 'groupBy' => 'bucketId', + ], + 'storage.buckets.bucketId.files.update' => [ + 'table' => 'appwrite_usage_storage_files_update', + 'groupBy' => 'bucketId', + ], + 'storage.buckets.bucketId.files.delete' => [ + 'table' => 'appwrite_usage_storage_files_delete', + 'groupBy' => 'bucketId', + ], + 'users.create' => [ + 'table' => 'appwrite_usage_users_create', + ], + 'users.read' => [ + 'table' => 'appwrite_usage_users_read', + ], + 'users.update' => [ + 'table' => 'appwrite_usage_users_update', + ], + 'users.delete' => [ + 'table' => 'appwrite_usage_users_delete', + ], + 'users.sessions.create' => [ + 'table' => 'appwrite_usage_users_sessions_create', + ], + 'users.sessions.provider.create' => [ + 'table' => 'appwrite_usage_users_sessions_create', + 'groupBy' => 'provider', + ], + 'users.sessions.delete' => [ + 'table' => 'appwrite_usage_users_sessions_delete', + ], + 'functions.functionId.executions' => [ + 'table' => 'appwrite_usage_executions_all', + 'groupBy' => 'functionId', + ], + 'functions.functionId.compute' => [ + 'table' => 'appwrite_usage_executions_time', + 'groupBy' => 'functionId', + ], + 'functions.functionId.failures' => [ + 'table' => 'appwrite_usage_executions_all', + 'groupBy' => 'functionId', + 'filters' => [ + 'functionStatus' => 'failed', + ], + ], + ]; + + protected array $periods = [ + [ + 'key' => '30m', + 'multiplier' => 1800, + 'startTime' => '-24 hours', + ], + [ + 'key' => '1d', + 'multiplier' => 86400, + 'startTime' => '-90 days', + ], + ]; + + public function __construct(Database $database, InfluxDatabase $influxDB, callable $errorHandler = null) + { + $this->database = $database; + $this->influxDB = $influxDB; + $this->errorHandler = $errorHandler; + } + + /** + * Create or Update Mertic + * Create or update each metric in the stats collection for the given project + * + * @param string $projectId + * @param int $time + * @param string $period + * @param string $metric + * @param int $value + * @param int $type + * + * @return void + */ + private function createOrUpdateMetric(string $projectId, int $time, string $period, string $metric, int $value, int $type): void + { + $id = \md5("{$time}_{$period}_{$metric}"); + $this->database->setNamespace('_' . $projectId); + try { + $document = $this->database->getDocument('stats', $id); + if ($document->isEmpty()) { + $this->database->createDocument('stats', new Document([ + '$id' => $id, + 'period' => $period, + 'time' => $time, + 'metric' => $metric, + 'value' => $value, + 'type' => $type, + ])); + } else { + $this->database->updateDocument( + 'stats', + $document->getId(), + $document->setAttribute('value', $value) + ); + } + $this->latestTime[$metric][$period] = $time; + } catch (\Exception $e) { // if projects are deleted this might fail + if (is_callable($this->errorHandler)) { + call_user_func($this->errorHandler, $e, "sync_project_{$projectId}_metric_{$metric}"); + } else { + throw $e; + } + } + } + + /** + * Sync From InfluxDB + * Sync stats from influxDB to stats collection in the Appwrite database + * + * @param string $metric + * @param array $options + * @param array $period + * + * @return void + */ + private function syncFromInfluxDB(string $metric, array $options, array $period): void + { + $start = DateTime::createFromFormat('U', \strtotime($period['startTime']))->format(DateTime::RFC3339); + if (!empty($this->latestTime[$metric][$period['key']])) { + $start = DateTime::createFromFormat('U', $this->latestTime[$metric][$period['key']])->format(DateTime::RFC3339); + } + $end = DateTime::createFromFormat('U', \strtotime('now'))->format(DateTime::RFC3339); + + $table = $options['table']; //Which influxdb table to query for this metric + $groupBy = empty($options['groupBy']) ? '' : ', "' . $options['groupBy'] . '"'; //Some sub level metrics may be grouped by other tags like collectionId, bucketId, etc + + $filters = $options['filters'] ?? []; // Some metrics might have additional filters, like function's status + if (!empty($filters)) { + $filters = ' AND ' . implode(' AND ', array_map(fn ($filter, $value) => "\"{$filter}\"='{$value}'", array_keys($filters), array_values($filters))); + } else { + $filters = ''; + } + + $query = "SELECT sum(value) AS \"value\" "; + $query .= "FROM \"{$table}\" "; + $query .= "WHERE \"time\" > '{$start}' "; + $query .= "AND \"time\" < '{$end}' "; + $query .= "AND \"metric_type\"='counter' {$filters} "; + $query .= "GROUP BY time({$period['key']}), \"projectId\" {$groupBy} "; + $query .= "FILL(null)"; + $result = $this->influxDB->query($query); + + $points = $result->getPoints(); + foreach ($points as $point) { + $projectId = $point['projectId']; + + if (!empty($projectId) && $projectId !== 'console') { + $metricUpdated = $metric; + + if (!empty($groupBy)) { + $groupedBy = $point[$options['groupBy']] ?? ''; + if (empty($groupedBy)) { + continue; + } + $metricUpdated = str_replace($options['groupBy'], $groupedBy, $metric); + } + + $time = \strtotime($point['time']); + $value = (!empty($point['value'])) ? $point['value'] : 0; + + $this->createOrUpdateMetric( + $projectId, + $time, + $period['key'], + $metricUpdated, + $value, + 0 + ); + } + } + } + + /** + * Collect Stats + * Collect all the stats from Influd DB to Database + * + * @return void + */ + public function collect(): void + { + foreach ($this->metrics as $metric => $options) { //for each metrics + foreach ($this->periods as $period) { // aggregate data for each period + try { + $this->syncFromInfluxDB($metric, $options, $period); + } catch (\Exception $e) { + if (is_callable($this->errorHandler)) { + call_user_func($this->errorHandler, $e); + } else { + throw $e; + } + } + } + } + } +} diff --git a/src/Appwrite/Stats/UsageDB.php b/src/Appwrite/Stats/UsageDB.php new file mode 100644 index 000000000..cc312b69a --- /dev/null +++ b/src/Appwrite/Stats/UsageDB.php @@ -0,0 +1,262 @@ +database = $database; + $this->errorHandler = $errorHandler; + } + /** + * Create or Update Mertic + * Create or update each metric in the stats collection for the given project + * + * @param string $projectId + * @param string $metric + * @param int $value + * + * @return void + */ + private function createOrUpdateMetric(string $projectId, string $metric, int $value): void + { + foreach ($this->periods as $options) { + $period = $options['key']; + $time = (int) (floor(time() / $options['multiplier']) * $options['multiplier']); + $id = \md5("{$time}_{$period}_{$metric}"); + $this->database->setNamespace('_' . $projectId); + try { + $document = $this->database->getDocument('stats', $id); + if ($document->isEmpty()) { + $this->database->createDocument('stats', new Document([ + '$id' => $id, + 'period' => $period, + 'time' => $time, + 'metric' => $metric, + 'value' => $value, + 'type' => 1, + ])); + } else { + $this->database->updateDocument( + 'stats', + $document->getId(), + $document->setAttribute('value', $value) + ); + } + } catch (\Exception$e) { // if projects are deleted this might fail + if (is_callable($this->errorHandler)) { + call_user_func($this->errorHandler, $e, "sync_project_{$projectId}_metric_{$metric}"); + } else { + throw $e; + } + } + } + } + + /** + * Foreach Document + * Call provided callback for each document in the collection + * + * @param string $projectId + * @param string $collection + * @param array $queries + * @param callable $callback + * + * @return void + */ + private function foreachDocument(string $projectId, string $collection, array $queries, callable $callback): void + { + $limit = 50; + $results = []; + $sum = $limit; + $latestDocument = null; + $this->database->setNamespace('_' . $projectId); + + while ($sum === $limit) { + try { + $results = $this->database->find($collection, $queries, $limit, cursor:$latestDocument); + } catch (\Exception $e) { + if (is_callable($this->errorHandler)) { + call_user_func($this->errorHandler, $e, "fetch_documents_project_{$projectId}_collection_{$collection}"); + return; + } else { + throw $e; + } + } + if (empty($results)) { + return; + } + + $sum = count($results); + + foreach ($results as $document) { + if (is_callable($callback)) { + $callback($document); + } + } + $latestDocument = $results[array_key_last($results)]; + } + } + + /** + * Sum + * Calculate sum of a attribute of documents in collection + * + * @param string $projectId + * @param string $collection + * @param string $attribute + * @param string $metric + * + * @return int + */ + private function sum(string $projectId, string $collection, string $attribute, string $metric): int + { + $this->database->setNamespace('_' . $projectId); + try { + $sum = (int) $this->database->sum($collection, $attribute); + $this->createOrUpdateMetric($projectId, $metric, $sum); + return $sum; + } catch (\Exception $e) { + if (is_callable($this->errorHandler)) { + call_user_func($this->errorHandler, $e, "fetch_sum_project_{$projectId}_collection_{$collection}"); + } else { + throw $e; + } + } + } + + /** + * Count + * Count number of documents in collection + * + * @param string $projectId + * @param string $collection + * @param string $metric + * + * @return int + */ + private function count(string $projectId, string $collection, string $metric): int + { + $this->database->setNamespace("_{$projectId}"); + try { + $count = $this->database->count($collection); + $this->createOrUpdateMetric($projectId, $metric, $count); + return $count; + } catch (\Exception $e) { + if (is_callable($this->errorHandler)) { + call_user_func($this->errorHandler, $e, "fetch_count_project_{$projectId}_collection_{$collection}"); + } else { + throw $e; + } + } + } + + /** + * Deployments Total + * Total sum of storage used by deployments + * + * @param string $projectId + * + * @return int + */ + private function deploymentsTotal(string $projectId): int + { + return $this->sum($projectId, 'deployments', 'size', 'stroage.deployments.total'); + } + + /** + * Users Stats + * Metric: users.count + * + * @param string $projectId + * + * @return void + */ + private function usersStats(string $projectId): void + { + $this->count($projectId, 'users', 'users.count'); + } + + /** + * Storage Stats + * Metrics: storage.total, storage.files.total, storage.buckets.{bucketId}.files.total, + * storage.buckets.count, storage.files.count, storage.buckets.{bucketId}.files.count + * + * @param string $projectId + * + * @return void + */ + private function storageStats(string $projectId): void + { + $deploymentsTotal = $this->deploymentsTotal($projectId); + + $projectFilesTotal = 0; + $projectFilesCount = 0; + + $metric = 'storage.buckets.count'; + $this->count($projectId, 'buckets', $metric); + + $this->foreachDocument($projectId, 'buckets', [], function ($bucket) use (&$projectFilesCount, &$projectFilesTotal, $projectId,) { + $metric = "storage.buckets.{$bucket->getId()}.files.count"; + + $count = $this->count($projectId, 'bucket_' . $bucket->getInternalId(), $metric); + $projectFilesCount += $count; + + $metric = "storage.buckets.{$bucket->getId()}.files.total"; + $sum = $this->sum($projectId, 'bucket_' . $bucket->getInternalId(), 'sizeOriginal', $metric); + $projectFilesTotal += $sum; + }); + + $this->createOrUpdateMetric($projectId, 'storage.files.count', $projectFilesCount); + $this->createOrUpdateMetric($projectId, 'storage.files.total', $projectFilesTotal); + + $this->createOrUpdateMetric($projectId, 'storage.total', $projectFilesTotal + $deploymentsTotal); + } + + /** + * Database Stats + * Collect all database stats + * Metrics: database.collections.count, database.collections.{collectionId}.documents.count, + * database.documents.count + * + * @param string $projectId + * + * @return void + */ + private function databaseStats(string $projectId): void + { + $projectDocumentsCount = 0; + + $metric = 'database.collections.count'; + $this->count($projectId, 'collections', $metric); + + $this->foreachDocument($projectId, 'collections', [], function ($collection) use (&$projectDocumentsCount, $projectId,) { + $metric = "database.collections.{$collection->getId()}.documents.count"; + + $count = $this->count($projectId, 'collection_' . $collection->getInternalId(), $metric); + $projectDocumentsCount += $count; + }); + + $this->createOrUpdateMetric($projectId, 'database.documents.count', $projectDocumentsCount); + } + + /** + * Collect Stats + * Collect all database related stats + * + * @return void + */ + public function collect(): void + { + $this->foreachDocument('console', 'projects', [], function ($project) { + $projectId = $project->getId(); + $this->usersStats($projectId); + $this->databaseStats($projectId); + $this->storageStats($projectId); + }); + } +}