refactor to use db pools
This commit is contained in:
parent
982a449143
commit
546d8ba583
|
@ -3,13 +3,11 @@
|
||||||
global $cli, $register;
|
global $cli, $register;
|
||||||
|
|
||||||
use Appwrite\Usage\Calculators\TimeSeries;
|
use Appwrite\Usage\Calculators\TimeSeries;
|
||||||
use InfluxDB\Database as InfluxDatabase;
|
|
||||||
use Utopia\App;
|
use Utopia\App;
|
||||||
use Utopia\CLI\Console;
|
use Utopia\CLI\Console;
|
||||||
use Utopia\Database\Database as UtopiaDatabase;
|
use Utopia\Database\Document;
|
||||||
use Utopia\Database\Validator\Authorization;
|
use Utopia\Database\Validator\Authorization;
|
||||||
use Utopia\Logger\Log;
|
use Utopia\Logger\Log;
|
||||||
use Utopia\Validator\WhiteList;
|
|
||||||
|
|
||||||
Authorization::disable();
|
Authorization::disable();
|
||||||
Authorization::setDefaultStatus(false);
|
Authorization::setDefaultStatus(false);
|
||||||
|
@ -51,15 +49,16 @@ $logError = function (Throwable $error, string $action = 'syncUsageStats') use (
|
||||||
$cli
|
$cli
|
||||||
->task('usage')
|
->task('usage')
|
||||||
->desc('Schedules syncing data from influxdb to Appwrite console db')
|
->desc('Schedules syncing data from influxdb to Appwrite console db')
|
||||||
->action(function () use ($logError) {
|
->action(function () use ($register, $logError) {
|
||||||
Console::title('Usage Aggregation V1');
|
Console::title('Usage Aggregation V1');
|
||||||
Console::success(APP_NAME . ' usage aggregation process v1 has started');
|
Console::success(APP_NAME . ' usage aggregation process v1 has started');
|
||||||
|
|
||||||
$database = getConsoleDB();
|
$database = getConsoleDB();
|
||||||
$influxDB = getInfluxDB();
|
$influxDB = getInfluxDB();
|
||||||
|
$getProjectDB = fn (Document $project) => getProjectDB($project);
|
||||||
|
|
||||||
$interval = (int) App::getEnv('_APP_USAGE_AGGREGATION_INTERVAL', '30'); // 30 seconds (by default)
|
$interval = (int) App::getEnv('_APP_USAGE_AGGREGATION_INTERVAL', '30'); // 30 seconds (by default)
|
||||||
$usage = new TimeSeries($database, $influxDB, $logError);
|
$usage = new TimeSeries($database, $influxDB, $getProjectDB, $register, $logError);
|
||||||
|
|
||||||
Console::loop(function () use ($interval, $usage) {
|
Console::loop(function () use ($interval, $usage) {
|
||||||
$now = date('d-m-Y H:i:s', time());
|
$now = date('d-m-Y H:i:s', time());
|
||||||
|
|
|
@ -7,6 +7,7 @@ use Utopia\Database\Database;
|
||||||
use Utopia\Database\Document;
|
use Utopia\Database\Document;
|
||||||
use InfluxDB\Database as InfluxDatabase;
|
use InfluxDB\Database as InfluxDatabase;
|
||||||
use DateTime;
|
use DateTime;
|
||||||
|
use Utopia\Registry\Registry;
|
||||||
|
|
||||||
class TimeSeries extends Calculator
|
class TimeSeries extends Calculator
|
||||||
{
|
{
|
||||||
|
@ -31,6 +32,20 @@ class TimeSeries extends Calculator
|
||||||
*/
|
*/
|
||||||
protected $errorHandler;
|
protected $errorHandler;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Callback to get project DB
|
||||||
|
*
|
||||||
|
* @var callable
|
||||||
|
*/
|
||||||
|
protected mixed $getProjectDB;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Registry
|
||||||
|
*
|
||||||
|
* @var Registry
|
||||||
|
*/
|
||||||
|
protected Registry $register;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Latest times for metric that was synced to the database
|
* Latest times for metric that was synced to the database
|
||||||
*
|
*
|
||||||
|
@ -381,10 +396,12 @@ class TimeSeries extends Calculator
|
||||||
]
|
]
|
||||||
];
|
];
|
||||||
|
|
||||||
public function __construct(Database $database, InfluxDatabase $influxDB, callable $errorHandler = null)
|
public function __construct(Database $database, InfluxDatabase $influxDB, callable $getProjectDB, Registry $register, callable $errorHandler = null)
|
||||||
{
|
{
|
||||||
$this->database = $database;
|
$this->database = $database;
|
||||||
$this->influxDB = $influxDB;
|
$this->influxDB = $influxDB;
|
||||||
|
$this->getProjectDB = $getProjectDB;
|
||||||
|
$this->register = $register;
|
||||||
$this->errorHandler = $errorHandler;
|
$this->errorHandler = $errorHandler;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -404,12 +421,13 @@ class TimeSeries extends Calculator
|
||||||
private function createOrUpdateMetric(string $projectId, string $time, string $period, string $metric, int $value, int $type): void
|
private function createOrUpdateMetric(string $projectId, string $time, string $period, string $metric, int $value, int $type): void
|
||||||
{
|
{
|
||||||
$id = \md5("{$time}_{$period}_{$metric}");
|
$id = \md5("{$time}_{$period}_{$metric}");
|
||||||
$this->database->setNamespace('_' . $projectId);
|
$project = $this->database->getDocument('projects', $projectId);
|
||||||
|
$database = call_user_func($this->getProjectDB, $project);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
$document = $this->database->getDocument('stats', $id);
|
$document = $database->getDocument('stats', $id);
|
||||||
if ($document->isEmpty()) {
|
if ($document->isEmpty()) {
|
||||||
$this->database->createDocument('stats', new Document([
|
$database->createDocument('stats', new Document([
|
||||||
'$id' => $id,
|
'$id' => $id,
|
||||||
'period' => $period,
|
'period' => $period,
|
||||||
'time' => $time,
|
'time' => $time,
|
||||||
|
@ -418,7 +436,7 @@ class TimeSeries extends Calculator
|
||||||
'type' => $type,
|
'type' => $type,
|
||||||
]));
|
]));
|
||||||
} else {
|
} else {
|
||||||
$this->database->updateDocument(
|
$database->updateDocument(
|
||||||
'stats',
|
'stats',
|
||||||
$document->getId(),
|
$document->getId(),
|
||||||
$document->setAttribute('value', $value)
|
$document->setAttribute('value', $value)
|
||||||
|
@ -431,6 +449,8 @@ class TimeSeries extends Calculator
|
||||||
throw $e;
|
throw $e;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
$this->register->get('pools')->reclaim();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -466,7 +486,7 @@ class TimeSeries extends Calculator
|
||||||
$query .= "WHERE \"time\" > '{$start}' ";
|
$query .= "WHERE \"time\" > '{$start}' ";
|
||||||
$query .= "AND \"time\" < '{$end}' ";
|
$query .= "AND \"time\" < '{$end}' ";
|
||||||
$query .= "AND \"metric_type\"='counter' {$filters} ";
|
$query .= "AND \"metric_type\"='counter' {$filters} ";
|
||||||
$query .= "GROUP BY time({$period['key']}), \"projectId\", \"projectInternalId\" {$groupBy} ";
|
$query .= "GROUP BY time({$period['key']}), \"projectId\" {$groupBy} ";
|
||||||
$query .= "FILL(null)";
|
$query .= "FILL(null)";
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
@ -490,7 +510,7 @@ class TimeSeries extends Calculator
|
||||||
$value = (!empty($point['value'])) ? $point['value'] : 0;
|
$value = (!empty($point['value'])) ? $point['value'] : 0;
|
||||||
|
|
||||||
$this->createOrUpdateMetric(
|
$this->createOrUpdateMetric(
|
||||||
$point['projectInternalId'],
|
$point['projectId'],
|
||||||
$point['time'],
|
$point['time'],
|
||||||
$period['key'],
|
$period['key'],
|
||||||
$metricUpdated,
|
$metricUpdated,
|
||||||
|
|
Loading…
Reference in a new issue