1
0
Fork 0
mirror of synced 2024-10-05 20:53:27 +13:00

Merge pull request #6886 from appwrite/feat-reclaim-only-current-connection

Fix connection reclaim logic.
This commit is contained in:
Jake Barnby 2024-04-08 16:14:04 +12:00 committed by GitHub
commit c5c0c8a8e6
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 344 additions and 133 deletions

View file

@ -323,7 +323,28 @@ $http->on('request', function (SwooleRequest $swooleRequest, SwooleResponse $swo
$swooleResponse->end(\json_encode($output));
} finally {
$pools->reclaim();
$connectionForConsole = $app->getResource('connectionForConsole');
$connectionForProject = $app->getResource('connectionForProject');
$connectionForQueue = $app->getResource('connectionForQueue');
$connectionsForCache = $app->getResource('connectionsForCache');
if (!is_null($connectionForConsole)) {
$connectionForConsole->reclaim();
}
if (!is_null($connectionForProject)) {
$connectionForProject->reclaim();
}
if (!is_null($connectionForQueue)) {
$connectionForQueue->reclaim();
}
if (!empty($connectionsForCache)) {
foreach ($connectionsForCache as $connection) {
$connection->reclaim();
}
}
}
});

View file

@ -884,7 +884,13 @@ App::setResource('localeCodes', function () {
// Queues
App::setResource('queue', function (Group $pools) {
return $pools->get('queue')->pop()->getResource();
$connection = $pools->get('queue')->pop();
App::setResource('connectionForQueue', function () use ($connection) {
return $connection;
});
return $connection->getResource();
}, ['pools']);
App::setResource('queueForMessaging', function (Connection $queue) {
return new Phone($queue);
@ -1126,15 +1132,33 @@ App::setResource('console', function () {
]);
}, []);
App::setResource('connectionForProject', function () {
return null;
});
App::setResource('connectionForConsole', function () {
return null;
});
App::setResource('connectionForQueue', function () {
return null;
});
App::setResource('connectionsForCache', function () {
return [];
});
App::setResource('dbForProject', function (Group $pools, Database $dbForConsole, Cache $cache, Document $project) {
if ($project->isEmpty() || $project->getId() === 'console') {
return $dbForConsole;
}
$dbAdapter = $pools
$connection = $pools
->get($project->getAttribute('database'))
->pop()
->getResource();
->pop();
App::setResource('connectionForProject', function () use ($connection) {
return $connection;
});
$dbAdapter = $connection->getResource();
$database = new Database($dbAdapter, $cache);
@ -1148,11 +1172,15 @@ App::setResource('dbForProject', function (Group $pools, Database $dbForConsole,
}, ['pools', 'dbForConsole', 'cache', 'project']);
App::setResource('dbForConsole', function (Group $pools, Cache $cache) {
$dbAdapter = $pools
$connection = $pools
->get('console')
->pop()
->getResource()
;
->pop();
App::setResource('connectionForConsole', function () use ($connection) {
return $connection;
});
$dbAdapter = $connection->getResource();
$database = new Database($dbAdapter, $cache);
@ -1187,15 +1215,18 @@ App::setResource('getProjectDB', function (Group $pools, Database $dbForConsole,
return $database;
}
$dbAdapter = $pools
$connection = $pools
->get($databaseName)
->pop()
->getResource();
->pop();
$dbAdapter = $connection->getResource();
App::setResource('connectionForProject', function () use ($connection) {
return $connection;
}, []);
$database = new Database($dbAdapter, $cache);
$databases[$databaseName] = $database;
$database
->setNamespace('_' . $project->getInternalId())
->setMetadata('host', \gethostname())
@ -1211,15 +1242,21 @@ App::setResource('getProjectDB', function (Group $pools, Database $dbForConsole,
App::setResource('cache', function (Group $pools) {
$list = Config::getParam('pools-cache', []);
$adapters = [];
$connections = [];
foreach ($list as $value) {
$adapters[] = $pools
$connection = $pools
->get($value)
->pop()
->getResource()
;
->pop();
$connections[] = $connection;
$adapters[] = $connection->getResource();
}
App::setResource('connectionsForCache', function () use ($connections) {
return $connections;
}, []);
return new Cache(new Sharding($adapters));
}, ['pools']);

View file

@ -26,92 +26,151 @@ use Utopia\Cache\Adapter\Sharding;
use Utopia\Cache\Cache;
use Utopia\Config\Config;
use Utopia\Database\Database;
use Utopia\Pools\Group;
use Utopia\Registry\Registry;
use Utopia\WebSocket\Server;
use Utopia\WebSocket\Adapter;
/**
* @var \Utopia\Registry\Registry $register
* @var Registry $register
*/
require_once __DIR__ . '/init.php';
Runtime::enableCoroutine(SWOOLE_HOOK_ALL);
Runtime::enableCoroutine();
// Allows overriding
if (!function_exists("getConsoleDB")) {
function getConsoleDB(): Database
if (!function_exists('getConsoleDB')) {
/**
* @return array{Database, callable}
* @throws Exception|\Exception
*/
function getConsoleDB(): array
{
global $register;
/** @var \Utopia\Pools\Group $pools */
/** @var Group $pools */
$pools = $register->get('pools');
$dbAdapter = $pools
$dbConnection = $pools
->get('console')
->pop()
->getResource()
;
->pop();
$database = new Database($dbAdapter, getCache());
$dbAdapter = $dbConnection->getResource();
[$cache, $reclaimCache] = getCache();
$database = new Database($dbAdapter, $cache);
$database
->setNamespace('_console')
->setMetadata('host', \gethostname())
->setMetadata('project', '_console');
return $database;
return [$database, function () use ($dbConnection, $reclaimCache) {
$dbConnection->reclaim();
$reclaimCache();
}];
}
}
// Allows overriding
if (!function_exists("getProjectDB")) {
function getProjectDB(Document $project): Database
if (!function_exists('getProjectDB')) {
/**
* @param Document $project
* @return array{Database, callable}
* @throws Exception
*/
function getProjectDB(Document $project): array
{
global $register;
/** @var \Utopia\Pools\Group $pools */
/** @var Group $pools */
$pools = $register->get('pools');
if ($project->isEmpty() || $project->getId() === 'console') {
return getConsoleDB();
}
$dbAdapter = $pools
$dbConnection = $pools
->get($project->getAttribute('database'))
->pop()
->getResource()
;
->pop();
$database = new Database($dbAdapter, getCache());
$dbAdapter = $dbConnection->getResource();
[$cache, $reclaimCache] = getCache();
$database = new Database($dbAdapter, $cache);
$database
->setNamespace('_' . $project->getInternalId())
->setMetadata('host', \gethostname())
->setMetadata('project', $project->getId());
return $database;
return [$database, function () use ($dbConnection, $reclaimCache) {
$dbConnection->reclaim();
$reclaimCache();
}];
}
}
// Allows overriding
if (!function_exists("getCache")) {
function getCache(): Cache
if (!function_exists('getCache')) {
/**
* @return array{Cache, callable}
* @throws Exception|\Exception
*/
function getCache(): array
{
global $register;
$pools = $register->get('pools'); /** @var \Utopia\Pools\Group $pools */
/** @var Group $pools */
$pools = $register->get('pools');
$list = Config::getParam('pools-cache', []);
$connections = [];
$adapters = [];
foreach ($list as $value) {
$adapters[] = $pools
$connection = $pools
->get($value)
->pop()
->getResource()
;
->pop();
$connections[] = $connection;
$adapters[] = $connection->getResource();
}
return new Cache(new Sharding($adapters));
$cache = new Cache(new Sharding($adapters));
return [$cache, function () use ($connections) {
foreach ($connections as $connection) {
$connection->reclaim();
}
}];
}
}
if (!function_exists('getPubSub')) {
/**
* @return array{Redis, callable}
* @throws Exception|\Exception
*/
function getPubSub(): array
{
global $register;
/** @var Group $pools */
$pools = $register->get('pools');
$connection = $pools
->get('pubsub')
->pop();
$redis = $connection->getResource();
return [$redis, function () use ($connection) {
$connection->reclaim();
}];
}
}
@ -133,13 +192,17 @@ $statsDocument = null;
$workerNumber = swoole_cpu_num() * intval(App::getEnv('_APP_WORKER_PER_CORE', 6));
$adapter = new Adapter\Swoole(port: App::getEnv('PORT', 80));
$adapter
->setPackageMaxLength(64000) // Default maximum Package Size (64kb)
->setWorkerNumber($workerNumber);
$server = new Server($adapter);
$logError = function (Throwable $error, string $action) use ($register) {
function logError(Throwable $error, string $action): void
{
global $register;
$logger = $register->get('logger');
if ($logger && !$error instanceof Exception) {
@ -173,11 +236,13 @@ $logError = function (Throwable $error, string $action) use ($register) {
Console::error('[Error] Message: ' . $error->getMessage());
Console::error('[Error] File: ' . $error->getFile());
Console::error('[Error] Line: ' . $error->getLine());
};
}
$server->error($logError);
$server->error(function (Throwable $th, string $method) {
logError($th, $method);
});
$server->onStart(function () use ($stats, $register, $containerId, &$statsDocument, $logError) {
$server->onStart(function () use ($stats, $register, $containerId, &$statsDocument) {
sleep(5); // wait for the initial database schema to be ready
Console::success('Server started successfully');
@ -186,11 +251,17 @@ $server->onStart(function () use ($stats, $register, $containerId, &$statsDocume
*/
go(function () use ($register, $containerId, &$statsDocument) {
$attempts = 0;
$database = getConsoleDB();
do {
try {
/**
* @var Database $database
* @var callable $reclaim
*/
[$database, $reclaim] = getConsoleDB();
$attempts++;
$document = new Document([
'$id' => ID::unique(),
'$collection' => ID::custom('realtime'),
@ -200,102 +271,131 @@ $server->onStart(function () use ($stats, $register, $containerId, &$statsDocume
'value' => '{}'
]);
$statsDocument = Authorization::skip(fn () => $database->createDocument('realtime', $document));
$statsDocument = Authorization::skip(function () use ($database, $document) {
return $database->createDocument('realtime', $document);
});
break;
} catch (Throwable) {
Console::warning("Collection not ready. Retrying connection ({$attempts})...");
sleep(DATABASE_RECONNECT_SLEEP);
}
} while (true);
$register->get('pools')->reclaim();
if (isset($reclaim)) {
$reclaim();
}
});
/**
* Save current connections to the Database every 5 seconds.
*/
// Timer::tick(5000, function () use ($register, $stats, &$statsDocument, $logError) {
// $payload = [];
// foreach ($stats as $projectId => $value) {
// $payload[$projectId] = $stats->get($projectId, 'connectionsTotal');
// }
// if (empty($payload) || empty($statsDocument)) {
// return;
// }
Timer::tick(5000, function () use ($register, $stats, &$statsDocument) {
$payload = [];
// try {
// $database = getConsoleDB();
foreach ($stats as $projectId => $value) {
$payload[$projectId] = $stats->get($projectId, 'connectionsTotal');
}
// $statsDocument
// ->setAttribute('timestamp', DateTime::now())
// ->setAttribute('value', json_encode($payload));
if (empty($payload) || empty($statsDocument)) {
return;
}
// Authorization::skip(fn () => $database->updateDocument('realtime', $statsDocument->getId(), $statsDocument));
// } catch (Throwable $th) {
// call_user_func($logError, $th, "updateWorkerDocument");
// } finally {
// $register->get('pools')->reclaim();
// }
// });
try {
/**
* @var Database $database
* @var callable $reclaim
*/
[$database, $reclaim] = getConsoleDB();
$statsDocument
->setAttribute('timestamp', DateTime::now())
->setAttribute('value', json_encode($payload));
Authorization::skip(function () use ($database, $statsDocument) {
$database->updateDocument('realtime', $statsDocument->getId(), $statsDocument);
});
} catch (Throwable $th) {
logError($th, 'updateWorkerDocument');
} finally {
if (isset($reclaim)) {
$reclaim();
}
}
});
});
$server->onWorkerStart(function (int $workerId) use ($server, $register, $stats, $realtime, $logError) {
$server->onWorkerStart(function (int $workerId) use ($server, $register, $stats, $realtime) {
Console::success('Worker ' . $workerId . ' started successfully');
$attempts = 0;
$start = time();
Timer::tick(5000, function () use ($server, $register, $realtime, $stats, $logError) {
Timer::tick(5000, function () use ($server, $register, $realtime, $stats) {
/**
* Sending current connections to project channels on the console project every 5 seconds.
*/
if ($realtime->hasSubscriber('console', Role::users()->toString(), 'project')) {
$database = getConsoleDB();
try {
/**
* @var Database $database
* @var callable $reclaim
*/
[$database, $reclaim] = getConsoleDB();
$payload = [];
$payload = [];
$list = Authorization::skip(fn () => $database->find('realtime', [
Query::greaterThan('timestamp', DateTime::addSeconds(new \DateTime(), -15)),
]));
$list = Authorization::skip(function () use ($database) {
return $database->find('realtime', [
Query::greaterThan('timestamp', DateTime::addSeconds(new \DateTime(), -15)),
]);
});
/**
* Aggregate stats across containers.
*/
foreach ($list as $document) {
foreach (json_decode($document->getAttribute('value')) as $projectId => $value) {
if (array_key_exists($projectId, $payload)) {
$payload[$projectId] += $value;
} else {
$payload[$projectId] = $value;
/**
* Aggregate stats across containers.
*/
foreach ($list as $document) {
foreach (json_decode($document->getAttribute('value')) as $projectId => $value) {
if (array_key_exists($projectId, $payload)) {
$payload[$projectId] += $value;
} else {
$payload[$projectId] = $value;
}
}
}
}
foreach ($stats as $projectId => $value) {
if (!array_key_exists($projectId, $payload)) {
continue;
}
foreach ($stats as $projectId => $value) {
if (!array_key_exists($projectId, $payload)) {
continue;
}
$event = [
'project' => 'console',
'roles' => ['team:' . $stats->get($projectId, 'teamId')],
'data' => [
'events' => ['stats.connections'],
'channels' => ['project'],
'timestamp' => DateTime::formatTz(DateTime::now()),
'payload' => [
$projectId => $payload[$projectId]
$event = [
'project' => 'console',
'roles' => ['team:' . $stats->get($projectId, 'teamId')],
'data' => [
'events' => ['stats.connections'],
'channels' => ['project'],
'timestamp' => DateTime::formatTz(DateTime::now()),
'payload' => [
$projectId => $payload[$projectId]
]
]
]
];
];
$server->send($realtime->getSubscribers($event), json_encode([
'type' => 'event',
'data' => $event['data']
]));
$server->send($realtime->getSubscribers($event), json_encode([
'type' => 'event',
'data' => $event['data']
]));
}
} catch (Throwable $th) {
logError($th, 'sendStats');
} finally {
if (isset($reclaim)) {
$reclaim();
}
}
$register->get('pools')->reclaim();
}
/**
* Sending test message for SDK E2E tests every 5 seconds.
*/
@ -327,9 +427,15 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats,
Attempting restart in 5 seconds (attempt #' . $attempts . ')');
sleep(5); // 5 sec delay between connection attempts
}
$start = time();
$redis = $register->get('pools')->get('pubsub')->pop()->getResource(); /** @var Redis $redis */
/**
* @var Redis $redis
* @var callable $reclaimForRedis
*/
[$redis, $reclaimForRedis] = getPubSub();
$redis->setOption(Redis::OPT_READ_TIMEOUT, -1);
if ($redis->ping(true)) {
@ -348,17 +454,36 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats,
if ($realtime->hasSubscriber($projectId, 'user:' . $userId)) {
$connection = array_key_first(reset($realtime->subscriptions[$projectId]['user:' . $userId]));
$consoleDatabase = getConsoleDB();
$project = Authorization::skip(fn() => $consoleDatabase->getDocument('projects', $projectId));
$database = getProjectDB($project);
$user = $database->getDocument('users', $userId);
/**
* @var Database $dbForConsole
* @var Database $dbForProject
* @var callable $reclaimForConsole
* @var callable $reclaimForProject
*/
[$dbForConsole, $reclaimForConsole] = getConsoleDB();
$project = Authorization::skip(function () use ($dbForConsole, $projectId) {
return $dbForConsole->getDocument('projects', $projectId);
});
[$dbForProject, $reclaimForProject] = getProjectDB($project);
$user = $dbForProject->getDocument('users', $userId);
$roles = Auth::getRoles($user);
$realtime->subscribe($projectId, $connection, $roles, $realtime->connections[$connection]['channels']);
$register->get('pools')->reclaim();
/**
* If we successfully reclaim, clear the callbacks
* so the finally block doesn't try to reclaim again.
*/
$reclaimForConsole();
$reclaimForConsole = null;
$reclaimForProject();
$reclaimForProject = null;
}
}
@ -383,30 +508,43 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats,
}
});
} catch (Throwable $th) {
call_user_func($logError, $th, "pubSubConnection");
logError($th, 'pubSubConnection');
Console::error('Pub/sub error: ' . $th->getMessage());
$attempts++;
sleep(DATABASE_RECONNECT_SLEEP);
continue;
} finally {
$register->get('pools')->reclaim();
if (isset($reclaimForRedis)) {
$reclaimForRedis();
}
if (isset($reclaimForConsole)) {
$reclaimForConsole();
}
if (isset($reclaimForProject)) {
$reclaimForProject();
}
}
}
Console::error('Failed to restart pub/sub...');
});
$server->onOpen(function (int $connection, SwooleRequest $request) use ($server, $register, $stats, &$realtime, $logError) {
$server->onOpen(function (int $connection, SwooleRequest $request) use ($server, $register, $stats, &$realtime) {
$app = new App('UTC');
$request = new Request($request);
$response = new Response(new SwooleResponse());
Console::info("Connection open (user: {$connection})");
App::setResource('pools', fn() => $register->get('pools'));
App::setResource('request', fn() => $request);
App::setResource('response', fn() => $response);
App::setResource('pools', function () use ($register) {
return $register->get('pools');
});
App::setResource('request', function () use ($request) {
return $request;
});
App::setResource('response', function () use ($response) {
return $response;
});
try {
/** @var Document $project */
@ -419,9 +557,13 @@ $server->onOpen(function (int $connection, SwooleRequest $request) use ($server,
throw new Exception(Exception::REALTIME_POLICY_VIOLATION, 'Missing or unknown project ID');
}
$dbForProject = getProjectDB($project);
$console = $app->getResource('console'); /** @var Document $console */
$user = $app->getResource('user'); /** @var Document $user */
[$dbForProject, $reclaimForProject] = getProjectDB($project);
/** @var Document $console */
$console = $app->getResource('console');
/** @var Document $user */
$user = $app->getResource('user');
/*
* Abuse Check
@ -481,7 +623,7 @@ $server->onOpen(function (int $connection, SwooleRequest $request) use ($server,
$stats->incr($project->getId(), 'connections');
$stats->incr($project->getId(), 'connectionsTotal');
} catch (Throwable $th) {
call_user_func($logError, $th, "initServer");
logError($th, 'initServer');
$response = [
'type' => 'error',
@ -500,7 +642,9 @@ $server->onOpen(function (int $connection, SwooleRequest $request) use ($server,
Console::error('[Error] Message: ' . $response['data']['message']);
}
} finally {
$register->get('pools')->reclaim();
if (isset($reclaimForProject)) {
$reclaimForProject();
}
}
});
@ -508,11 +652,14 @@ $server->onMessage(function (int $connection, string $message) use ($server, $re
try {
$response = new Response(new SwooleResponse());
$projectId = $realtime->connections[$connection]['projectId'];
$database = getConsoleDB();
[$database, $reclaimForConsole] = getConsoleDB();
if ($projectId !== 'console') {
$project = Authorization::skip(fn() => $database->getDocument('projects', $projectId));
$database = getProjectDB($project);
$project = Authorization::skip(function () use ($database, $projectId) {
return $database->getDocument('projects', $projectId);
});
[$database, $reclaimForProject] = getProjectDB($project);
} else {
$project = null;
}
@ -598,7 +745,12 @@ $server->onMessage(function (int $connection, string $message) use ($server, $re
$server->close($connection, $th->getCode());
}
} finally {
$register->get('pools')->reclaim();
if (isset($reclaimForConsole)) {
$reclaimForConsole();
}
if (isset($reclaimForProject)) {
$reclaimForProject();
}
}
});
@ -606,6 +758,7 @@ $server->onClose(function (int $connection) use ($realtime, $stats) {
if (array_key_exists($connection, $realtime->connections)) {
$stats->decr($realtime->connections[$connection]['projectId'], 'connectionsTotal');
}
$realtime->unsubscribe($connection);
Console::info('Connection close: ' . $connection);