dependency injection in usage task
This commit is contained in:
parent
85794beb95
commit
30a541a5c9
3 changed files with 35 additions and 69 deletions
23
app/cli.php
23
app/cli.php
|
@ -50,6 +50,29 @@ CLI::setResource('dbForConsole', function ($db, $cache) {
|
|||
return $database;
|
||||
}, ['db', 'cache']);
|
||||
|
||||
/** @var InfluxDB\Client $client */
|
||||
$client = $register->get('influxdb');
|
||||
$attempts = 0;
|
||||
$max = 10;
|
||||
$sleep = 1;
|
||||
|
||||
do { // check if telegraf database is ready
|
||||
try {
|
||||
$attempts++;
|
||||
$database = $client->selectDB('telegraf');
|
||||
if (in_array('telegraf', $client->listDatabases())) {
|
||||
break; // leave the do-while if successful
|
||||
}
|
||||
} catch (\Throwable$th) {
|
||||
Console::warning("InfluxDB not ready. Retrying connection ({$attempts})...");
|
||||
if ($attempts >= $max) {
|
||||
throw new \Exception('InfluxDB database not ready yet');
|
||||
}
|
||||
sleep($sleep);
|
||||
}
|
||||
} while ($attempts < $max);
|
||||
CLI::setResource('influxdb', fn() => $database);
|
||||
|
||||
$cliPlatform = new Tasks();
|
||||
$cliPlatform->init(Service::TYPE_CLI);
|
||||
|
||||
|
|
|
@ -12,6 +12,7 @@ use Utopia\CLI\Console;
|
|||
use Utopia\Database\Database as UtopiaDatabase;
|
||||
use Utopia\Validator\WhiteList;
|
||||
use Throwable;
|
||||
use Utopia\Registry\Registry;
|
||||
|
||||
class Usage extends Task
|
||||
{
|
||||
|
@ -23,9 +24,12 @@ class Usage extends Task
|
|||
public function __construct()
|
||||
{
|
||||
$this
|
||||
->param('type', 'timeseries', new WhiteList(['timeseries', 'database']))
|
||||
->desc('Schedules syncing data from influxdb to Appwrite console db')
|
||||
->callback(fn ($type) => $this->action($type));
|
||||
->param('type', 'timeseries', new WhiteList(['timeseries', 'database']))
|
||||
->inject('dbForConsole')
|
||||
->inject('influxdb')
|
||||
->inject('register')
|
||||
->callback(fn ($type, $dbForConsole, $influxDB, $register) => $this->action($type, $dbForConsole, $influxDB, $register));
|
||||
}
|
||||
|
||||
|
||||
|
@ -66,22 +70,19 @@ class Usage extends Task
|
|||
}, $interval);
|
||||
}
|
||||
|
||||
public function action(string $type)
|
||||
public function action(string $type, UtopiaDatabase $dbForConsole, InfluxDatabase $influxDB, Registry $register)
|
||||
{
|
||||
global $register;
|
||||
Console::title('Usage Aggregation V1');
|
||||
Console::success(APP_NAME . ' usage aggregation process v1 has started');
|
||||
|
||||
$database = $this->getDatabase($register, '_console');
|
||||
$influxDB = $this->getInfluxDB($register);
|
||||
$logError = fn(Throwable $error, string $action = 'syncUsageStats') => $this->logError($register, $error, $action);
|
||||
$logError = fn(Throwable $error, string $action = 'syncUsageStats') => $this->logError($register, $error, "usage", $action);
|
||||
|
||||
switch ($type) {
|
||||
case 'timeseries':
|
||||
$this->aggregateTimeseries($database, $influxDB, $logError);
|
||||
$this->aggregateTimeseries($dbForConsole, $influxDB, $logError);
|
||||
break;
|
||||
case 'database':
|
||||
$this->aggregateDatabase($database, $logError);
|
||||
$this->aggregateDatabase($dbForConsole, $logError);
|
||||
break;
|
||||
default:
|
||||
Console::error("Unsupported usage aggregation type");
|
||||
|
|
|
@ -17,7 +17,7 @@ use Throwable;
|
|||
|
||||
abstract class Task extends Action
|
||||
{
|
||||
protected function logError(Registry $register, Throwable $error, string $action = 'syncUsageStats')
|
||||
protected function logError(Registry $register, Throwable $error, string $namespace, string $action)
|
||||
{
|
||||
$logger = $register->get('logger');
|
||||
|
||||
|
@ -25,7 +25,7 @@ abstract class Task extends Action
|
|||
$version = App::getEnv('_APP_VERSION', 'UNKNOWN');
|
||||
|
||||
$log = new Log();
|
||||
$log->setNamespace("usage");
|
||||
$log->setNamespace($namespace);
|
||||
$log->setServer(\gethostname());
|
||||
$log->setVersion($version);
|
||||
$log->setType(Log::TYPE_ERROR);
|
||||
|
@ -51,62 +51,4 @@ abstract class Task extends Action
|
|||
Console::warning("Failed: {$error->getMessage()}");
|
||||
Console::warning($error->getTraceAsString());
|
||||
}
|
||||
|
||||
protected function getDatabase(Registry &$register, string $namespace): Database
|
||||
{
|
||||
$attempts = 0;
|
||||
|
||||
do {
|
||||
try {
|
||||
$attempts++;
|
||||
|
||||
$db = $register->get('db');
|
||||
$redis = $register->get('cache');
|
||||
|
||||
$cache = new Cache(new RedisCache($redis));
|
||||
$database = new Database(new MariaDB($db), $cache);
|
||||
$database->setDefaultDatabase(App::getEnv('_APP_DB_SCHEMA', 'appwrite'));
|
||||
$database->setNamespace($namespace);
|
||||
|
||||
if (!$database->exists($database->getDefaultDatabase(), 'projects')) {
|
||||
throw new Exception('Projects collection not ready');
|
||||
}
|
||||
break; // leave loop if successful
|
||||
} catch (\Exception$e) {
|
||||
Console::warning("Database not ready. Retrying connection ({$attempts})...");
|
||||
if ($attempts >= DATABASE_RECONNECT_MAX_ATTEMPTS) {
|
||||
throw new \Exception('Failed to connect to database: ' . $e->getMessage());
|
||||
}
|
||||
sleep(DATABASE_RECONNECT_SLEEP);
|
||||
}
|
||||
} while ($attempts < DATABASE_RECONNECT_MAX_ATTEMPTS);
|
||||
|
||||
return $database;
|
||||
}
|
||||
|
||||
protected function getInfluxDB(Registry &$register): InfluxDatabase
|
||||
{
|
||||
/** @var InfluxDB\Client $client */
|
||||
$client = $register->get('influxdb');
|
||||
$attempts = 0;
|
||||
$max = 10;
|
||||
$sleep = 1;
|
||||
|
||||
do { // check if telegraf database is ready
|
||||
try {
|
||||
$attempts++;
|
||||
$database = $client->selectDB('telegraf');
|
||||
if (in_array('telegraf', $client->listDatabases())) {
|
||||
break; // leave the do-while if successful
|
||||
}
|
||||
} catch (\Throwable$th) {
|
||||
Console::warning("InfluxDB not ready. Retrying connection ({$attempts})...");
|
||||
if ($attempts >= $max) {
|
||||
throw new \Exception('InfluxDB database not ready yet');
|
||||
}
|
||||
sleep($sleep);
|
||||
}
|
||||
} while ($attempts < $max);
|
||||
return $database;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue