Merge pull request #4616 from appwrite/feat-usage-refactor-fo-db-pools
Feat usage refactor for db pools
This commit is contained in:
commit
e7a51fac74
11 changed files with 62 additions and 65 deletions
2
.env
2
.env
|
@ -65,9 +65,9 @@ _APP_MAINTENANCE_RETENTION_EXECUTION=1209600
|
|||
_APP_MAINTENANCE_RETENTION_ABUSE=86400
|
||||
_APP_MAINTENANCE_RETENTION_AUDIT=1209600
|
||||
_APP_MAINTENANCE_RETENTION_SCHEDULES=86400
|
||||
_APP_USAGE_TIMESERIES_INTERVAL=20
|
||||
_APP_MAINTENANCE_RETENTION_USAGE_HOURLY=8640000
|
||||
_APP_USAGE_STATS=enabled
|
||||
_APP_USAGE_AGGREGATION_INTERVAL=30
|
||||
_APP_LOGGING_PROVIDER=
|
||||
_APP_LOGGING_CONFIG=
|
||||
_APP_REGION=default
|
||||
|
|
1
.github/workflows/tests.yml
vendored
1
.github/workflows/tests.yml
vendored
|
@ -33,6 +33,7 @@ jobs:
|
|||
docker compose build appwrite
|
||||
docker compose up -d
|
||||
sleep 30
|
||||
|
||||
- name: Doctor
|
||||
run: docker compose exec -T appwrite doctor
|
||||
|
||||
|
|
|
@ -938,6 +938,15 @@ return [
|
|||
'question' => '',
|
||||
'filter' => ''
|
||||
],
|
||||
[
|
||||
'name' => '_APP_MAINTENANCE_RETENTION_USAGE_HOURLY',
|
||||
'description' => 'The maximum duration (in seconds) upto which to retain hourly usage metrics. The default value is 8640000 seconds (100 days).',
|
||||
'introduction' => '',
|
||||
'default' => '8640000',
|
||||
'required' => false,
|
||||
'question' => '',
|
||||
'filter' => ''
|
||||
],
|
||||
[
|
||||
'name' => '_APP_MAINTENANCE_RETENTION_SCHEDULES',
|
||||
'description' => 'Schedules deletion interval ( in seconds ) ',
|
||||
|
|
|
@ -552,10 +552,10 @@ $register->set('pools', function () {
|
|||
],
|
||||
];
|
||||
|
||||
$maxConnections = App::getenv('_APP_CONNECTIONS_MAX', 151);
|
||||
$instanceConnections = $maxConnections / App::getenv('_APP_POOL_CLIENTS', 14);
|
||||
$maxConnections = App::getEnv('_APP_CONNECTIONS_MAX', 151);
|
||||
$instanceConnections = $maxConnections / App::getEnv('_APP_POOL_CLIENTS', 14);
|
||||
|
||||
$multiprocessing = App::getenv('_APP_SERVER_MULTIPROCESS', 'disabled') === 'enabled';
|
||||
$multiprocessing = App::getEnv('_APP_SERVER_MULTIPROCESS', 'disabled') === 'enabled';
|
||||
|
||||
if ($multiprocessing) {
|
||||
$workerCount = swoole_cpu_num() * intval(App::getEnv('_APP_WORKER_PER_CORE', 6));
|
||||
|
|
|
@ -139,6 +139,7 @@ services:
|
|||
- _APP_MAINTENANCE_RETENTION_CACHE
|
||||
- _APP_MAINTENANCE_RETENTION_ABUSE
|
||||
- _APP_MAINTENANCE_RETENTION_AUDIT
|
||||
- _APP_MAINTENANCE_RETENTION_USAGE_HOURLY
|
||||
- _APP_MAINTENANCE_RETENTION_SCHEDULES
|
||||
- _APP_SMS_PROVIDER
|
||||
- _APP_SMS_FROM
|
||||
|
@ -483,44 +484,13 @@ services:
|
|||
- _APP_DB_SCHEMA
|
||||
- _APP_DB_USER
|
||||
- _APP_DB_PASS
|
||||
- _APP_USAGE_AGGREGATION_INTERVAL
|
||||
- _APP_REDIS_HOST
|
||||
- _APP_REDIS_PORT
|
||||
- _APP_REDIS_USER
|
||||
- _APP_REDIS_PASS
|
||||
- _APP_INFLUXDB_HOST
|
||||
- _APP_INFLUXDB_PORT
|
||||
- _APP_USAGE_TIMESERIES_INTERVAL
|
||||
- _APP_LOGGING_PROVIDER
|
||||
- _APP_LOGGING_CONFIG
|
||||
|
||||
appwrite-usage-database:
|
||||
image: <?php echo $organization; ?>/<?php echo $image; ?>:<?php echo $version."\n"; ?>
|
||||
entrypoint:
|
||||
- usage
|
||||
- --type=database
|
||||
container_name: appwrite-usage-database
|
||||
<<: *x-logging
|
||||
restart: unless-stopped
|
||||
networks:
|
||||
- appwrite
|
||||
depends_on:
|
||||
- influxdb
|
||||
- mariadb
|
||||
environment:
|
||||
- _APP_ENV
|
||||
- _APP_OPENSSL_KEY_V1
|
||||
- _APP_DB_HOST
|
||||
- _APP_DB_PORT
|
||||
- _APP_DB_SCHEMA
|
||||
- _APP_DB_USER
|
||||
- _APP_DB_PASS
|
||||
- _APP_REDIS_HOST
|
||||
- _APP_REDIS_PORT
|
||||
- _APP_REDIS_USER
|
||||
- _APP_REDIS_PASS
|
||||
- _APP_INFLUXDB_HOST
|
||||
- _APP_INFLUXDB_PORT
|
||||
- _APP_USAGE_TIMESERIES_INTERVAL
|
||||
- _APP_LOGGING_PROVIDER
|
||||
- _APP_LOGGING_CONFIG
|
||||
|
||||
|
|
|
@ -260,6 +260,7 @@ class DeletesV1 extends Worker
|
|||
{
|
||||
$this->deleteForProjectIds(function (Document $project) use ($hourlyUsageRetentionDatetime) {
|
||||
$dbForProject = $this->getProjectDB($project);
|
||||
// Delete Usage stats
|
||||
$this->deleteByGroup('stats', [
|
||||
Query::lessThan('time', $hourlyUsageRetentionDatetime),
|
||||
Query::equal('period', ['1h']),
|
||||
|
|
2
composer.lock
generated
2
composer.lock
generated
|
@ -4,7 +4,7 @@
|
|||
"Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies",
|
||||
"This file is @generated automatically"
|
||||
],
|
||||
"content-hash": "4893e1c13630239fe6a20d1c652eb484",
|
||||
"content-hash": "7a4830071d4d0c427adc32da23ed1856",
|
||||
"packages": [
|
||||
{
|
||||
"name": "adhocore/jwt",
|
||||
|
|
|
@ -93,6 +93,7 @@ services:
|
|||
- ./public:/usr/src/code/public
|
||||
- ./src:/usr/src/code/src
|
||||
- ./dev:/usr/local/dev
|
||||
|
||||
depends_on:
|
||||
- mariadb
|
||||
- redis
|
||||
|
@ -586,6 +587,7 @@ services:
|
|||
- _APP_MAINTENANCE_RETENTION_CACHE
|
||||
- _APP_MAINTENANCE_RETENTION_ABUSE
|
||||
- _APP_MAINTENANCE_RETENTION_AUDIT
|
||||
- _APP_MAINTENANCE_RETENTION_USAGE_HOURLY
|
||||
- _APP_MAINTENANCE_RETENTION_SCHEDULES
|
||||
|
||||
appwrite-usage:
|
||||
|
@ -615,6 +617,7 @@ services:
|
|||
- _APP_DB_PASS
|
||||
- _APP_INFLUXDB_HOST
|
||||
- _APP_INFLUXDB_PORT
|
||||
- _APP_USAGE_AGGREGATION_INTERVAL
|
||||
- _APP_REDIS_HOST
|
||||
- _APP_REDIS_PORT
|
||||
- _APP_REDIS_USER
|
||||
|
@ -622,7 +625,6 @@ services:
|
|||
- _APP_CONNECTIONS_DB_CONSOLE
|
||||
- _APP_CONNECTIONS_DB_PROJECT
|
||||
- _APP_CONNECTIONS_CACHE
|
||||
- _APP_USAGE_TIMESERIES_INTERVAL
|
||||
- _APP_LOGGING_PROVIDER
|
||||
- _APP_LOGGING_CONFIG
|
||||
|
||||
|
|
|
@ -136,7 +136,7 @@ class Maintenance extends Action
|
|||
$cacheRetention = (int) App::getEnv('_APP_MAINTENANCE_RETENTION_CACHE', '2592000'); // 30 days
|
||||
$schedulesDeletionRetention = (int) App::getEnv('_APP_MAINTENANCE_RETENTION_SCHEDULES', '86400'); // 1 Day
|
||||
|
||||
Console::loop(function () use ($interval, $executionLogsRetention, $abuseLogsRetention, $auditLogRetention, $usageStatsRetentionHourly, $cacheRetention, $schedulesDeletionRetention, $dbForConsole) {
|
||||
Console::loop(function () use ($interval, $executionLogsRetention, $abuseLogsRetention, $auditLogRetention, $cacheRetention, $schedulesDeletionRetention, $usageStatsRetentionHourly, $dbForConsole) {
|
||||
$time = DateTime::now();
|
||||
|
||||
Console::info("[{$time}] Notifying workers with maintenance tasks every {$interval} seconds");
|
||||
|
|
|
@ -7,9 +7,9 @@ use InfluxDB\Database as InfluxDatabase;
|
|||
use Utopia\App;
|
||||
use Utopia\CLI\Console;
|
||||
use Utopia\Database\Database as UtopiaDatabase;
|
||||
use Utopia\Validator\WhiteList;
|
||||
use Throwable;
|
||||
use Utopia\Platform\Action;
|
||||
use Utopia\Registry\Registry;
|
||||
|
||||
class Usage extends Action
|
||||
{
|
||||
|
@ -22,18 +22,28 @@ class Usage extends Action
|
|||
{
|
||||
$this
|
||||
->desc('Schedules syncing data from influxdb to Appwrite console db')
|
||||
->param('type', 'timeseries', new WhiteList(['timeseries', 'database']))
|
||||
->inject('dbForConsole')
|
||||
->inject('influxdb')
|
||||
->inject('register')
|
||||
->inject('getProjectDB')
|
||||
->inject('logError')
|
||||
->callback(fn ($type, $dbForConsole, $influxDB, $logError) => $this->action($type, $dbForConsole, $influxDB, $logError));
|
||||
->callback(fn ($dbForConsole, $influxDB, $register, $getProjectDB, $logError) => $this->action($dbForConsole, $influxDB, $register, $getProjectDB, $logError));
|
||||
}
|
||||
|
||||
protected function aggregateTimeseries(UtopiaDatabase $database, InfluxDatabase $influxDB, callable $logError): void
|
||||
{
|
||||
$interval = (int) App::getEnv('_APP_USAGE_TIMESERIES_INTERVAL', '30'); // 30 seconds (by default)
|
||||
}
|
||||
|
||||
public function action(UtopiaDatabase $dbForConsole, InfluxDatabase $influxDB, Registry $register, callable $getProjectDB, callable $logError)
|
||||
{
|
||||
Console::title('Usage Aggregation V1');
|
||||
Console::success(APP_NAME . ' usage aggregation process v1 has started');
|
||||
|
||||
$errorLogger = fn(Throwable $error, string $action = 'syncUsageStats') => $logError($error, "usage", $action);
|
||||
|
||||
$interval = (int) App::getEnv('_APP_USAGE_AGGREGATION_INTERVAL', '30'); // 30 seconds (by default)
|
||||
$region = App::getEnv('region', 'default');
|
||||
$usage = new TimeSeries($region, $database, $influxDB, $logError);
|
||||
$usage = new TimeSeries($region, $dbForConsole, $influxDB, $getProjectDB, $register, $errorLogger);
|
||||
|
||||
Console::loop(function () use ($interval, $usage) {
|
||||
$now = date('d-m-Y H:i:s', time());
|
||||
|
@ -47,20 +57,4 @@ class Usage extends Action
|
|||
Console::info("[{$now}] Aggregation took {$loopTook} seconds");
|
||||
}, $interval);
|
||||
}
|
||||
|
||||
public function action(string $type, UtopiaDatabase $dbForConsole, InfluxDatabase $influxDB, callable $logError)
|
||||
{
|
||||
Console::title('Usage Aggregation V1');
|
||||
Console::success(APP_NAME . ' usage aggregation process v1 has started');
|
||||
|
||||
$errorLogger = fn(Throwable $error, string $action = 'syncUsageStats') => $logError($error, "usage", $action);
|
||||
|
||||
switch ($type) {
|
||||
case 'timeseries':
|
||||
$this->aggregateTimeseries($dbForConsole, $influxDB, $errorLogger);
|
||||
break;
|
||||
default:
|
||||
Console::error("Unsupported usage aggregation type");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -8,6 +8,7 @@ use Utopia\Database\Database;
|
|||
use Utopia\Database\Document;
|
||||
use InfluxDB\Database as InfluxDatabase;
|
||||
use DateTime;
|
||||
use Utopia\Registry\Registry;
|
||||
|
||||
class TimeSeries extends Calculator
|
||||
{
|
||||
|
@ -32,6 +33,20 @@ class TimeSeries extends Calculator
|
|||
*/
|
||||
protected $errorHandler;
|
||||
|
||||
/**
|
||||
* Callback to get project DB
|
||||
*
|
||||
* @var callable
|
||||
*/
|
||||
protected mixed $getProjectDB;
|
||||
|
||||
/**
|
||||
* Registry
|
||||
*
|
||||
* @var Registry
|
||||
*/
|
||||
protected Registry $register;
|
||||
|
||||
/**
|
||||
* Latest times for metric that was synced to the database
|
||||
*
|
||||
|
@ -382,11 +397,13 @@ class TimeSeries extends Calculator
|
|||
]
|
||||
];
|
||||
|
||||
public function __construct(string $region, Database $database, InfluxDatabase $influxDB, callable $errorHandler = null)
|
||||
public function __construct(string $region, Database $database, InfluxDatabase $influxDB, callable $getProjectDB, Registry $register, callable $errorHandler = null)
|
||||
{
|
||||
parent::__construct($region);
|
||||
$this->database = $database;
|
||||
$this->influxDB = $influxDB;
|
||||
$this->getProjectDB = $getProjectDB;
|
||||
$this->register = $register;
|
||||
$this->errorHandler = $errorHandler;
|
||||
}
|
||||
|
||||
|
@ -406,12 +423,13 @@ class TimeSeries extends Calculator
|
|||
private function createOrUpdateMetric(string $projectId, string $time, string $period, string $metric, int $value, int $type): void
|
||||
{
|
||||
$id = \md5("{$time}_{$period}_{$metric}");
|
||||
$this->database->setNamespace('_' . $projectId);
|
||||
$project = $this->database->getDocument('projects', $projectId);
|
||||
$database = call_user_func($this->getProjectDB, $project);
|
||||
|
||||
try {
|
||||
$document = $this->database->getDocument('stats', $id);
|
||||
$document = $database->getDocument('stats', $id);
|
||||
if ($document->isEmpty()) {
|
||||
$this->database->createDocument('stats', new Document([
|
||||
$database->createDocument('stats', new Document([
|
||||
'$id' => $id,
|
||||
'period' => $period,
|
||||
'time' => $time,
|
||||
|
@ -421,7 +439,7 @@ class TimeSeries extends Calculator
|
|||
'region' => $this->region,
|
||||
]));
|
||||
} else {
|
||||
$this->database->updateDocument(
|
||||
$database->updateDocument(
|
||||
'stats',
|
||||
$document->getId(),
|
||||
$document->setAttribute('value', $value)
|
||||
|
@ -434,6 +452,8 @@ class TimeSeries extends Calculator
|
|||
throw $e;
|
||||
}
|
||||
}
|
||||
|
||||
$this->register->get('pools')->reclaim();
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
Loading…
Reference in a new issue