From b6839fa8f1e0d52c7785cd292467872f8a54dbab Mon Sep 17 00:00:00 2001 From: Jake Barnby Date: Wed, 9 Nov 2022 16:39:15 +1300 Subject: [PATCH] Capture pools on init --- app/realtime.php | 52 +++++++++++++++++++++++++----------------------- 1 file changed, 27 insertions(+), 25 deletions(-) diff --git a/app/realtime.php b/app/realtime.php index 8ee4e64761..a346cb4dc8 100644 --- a/app/realtime.php +++ b/app/realtime.php @@ -4,6 +4,7 @@ use Appwrite\Auth\Auth; use Appwrite\Messaging\Adapter\Realtime; use Appwrite\Network\Validator\Origin; use Appwrite\Utopia\Response; +use Swoole\ConnectionPool; use Swoole\Http\Request as SwooleRequest; use Swoole\Http\Response as SwooleResponse; use Swoole\Runtime; @@ -35,7 +36,7 @@ Runtime::enableCoroutine(SWOOLE_HOOK_ALL); $realtime = new Realtime(); -$dbPool = $register->get('dbPool'); +$dbPool = $register->get('dbPool', args: [getWorkerPoolSize()]); $redisPool = $register->get('redisPool'); /** @@ -98,7 +99,7 @@ $logError = function (Throwable $error, string $action) use ($register) { $server->error($logError); -function getDatabase(Registry &$register, string $namespace) +function getDatabase(ConnectionPool $dbPool, ConnectionPool $redisPool, string $namespace) { $attempts = 0; @@ -106,8 +107,8 @@ function getDatabase(Registry &$register, string $namespace) try { $attempts++; - $db = $register->get('dbPool')->get(); - $redis = $register->get('redisPool')->get(); + $db = $dbPool->get(); + $redis = $redisPool->get(); $cache = new Cache(new RedisCache($redis)); $database = new Database(new MariaDB($db), $cache); @@ -130,23 +131,23 @@ function getDatabase(Registry &$register, string $namespace) return [ $database, - function () use ($register, $db, $redis) { - $register->get('dbPool')->put($db); - $register->get('redisPool')->put($redis); + function () use ($dbPool, $redisPool, $db, $redis) { + $dbPool->put($db); + $redisPool->put($redis); } ]; } -$server->onStart(function () use ($stats, $register, $containerId, &$statsDocument, $logError) { +$server->onStart(function () use ($stats, $dbPool, $redisPool, $containerId, &$statsDocument, $logError) { sleep(5); // wait for the initial database schema to be ready Console::success('Server started successfully'); /** * Create document for this worker to share stats across Containers. */ - go(function () use ($register, $containerId, &$statsDocument) { + go(function () use ($dbPool, $redisPool, $containerId, &$statsDocument) { $attempts = 0; - [$database, $returnDatabase] = getDatabase($register, '_console'); + [$database, $returnDatabase] = getDatabase($dbPool, $redisPool, '_console'); do { try { $attempts++; @@ -172,7 +173,7 @@ $server->onStart(function () use ($stats, $register, $containerId, &$statsDocume /** * Save current connections to the Database every 5 seconds. */ - Timer::tick(5000, function () use ($register, $stats, &$statsDocument, $logError) { + Timer::tick(5000, function () use ($dbPool, $redisPool, $stats, &$statsDocument, $logError) { $payload = []; foreach ($stats as $projectId => $value) { $payload[$projectId] = $stats->get($projectId, 'connectionsTotal'); @@ -182,7 +183,7 @@ $server->onStart(function () use ($stats, $register, $containerId, &$statsDocume } try { - [$database, $returnDatabase] = getDatabase($register, '_console'); + [$database, $returnDatabase] = getDatabase($dbPool, $redisPool, '_console'); $statsDocument ->setAttribute('timestamp', DateTime::now()) @@ -197,18 +198,18 @@ $server->onStart(function () use ($stats, $register, $containerId, &$statsDocume }); }); -$server->onWorkerStart(function (int $workerId) use ($server, $register, $stats, $realtime, $logError) { +$server->onWorkerStart(function (int $workerId) use ($server, $dbPool, $redisPool, $stats, $realtime, $logError) { Console::success('Worker ' . $workerId . ' started successfully'); $attempts = 0; $start = time(); - Timer::tick(5000, function () use ($server, $register, $realtime, $stats, $logError) { + Timer::tick(5000, function () use ($server, $dbPool, $redisPool, $realtime, $stats, $logError) { /** * Sending current connections to project channels on the console project every 5 seconds. */ if ($realtime->hasSubscriber('console', Role::users()->toString(), 'project')) { - [$database, $returnDatabase] = getDatabase($register, '_console'); + [$database, $returnDatabase] = getDatabase($dbPool, $redisPool, '_console'); $payload = []; @@ -289,7 +290,7 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats, $start = time(); /** @var Redis $redis */ - $redis = $register->get('redisPool')->get(); + $redis = $redisPool->get(); $redis->setOption(Redis::OPT_READ_TIMEOUT, -1); if ($redis->ping(true)) { @@ -299,7 +300,7 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats, Console::error('Pub/sub failed (worker: ' . $workerId . ')'); } - $redis->subscribe(['realtime'], function (Redis $redis, string $channel, string $payload) use ($server, $workerId, $stats, $register, $realtime) { + $redis->subscribe(['realtime'], function (Redis $redis, string $channel, string $payload) use ($server, $workerId, $stats, $dbPool, $redisPool, $realtime) { $event = json_decode($payload, true); if ($event['permissionsChanged'] && isset($event['userId'])) { @@ -308,9 +309,9 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats, if ($realtime->hasSubscriber($projectId, 'user:' . $userId)) { $connection = array_key_first(reset($realtime->subscriptions[$projectId]['user:' . $userId])); - [$consoleDatabase, $returnConsoleDatabase] = getDatabase($register, '_console'); + [$consoleDatabase, $returnConsoleDatabase] = getDatabase($dbPool, $redisPool, '_console'); $project = Authorization::skip(fn() => $consoleDatabase->getDocument('projects', $projectId)); - [$database, $returnDatabase] = getDatabase($register, "_{$project->getInternalId()}"); + [$database, $returnDatabase] = getDatabase($dbPool, $redisPool, "_{$project->getInternalId()}"); $user = $database->getDocument('users', $userId); @@ -347,10 +348,11 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats, call_user_func($logError, $th, "pubSubConnection"); Console::error('Pub/sub error: ' . $th->getMessage()); - $register->get('redisPool')->put($redis); $attempts++; sleep(DATABASE_RECONNECT_SLEEP); continue; + } finally { + $redisPool->put($redis); } } @@ -485,11 +487,11 @@ $server->onOpen(function (int $connection, SwooleRequest $request) use ($server, } }); -$server->onMessage(function (int $connection, string $message) use ($server, $register, $realtime, $containerId) { +$server->onMessage(function (int $connection, string $message) use ($server, $dbPool, $redisPool, $realtime, $containerId) { try { $response = new Response(new SwooleResponse()); - $db = $register->get('dbPool')->get(); - $redis = $register->get('redisPool')->get(); + $db = $dbPool->get(); + $redis = $redisPool->get(); $cache = new Cache(new RedisCache($redis)); $database = new Database(new MariaDB($db), $cache); @@ -583,8 +585,8 @@ $server->onMessage(function (int $connection, string $message) use ($server, $re $server->close($connection, $th->getCode()); } } finally { - $register->get('dbPool')->put($db); - $register->get('redisPool')->put($redis); + $dbPool->put($db); + $redisPool->put($redis); } });