1
0
Fork 0
mirror of synced 2024-08-02 12:01:37 +12:00

fix usage task

This commit is contained in:
Damodar Lohani 2022-10-14 09:42:48 +00:00
parent e5b3d74515
commit e5ad8d63f0

View file

@ -3,12 +3,14 @@
namespace Appwrite\CLI\Tasks; namespace Appwrite\CLI\Tasks;
use Appwrite\Platform\Task; use Appwrite\Platform\Task;
use Throwable; use Appwrite\Usage\Calculators\Aggregator;
use Appwrite\Stats\Usage as InfluxUsage; use Appwrite\Usage\Calculators\Database;
use Appwrite\Stats\UsageDB; use Appwrite\Usage\Calculators\TimeSeries;
use InfluxDB\Database as InfluxDatabase;
use Utopia\App; use Utopia\App;
use Utopia\CLI\Console; use Utopia\CLI\Console;
use Utopia\Database\Validator\Authorization; use Utopia\Database\Database as UtopiaDatabase;
use Utopia\Validator\WhiteList;
class Usage extends Task class Usage extends Task
{ {
@ -20,65 +22,67 @@ class Usage extends Task
public function __construct() public function __construct()
{ {
$this $this
->param('type', 'timeseries', new WhiteList(['timeseries', 'database']))
->desc('Schedules syncing data from influxdb to Appwrite console db') ->desc('Schedules syncing data from influxdb to Appwrite console db')
->callback(fn () => $this->action()); ->callback(fn ($type) => $this->action($type));
} }
public function action()
protected function aggregateTimeseries(UtopiaDatabase $database, InfluxDatabase $influxDB, callable $logError): void
{ {
global $register; $interval = (int) App::getEnv('_APP_USAGE_TIMESERIES_INTERVAL', '30'); // 30 seconds (by default)
$usage = new TimeSeries($database, $influxDB, $logError);
Authorization::disable(); Console::loop(function () use ($interval, $usage) {
Authorization::setDefaultStatus(false);
$logError = fn(Throwable $error, string $action = 'syncUsageStats') => $this->logError($register, $error, $action);
Console::title('Usage Aggregation V1');
Console::success(APP_NAME . ' usage aggregation process v1 has started');
$interval = (int) App::getEnv('_APP_USAGE_AGGREGATION_INTERVAL', '30'); // 30 seconds (by default)
$database = self::getDatabase($register, '_console');
$influxDB = self::getInfluxDB($register);
$usage = new InfluxUsage($database, $influxDB, $logError);
$usageDB = new UsageDB($database, $logError);
$iterations = 0;
Console::loop(function () use ($interval, $usage, $usageDB, &$iterations) {
$now = date('d-m-Y H:i:s', time()); $now = date('d-m-Y H:i:s', time());
Console::info("[{$now}] Aggregating usage data every {$interval} seconds"); Console::info("[{$now}] Aggregating Timeseries Usage data every {$interval} seconds");
$loopStart = microtime(true); $loopStart = microtime(true);
/**
* Aggregate InfluxDB every 30 seconds
*/
$usage->collect(); $usage->collect();
if ($iterations % 30 != 0) { // return if 30 iterations has not passed $loopTook = microtime(true) - $loopStart;
$iterations++;
$loopTook = microtime(true) - $loopStart;
$now = date('d-m-Y H:i:s', time());
Console::info("[{$now}] Aggregation took {$loopTook} seconds");
return;
}
$iterations = 0; // Reset iterations to prevent overflow when running for long time
/**
* Aggregate MariaDB every 15 minutes
* Some of the queries here might contain full-table scans.
*/
$now = date('d-m-Y H:i:s', time()); $now = date('d-m-Y H:i:s', time());
Console::info("[{$now}] Aggregating database counters."); Console::info("[{$now}] Aggregation took {$loopTook} seconds");
}, $interval);
}
$usageDB->collect(); protected function aggregateDatabase(UtopiaDatabase $database, callable $logError): void
{
$interval = (int) App::getEnv('_APP_USAGE_DATABASE_INTERVAL', '900'); // 15 minutes (by default)
$usage = new Database($database, $logError);
$aggregrator = new Aggregator($database, $logError);
$iterations++; Console::loop(function () use ($interval, $usage, $aggregrator) {
$now = date('d-m-Y H:i:s', time());
Console::info("[{$now}] Aggregating database usage every {$interval} seconds.");
$loopStart = microtime(true);
$usage->collect();
$aggregrator->collect();
$loopTook = microtime(true) - $loopStart; $loopTook = microtime(true) - $loopStart;
$now = date('d-m-Y H:i:s', time()); $now = date('d-m-Y H:i:s', time());
Console::info("[{$now}] Aggregation took {$loopTook} seconds"); Console::info("[{$now}] Aggregation took {$loopTook} seconds");
}, $interval); }, $interval);
} }
public function action(string $type)
{
global $register;
Console::title('Usage Aggregation V1');
Console::success(APP_NAME . ' usage aggregation process v1 has started');
$database = $this->getDatabase($register, '_console');
$influxDB = $this->getInfluxDB($register);
switch ($type) {
case 'timeseries':
$this->aggregateTimeseries($database, $influxDB, $this->logError);
break;
case 'database':
$this->aggregateDatabase($database, $this->logError);
break;
default:
Console::error("Unsupported usage aggregation type");
}
}
} }