From 899bba64d1b97178cd241d286765413af5238fc5 Mon Sep 17 00:00:00 2001 From: Jake Barnby Date: Mon, 8 Apr 2024 14:55:41 +1200 Subject: [PATCH] Add try/catch/reclaim around sending stats --- app/realtime.php | 130 ++++++++++++++++++++++++++++------------------- 1 file changed, 77 insertions(+), 53 deletions(-) diff --git a/app/realtime.php b/app/realtime.php index 8fdfce3e4a..8a0b2f2c38 100644 --- a/app/realtime.php +++ b/app/realtime.php @@ -252,15 +252,16 @@ $server->onStart(function () use ($stats, $register, $containerId, &$statsDocume go(function () use ($register, $containerId, &$statsDocument) { $attempts = 0; - /** - * @var Database $database - * @var callable $reclaim - */ - [$database, $reclaim] = getConsoleDB(); - do { try { + /** + * @var Database $database + * @var callable $reclaim + */ + [$database, $reclaim] = getConsoleDB(); + $attempts++; + $document = new Document([ '$id' => ID::unique(), '$collection' => ID::custom('realtime'), @@ -270,7 +271,9 @@ $server->onStart(function () use ($stats, $register, $containerId, &$statsDocume 'value' => '{}' ]); - $statsDocument = Authorization::skip(fn() => $database->createDocument('realtime', $document)); + $statsDocument = Authorization::skip(function () use ($database, $document) { + return $database->createDocument('realtime', $document); + }); break; } catch (Throwable) { @@ -279,7 +282,9 @@ $server->onStart(function () use ($stats, $register, $containerId, &$statsDocume } } while (true); - $reclaim(); + if (isset($reclaim)) { + $reclaim(); + } }); /** @@ -331,56 +336,64 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats, * Sending current connections to project channels on the console project every 5 seconds. */ if ($realtime->hasSubscriber('console', Role::users()->toString(), 'project')) { - /** - * @var Database $database - * @var callable $reclaim - */ - [$database, $reclaim] = getConsoleDB(); + try { + /** + * @var Database $database + * @var callable $reclaim + */ + [$database, $reclaim] = getConsoleDB(); - $payload = []; + $payload = []; - $list = Authorization::skip(fn() => $database->find('realtime', [ - Query::greaterThan('timestamp', DateTime::addSeconds(new \DateTime(), -15)), - ])); + $list = Authorization::skip(function () use ($database) { + return $database->find('realtime', [ + Query::greaterThan('timestamp', DateTime::addSeconds(new \DateTime(), -15)), + ]); + }); - /** - * Aggregate stats across containers. - */ - foreach ($list as $document) { - foreach (json_decode($document->getAttribute('value')) as $projectId => $value) { - if (array_key_exists($projectId, $payload)) { - $payload[$projectId] += $value; - } else { - $payload[$projectId] = $value; + /** + * Aggregate stats across containers. + */ + foreach ($list as $document) { + foreach (json_decode($document->getAttribute('value')) as $projectId => $value) { + if (array_key_exists($projectId, $payload)) { + $payload[$projectId] += $value; + } else { + $payload[$projectId] = $value; + } } } - } - foreach ($stats as $projectId => $value) { - if (!array_key_exists($projectId, $payload)) { - continue; - } + foreach ($stats as $projectId => $value) { + if (!array_key_exists($projectId, $payload)) { + continue; + } - $event = [ - 'project' => 'console', - 'roles' => ['team:' . $stats->get($projectId, 'teamId')], - 'data' => [ - 'events' => ['stats.connections'], - 'channels' => ['project'], - 'timestamp' => DateTime::formatTz(DateTime::now()), - 'payload' => [ - $projectId => $payload[$projectId] + $event = [ + 'project' => 'console', + 'roles' => ['team:' . $stats->get($projectId, 'teamId')], + 'data' => [ + 'events' => ['stats.connections'], + 'channels' => ['project'], + 'timestamp' => DateTime::formatTz(DateTime::now()), + 'payload' => [ + $projectId => $payload[$projectId] + ] ] - ] - ]; + ]; - $server->send($realtime->getSubscribers($event), json_encode([ - 'type' => 'event', - 'data' => $event['data'] - ])); + $server->send($realtime->getSubscribers($event), json_encode([ + 'type' => 'event', + 'data' => $event['data'] + ])); + } + } catch (Throwable $th) { + logError($th, 'sendStats'); + } finally { + if (isset($reclaim)) { + $reclaim(); + } } - - $reclaim(); } /** @@ -450,7 +463,9 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats, */ [$dbForConsole, $reclaimForConsole] = getConsoleDB(); - $project = Authorization::skip(fn() => $dbForConsole->getDocument('projects', $projectId)); + $project = Authorization::skip(function () use ($dbForConsole, $projectId) { + return $dbForConsole->getDocument('projects', $projectId); + }); [$dbForProject, $reclaimForProject] = getProjectDB($project); @@ -521,9 +536,15 @@ $server->onOpen(function (int $connection, SwooleRequest $request) use ($server, Console::info("Connection open (user: {$connection})"); - App::setResource('pools', fn() => $register->get('pools')); - App::setResource('request', fn() => $request); - App::setResource('response', fn() => $response); + App::setResource('pools', function () use ($register) { + return $register->get('pools'); + }); + App::setResource('request', function () use ($request) { + return $request; + }); + App::setResource('response', function () use ($response) { + return $response; + }); try { /** @var Document $project */ @@ -634,7 +655,10 @@ $server->onMessage(function (int $connection, string $message) use ($server, $re [$database, $reclaimForConsole] = getConsoleDB(); if ($projectId !== 'console') { - $project = Authorization::skip(fn() => $database->getDocument('projects', $projectId)); + $project = Authorization::skip(function () use ($database, $projectId) { + return $database->getDocument('projects', $projectId); + }); + [$database, $reclaimForProject] = getProjectDB($project); } else { $project = null;