1
0
Fork 0
mirror of synced 2024-06-01 18:39:57 +12:00

refactor usage worker

This commit is contained in:
shimon 2023-12-24 20:38:15 +02:00
parent f1e066d965
commit d1751565cf
5 changed files with 91 additions and 124 deletions

2
.env
View file

@ -75,7 +75,7 @@ _APP_MAINTENANCE_RETENTION_CACHE=2592000
_APP_MAINTENANCE_RETENTION_EXECUTION=1209600
_APP_MAINTENANCE_RETENTION_ABUSE=86400
_APP_MAINTENANCE_RETENTION_AUDIT=1209600
_APP_USAGE_AGGREGATION_INTERVAL=5
_APP_USAGE_AGGREGATION_INTERVAL=50000
_APP_MAINTENANCE_RETENTION_USAGE_HOURLY=8640000
_APP_MAINTENANCE_RETENTION_SCHEDULES=86400
_APP_USAGE_STATS=enabled

View file

@ -1,3 +0,0 @@
#!/bin/sh
php /usr/src/code/app/cli.php usage $@

View file

@ -673,6 +673,7 @@ services:
- _APP_USAGE_STATS
- _APP_LOGGING_PROVIDER
- _APP_LOGGING_CONFIG
- _APP_USAGE_AGGREGATION_INTERVAL
appwrite-schedule:
entrypoint: schedule

View file

