refactor usage worker
This commit is contained in:
parent
c02ef7b340
commit
5e394fed5d
7 changed files with 75 additions and 83 deletions
|
@ -3167,7 +3167,7 @@ $collections = [
|
|||
],
|
||||
[
|
||||
'$id' => ID::custom('_key_metric_period_time'),
|
||||
'type' => Database::INDEX_KEY,
|
||||
'type' => Database::INDEX_UNIQUE,
|
||||
'attributes' => ['metric', 'period', 'time'],
|
||||
'lengths' => [],
|
||||
'orders' => [Database::ORDER_DESC],
|
||||
|
|
|
@ -1470,6 +1470,7 @@ App::get('/v1/storage/:bucketId/usage')
|
|||
foreach ($metrics as $metric) {
|
||||
$usage[$metric] = [];
|
||||
$leap = time() - ($days['limit'] * $days['factor']);
|
||||
|
||||
while ($leap < time()) {
|
||||
$leap += $days['factor'];
|
||||
$formatDate = date($format, $leap);
|
||||
|
|
|
@ -76,9 +76,9 @@ $databaseListener = function (string $event, array $args, Document $project, Usa
|
|||
|
||||
//Project level sessions deduction
|
||||
if ($event === Database::EVENT_DOCUMENT_DELETE) {
|
||||
$sessions = (count($document->getAttribute('sessions')));
|
||||
$sessions = count($document->getAttribute('sessions'));
|
||||
$queueForUsage
|
||||
->addMetric("databases", ($sessions['value'] * -1)); // per project
|
||||
->addMetric("sessions", ($sessions * -1)); // per project
|
||||
}
|
||||
break;
|
||||
case $document->getCollection() === 'sessions': // sessions
|
||||
|
|
|
@ -112,7 +112,7 @@ if (empty(App::getEnv('QUEUE'))) {
|
|||
throw new Exception('Please configure "QUEUE" environemnt variable.');
|
||||
}
|
||||
|
||||
$adapter = new Swoole($connection, $workerNumber, App::getEnv('QUEUE'));
|
||||
$adapter = new Swoole($connection, 1, App::getEnv('QUEUE'));
|
||||
$server = new Server($adapter);
|
||||
|
||||
$server
|
||||
|
|
|
@ -25,23 +25,18 @@ $periods['inf'] = '0000-00-00 00:00';
|
|||
$server->job()
|
||||
->inject('message')
|
||||
->action(function (Message $message) use (&$stats) {
|
||||
|
||||
$payload = $message->getPayload() ?? [];
|
||||
$project = new Document($payload['project'] ?? []);
|
||||
$projectId = $project->getInternalId();
|
||||
$stats[$projectId]['database'] = $project->getAttribute('database');
|
||||
|
||||
foreach ($payload['metrics'] ?? [] as $metric) {
|
||||
$uniq = md5($metric['key']);
|
||||
|
||||
if (!isset($stats[$uniq])) {
|
||||
$stats[$uniq] = [
|
||||
'projectInternalId' => $project->getInternalId(),
|
||||
'database' => $project->getAttribute('database'),
|
||||
'key' => $metric['key'],
|
||||
'value' => $metric['value']
|
||||
];
|
||||
|
||||
if (!isset($stats[$projectId]['keys'][$metric['key']])) {
|
||||
$stats[$projectId]['keys'][$metric['key']] = $metric['value'];
|
||||
continue;
|
||||
}
|
||||
$stats[$uniq]['value'] += $metric['value'];
|
||||
$stats[$projectId]['keys'][$metric['key']] += $metric['value'];
|
||||
}
|
||||
});
|
||||
|
||||
|
@ -51,77 +46,71 @@ $server
|
|||
->inject('cache')
|
||||
->inject('pools')
|
||||
->action(function ($register, $cache, $pools) use ($periods, &$stats) {
|
||||
Timer::tick(3000, function () use ($register, $cache, $pools, $periods, &$stats) {
|
||||
$slice = array_slice($stats, 0, count($stats));
|
||||
array_splice($stats, 0, count($stats));
|
||||
//$log = [];
|
||||
Timer::tick(30000, function () use ($register, $cache, $pools, $periods, &$stats) {
|
||||
|
||||
foreach ($slice as $metric) {
|
||||
if ($metric['value'] == 0) {
|
||||
continue;
|
||||
}
|
||||
$offset = count($stats);
|
||||
$projects = array_slice($stats, 0, $offset, true);
|
||||
array_splice($stats, 0, $offset);
|
||||
|
||||
foreach ($projects as $projectInternalId => $project) {
|
||||
try {
|
||||
$dbForProject = new Database(
|
||||
$pools
|
||||
->get($metric['database'])
|
||||
->get($project['database'])
|
||||
->pop()
|
||||
->getResource(),
|
||||
$cache
|
||||
);
|
||||
|
||||
$dbForProject->setNamespace('_' . $metric['projectInternalId']);
|
||||
$dbForProject->setNamespace('_' . $projectInternalId);
|
||||
|
||||
foreach ($project['keys'] as $key => $value) {
|
||||
if ($value == 0) {
|
||||
continue;
|
||||
}
|
||||
|
||||
foreach ($periods as $period => $format) {
|
||||
$time = 'inf' === $period ? null : date($format, time());
|
||||
$id = \md5("{$time}_{$period}_{$metric['key']}");
|
||||
try {
|
||||
$id = \md5("{$time}_{$period}_{$key}");
|
||||
|
||||
try {
|
||||
$dbForProject->createDocument('stats', new Document([
|
||||
'$id' => $id,
|
||||
'period' => $period,
|
||||
'time' => $time,
|
||||
'metric' => $metric['key'],
|
||||
'value' => $metric['value'],
|
||||
'metric' => $key,
|
||||
'value' => $value,
|
||||
'region' => App::getEnv('_APP_REGION', 'default'),
|
||||
]));
|
||||
} catch (Duplicate $th) {
|
||||
if ($metric['value'] < 0) {
|
||||
if ($value < 0) {
|
||||
$dbForProject->decreaseDocumentAttribute(
|
||||
'stats',
|
||||
$id,
|
||||
'value',
|
||||
abs($metric['value'])
|
||||
abs($value)
|
||||
);
|
||||
} else {
|
||||
$dbForProject->increaseDocumentAttribute(
|
||||
'stats',
|
||||
$id,
|
||||
'value',
|
||||
$metric['value']
|
||||
$value
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// $log[] = [
|
||||
// 'id' => $id,
|
||||
// 'period' => $period,
|
||||
// 'time' => $time,
|
||||
// 'metric' => $metric['key'],
|
||||
// 'value' => $metric['value'],
|
||||
// 'region' => App::getEnv('_APP_REGION', 'default'),
|
||||
// ];
|
||||
}
|
||||
}
|
||||
$dbForProject->createDocument('statsLogger', new Document([
|
||||
'time' => DateTime::now(),
|
||||
'metrics' => $project['keys'],
|
||||
]));
|
||||
} catch (\Exception $e) {
|
||||
console::error($e->getMessage());
|
||||
} finally {
|
||||
$pools->reclaim();
|
||||
}
|
||||
}
|
||||
|
||||
// if (!empty($log)) {
|
||||
// $dbForProject->createDocument('statsLogger', new Document([
|
||||
// 'time' => DateTime::now(),
|
||||
// 'metrics' => $log,
|
||||
// ]));
|
||||
// }
|
||||
}
|
||||
});
|
||||
});
|
||||
$server->start();
|
||||
|
|
|
@ -39,8 +39,6 @@ class Usage extends Event
|
|||
public function trigger(): string|bool
|
||||
{
|
||||
$client = new Client($this->queue, $this->connection);
|
||||
var_dump('triger');
|
||||
var_dump($this->metrics);
|
||||
|
||||
return $client->enqueue([
|
||||
'project' => $this->getProject(),
|
||||
|
|
|
@ -18,7 +18,7 @@ class UsageTest extends Scope
|
|||
use SideServer;
|
||||
use FunctionsBase;
|
||||
|
||||
private const WAIT = 5;
|
||||
private const WAIT = 30;
|
||||
private const CREATE = 20;
|
||||
|
||||
protected string $projectId;
|
||||
|
@ -111,7 +111,11 @@ class UsageTest extends Scope
|
|||
$consoleHeaders
|
||||
);
|
||||
$res = $res['body'];
|
||||
var_dump($res['users']);
|
||||
var_dump(array_key_last($res['users']));
|
||||
var_dump($res['users'][array_key_last($res['users'])]['value']);
|
||||
|
||||
exit;
|
||||
$this->assertEquals('24h', $res['range']);
|
||||
$this->assertEquals(9, count($res));
|
||||
$this->assertEquals(24, count($res['requests']));
|
||||
|
|
Loading…
Reference in a new issue