diff --git a/app/controllers/api/health.php b/app/controllers/api/health.php index 24acaf96cc..47d80dbf71 100644 --- a/app/controllers/api/health.php +++ b/app/controllers/api/health.php @@ -179,6 +179,8 @@ Http::get('/v1/health/cache') 'status' => 'fail', 'ping' => \round((\microtime(true) - $checkStart) / 1000) ]); + } finally { + $connections->reclaim(); } } } @@ -241,6 +243,8 @@ Http::get('/v1/health/queue') 'status' => 'fail', 'ping' => \round((\microtime(true) - $checkStart) / 1000) ]); + } finally { + $connections->reclaim(); } } } @@ -277,11 +281,11 @@ Http::get('/v1/health/pubsub') foreach ($config as $database) { try { $pool = $pools['pools-pubsub-' . $database]['pool']; - $dsn = $pools['pools-pubsub-' . $database]['dsn']; + $connection = $pool->get(); $connections->add($connection, $pool); - $adapter = new Connection\Redis($dsn->getHost(), $dsn->getPort()); + $adapter = new Connection\Redis($connection); $checkStart = \microtime(true); @@ -304,6 +308,8 @@ Http::get('/v1/health/pubsub') 'status' => 'fail', 'ping' => \round((\microtime(true) - $checkStart) / 1000) ]); + } finally { + $connections->reclaim(); } } } diff --git a/app/http.php b/app/http.php index bb50c46230..5bf67108bc 100644 --- a/app/http.php +++ b/app/http.php @@ -58,13 +58,18 @@ function startCoroutineServer(float|int $payloadSize, float|int $workerNumber, R $http->setRequestClass(Request::class); $http->setResponseClass(Response::class); + Http::onEnd() + ->inject('connections') + ->action(function (Connections $connections) use ($workerId) { + $connections->reclaim(); + }); Http::onStart() ->inject('authorization') ->inject('cache') ->inject('pools') ->inject('connections') ->action(function (Authorization $authorization, Cache $cache, array $pools, Connections $connections) use ($workerId) { - if($workerId !== 0) { + if ($workerId !== 0) { return; } try { diff --git a/app/init2.php b/app/init2.php index 9e5c9ad25a..8441a84f81 100644 --- a/app/init2.php +++ b/app/init2.php @@ -257,8 +257,7 @@ $global->set( ]; $pools = []; - $poolSize = (int)System::getEnv('_APP_POOL_CLIENTS', 9000); - $poolSize = 9000; + $poolSize = (int)System::getEnv('_APP_POOL_CLIENTS', 64); foreach ($connections as $key => $connection) { $dsns = $connection['dsns'] ?? ''; @@ -841,11 +840,10 @@ $queue ->inject('connections') ->setCallback(function (array $pools, Connections $connections) { $pool = $pools['pools-queue-main']['pool']; - $dsn = $pools['pools-queue-main']['dsn']; $connection = $pool->get(); $connections->add($connection, $pool); - return new Queue\Connection\Redis($dsn->getHost(), $dsn->getPort()); + return new Queue\Connection\Redis($connection); }); $queueForMessaging diff --git a/app/realtime.php b/app/realtime.php index 117941a86e..0e95ce1735 100644 --- a/app/realtime.php +++ b/app/realtime.php @@ -5,6 +5,7 @@ use Appwrite\Extend\Exception; use Appwrite\Extend\Exception as AppwriteException; use Appwrite\Messaging\Adapter\Realtime; use Appwrite\Network\Validator\Origin; +use Appwrite\Utopia\Queue\Connections; use Appwrite\Utopia\Request; use Appwrite\Utopia\Response; use Swoole\Http\Request as SwooleRequest; @@ -28,6 +29,7 @@ use Utopia\Http\Adapter\Swoole\Response as HttpResponse; use Utopia\Http\Adapter\Swoole\Response as UtopiaResponse; use Utopia\Http\Http; use Utopia\Logger\Log; +use Utopia\Pools\Connection; use Utopia\Registry\Registry; use Utopia\System\System; use Utopia\WebSocket\Adapter; @@ -270,10 +272,14 @@ $server->onWorkerStart(function (int $workerId) use ($server, $container, $stats $start = time(); $pools = $container->get('pools'); + /** @var Connections $connections */ + $connections = $container->get('connections'); + $pool = $pools['pools-pubsub-main']['pool']; - $dsn = $pools['pools-pubsub-main']['dsn']; - $redis = new \Redis(); - $redis->connect($dsn->getHost(), $dsn->getPort()); + $connection = $pool->get(); + $connections->add($connection, $pool); + + $redis = $connection; /** @var Redis $redis */ $redis->setOption(Redis::OPT_READ_TIMEOUT, -1); @@ -477,6 +483,11 @@ $server->onOpen(function (int $connection, SwooleRequest $request) use ($server, } }); +$server->onWorkerStop(function (int $workerId) use ($container) { + $connections = $container->get('connections'); + $connections->reclaim(); +}); + $server->onMessage(function (int $connection, string $message) use ($server, $container, $realtime, $containerId) { try { $response = new Response(new HttpResponse(new SwooleHttpResponse())); @@ -486,7 +497,6 @@ $server->onMessage(function (int $connection, string $message) use ($server, $co $authentication = $container->get('authentication'); if ($projectId !== 'console') { - $project = $authorization->skip(fn () => $database->getDocument('projects', $projectId)); $database = $container->get('getProjectDB')($project); } else {