@ -32,23 +32,22 @@ class Usage extends Action
{
$this
->desc('Usage worker')
->inject('message')
->inject('pools')
->inject('cache')
->callback(function ($message, $pools, $cache) {
$this->action($message, $pools, $cache);
});
->desc('Usage worker')
->inject('message')
->inject('getProjectDB')
->callback(function (Message $message, callable $getProjectDB) {
$this->action($message, $getProjectDB);
});
}
/**
* @param Message $message
* @param $pools
* @param $cache
* @param callable $getProjectDB
* @return void
* @throws \Utopia\Database\Exception
* @throws Exception
*/
public function action(Message $message, $pools, $cache): void
public function action(Message $message, callable $getProjectDB): void
{
$payload = $message->getPayload() ?? [];
@ -65,16 +64,14 @@ class Usage extends Action
}
$this->reduce(
database: $project->getAttribute('database'),
projectInternalId: $project->getInternalId(),
project: $project,
document: new Document($document),
metrics: $payload['metrics'],
pools: $pools,
cache: $cache
getProjectDB: $getProjectDB
);
}
self::$stats[$projectId]['database'] = $project->getAttribute('database');
self::$stats[$projectId]['project'] = $project;
foreach ($payload['metrics'] ?? [] as $metric) {
if (!isset(self::$stats[$projectId]['keys'][$metric['key']])) {
self::$stats[$projectId]['keys'][$metric['key']] = $metric['value'];
@ -88,35 +85,25 @@ class Usage extends Action
/**
* On Documents that tied by relations like functions>deployments>build || documents>collection>database || buckets>files.
* When we remove a parent document we need to deduct his children aggregation from the project scope.
* @param $database
* @param $projectInternalId
* @param Document $project
* @param Document $document
* @param array $metrics
* @param $pools
* @param $cache
* @param callable $getProjectDB
* @return void
*/
private function reduce($database, $projectInternalId, Document $document, array &$metrics, $pools, $cache)
private function reduce(Document $project, Document $document, array &$metrics, callable $getProjectDB): void
{
$dbForProject = $getProjectDB($project);
try {
$dbForProject = new Database(
$pools
->get($database)
->pop()
->getResource(),
$cache
);
$dbForProject->setNamespace('_' . $projectInternalId);
switch (true) {
case $document->getCollection() === 'users': // users
$sessions = count($document->getAttribute(METRIC_SESSIONS, 0));
if (!empty($sessions)) {
$metrics[] = [
'key' => METRIC_SESSIONS,
'value' => ($sessions * -1),
'key' => METRIC_SESSIONS,
'value' => ($sessions * -1),
];
}
break;
@ -144,12 +131,12 @@ class Usage extends Action
if (!empty($documents['value'])) {
$metrics[] = [
'key' => METRIC_DOCUMENTS,
'value' => ($documents['value'] * -1),
'key' => METRIC_DOCUMENTS,
'value' => ($documents['value'] * -1),
];
$metrics[] = [
'key' => str_replace('{databaseInternalId}', $databaseInternalId, METRIC_DATABASE_ID_DOCUMENTS),
'value' => ($documents['value'] * -1),
'key' => str_replace('{databaseInternalId}', $databaseInternalId, METRIC_DATABASE_ID_DOCUMENTS),
'value' => ($documents['value'] * -1),
];
}
break;
@ -160,15 +147,15 @@ class Usage extends Action
if (!empty($files['value'])) {
$metrics[] = [
'key' => METRIC_FILES,
'value' => ($files['value'] * -1),
'key' => METRIC_FILES,
'value' => ($files['value'] * -1),
];
}
if (!empty($storage['value'])) {
$metrics[] = [
'key' => METRIC_FILES_STORAGE,
'value' => ($storage['value'] * -1),
'key' => METRIC_FILES_STORAGE,
'value' => ($storage['value'] * -1),
];
}
break;
@ -176,58 +163,58 @@ class Usage extends Action
case $document->getCollection() === 'functions':
$deployments = $dbForProject->getDocument('stats_v2', md5(self::INFINITY_PERIOD . str_replace(['{resourceType}', '{resourceInternalId}'], ['functions', $document->getInternalId()], METRIC_FUNCTION_ID_DEPLOYMENTS)));
$deploymentsStorage = $dbForProject->getDocument('stats_v2', md5(self::INFINITY_PERIOD . str_replace(['{resourceType}', '{resourceInternalId}'], ['functions', $document->getInternalId()], METRIC_FUNCTION_ID_DEPLOYMENTS_STORAGE)));
$builds = $dbForProject->getDocument('stats_v2', md5(self::INFINITY_PERIOD . str_replace('{functionInternalId}', $document->getInternalId(), METRIC_FUNCTION_ID_BUILDS)));
$builds = $dbForProject->getDocument('stats_v2', md5(self::INFINITY_PERIOD . str_replace('{functionInternalId}', $document->getInternalId(), METRIC_FUNCTION_ID_BUILDS)));
$buildsStorage = $dbForProject->getDocument('stats_v2', md5(self::INFINITY_PERIOD . str_replace('{functionInternalId}', $document->getInternalId(), METRIC_FUNCTION_ID_BUILDS_STORAGE)));
$buildsCompute = $dbForProject->getDocument('stats_v2', md5(self::INFINITY_PERIOD . str_replace('{functionInternalId}', $document->getInternalId(), METRIC_FUNCTION_ID_BUILDS_COMPUTE)));
$executions = $dbForProject->getDocument('stats_v2', md5(self::INFINITY_PERIOD . str_replace('{functionInternalId}', $document->getInternalId(), METRIC_FUNCTION_ID_EXECUTIONS)));
$executions = $dbForProject->getDocument('stats_v2', md5(self::INFINITY_PERIOD . str_replace('{functionInternalId}', $document->getInternalId(), METRIC_FUNCTION_ID_EXECUTIONS)));
$executionsCompute = $dbForProject->getDocument('stats_v2', md5(self::INFINITY_PERIOD . str_replace('{functionInternalId}', $document->getInternalId(), METRIC_FUNCTION_ID_EXECUTIONS_COMPUTE)));
if (!empty($deployments['value'])) {
$metrics[] = [
'key' => METRIC_DEPLOYMENTS,
'value' => ($deployments['value'] * -1),
'key' => METRIC_DEPLOYMENTS,
'value' => ($deployments['value'] * -1),
];
}
if (!empty($deploymentsStorage['value'])) {
$metrics[] = [
'key' => METRIC_DEPLOYMENTS_STORAGE,
'value' => ($deploymentsStorage['value'] * -1),
'key' => METRIC_DEPLOYMENTS_STORAGE,
'value' => ($deploymentsStorage['value'] * -1),
];
}
if (!empty($builds['value'])) {
$metrics[] = [
'key' => METRIC_BUILDS,
'value' => ($builds['value'] * -1),
'key' => METRIC_BUILDS,
'value' => ($builds['value'] * -1),
];
}
if (!empty($buildsStorage['value'])) {
$metrics[] = [
'key' => METRIC_BUILDS_STORAGE,
'value' => ($buildsStorage['value'] * -1),
'key' => METRIC_BUILDS_STORAGE,
'value' => ($buildsStorage['value'] * -1),
];
}
if (!empty($buildsCompute['value'])) {
$metrics[] = [
'key' => METRIC_BUILDS_COMPUTE,
'value' => ($buildsCompute['value'] * -1),
'key' => METRIC_BUILDS_COMPUTE,
'value' => ($buildsCompute['value'] * -1),
];
}
if (!empty($executions['value'])) {
$metrics[] = [
'key' => METRIC_EXECUTIONS,
'value' => ($executions['value'] * -1),
'key' => METRIC_EXECUTIONS,
'value' => ($executions['value'] * -1),
];
}
if (!empty($executionsCompute['value'])) {
$metrics[] = [
'key' => METRIC_EXECUTIONS_COMPUTE,
'value' => ($executionsCompute['value'] * -1),
'key' => METRIC_EXECUTIONS_COMPUTE,
'value' => ($executionsCompute['value'] * -1),
];
}
break;
@ -235,10 +222,7 @@ class Usage extends Action
break;
}
} catch (\Exception $e) {
console::error("[reducer] " . " {DateTime::now()} " . " {$projectInternalId} " . " {$e->getMessage()}");
} catch (\Throwable $e) {
} finally {
$pools->reclaim();
console::error("[reducer] " . " {DateTime::now()} " . " {$project->getInternalId()} " . " {$e->getMessage()}");
}
}
}

View file

@ -9,6 +9,7 @@ use Utopia\Database\Exception\Duplicate;
use Utopia\Platform\Action;
use Utopia\CLI\Console;
use Swoole\Timer;
use Utopia\Registry\Registry;
class UsageHook extends Usage
{
@ -23,87 +24,71 @@ class UsageHook extends Usage
$this
->setType(Action::TYPE_WORKER_START)
->inject('register')
->inject('cache')
->inject('pools')
->callback(function ($register, $cache, $pools) {
$this->action($register, $cache, $pools);
->inject('getProjectDB')
->callback(function ($register, callable $getProjectDB) {
$this->action($register, $getProjectDB);
})
;
}
/**
* @param $register
* @param $cache
* @param $pools
* @param $getProjectDB
* @return void
*/
public function action($register, $cache, $pools): void
public function action($register, $getProjectDB): void
{
Timer::tick(30000, function () use ($register, $cache, $pools) {
var_dump('innn');
$offset = count(self::$stats);
$projects = array_slice(self::$stats, 0, $offset, true);
array_splice(self::$stats, 0, $offset);
$interval = (int) App::getEnv('_APP_USAGE_AGGREGATION_INTERVAL', '50000');
Timer::tick($interval, function () use ($register, $getProjectDB) {
$offset = count(self::$stats);
$projects = array_slice(self::$stats, 0, $offset, true);
array_splice(self::$stats, 0, $offset);
var_dump($projects);
foreach ($projects as $data) {
try {
$dbForProject = $getProjectDB($data['project']);
foreach ($data['keys'] ?? [] as $key => $value) {
if ($value == 0) {
continue;
}
foreach ($projects as $projectInternalId => $project) {
try {
$dbForProject = new Database(
$pools
->get($project['database'])
->pop()
->getResource(),
$cache
);
foreach ($this->periods as $period => $format) {
$time = 'inf' === $period ? null : date($format, time());
$id = \md5("{$time}_{$period}_{$key}");
$dbForProject->setNamespace('_' . $projectInternalId);
foreach ($project['keys'] ?? [] as $key => $value) {
if ($value == 0) {
continue;
}
foreach ($this->periods as $period => $format) {
$time = 'inf' === $period ? null : date($format, time());
$id = \md5("{$time}_{$period}_{$key}");
var_dump($key);
var_dump($value);
var_dump('-------------');
try {
$dbForProject->createDocument('stats_v2', new Document([
try {
$dbForProject->createDocument('stats_v2', new Document([
'$id' => $id,
'period' => $period,
'time' => $time,
'metric' => $key,
'value' => $value,
'region' => App::getEnv('_APP_REGION', 'default'),
]));
} catch (Duplicate $th) {
if ($value < 0) {
$dbForProject->decreaseDocumentAttribute(
'stats_v2',
$id,
'value',
abs($value)
);
} else {
$dbForProject->increaseDocumentAttribute(
'stats_v2',
$id,
'value',
$value
);
]));
} catch (Duplicate $th) {
if ($value < 0) {
$dbForProject->decreaseDocumentAttribute(
'stats_v2',
$id,
'value',
abs($value)
);
} else {
$dbForProject->increaseDocumentAttribute(
'stats_v2',
$id,
'value',
$value
);
}
}
}
}
} catch (\Exception $e) {
console::error("[logger] " . " {DateTime::now()} " . " {$data->getInternalId()} " . " {$e->getMessage()}");
}
} catch (\Exception $e) {
console::error("[logger] " . " {DateTime::now()} " . " {$projectInternalId} " . " {$e->getMessage()}");
} finally {
$pools->reclaim();
}
}
});
});
}
}