refactor: Connection pools
This commit is contained in:
parent
664c3ad466
commit
1217aee296
4 changed files with 30 additions and 11 deletions
|
@ -179,6 +179,8 @@ Http::get('/v1/health/cache')
|
|||
'status' => 'fail',
|
||||
'ping' => \round((\microtime(true) - $checkStart) / 1000)
|
||||
]);
|
||||
} finally {
|
||||
$connections->reclaim();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -241,6 +243,8 @@ Http::get('/v1/health/queue')
|
|||
'status' => 'fail',
|
||||
'ping' => \round((\microtime(true) - $checkStart) / 1000)
|
||||
]);
|
||||
} finally {
|
||||
$connections->reclaim();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -277,11 +281,11 @@ Http::get('/v1/health/pubsub')
|
|||
foreach ($config as $database) {
|
||||
try {
|
||||
$pool = $pools['pools-pubsub-' . $database]['pool'];
|
||||
$dsn = $pools['pools-pubsub-' . $database]['dsn'];
|
||||
|
||||
$connection = $pool->get();
|
||||
$connections->add($connection, $pool);
|
||||
|
||||
$adapter = new Connection\Redis($dsn->getHost(), $dsn->getPort());
|
||||
$adapter = new Connection\Redis($connection);
|
||||
|
||||
$checkStart = \microtime(true);
|
||||
|
||||
|
@ -304,6 +308,8 @@ Http::get('/v1/health/pubsub')
|
|||
'status' => 'fail',
|
||||
'ping' => \round((\microtime(true) - $checkStart) / 1000)
|
||||
]);
|
||||
} finally {
|
||||
$connections->reclaim();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -58,13 +58,18 @@ function startCoroutineServer(float|int $payloadSize, float|int $workerNumber, R
|
|||
$http->setRequestClass(Request::class);
|
||||
$http->setResponseClass(Response::class);
|
||||
|
||||
Http::onEnd()
|
||||
->inject('connections')
|
||||
->action(function (Connections $connections) use ($workerId) {
|
||||
$connections->reclaim();
|
||||
});
|
||||
Http::onStart()
|
||||
->inject('authorization')
|
||||
->inject('cache')
|
||||
->inject('pools')
|
||||
->inject('connections')
|
||||
->action(function (Authorization $authorization, Cache $cache, array $pools, Connections $connections) use ($workerId) {
|
||||
if($workerId !== 0) {
|
||||
if ($workerId !== 0) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
|
|
|
@ -257,8 +257,7 @@ $global->set(
|
|||
];
|
||||
|
||||
$pools = [];
|
||||
$poolSize = (int)System::getEnv('_APP_POOL_CLIENTS', 9000);
|
||||
$poolSize = 9000;
|
||||
$poolSize = (int)System::getEnv('_APP_POOL_CLIENTS', 64);
|
||||
|
||||
foreach ($connections as $key => $connection) {
|
||||
$dsns = $connection['dsns'] ?? '';
|
||||
|
@ -841,11 +840,10 @@ $queue
|
|||
->inject('connections')
|
||||
->setCallback(function (array $pools, Connections $connections) {
|
||||
$pool = $pools['pools-queue-main']['pool'];
|
||||
$dsn = $pools['pools-queue-main']['dsn'];
|
||||
$connection = $pool->get();
|
||||
$connections->add($connection, $pool);
|
||||
|
||||
return new Queue\Connection\Redis($dsn->getHost(), $dsn->getPort());
|
||||
return new Queue\Connection\Redis($connection);
|
||||
});
|
||||
|
||||
$queueForMessaging
|
||||
|
|
|
@ -5,6 +5,7 @@ use Appwrite\Extend\Exception;
|
|||
use Appwrite\Extend\Exception as AppwriteException;
|
||||
use Appwrite\Messaging\Adapter\Realtime;
|
||||
use Appwrite\Network\Validator\Origin;
|
||||
use Appwrite\Utopia\Queue\Connections;
|
||||
use Appwrite\Utopia\Request;
|
||||
use Appwrite\Utopia\Response;
|
||||
use Swoole\Http\Request as SwooleRequest;
|
||||
|
@ -28,6 +29,7 @@ use Utopia\Http\Adapter\Swoole\Response as HttpResponse;
|
|||
use Utopia\Http\Adapter\Swoole\Response as UtopiaResponse;
|
||||
use Utopia\Http\Http;
|
||||
use Utopia\Logger\Log;
|
||||
use Utopia\Pools\Connection;
|
||||
use Utopia\Registry\Registry;
|
||||
use Utopia\System\System;
|
||||
use Utopia\WebSocket\Adapter;
|
||||
|
@ -270,10 +272,14 @@ $server->onWorkerStart(function (int $workerId) use ($server, $container, $stats
|
|||
$start = time();
|
||||
|
||||
$pools = $container->get('pools');
|
||||
/** @var Connections $connections */
|
||||
$connections = $container->get('connections');
|
||||
|
||||
$pool = $pools['pools-pubsub-main']['pool'];
|
||||
$dsn = $pools['pools-pubsub-main']['dsn'];
|
||||
$redis = new \Redis();
|
||||
$redis->connect($dsn->getHost(), $dsn->getPort());
|
||||
$connection = $pool->get();
|
||||
$connections->add($connection, $pool);
|
||||
|
||||
$redis = $connection;
|
||||
|
||||
/** @var Redis $redis */
|
||||
$redis->setOption(Redis::OPT_READ_TIMEOUT, -1);
|
||||
|
@ -477,6 +483,11 @@ $server->onOpen(function (int $connection, SwooleRequest $request) use ($server,
|
|||
}
|
||||
});
|
||||
|
||||
$server->onWorkerStop(function (int $workerId) use ($container) {
|
||||
$connections = $container->get('connections');
|
||||
$connections->reclaim();
|
||||
});
|
||||
|
||||
$server->onMessage(function (int $connection, string $message) use ($server, $container, $realtime, $containerId) {
|
||||
try {
|
||||
$response = new Response(new HttpResponse(new SwooleHttpResponse()));
|
||||
|
@ -486,7 +497,6 @@ $server->onMessage(function (int $connection, string $message) use ($server, $co
|
|||
$authentication = $container->get('authentication');
|
||||
|
||||
if ($projectId !== 'console') {
|
||||
|
||||
$project = $authorization->skip(fn () => $database->getDocument('projects', $projectId));
|
||||
$database = $container->get('getProjectDB')($project);
|
||||
} else {
|
||||
|
|
Loading…
Reference in a new issue