1
0
Fork 0
mirror of synced 2024-10-02 10:16:27 +13:00

Updated Realtime server

This commit is contained in:
Eldad Fux 2022-10-17 20:26:21 +03:00
parent 1632ae61eb
commit 52d44f0599

View file

@ -1,7 +1,6 @@
<?php <?php
use Appwrite\Auth\Auth; use Appwrite\Auth\Auth;
use Appwrite\Database\Pools;
use Appwrite\Messaging\Adapter\Realtime; use Appwrite\Messaging\Adapter\Realtime;
use Appwrite\Network\Validator\Origin; use Appwrite\Network\Validator\Origin;
use Appwrite\Utopia\Response; use Appwrite\Utopia\Response;
@ -21,8 +20,11 @@ use Utopia\Database\DateTime;
use Utopia\Database\Document; use Utopia\Database\Document;
use Utopia\Database\Query; use Utopia\Database\Query;
use Utopia\Database\Validator\Authorization; use Utopia\Database\Validator\Authorization;
use Utopia\Registry\Registry;
use Appwrite\Utopia\Request; use Appwrite\Utopia\Request;
use Utopia\Cache\Adapter\Sharding;
use Utopia\Cache\Cache;
use Utopia\Config\Config;
use Utopia\Database\Database;
use Utopia\WebSocket\Server; use Utopia\WebSocket\Server;
use Utopia\WebSocket\Adapter; use Utopia\WebSocket\Adapter;
@ -30,6 +32,70 @@ require_once __DIR__ . '/init.php';
Runtime::enableCoroutine(SWOOLE_HOOK_ALL); Runtime::enableCoroutine(SWOOLE_HOOK_ALL);
function getConsoleDB(): Database
{
global $register;
$pools = $register->get('pools'); /** @var \Utopia\Pools\Group $pools */
$dbAdapter = $pools
->get('console')
->pop()
->getResource()
;
$database = new Database($dbAdapter, getCache());
$database->setNamespace('console');
$database->setDefaultDatabase('appwrite');
return $database;
}
function getProjectDB(Document $project): Database
{
global $register;
$pools = $register->get('pools'); /** @var \Utopia\Pools\Group $pools */
if($project->isEmpty() || $project->getId() === 'console') {
return getConsoleDB();
}
$dbAdapter = $pools
->get($project->getAttribute('database'))
->pop()
->getResource()
;
$database = new Database($dbAdapter, getCache());
$database->setNamespace('_'.$project->getInternalId());
$database->setDefaultDatabase('appwrite');
return $database;
}
function getCache(): Cache
{
global $register;
$pools = $register->get('pools'); /** @var \Utopia\Pools\Group $pools */
$list = Config::getParam('pools-cache', []);
$adapters = [];
foreach ($list as $value) {
$adapters[] = $pools
->get($value)
->pop()
->getResource()
;
}
return new Cache(new Sharding($adapters));
}
$realtime = new Realtime(); $realtime = new Realtime();
/** /**
@ -92,38 +158,6 @@ $logError = function (Throwable $error, string $action) use ($register) {
$server->error($logError); $server->error($logError);
function getDatabase(Registry &$register, string $projectId)
{
$redis = $register->get('redisPool')->get();
$dbPool = $register->get('dbPool');
/** Get the console DB */
$database = $dbPool->getConsoleDB();
$pdo = $dbPool->getPDOFromPool($database);
$database = Pools::wait(
Pools::getDatabase($pdo->getConnection(), $redis, '_console'),
'realtime'
);
if ($projectId !== 'console') {
$project = Authorization::skip(fn() => $database->getDocument('projects', $projectId));
$database = $project->getAttribute('database', '');
$pdo = $dbPool->getPDOFromPool($database);
$database = Pools::wait(
Pools::getDatabase($pdo->getConnection(), $redis, "_{$project->getInternalId()}"),
'realtime'
);
}
return [
$database,
function () use ($register, $redis) {
$register->get('dbPool')->reset();
$register->get('redisPool')->put($redis);
}
];
}
$server->onStart(function () use ($stats, $register, $containerId, &$statsDocument, $logError) { $server->onStart(function () use ($stats, $register, $containerId, &$statsDocument, $logError) {
sleep(5); // wait for the initial database schema to be ready sleep(5); // wait for the initial database schema to be ready
Console::success('Server started successfully'); Console::success('Server started successfully');
@ -133,7 +167,8 @@ $server->onStart(function () use ($stats, $register, $containerId, &$statsDocume
*/ */
go(function () use ($register, $containerId, &$statsDocument) { go(function () use ($register, $containerId, &$statsDocument) {
$attempts = 0; $attempts = 0;
[$database, $returnDatabase] = getDatabase($register, 'console'); $database = getConsoleDB();
do { do {
try { try {
$attempts++; $attempts++;
@ -153,7 +188,7 @@ $server->onStart(function () use ($stats, $register, $containerId, &$statsDocume
sleep(DATABASE_RECONNECT_SLEEP); sleep(DATABASE_RECONNECT_SLEEP);
} }
} while (true); } while (true);
call_user_func($returnDatabase); $register->get('pools')->reclaim();
}); });
/** /**
@ -169,7 +204,7 @@ $server->onStart(function () use ($stats, $register, $containerId, &$statsDocume
} }
try { try {
[$database, $returnDatabase] = getDatabase($register, 'console'); $database = getConsoleDB();
$statsDocument $statsDocument
->setAttribute('timestamp', DateTime::now()) ->setAttribute('timestamp', DateTime::now())
@ -179,7 +214,7 @@ $server->onStart(function () use ($stats, $register, $containerId, &$statsDocume
} catch (\Throwable $th) { } catch (\Throwable $th) {
call_user_func($logError, $th, "updateWorkerDocument"); call_user_func($logError, $th, "updateWorkerDocument");
} finally { } finally {
call_user_func($returnDatabase); $register->get('pools')->reclaim();
} }
}); });
}); });
@ -195,7 +230,7 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats,
* Sending current connections to project channels on the console project every 5 seconds. * Sending current connections to project channels on the console project every 5 seconds.
*/ */
if ($realtime->hasSubscriber('console', Role::users()->toString(), 'project')) { if ($realtime->hasSubscriber('console', Role::users()->toString(), 'project')) {
[$database, $returnDatabase] = getDatabase($register, '_console'); $database = getConsoleDB();
$payload = []; $payload = [];
@ -240,7 +275,7 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats,
])); ]));
} }
call_user_func($returnDatabase); $register->get('pools')->reclaim();
} }
/** /**
* Sending test message for SDK E2E tests every 5 seconds. * Sending test message for SDK E2E tests every 5 seconds.
@ -275,8 +310,7 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats,
} }
$start = time(); $start = time();
/** @var Redis $redis */ $redis = $register->get('pools')->get('pubsub')->pop()->getResource(); /** @var Redis $redis */
$redis = $register->get('redisPool')->get();
$redis->setOption(Redis::OPT_READ_TIMEOUT, -1); $redis->setOption(Redis::OPT_READ_TIMEOUT, -1);
if ($redis->ping(true)) { if ($redis->ping(true)) {
@ -295,18 +329,17 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats,
if ($realtime->hasSubscriber($projectId, 'user:' . $userId)) { if ($realtime->hasSubscriber($projectId, 'user:' . $userId)) {
$connection = array_key_first(reset($realtime->subscriptions[$projectId]['user:' . $userId])); $connection = array_key_first(reset($realtime->subscriptions[$projectId]['user:' . $userId]));
[$consoleDatabase, $returnConsoleDatabase] = getDatabase($register, 'console'); $consoleDatabase = getConsoleDB();
$project = Authorization::skip(fn() => $consoleDatabase->getDocument('projects', $projectId)); $project = Authorization::skip(fn() => $consoleDatabase->getDocument('projects', $projectId));
[$database, $returnDatabase] = getDatabase($register, $project->getId()); $database = getProjectDB($project);
$user = $database->getDocument('users', $userId); $user = $database->getDocument('users', $userId);
$roles = Auth::getRoles($user); $roles = Auth::getRoles($user);
$realtime->subscribe($projectId, $connection, $roles, $realtime->connections[$connection]['channels']); $realtime->subscribe($projectId, $connection, $roles, $realtime->connections[$connection]['channels']);
call_user_func($returnDatabase); $register->get('pools')->reclaim();
call_user_func($returnConsoleDatabase);
} }
} }
@ -334,7 +367,7 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats,
call_user_func($logError, $th, "pubSubConnection"); call_user_func($logError, $th, "pubSubConnection");
Console::error('Pub/sub error: ' . $th->getMessage()); Console::error('Pub/sub error: ' . $th->getMessage());
$register->get('redisPool')->put($redis); $register->get('pools')->reclaim();
$attempts++; $attempts++;
sleep(DATABASE_RECONNECT_SLEEP); sleep(DATABASE_RECONNECT_SLEEP);
continue; continue;
@ -349,15 +382,8 @@ $server->onOpen(function (int $connection, SwooleRequest $request) use ($server,
$request = new Request($request); $request = new Request($request);
$response = new Response(new SwooleResponse()); $response = new Response(new SwooleResponse());
/** @var PDO $db */
$dbPool = $register->get('dbPool');
/** @var Redis $redis */
$redis = $register->get('redisPool')->get();
Console::info("Connection open (user: {$connection})"); Console::info("Connection open (user: {$connection})");
App::setResource('dbPool', fn() => $dbPool);
App::setResource('cache', fn() => $redis);
App::setResource('request', fn() => $request); App::setResource('request', fn() => $request);
App::setResource('response', fn() => $response); App::setResource('response', fn() => $response);
@ -372,13 +398,9 @@ $server->onOpen(function (int $connection, SwooleRequest $request) use ($server,
throw new Exception('Missing or unknown project ID', 1008); throw new Exception('Missing or unknown project ID', 1008);
} }
$dbForProject = $app->getResource('dbForProject'); $dbForProject = getProjectDB($project);
$console = $app->getResource('console'); /** @var \Utopia\Database\Document $console */
/** @var \Utopia\Database\Document $console */ $user = $app->getResource('user'); /** @var \Utopia\Database\Document $user */
$console = $app->getResource('console');
/** @var \Utopia\Database\Document $user */
$user = $app->getResource('user');
/* /*
* Abuse Check * Abuse Check
@ -457,31 +479,19 @@ $server->onOpen(function (int $connection, SwooleRequest $request) use ($server,
Console::error('[Error] Message: ' . $response['data']['message']); Console::error('[Error] Message: ' . $response['data']['message']);
} }
} finally { } finally {
/** $register->get('pools')->reclaim();
* Put used PDO and Redis Connections back into their pools.
*/
$dbPool->reset();
$register->get('redisPool')->put($redis);
} }
}); });
$server->onMessage(function (int $connection, string $message) use ($server, $register, $realtime, $containerId) { $server->onMessage(function (int $connection, string $message) use ($server, $register, $realtime, $containerId) {
try { try {
$response = new Response(new SwooleResponse()); $response = new Response(new SwooleResponse());
$redis = $register->get('redisPool')->get();
$dbPool = $register->get('dbPool');
$projectId = $realtime->connections[$connection]['projectId']; $projectId = $realtime->connections[$connection]['projectId'];
$database = getConsoleDB();
/** Get the console DB */
$database = $dbPool->getConsoleDB();
$pdo = $dbPool->getPDOFromPool($database);
$database = Pools::getDatabase($pdo->getConnection(), $redis, '_console');
if ($projectId !== 'console') { if ($projectId !== 'console') {
$project = Authorization::skip(fn() => $database->getDocument('projects', $projectId)); $project = Authorization::skip(fn() => $database->getDocument('projects', $projectId));
$database = $project->getAttribute('database', ''); $database = getProjectDB($project);
$pdo = $dbPool->getPDOFromPool($database);
$database = Pools::getDatabase($pdo->getConnection(), $redis, "_{$project->getInternalId()}");
} }
/* /*
@ -565,8 +575,7 @@ $server->onMessage(function (int $connection, string $message) use ($server, $re
$server->close($connection, $th->getCode()); $server->close($connection, $th->getCode());
} }
} finally { } finally {
$dbPool->reset(); $register->get('pools')->reclaim();
$register->get('redisPool')->put($redis);
} }
}); });