1
0
Fork 0
mirror of synced 2024-10-01 01:37:56 +13:00

Capture pools on init

This commit is contained in:
Jake Barnby 2022-11-09 16:39:15 +13:00
parent 4fe2289e7f
commit b6839fa8f1
No known key found for this signature in database
GPG key ID: C437A8CC85B96E9C

View file

@ -4,6 +4,7 @@ use Appwrite\Auth\Auth;
use Appwrite\Messaging\Adapter\Realtime;
use Appwrite\Network\Validator\Origin;
use Appwrite\Utopia\Response;
use Swoole\ConnectionPool;
use Swoole\Http\Request as SwooleRequest;
use Swoole\Http\Response as SwooleResponse;
use Swoole\Runtime;
@ -35,7 +36,7 @@ Runtime::enableCoroutine(SWOOLE_HOOK_ALL);
$realtime = new Realtime();
$dbPool = $register->get('dbPool');
$dbPool = $register->get('dbPool', args: [getWorkerPoolSize()]);
$redisPool = $register->get('redisPool');
/**
@ -98,7 +99,7 @@ $logError = function (Throwable $error, string $action) use ($register) {
$server->error($logError);
function getDatabase(Registry &$register, string $namespace)
function getDatabase(ConnectionPool $dbPool, ConnectionPool $redisPool, string $namespace)
{
$attempts = 0;
@ -106,8 +107,8 @@ function getDatabase(Registry &$register, string $namespace)
try {
$attempts++;
$db = $register->get('dbPool')->get();
$redis = $register->get('redisPool')->get();
$db = $dbPool->get();
$redis = $redisPool->get();
$cache = new Cache(new RedisCache($redis));
$database = new Database(new MariaDB($db), $cache);
@ -130,23 +131,23 @@ function getDatabase(Registry &$register, string $namespace)
return [
$database,
function () use ($register, $db, $redis) {
$register->get('dbPool')->put($db);
$register->get('redisPool')->put($redis);
function () use ($dbPool, $redisPool, $db, $redis) {
$dbPool->put($db);
$redisPool->put($redis);
}
];
}
$server->onStart(function () use ($stats, $register, $containerId, &$statsDocument, $logError) {
$server->onStart(function () use ($stats, $dbPool, $redisPool, $containerId, &$statsDocument, $logError) {
sleep(5); // wait for the initial database schema to be ready
Console::success('Server started successfully');
/**
* Create document for this worker to share stats across Containers.
*/
go(function () use ($register, $containerId, &$statsDocument) {
go(function () use ($dbPool, $redisPool, $containerId, &$statsDocument) {
$attempts = 0;
[$database, $returnDatabase] = getDatabase($register, '_console');
[$database, $returnDatabase] = getDatabase($dbPool, $redisPool, '_console');
do {
try {
$attempts++;
@ -172,7 +173,7 @@ $server->onStart(function () use ($stats, $register, $containerId, &$statsDocume
/**
* Save current connections to the Database every 5 seconds.
*/
Timer::tick(5000, function () use ($register, $stats, &$statsDocument, $logError) {
Timer::tick(5000, function () use ($dbPool, $redisPool, $stats, &$statsDocument, $logError) {
$payload = [];
foreach ($stats as $projectId => $value) {
$payload[$projectId] = $stats->get($projectId, 'connectionsTotal');
@ -182,7 +183,7 @@ $server->onStart(function () use ($stats, $register, $containerId, &$statsDocume
}
try {
[$database, $returnDatabase] = getDatabase($register, '_console');
[$database, $returnDatabase] = getDatabase($dbPool, $redisPool, '_console');
$statsDocument
->setAttribute('timestamp', DateTime::now())
@ -197,18 +198,18 @@ $server->onStart(function () use ($stats, $register, $containerId, &$statsDocume
});
});
$server->onWorkerStart(function (int $workerId) use ($server, $register, $stats, $realtime, $logError) {
$server->onWorkerStart(function (int $workerId) use ($server, $dbPool, $redisPool, $stats, $realtime, $logError) {
Console::success('Worker ' . $workerId . ' started successfully');
$attempts = 0;
$start = time();
Timer::tick(5000, function () use ($server, $register, $realtime, $stats, $logError) {
Timer::tick(5000, function () use ($server, $dbPool, $redisPool, $realtime, $stats, $logError) {
/**
* Sending current connections to project channels on the console project every 5 seconds.
*/
if ($realtime->hasSubscriber('console', Role::users()->toString(), 'project')) {
[$database, $returnDatabase] = getDatabase($register, '_console');
[$database, $returnDatabase] = getDatabase($dbPool, $redisPool, '_console');
$payload = [];
@ -289,7 +290,7 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats,
$start = time();
/** @var Redis $redis */
$redis = $register->get('redisPool')->get();
$redis = $redisPool->get();
$redis->setOption(Redis::OPT_READ_TIMEOUT, -1);
if ($redis->ping(true)) {
@ -299,7 +300,7 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats,
Console::error('Pub/sub failed (worker: ' . $workerId . ')');
}
$redis->subscribe(['realtime'], function (Redis $redis, string $channel, string $payload) use ($server, $workerId, $stats, $register, $realtime) {
$redis->subscribe(['realtime'], function (Redis $redis, string $channel, string $payload) use ($server, $workerId, $stats, $dbPool, $redisPool, $realtime) {
$event = json_decode($payload, true);
if ($event['permissionsChanged'] && isset($event['userId'])) {
@ -308,9 +309,9 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats,
if ($realtime->hasSubscriber($projectId, 'user:' . $userId)) {
$connection = array_key_first(reset($realtime->subscriptions[$projectId]['user:' . $userId]));
[$consoleDatabase, $returnConsoleDatabase] = getDatabase($register, '_console');
[$consoleDatabase, $returnConsoleDatabase] = getDatabase($dbPool, $redisPool, '_console');
$project = Authorization::skip(fn() => $consoleDatabase->getDocument('projects', $projectId));
[$database, $returnDatabase] = getDatabase($register, "_{$project->getInternalId()}");
[$database, $returnDatabase] = getDatabase($dbPool, $redisPool, "_{$project->getInternalId()}");
$user = $database->getDocument('users', $userId);
@ -347,10 +348,11 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats,
call_user_func($logError, $th, "pubSubConnection");
Console::error('Pub/sub error: ' . $th->getMessage());
$register->get('redisPool')->put($redis);
$attempts++;
sleep(DATABASE_RECONNECT_SLEEP);
continue;
} finally {
$redisPool->put($redis);
}
}
@ -485,11 +487,11 @@ $server->onOpen(function (int $connection, SwooleRequest $request) use ($server,
}
});
$server->onMessage(function (int $connection, string $message) use ($server, $register, $realtime, $containerId) {
$server->onMessage(function (int $connection, string $message) use ($server, $dbPool, $redisPool, $realtime, $containerId) {
try {
$response = new Response(new SwooleResponse());
$db = $register->get('dbPool')->get();
$redis = $register->get('redisPool')->get();
$db = $dbPool->get();
$redis = $redisPool->get();
$cache = new Cache(new RedisCache($redis));
$database = new Database(new MariaDB($db), $cache);
@ -583,8 +585,8 @@ $server->onMessage(function (int $connection, string $message) use ($server, $re
$server->close($connection, $th->getCode());
}
} finally {
$register->get('dbPool')->put($db);
$register->get('redisPool')->put($redis);
$dbPool->put($db);
$redisPool->put($redis);
}
});