Merge remote-tracking branch 'origin/refactor-usage-sn' into sync-1.5.x-with-cloud
This commit is contained in:
commit
aa8460cfb9
1 changed files with 67 additions and 61 deletions
128
app/realtime.php
128
app/realtime.php
|
@ -233,29 +233,32 @@ $server->onStart(function () use ($stats, $register, $containerId, &$statsDocume
|
||||||
/**
|
/**
|
||||||
* Save current connections to the Database every 5 seconds.
|
* Save current connections to the Database every 5 seconds.
|
||||||
*/
|
*/
|
||||||
// Timer::tick(5000, function () use ($register, $stats, &$statsDocument, $logError) {
|
// TODO: Remove this if check once it doesn't cause issues for cloud
|
||||||
// $payload = [];
|
if (System::getEnv('_APP_EDITION', 'self-hosted') === 'self-hosted') {
|
||||||
// foreach ($stats as $projectId => $value) {
|
Timer::tick(5000, function () use ($register, $stats, &$statsDocument, $logError) {
|
||||||
// $payload[$projectId] = $stats->get($projectId, 'connectionsTotal');
|
$payload = [];
|
||||||
// }
|
foreach ($stats as $projectId => $value) {
|
||||||
// if (empty($payload) || empty($statsDocument)) {
|
$payload[$projectId] = $stats->get($projectId, 'connectionsTotal');
|
||||||
// return;
|
}
|
||||||
// }
|
if (empty($payload) || empty($statsDocument)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
// try {
|
try {
|
||||||
// $database = getConsoleDB();
|
$database = getConsoleDB();
|
||||||
|
|
||||||
// $statsDocument
|
$statsDocument
|
||||||
// ->setAttribute('timestamp', DateTime::now())
|
->setAttribute('timestamp', DateTime::now())
|
||||||
// ->setAttribute('value', json_encode($payload));
|
->setAttribute('value', json_encode($payload));
|
||||||
|
|
||||||
// Authorization::skip(fn () => $database->updateDocument('realtime', $statsDocument->getId(), $statsDocument));
|
Authorization::skip(fn () => $database->updateDocument('realtime', $statsDocument->getId(), $statsDocument));
|
||||||
// } catch (Throwable $th) {
|
} catch (Throwable $th) {
|
||||||
// call_user_func($logError, $th, "updateWorkerDocument");
|
call_user_func($logError, $th, "updateWorkerDocument");
|
||||||
// } finally {
|
} finally {
|
||||||
// $register->get('pools')->reclaim();
|
$register->get('pools')->reclaim();
|
||||||
// }
|
}
|
||||||
// });
|
});
|
||||||
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
$server->onWorkerStart(function (int $workerId) use ($server, $register, $stats, $realtime, $logError) {
|
$server->onWorkerStart(function (int $workerId) use ($server, $register, $stats, $realtime, $logError) {
|
||||||
|
@ -268,54 +271,57 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats,
|
||||||
/**
|
/**
|
||||||
* Sending current connections to project channels on the console project every 5 seconds.
|
* Sending current connections to project channels on the console project every 5 seconds.
|
||||||
*/
|
*/
|
||||||
// if ($realtime->hasSubscriber('console', Role::users()->toString(), 'project')) {
|
// TODO: Remove this if check once it doesn't cause issues for cloud
|
||||||
// $database = getConsoleDB();
|
if (System::getEnv('_APP_EDITION', 'self-hosted') === 'self-hosted') {
|
||||||
|
if ($realtime->hasSubscriber('console', Role::users()->toString(), 'project')) {
|
||||||
|
$database = getConsoleDB();
|
||||||
|
|
||||||
// $payload = [];
|
$payload = [];
|
||||||
|
|
||||||
// $list = Authorization::skip(fn () => $database->find('realtime', [
|
$list = Authorization::skip(fn () => $database->find('realtime', [
|
||||||
// Query::greaterThan('timestamp', DateTime::addSeconds(new \DateTime(), -15)),
|
Query::greaterThan('timestamp', DateTime::addSeconds(new \DateTime(), -15)),
|
||||||
// ]));
|
]));
|
||||||
|
|
||||||
// /**
|
/**
|
||||||
// * Aggregate stats across containers.
|
* Aggregate stats across containers.
|
||||||
// */
|
*/
|
||||||
// foreach ($list as $document) {
|
foreach ($list as $document) {
|
||||||
// foreach (json_decode($document->getAttribute('value')) as $projectId => $value) {
|
foreach (json_decode($document->getAttribute('value')) as $projectId => $value) {
|
||||||
// if (array_key_exists($projectId, $payload)) {
|
if (array_key_exists($projectId, $payload)) {
|
||||||
// $payload[$projectId] += $value;
|
$payload[$projectId] += $value;
|
||||||
// } else {
|
} else {
|
||||||
// $payload[$projectId] = $value;
|
$payload[$projectId] = $value;
|
||||||
// }
|
}
|
||||||
// }
|
}
|
||||||
// }
|
}
|
||||||
|
|
||||||
// foreach ($stats as $projectId => $value) {
|
foreach ($stats as $projectId => $value) {
|
||||||
// if (!array_key_exists($projectId, $payload)) {
|
if (!array_key_exists($projectId, $payload)) {
|
||||||
// continue;
|
continue;
|
||||||
// }
|
}
|
||||||
|
|
||||||
// $event = [
|
$event = [
|
||||||
// 'project' => 'console',
|
'project' => 'console',
|
||||||
// 'roles' => ['team:' . $stats->get($projectId, 'teamId')],
|
'roles' => ['team:' . $stats->get($projectId, 'teamId')],
|
||||||
// 'data' => [
|
'data' => [
|
||||||
// 'events' => ['stats.connections'],
|
'events' => ['stats.connections'],
|
||||||
// 'channels' => ['project'],
|
'channels' => ['project'],
|
||||||
// 'timestamp' => DateTime::formatTz(DateTime::now()),
|
'timestamp' => DateTime::formatTz(DateTime::now()),
|
||||||
// 'payload' => [
|
'payload' => [
|
||||||
// $projectId => $payload[$projectId]
|
$projectId => $payload[$projectId]
|
||||||
// ]
|
]
|
||||||
// ]
|
]
|
||||||
// ];
|
];
|
||||||
|
|
||||||
// $server->send($realtime->getSubscribers($event), json_encode([
|
$server->send($realtime->getSubscribers($event), json_encode([
|
||||||
// 'type' => 'event',
|
'type' => 'event',
|
||||||
// 'data' => $event['data']
|
'data' => $event['data']
|
||||||
// ]));
|
]));
|
||||||
// }
|
}
|
||||||
|
|
||||||
// $register->get('pools')->reclaim();
|
$register->get('pools')->reclaim();
|
||||||
// }
|
}
|
||||||
|
}
|
||||||
/**
|
/**
|
||||||
* Sending test message for SDK E2E tests every 5 seconds.
|
* Sending test message for SDK E2E tests every 5 seconds.
|
||||||
*/
|
*/
|
||||||
|
|
Loading…
Reference in a new issue