1
0
Fork 0
mirror of synced 2024-06-12 15:54:47 +12:00

Updates to realtime

This commit is contained in:
Eldad Fux 2024-05-08 21:01:07 +01:00
parent 0b7c795e2b
commit 525383c725

View file

@ -32,19 +32,21 @@ use Utopia\System\System;
use Utopia\WebSocket\Adapter;
use Utopia\WebSocket\Server;
global $global;
/**
* @var \Utopia\Registry\Registry $register
* @var \Utopia\Registry\Registry $global
*/
require_once __DIR__ . '/init.php';
require_once __DIR__ . '/init2.php';
Runtime::enableCoroutine(SWOOLE_HOOK_ALL);
function getConsoleDB(Authorization $auth): Database
{
global $register;
global $global;
/** @var \Utopia\Pools\Group $pools */
$pools = $register->get('pools');
$pools = $global->get('pools');
$dbAdapter = $pools
->get('console')
@ -65,10 +67,10 @@ function getConsoleDB(Authorization $auth): Database
function getProjectDB(Document $project, Authorization $auth): Database
{
global $register;
global $global;
/** @var \Utopia\Pools\Group $pools */
$pools = $register->get('pools');
$pools = $global->get('pools');
if ($project->isEmpty() || $project->getId() === 'console') {
return getConsoleDB($auth);
@ -93,9 +95,9 @@ function getProjectDB(Document $project, Authorization $auth): Database
function getCache(): Cache
{
global $register;
global $global;
$pools = $register->get('pools'); /** @var \Utopia\Pools\Group $pools */
$pools = $global->get('pools'); /** @var \Utopia\Pools\Group $pools */
$list = Config::getParam('pools-cache', []);
$adapters = [];
@ -135,8 +137,8 @@ $adapter
$server = new Server($adapter);
$logError = function (Throwable $error, string $action) use ($register) {
$logger = $register->get('logger');
$logError = function (Throwable $error, string $action) use ($global) {
$logger = $global->get('logger');
if ($logger && !$error instanceof Exception) {
$version = System::getEnv('_APP_VERSION', 'UNKNOWN');
@ -173,7 +175,7 @@ $logError = function (Throwable $error, string $action) use ($register) {
$server->error($logError);
$server->onStart(function () use ($stats, $register, $containerId, &$statsDocument, $logError) {
$server->onStart(function () use ($stats, $global, $containerId, &$statsDocument, $logError) {
$auth = new Authorization();
sleep(5); // wait for the initial database schema to be ready
@ -182,7 +184,7 @@ $server->onStart(function () use ($stats, $register, $containerId, &$statsDocume
/**
* Create document for this worker to share stats across Containers.
*/
go(function () use ($register, $containerId, &$statsDocument, $auth) {
go(function () use ($global, $containerId, &$statsDocument, $auth) {
$attempts = 0;
$database = getConsoleDB($auth);
@ -206,13 +208,13 @@ $server->onStart(function () use ($stats, $register, $containerId, &$statsDocume
sleep(DATABASE_RECONNECT_SLEEP);
}
} while (true);
$register->get('pools')->reclaim();
$global->get('pools')->reclaim();
});
/**
* Save current connections to the Database every 5 seconds.
*/
Timer::tick(5000, function () use ($register, $stats, &$statsDocument, $logError, $auth) {
Timer::tick(5000, function () use ($global, $stats, &$statsDocument, $logError, $auth) {
$payload = [];
foreach ($stats as $projectId => $value) {
$payload[$projectId] = $stats->get($projectId, 'connectionsTotal');
@ -233,12 +235,12 @@ $server->onStart(function () use ($stats, $register, $containerId, &$statsDocume
} catch (Throwable $th) {
call_user_func($logError, $th, "updateWorkerDocument");
} finally {
$register->get('pools')->reclaim();
$global->get('pools')->reclaim();
}
});
});
$server->onWorkerStart(function (int $workerId) use ($server, $register, $stats, $realtime, $logError) {
$server->onWorkerStart(function (int $workerId) use ($server, $global, $stats, $realtime, $logError) {
Console::success('Worker ' . $workerId . ' started successfully');
$attempts = 0;
@ -246,7 +248,7 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats,
$auth = new Authorization();
Timer::tick(5000, function () use ($server, $register, $realtime, $stats, $logError, $auth) {
Timer::tick(5000, function () use ($server, $global, $realtime, $stats, $logError, $auth) {
/**
* Sending current connections to project channels on the console project every 5 seconds.
*/
@ -296,7 +298,7 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats,
]));
}
$register->get('pools')->reclaim();
$global->get('pools')->reclaim();
}
/**
* Sending test message for SDK E2E tests every 5 seconds.
@ -329,9 +331,10 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats,
Attempting restart in 5 seconds (attempt #' . $attempts . ')');
sleep(5); // 5 sec delay between connection attempts
}
$start = time();
$redis = $register->get('pools')->get('pubsub')->pop()->getResource(); /** @var Redis $redis */
$redis = $global->get('pools')->get('pubsub')->pop()->getResource(); /** @var Redis $redis */
$redis->setOption(Redis::OPT_READ_TIMEOUT, -1);
if ($redis->ping(true)) {
@ -341,7 +344,7 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats,
Console::error('Pub/sub failed (worker: ' . $workerId . ')');
}
$redis->subscribe(['realtime'], function (Redis $redis, string $channel, string $payload) use ($server, $workerId, $stats, $register, $realtime, $auth) {
$redis->subscribe(['realtime'], function (Redis $redis, string $channel, string $payload) use ($server, $workerId, $stats, $global, $realtime, $auth) {
$event = json_decode($payload, true);
if ($event['permissionsChanged'] && isset($event['userId'])) {
@ -361,7 +364,7 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats,
$realtime->subscribe($projectId, $connection, $roles, $realtime->connections[$connection]['channels']);
$register->get('pools')->reclaim();
$global->get('pools')->reclaim();
}
}
@ -393,14 +396,15 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats,
sleep(DATABASE_RECONNECT_SLEEP);
continue;
} finally {
$register->get('pools')->reclaim();
//$global->get('pools')->reclaim();
// TODO eldad add connections reclaim
}
}
Console::error('Failed to restart pub/sub...');
});
$server->onOpen(function (int $connection, SwooleRequest $request) use ($server, $register, $stats, &$realtime, $logError) {
$server->onOpen(function (int $connection, SwooleRequest $request) use ($server, $global, $stats, &$realtime, $logError) {
$auth = new Authorization();
$http = new Http(new FPMServer(), 'UTC');
@ -409,7 +413,7 @@ $server->onOpen(function (int $connection, SwooleRequest $request) use ($server,
Console::info("Connection open (user: {$connection})");
Http::setResource('pools', fn () => $register->get('pools'));
Http::setResource('pools', fn () => $global->get('pools'));
Http::setResource('request', fn () => $request);
Http::setResource('response', fn () => $response);
@ -515,11 +519,11 @@ $server->onOpen(function (int $connection, SwooleRequest $request) use ($server,
Console::error('[Error] Message: ' . $response['data']['message']);
}
} finally {
$register->get('pools')->reclaim();
$global->get('pools')->reclaim();
}
});
$server->onMessage(function (int $connection, string $message) use ($server, $register, $realtime, $containerId) {
$server->onMessage(function (int $connection, string $message) use ($server, $global, $realtime, $containerId) {
$auth = new Authorization();
try {
@ -615,7 +619,7 @@ $server->onMessage(function (int $connection, string $message) use ($server, $re
$server->close($connection, $th->getCode());
}
} finally {
$register->get('pools')->reclaim();
$global->get('pools')->reclaim();
}
});