Merge pull request #7326 from appwrite/refactor-usage-worker-sn
Refactor usage worker sn
This commit is contained in:
commit
a8af6fde8d
5 changed files with 90 additions and 120 deletions
2
.env
2
.env
|
@ -75,7 +75,7 @@ _APP_MAINTENANCE_RETENTION_CACHE=2592000
|
|||
_APP_MAINTENANCE_RETENTION_EXECUTION=1209600
|
||||
_APP_MAINTENANCE_RETENTION_ABUSE=86400
|
||||
_APP_MAINTENANCE_RETENTION_AUDIT=1209600
|
||||
_APP_USAGE_AGGREGATION_INTERVAL=5
|
||||
_APP_USAGE_AGGREGATION_INTERVAL=60000
|
||||
_APP_MAINTENANCE_RETENTION_USAGE_HOURLY=8640000
|
||||
_APP_MAINTENANCE_RETENTION_SCHEDULES=86400
|
||||
_APP_USAGE_STATS=enabled
|
||||
|
|
|
@ -1,3 +0,0 @@
|
|||
#!/bin/sh
|
||||
|
||||
php /usr/src/code/app/cli.php usage $@
|
|
@ -673,6 +673,7 @@ services:
|
|||
- _APP_USAGE_STATS
|
||||
- _APP_LOGGING_PROVIDER
|
||||
- _APP_LOGGING_CONFIG
|
||||
- _APP_USAGE_AGGREGATION_INTERVAL
|
||||
|
||||
appwrite-schedule:
|
||||
entrypoint: schedule
|
||||
|
|
|
@ -32,23 +32,22 @@ class Usage extends Action
|
|||
{
|
||||
|
||||
$this
|
||||
->desc('Usage worker')
|
||||
->inject('message')
|
||||
->inject('pools')
|
||||
->inject('cache')
|
||||
->callback(function ($message, $pools, $cache) {
|
||||
$this->action($message, $pools, $cache);
|
||||
});
|
||||
->desc('Usage worker')
|
||||
->inject('message')
|
||||
->inject('getProjectDB')
|
||||
->callback(function (Message $message, callable $getProjectDB) {
|
||||
$this->action($message, $getProjectDB);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* @param Message $message
|
||||
* @param $pools
|
||||
* @param $cache
|
||||
* @param callable $getProjectDB
|
||||
* @return void
|
||||
* @throws \Utopia\Database\Exception
|
||||
* @throws Exception
|
||||
*/
|
||||
public function action(Message $message, $pools, $cache): void
|
||||
public function action(Message $message, callable $getProjectDB): void
|
||||
{
|
||||
$payload = $message->getPayload() ?? [];
|
||||
|
||||
|
@ -65,16 +64,14 @@ class Usage extends Action
|
|||
}
|
||||
|
||||
$this->reduce(
|
||||
database: $project->getAttribute('database'),
|
||||
projectInternalId: $project->getInternalId(),
|
||||
project: $project,
|
||||
document: new Document($document),
|
||||
metrics: $payload['metrics'],
|
||||
pools: $pools,
|
||||
cache: $cache
|
||||
getProjectDB: $getProjectDB
|
||||
);
|
||||
}
|
||||
|
||||
self::$stats[$projectId]['database'] = $project->getAttribute('database');
|
||||
self::$stats[$projectId]['project'] = $project;
|
||||
foreach ($payload['metrics'] ?? [] as $metric) {
|
||||
if (!isset(self::$stats[$projectId]['keys'][$metric['key']])) {
|
||||
self::$stats[$projectId]['keys'][$metric['key']] = $metric['value'];
|
||||
|
@ -88,35 +85,25 @@ class Usage extends Action
|
|||
/**
|
||||
* On Documents that tied by relations like functions>deployments>build || documents>collection>database || buckets>files.
|
||||
* When we remove a parent document we need to deduct his children aggregation from the project scope.
|
||||
|
||||
* @param $database
|
||||
* @param $projectInternalId
|
||||
* @param Document $project
|
||||
* @param Document $document
|
||||
* @param array $metrics
|
||||
* @param $pools
|
||||
* @param $cache
|
||||
* @param callable $getProjectDB
|
||||
* @return void
|
||||
*/
|
||||
private function reduce($database, $projectInternalId, Document $document, array &$metrics, $pools, $cache)
|
||||
private function reduce(Document $project, Document $document, array &$metrics, callable $getProjectDB): void
|
||||
{
|
||||
|
||||
$dbForProject = $getProjectDB($project);
|
||||
|
||||
try {
|
||||
$dbForProject = new Database(
|
||||
$pools
|
||||
->get($database)
|
||||
->pop()
|
||||
->getResource(),
|
||||
$cache
|
||||
);
|
||||
|
||||
$dbForProject->setNamespace('_' . $projectInternalId);
|
||||
|
||||
switch (true) {
|
||||
case $document->getCollection() === 'users': // users
|
||||
$sessions = count($document->getAttribute(METRIC_SESSIONS, 0));
|
||||
if (!empty($sessions)) {
|
||||
$metrics[] = [
|
||||
'key' => METRIC_SESSIONS,
|
||||
'value' => ($sessions * -1),
|
||||
'key' => METRIC_SESSIONS,
|
||||
'value' => ($sessions * -1),
|
||||
];
|
||||
}
|
||||
break;
|
||||
|
@ -144,12 +131,12 @@ class Usage extends Action
|
|||
|
||||
if (!empty($documents['value'])) {
|
||||
$metrics[] = [
|
||||
'key' => METRIC_DOCUMENTS,
|
||||
'value' => ($documents['value'] * -1),
|
||||
'key' => METRIC_DOCUMENTS,
|
||||
'value' => ($documents['value'] * -1),
|
||||
];
|
||||
$metrics[] = [
|
||||
'key' => str_replace('{databaseInternalId}', $databaseInternalId, METRIC_DATABASE_ID_DOCUMENTS),
|
||||
'value' => ($documents['value'] * -1),
|
||||
'key' => str_replace('{databaseInternalId}', $databaseInternalId, METRIC_DATABASE_ID_DOCUMENTS),
|
||||
'value' => ($documents['value'] * -1),
|
||||
];
|
||||
}
|
||||
break;
|
||||
|
@ -160,15 +147,15 @@ class Usage extends Action
|
|||
|
||||
if (!empty($files['value'])) {
|
||||
$metrics[] = [
|
||||
'key' => METRIC_FILES,
|
||||
'value' => ($files['value'] * -1),
|
||||
'key' => METRIC_FILES,
|
||||
'value' => ($files['value'] * -1),
|
||||
];
|
||||
}
|
||||
|
||||
if (!empty($storage['value'])) {
|
||||
$metrics[] = [
|
||||
'key' => METRIC_FILES_STORAGE,
|
||||
'value' => ($storage['value'] * -1),
|
||||
'key' => METRIC_FILES_STORAGE,
|
||||
'value' => ($storage['value'] * -1),
|
||||
];
|
||||
}
|
||||
break;
|
||||
|
@ -176,58 +163,58 @@ class Usage extends Action
|
|||
case $document->getCollection() === 'functions':
|
||||
$deployments = $dbForProject->getDocument('stats_v2', md5(self::INFINITY_PERIOD . str_replace(['{resourceType}', '{resourceInternalId}'], ['functions', $document->getInternalId()], METRIC_FUNCTION_ID_DEPLOYMENTS)));
|
||||
$deploymentsStorage = $dbForProject->getDocument('stats_v2', md5(self::INFINITY_PERIOD . str_replace(['{resourceType}', '{resourceInternalId}'], ['functions', $document->getInternalId()], METRIC_FUNCTION_ID_DEPLOYMENTS_STORAGE)));
|
||||
$builds = $dbForProject->getDocument('stats_v2', md5(self::INFINITY_PERIOD . str_replace('{functionInternalId}', $document->getInternalId(), METRIC_FUNCTION_ID_BUILDS)));
|
||||
$builds = $dbForProject->getDocument('stats_v2', md5(self::INFINITY_PERIOD . str_replace('{functionInternalId}', $document->getInternalId(), METRIC_FUNCTION_ID_BUILDS)));
|
||||
$buildsStorage = $dbForProject->getDocument('stats_v2', md5(self::INFINITY_PERIOD . str_replace('{functionInternalId}', $document->getInternalId(), METRIC_FUNCTION_ID_BUILDS_STORAGE)));
|
||||
$buildsCompute = $dbForProject->getDocument('stats_v2', md5(self::INFINITY_PERIOD . str_replace('{functionInternalId}', $document->getInternalId(), METRIC_FUNCTION_ID_BUILDS_COMPUTE)));
|
||||
$executions = $dbForProject->getDocument('stats_v2', md5(self::INFINITY_PERIOD . str_replace('{functionInternalId}', $document->getInternalId(), METRIC_FUNCTION_ID_EXECUTIONS)));
|
||||
$executions = $dbForProject->getDocument('stats_v2', md5(self::INFINITY_PERIOD . str_replace('{functionInternalId}', $document->getInternalId(), METRIC_FUNCTION_ID_EXECUTIONS)));
|
||||
$executionsCompute = $dbForProject->getDocument('stats_v2', md5(self::INFINITY_PERIOD . str_replace('{functionInternalId}', $document->getInternalId(), METRIC_FUNCTION_ID_EXECUTIONS_COMPUTE)));
|
||||
|
||||
if (!empty($deployments['value'])) {
|
||||
$metrics[] = [
|
||||
'key' => METRIC_DEPLOYMENTS,
|
||||
'value' => ($deployments['value'] * -1),
|
||||
'key' => METRIC_DEPLOYMENTS,
|
||||
'value' => ($deployments['value'] * -1),
|
||||
];
|
||||
}
|
||||
|
||||
if (!empty($deploymentsStorage['value'])) {
|
||||
$metrics[] = [
|
||||
'key' => METRIC_DEPLOYMENTS_STORAGE,
|
||||
'value' => ($deploymentsStorage['value'] * -1),
|
||||
'key' => METRIC_DEPLOYMENTS_STORAGE,
|
||||
'value' => ($deploymentsStorage['value'] * -1),
|
||||
];
|
||||
}
|
||||
|
||||
if (!empty($builds['value'])) {
|
||||
$metrics[] = [
|
||||
'key' => METRIC_BUILDS,
|
||||
'value' => ($builds['value'] * -1),
|
||||
'key' => METRIC_BUILDS,
|
||||
'value' => ($builds['value'] * -1),
|
||||
];
|
||||
}
|
||||
|
||||
if (!empty($buildsStorage['value'])) {
|
||||
$metrics[] = [
|
||||
'key' => METRIC_BUILDS_STORAGE,
|
||||
'value' => ($buildsStorage['value'] * -1),
|
||||
'key' => METRIC_BUILDS_STORAGE,
|
||||
'value' => ($buildsStorage['value'] * -1),
|
||||
];
|
||||
}
|
||||
|
||||
if (!empty($buildsCompute['value'])) {
|
||||
$metrics[] = [
|
||||
'key' => METRIC_BUILDS_COMPUTE,
|
||||
'value' => ($buildsCompute['value'] * -1),
|
||||
'key' => METRIC_BUILDS_COMPUTE,
|
||||
'value' => ($buildsCompute['value'] * -1),
|
||||
];
|
||||
}
|
||||
|
||||
if (!empty($executions['value'])) {
|
||||
$metrics[] = [
|
||||
'key' => METRIC_EXECUTIONS,
|
||||
'value' => ($executions['value'] * -1),
|
||||
'key' => METRIC_EXECUTIONS,
|
||||
'value' => ($executions['value'] * -1),
|
||||
];
|
||||
}
|
||||
|
||||
if (!empty($executionsCompute['value'])) {
|
||||
$metrics[] = [
|
||||
'key' => METRIC_EXECUTIONS_COMPUTE,
|
||||
'value' => ($executionsCompute['value'] * -1),
|
||||
'key' => METRIC_EXECUTIONS_COMPUTE,
|
||||
'value' => ($executionsCompute['value'] * -1),
|
||||
];
|
||||
}
|
||||
break;
|
||||
|
@ -235,10 +222,7 @@ class Usage extends Action
|
|||
break;
|
||||
}
|
||||
} catch (\Exception $e) {
|
||||
console::error("[reducer] " . " {DateTime::now()} " . " {$projectInternalId} " . " {$e->getMessage()}");
|
||||
} catch (\Throwable $e) {
|
||||
} finally {
|
||||
$pools->reclaim();
|
||||
console::error("[reducer] " . " {DateTime::now()} " . " {$project->getInternalId()} " . " {$e->getMessage()}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -9,6 +9,7 @@ use Utopia\Database\Exception\Duplicate;
|
|||
use Utopia\Platform\Action;
|
||||
use Utopia\CLI\Console;
|
||||
use Swoole\Timer;
|
||||
use Utopia\Registry\Registry;
|
||||
|
||||
class UsageHook extends Usage
|
||||
{
|
||||
|
@ -23,83 +24,70 @@ class UsageHook extends Usage
|
|||
$this
|
||||
->setType(Action::TYPE_WORKER_START)
|
||||
->inject('register')
|
||||
->inject('cache')
|
||||
->inject('pools')
|
||||
->callback(function ($register, $cache, $pools) {
|
||||
$this->action($register, $cache, $pools);
|
||||
->inject('getProjectDB')
|
||||
->callback(function ($register, callable $getProjectDB) {
|
||||
$this->action($register, $getProjectDB);
|
||||
})
|
||||
;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param $register
|
||||
* @param $cache
|
||||
* @param $pools
|
||||
* @param $getProjectDB
|
||||
* @return void
|
||||
*/
|
||||
public function action($register, $cache, $pools): void
|
||||
public function action($register, $getProjectDB): void
|
||||
{
|
||||
Timer::tick(30000, function () use ($register, $cache, $pools) {
|
||||
|
||||
$offset = count(self::$stats);
|
||||
$projects = array_slice(self::$stats, 0, $offset, true);
|
||||
array_splice(self::$stats, 0, $offset);
|
||||
$interval = (int) App::getEnv('_APP_USAGE_AGGREGATION_INTERVAL', '60000');
|
||||
Timer::tick($interval, function () use ($register, $getProjectDB) {
|
||||
$offset = count(self::$stats);
|
||||
$projects = array_slice(self::$stats, 0, $offset, true);
|
||||
array_splice(self::$stats, 0, $offset);
|
||||
foreach ($projects as $data) {
|
||||
try {
|
||||
$dbForProject = $getProjectDB($data['project']);
|
||||
foreach ($data['keys'] ?? [] as $key => $value) {
|
||||
if ($value == 0) {
|
||||
continue;
|
||||
}
|
||||
|
||||
foreach ($projects as $projectInternalId => $project) {
|
||||
try {
|
||||
$dbForProject = new Database(
|
||||
$pools
|
||||
->get($project['database'])
|
||||
->pop()
|
||||
->getResource(),
|
||||
$cache
|
||||
);
|
||||
foreach ($this->periods as $period => $format) {
|
||||
$time = 'inf' === $period ? null : date($format, time());
|
||||
$id = \md5("{$time}_{$period}_{$key}");
|
||||
|
||||
$dbForProject->setNamespace('_' . $projectInternalId);
|
||||
|
||||
foreach ($project['keys'] ?? [] as $key => $value) {
|
||||
if ($value == 0) {
|
||||
continue;
|
||||
}
|
||||
|
||||
foreach ($this->periods as $period => $format) {
|
||||
$time = 'inf' === $period ? null : date($format, time());
|
||||
$id = \md5("{$time}_{$period}_{$key}");
|
||||
|
||||
try {
|
||||
$dbForProject->createDocument('stats_v2', new Document([
|
||||
try {
|
||||
$dbForProject->createDocument('stats_v2', new Document([
|
||||
'$id' => $id,
|
||||
'period' => $period,
|
||||
'time' => $time,
|
||||
'metric' => $key,
|
||||
'value' => $value,
|
||||
'region' => App::getEnv('_APP_REGION', 'default'),
|
||||
]));
|
||||
} catch (Duplicate $th) {
|
||||
if ($value < 0) {
|
||||
$dbForProject->decreaseDocumentAttribute(
|
||||
'stats_v2',
|
||||
$id,
|
||||
'value',
|
||||
abs($value)
|
||||
);
|
||||
} else {
|
||||
$dbForProject->increaseDocumentAttribute(
|
||||
'stats_v2',
|
||||
$id,
|
||||
'value',
|
||||
$value
|
||||
);
|
||||
]));
|
||||
} catch (Duplicate $th) {
|
||||
if ($value < 0) {
|
||||
$dbForProject->decreaseDocumentAttribute(
|
||||
'stats_v2',
|
||||
$id,
|
||||
'value',
|
||||
abs($value)
|
||||
);
|
||||
} else {
|
||||
$dbForProject->increaseDocumentAttribute(
|
||||
'stats_v2',
|
||||
$id,
|
||||
'value',
|
||||
$value
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (\Exception $e) {
|
||||
console::error("[logger] " . " {DateTime::now()} " . " {$data->getInternalId()} " . " {$e->getMessage()}");
|
||||
}
|
||||
} catch (\Exception $e) {
|
||||
console::error("[logger] " . " {DateTime::now()} " . " {$projectInternalId} " . " {$e->getMessage()}");
|
||||
} finally {
|
||||
$pools->reclaim();
|
||||
}
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue