From e5ad8d63f03e05451a9be9f31c314f4f42c9ffe8 Mon Sep 17 00:00:00 2001 From: Damodar Lohani Date: Fri, 14 Oct 2022 09:42:48 +0000 Subject: [PATCH] fix usage task --- src/Appwrite/CLI/Tasks/Usage.php | 96 +++++++++++++++++--------------- 1 file changed, 50 insertions(+), 46 deletions(-) diff --git a/src/Appwrite/CLI/Tasks/Usage.php b/src/Appwrite/CLI/Tasks/Usage.php index b40f17c06..addf682be 100644 --- a/src/Appwrite/CLI/Tasks/Usage.php +++ b/src/Appwrite/CLI/Tasks/Usage.php @@ -3,12 +3,14 @@ namespace Appwrite\CLI\Tasks; use Appwrite\Platform\Task; -use Throwable; -use Appwrite\Stats\Usage as InfluxUsage; -use Appwrite\Stats\UsageDB; +use Appwrite\Usage\Calculators\Aggregator; +use Appwrite\Usage\Calculators\Database; +use Appwrite\Usage\Calculators\TimeSeries; +use InfluxDB\Database as InfluxDatabase; use Utopia\App; use Utopia\CLI\Console; -use Utopia\Database\Validator\Authorization; +use Utopia\Database\Database as UtopiaDatabase; +use Utopia\Validator\WhiteList; class Usage extends Task { @@ -20,65 +22,67 @@ class Usage extends Task public function __construct() { $this + ->param('type', 'timeseries', new WhiteList(['timeseries', 'database'])) ->desc('Schedules syncing data from influxdb to Appwrite console db') - ->callback(fn () => $this->action()); + ->callback(fn ($type) => $this->action($type)); } - public function action() + + protected function aggregateTimeseries(UtopiaDatabase $database, InfluxDatabase $influxDB, callable $logError): void { - global $register; + $interval = (int) App::getEnv('_APP_USAGE_TIMESERIES_INTERVAL', '30'); // 30 seconds (by default) + $usage = new TimeSeries($database, $influxDB, $logError); - Authorization::disable(); - Authorization::setDefaultStatus(false); - - $logError = fn(Throwable $error, string $action = 'syncUsageStats') => $this->logError($register, $error, $action); - - Console::title('Usage Aggregation V1'); - Console::success(APP_NAME . ' usage aggregation process v1 has started'); - - $interval = (int) App::getEnv('_APP_USAGE_AGGREGATION_INTERVAL', '30'); // 30 seconds (by default) - - $database = self::getDatabase($register, '_console'); - $influxDB = self::getInfluxDB($register); - - $usage = new InfluxUsage($database, $influxDB, $logError); - $usageDB = new UsageDB($database, $logError); - - $iterations = 0; - Console::loop(function () use ($interval, $usage, $usageDB, &$iterations) { + Console::loop(function () use ($interval, $usage) { $now = date('d-m-Y H:i:s', time()); - Console::info("[{$now}] Aggregating usage data every {$interval} seconds"); - + Console::info("[{$now}] Aggregating Timeseries Usage data every {$interval} seconds"); $loopStart = microtime(true); - /** - * Aggregate InfluxDB every 30 seconds - */ $usage->collect(); - if ($iterations % 30 != 0) { // return if 30 iterations has not passed - $iterations++; - $loopTook = microtime(true) - $loopStart; - $now = date('d-m-Y H:i:s', time()); - Console::info("[{$now}] Aggregation took {$loopTook} seconds"); - return; - } - - $iterations = 0; // Reset iterations to prevent overflow when running for long time - /** - * Aggregate MariaDB every 15 minutes - * Some of the queries here might contain full-table scans. - */ + $loopTook = microtime(true) - $loopStart; $now = date('d-m-Y H:i:s', time()); - Console::info("[{$now}] Aggregating database counters."); + Console::info("[{$now}] Aggregation took {$loopTook} seconds"); + }, $interval); + } - $usageDB->collect(); + protected function aggregateDatabase(UtopiaDatabase $database, callable $logError): void + { + $interval = (int) App::getEnv('_APP_USAGE_DATABASE_INTERVAL', '900'); // 15 minutes (by default) + $usage = new Database($database, $logError); + $aggregrator = new Aggregator($database, $logError); - $iterations++; + Console::loop(function () use ($interval, $usage, $aggregrator) { + $now = date('d-m-Y H:i:s', time()); + Console::info("[{$now}] Aggregating database usage every {$interval} seconds."); + $loopStart = microtime(true); + $usage->collect(); + $aggregrator->collect(); $loopTook = microtime(true) - $loopStart; $now = date('d-m-Y H:i:s', time()); Console::info("[{$now}] Aggregation took {$loopTook} seconds"); }, $interval); } + + public function action(string $type) + { + global $register; + Console::title('Usage Aggregation V1'); + Console::success(APP_NAME . ' usage aggregation process v1 has started'); + + $database = $this->getDatabase($register, '_console'); + $influxDB = $this->getInfluxDB($register); + + switch ($type) { + case 'timeseries': + $this->aggregateTimeseries($database, $influxDB, $this->logError); + break; + case 'database': + $this->aggregateDatabase($database, $this->logError); + break; + default: + Console::error("Unsupported usage aggregation type"); + } + } }