Merge pull request #8192 from appwrite/fix-realtime-stats
Wrap realtime stats in an edition check
This commit is contained in:
commit
7b76455f0b
1 changed files with 67 additions and 61 deletions
128
app/realtime.php
128
app/realtime.php
|
@ -233,29 +233,32 @@ $server->onStart(function () use ($stats, $register, $containerId, &$statsDocume
|
|||
/**
|
||||
* Save current connections to the Database every 5 seconds.
|
||||
*/
|
||||
// Timer::tick(5000, function () use ($register, $stats, &$statsDocument, $logError) {
|
||||
// $payload = [];
|
||||
// foreach ($stats as $projectId => $value) {
|
||||
// $payload[$projectId] = $stats->get($projectId, 'connectionsTotal');
|
||||
// }
|
||||
// if (empty($payload) || empty($statsDocument)) {
|
||||
// return;
|
||||
// }
|
||||
// TODO: Remove this if check once it doesn't cause issues for cloud
|
||||
if (System::getEnv('_APP_EDITION', 'self-hosted') === 'self-hosted') {
|
||||
Timer::tick(5000, function () use ($register, $stats, &$statsDocument, $logError) {
|
||||
$payload = [];
|
||||
foreach ($stats as $projectId => $value) {
|
||||
$payload[$projectId] = $stats->get($projectId, 'connectionsTotal');
|
||||
}
|
||||
if (empty($payload) || empty($statsDocument)) {
|
||||
return;
|
||||
}
|
||||
|
||||
// try {
|
||||
// $database = getConsoleDB();
|
||||
try {
|
||||
$database = getConsoleDB();
|
||||
|
||||
// $statsDocument
|
||||
// ->setAttribute('timestamp', DateTime::now())
|
||||
// ->setAttribute('value', json_encode($payload));
|
||||
$statsDocument
|
||||
->setAttribute('timestamp', DateTime::now())
|
||||
->setAttribute('value', json_encode($payload));
|
||||
|
||||
// Authorization::skip(fn () => $database->updateDocument('realtime', $statsDocument->getId(), $statsDocument));
|
||||
// } catch (Throwable $th) {
|
||||
// call_user_func($logError, $th, "updateWorkerDocument");
|
||||
// } finally {
|
||||
// $register->get('pools')->reclaim();
|
||||
// }
|
||||
// });
|
||||
Authorization::skip(fn () => $database->updateDocument('realtime', $statsDocument->getId(), $statsDocument));
|
||||
} catch (Throwable $th) {
|
||||
call_user_func($logError, $th, "updateWorkerDocument");
|
||||
} finally {
|
||||
$register->get('pools')->reclaim();
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
$server->onWorkerStart(function (int $workerId) use ($server, $register, $stats, $realtime, $logError) {
|
||||
|
@ -268,54 +271,57 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats,
|
|||
/**
|
||||
* Sending current connections to project channels on the console project every 5 seconds.
|
||||
*/
|
||||
// if ($realtime->hasSubscriber('console', Role::users()->toString(), 'project')) {
|
||||
// $database = getConsoleDB();
|
||||
// TODO: Remove this if check once it doesn't cause issues for cloud
|
||||
if (System::getEnv('_APP_EDITION', 'self-hosted') === 'self-hosted') {
|
||||
if ($realtime->hasSubscriber('console', Role::users()->toString(), 'project')) {
|
||||
$database = getConsoleDB();
|
||||
|
||||
// $payload = [];
|
||||
$payload = [];
|
||||
|
||||
// $list = Authorization::skip(fn () => $database->find('realtime', [
|
||||
// Query::greaterThan('timestamp', DateTime::addSeconds(new \DateTime(), -15)),
|
||||
// ]));
|
||||
$list = Authorization::skip(fn () => $database->find('realtime', [
|
||||
Query::greaterThan('timestamp', DateTime::addSeconds(new \DateTime(), -15)),
|
||||
]));
|
||||
|
||||
// /**
|
||||
// * Aggregate stats across containers.
|
||||
// */
|
||||
// foreach ($list as $document) {
|
||||
// foreach (json_decode($document->getAttribute('value')) as $projectId => $value) {
|
||||
// if (array_key_exists($projectId, $payload)) {
|
||||
// $payload[$projectId] += $value;
|
||||
// } else {
|
||||
// $payload[$projectId] = $value;
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
/**
|
||||
* Aggregate stats across containers.
|
||||
*/
|
||||
foreach ($list as $document) {
|
||||
foreach (json_decode($document->getAttribute('value')) as $projectId => $value) {
|
||||
if (array_key_exists($projectId, $payload)) {
|
||||
$payload[$projectId] += $value;
|
||||
} else {
|
||||
$payload[$projectId] = $value;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// foreach ($stats as $projectId => $value) {
|
||||
// if (!array_key_exists($projectId, $payload)) {
|
||||
// continue;
|
||||
// }
|
||||
foreach ($stats as $projectId => $value) {
|
||||
if (!array_key_exists($projectId, $payload)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// $event = [
|
||||
// 'project' => 'console',
|
||||
// 'roles' => ['team:' . $stats->get($projectId, 'teamId')],
|
||||
// 'data' => [
|
||||
// 'events' => ['stats.connections'],
|
||||
// 'channels' => ['project'],
|
||||
// 'timestamp' => DateTime::formatTz(DateTime::now()),
|
||||
// 'payload' => [
|
||||
// $projectId => $payload[$projectId]
|
||||
// ]
|
||||
// ]
|
||||
// ];
|
||||
$event = [
|
||||
'project' => 'console',
|
||||
'roles' => ['team:' . $stats->get($projectId, 'teamId')],
|
||||
'data' => [
|
||||
'events' => ['stats.connections'],
|
||||
'channels' => ['project'],
|
||||
'timestamp' => DateTime::formatTz(DateTime::now()),
|
||||
'payload' => [
|
||||
$projectId => $payload[$projectId]
|
||||
]
|
||||
]
|
||||
];
|
||||
|
||||
// $server->send($realtime->getSubscribers($event), json_encode([
|
||||
// 'type' => 'event',
|
||||
// 'data' => $event['data']
|
||||
// ]));
|
||||
// }
|
||||
$server->send($realtime->getSubscribers($event), json_encode([
|
||||
'type' => 'event',
|
||||
'data' => $event['data']
|
||||
]));
|
||||
}
|
||||
|
||||
// $register->get('pools')->reclaim();
|
||||
// }
|
||||
$register->get('pools')->reclaim();
|
||||
}
|
||||
}
|
||||
/**
|
||||
* Sending test message for SDK E2E tests every 5 seconds.
|
||||
*/
|
||||
|
|
Loading…
Reference in a new issue