From f7d9efc18e4ebe3a76b7fb67ee87d643d2a7d286 Mon Sep 17 00:00:00 2001 From: Binyamin Yawitz <316103+byawitz@users.noreply.github.com> Date: Tue, 4 Jun 2024 13:12:10 -0400 Subject: [PATCH] feat: Adding DI to realtime --- app/realtime.php | 221 +++++++++++++++++++++++------------------------ 1 file changed, 109 insertions(+), 112 deletions(-) diff --git a/app/realtime.php b/app/realtime.php index 2e2024216a..eb2eb4d7e4 100644 --- a/app/realtime.php +++ b/app/realtime.php @@ -5,6 +5,7 @@ use Appwrite\Extend\Exception; use Appwrite\Extend\Exception as AppwriteException; use Appwrite\Messaging\Adapter\Realtime; use Appwrite\Network\Validator\Origin; +use Appwrite\Utopia\Queue\Connections; use Appwrite\Utopia\Request; use Appwrite\Utopia\Response; use Swoole\Http\Request as SwooleRequest; @@ -14,10 +15,11 @@ use Swoole\Table; use Swoole\Timer; use Utopia\Abuse\Abuse; use Utopia\Abuse\Adapters\TimeLimit; -use Utopia\Cache\Adapter\Sharding; +use Utopia\Cache\Adapter\None; use Utopia\Cache\Cache; use Utopia\CLI\Console; -use Utopia\Config\Config; +use Utopia\Database\Adapter\MariaDB; +use Utopia\Database\Adapter\MySQL; use Utopia\Database\Database; use Utopia\Database\DateTime; use Utopia\Database\Document; @@ -25,93 +27,80 @@ use Utopia\Database\Helpers\ID; use Utopia\Database\Helpers\Role; use Utopia\Database\Query; use Utopia\Database\Validator\Authorization; -use Utopia\Http\Adapter\FPM\Server as FPMServer; +use Utopia\DI\Container; +use Utopia\DI\Dependency; +use Utopia\Http\Adapter\Swoole\Request as UtopiaRequest; +use Utopia\Http\Adapter\Swoole\Response as UtopiaResponse; use Utopia\Http\Http; use Utopia\Logger\Log; +use Utopia\Registry\Registry; use Utopia\System\System; use Utopia\WebSocket\Adapter; use Utopia\WebSocket\Server; -global $global; - /** - * @var \Utopia\Registry\Registry $global + * @var Registry $global + * @var Container $container */ +global $global, $container; + + require_once __DIR__ . '/init2.php'; Runtime::enableCoroutine(SWOOLE_HOOK_ALL); -function getConsoleDB(Authorization $auth): Database -{ - global $global; +$auth = new Dependency(); +$cache = new Dependency(); +$getProjectDB = new Dependency(); - /** @var \Utopia\Pools\Group $pools */ - $pools = $global->get('pools'); +$auth + ->setName('auth') + ->setCallback(fn () => new Authorization()); - $dbAdapter = $pools - ->get('console') - ->pop() - ->getResource() - ; +$getProjectDB + ->setName('getProjectDB') + ->inject('pools') + ->inject('dbForConsole') + ->inject('cache') + ->inject('auth') + ->inject('connections') + ->setCallback(function (array $pools, Database $dbForConsole, Cache $cache, Authorization $auth, Connections $connections) { + return function (Document $project) use ($pools, $dbForConsole, $cache, &$databases, $auth, $connections): Database { + if ($project->isEmpty() || $project->getId() === 'console') { + return $dbForConsole; + } - $database = new Database($dbAdapter, getCache()); - $database->setAuthorization($auth); + $databaseName = $project->getAttribute('database'); - $database - ->setNamespace('_console') - ->setMetadata('host', \gethostname()) - ->setMetadata('project', '_console'); + $pool = $pools['pools-database-' . $databaseName]['pool']; + $dsn = $pools['pools-database-' . $databaseName]['dsn']; - return $database; -} + $connection = $pool->get(); + $connections->add($connection, $pool); + $adapter = match ($dsn->getScheme()) { + 'mariadb' => new MariaDB($connection), + 'mysql' => new MySQL($connection), + default => null + }; + $adapter->setDatabase($dsn->getPath()); -function getProjectDB(Document $project, Authorization $auth): Database -{ - global $global; + $database = new Database($adapter, $cache); + $database->setAuthorization($auth); + $database->setNamespace('_' . $project->getInternalId()); - /** @var \Utopia\Pools\Group $pools */ - $pools = $global->get('pools'); + return $database; + }; + }); - if ($project->isEmpty() || $project->getId() === 'console') { - return getConsoleDB($auth); - } +$cache + ->setName('cache') + ->setCallback(function () { + return new Cache(new None()); + }); - $dbAdapter = $pools - ->get($project->getAttribute('database')) - ->pop() - ->getResource() - ; - - $database = new Database($dbAdapter, getCache()); - $database->setAuthorization($auth); - - $database - ->setNamespace('_' . $project->getInternalId()) - ->setMetadata('host', \gethostname()) - ->setMetadata('project', $project->getId()); - - return $database; -} - -function getCache(): Cache -{ - global $global; - - $pools = $global->get('pools'); /** @var \Utopia\Pools\Group $pools */ - - $list = Config::getParam('pools-cache', []); - $adapters = []; - - foreach ($list as $value) { - $adapters[] = $pools - ->get($value) - ->pop() - ->getResource() - ; - } - - return new Cache(new Sharding($adapters)); -} +$container->set($auth); +$container->set($cache); +$container->set($getProjectDB); $realtime = new Realtime(); @@ -175,18 +164,16 @@ $logError = function (Throwable $error, string $action) use ($global) { $server->error($logError); -$server->onStart(function () use ($stats, $global, $containerId, &$statsDocument, $logError) { - $auth = new Authorization(); - +$server->onStart(function () use ($stats, $container, $containerId, &$statsDocument, $logError) { sleep(5); // wait for the initial database schema to be ready Console::success('Server started successfully'); /** * Create document for this worker to share stats across Containers. */ - go(function () use ($global, $containerId, &$statsDocument, $auth) { + go(function () use ($container, $containerId, &$statsDocument) { $attempts = 0; - $database = getConsoleDB($auth); + $database = $container->get('dbForConsole'); do { try { @@ -208,13 +195,13 @@ $server->onStart(function () use ($stats, $global, $containerId, &$statsDocument sleep(DATABASE_RECONNECT_SLEEP); } } while (true); - $global->get('pools')->reclaim(); + // TODO NOW $global->get('pools')->reclaim(); }); /** * Save current connections to the Database every 5 seconds. */ - Timer::tick(5000, function () use ($global, $stats, &$statsDocument, $logError, $auth) { + Timer::tick(5000, function () use ($container, $stats, &$statsDocument, $logError) { $payload = []; foreach ($stats as $projectId => $value) { $payload[$projectId] = $stats->get($projectId, 'connectionsTotal'); @@ -224,7 +211,7 @@ $server->onStart(function () use ($stats, $global, $containerId, &$statsDocument } try { - $database = getConsoleDB($auth); + $database = $container->get('dbForConsole'); $statsDocument ->setAttribute('timestamp', DateTime::now()) @@ -235,12 +222,12 @@ $server->onStart(function () use ($stats, $global, $containerId, &$statsDocument } catch (Throwable $th) { call_user_func($logError, $th, "updateWorkerDocument"); } finally { - $global->get('pools')->reclaim(); + // TODO NOW $global->get('pools')->reclaim(); } }); }); -$server->onWorkerStart(function (int $workerId) use ($server, $global, $stats, $realtime, $logError) { +$server->onWorkerStart(function (int $workerId) use ($server, $container, $stats, $realtime, $logError) { Console::success('Worker ' . $workerId . ' started successfully'); $attempts = 0; @@ -248,12 +235,12 @@ $server->onWorkerStart(function (int $workerId) use ($server, $global, $stats, $ $auth = new Authorization(); - Timer::tick(5000, function () use ($server, $global, $realtime, $stats, $logError, $auth) { + Timer::tick(5000, function () use ($server, $container, $realtime, $stats, $logError, $auth) { /** * Sending current connections to project channels on the console project every 5 seconds. */ if ($realtime->hasSubscriber('console', Role::users()->toString(), 'project')) { - $database = getConsoleDB($auth); + $database = $container->get('dbForConsole'); $payload = []; @@ -267,9 +254,9 @@ $server->onWorkerStart(function (int $workerId) use ($server, $global, $stats, $ foreach ($list as $document) { foreach (json_decode($document->getAttribute('value')) as $projectId => $value) { if (array_key_exists($projectId, $payload)) { - $payload[$projectId] += $value; + $payload[$projectId] += $value; } else { - $payload[$projectId] = $value; + $payload[$projectId] = $value; } } } @@ -297,8 +284,7 @@ $server->onWorkerStart(function (int $workerId) use ($server, $global, $stats, $ 'data' => $event['data'] ])); } - - $global->get('pools')->reclaim(); + // TODO NOW $global->get('pools')->reclaim(); } /** * Sending test message for SDK E2E tests every 5 seconds. @@ -327,14 +313,22 @@ $server->onWorkerStart(function (int $workerId) use ($server, $global, $stats, $ while ($attempts < 300) { try { if ($attempts > 0) { - Console::error('Pub/sub connection lost (lasted ' . (time() - $start) . ' seconds, worker: ' . $workerId . '). - Attempting restart in 5 seconds (attempt #' . $attempts . ')'); + Console::error( + 'Pub/sub connection lost (lasted ' . (time() - $start) . ' seconds, worker: ' . $workerId . '). + Attempting restart in 5 seconds (attempt #' . $attempts . ')' + ); sleep(5); // 5 sec delay between connection attempts } $start = time(); - $redis = $global->get('pools')->get('pubsub')->pop()->getResource(); /** @var Redis $redis */ + $pools = $container->get('pools'); + $pool = $pools['pools-pubsub-main']['pool']; + $dsn = $pools['pools-pubsub-main']['dsn']; + $redis = new \Redis(); + $redis->connect($dsn->getHost(), $dsn->getPort()); + + /** @var Redis $redis */ $redis->setOption(Redis::OPT_READ_TIMEOUT, -1); if ($redis->ping(true)) { @@ -344,7 +338,7 @@ $server->onWorkerStart(function (int $workerId) use ($server, $global, $stats, $ Console::error('Pub/sub failed (worker: ' . $workerId . ')'); } - $redis->subscribe(['realtime'], function (Redis $redis, string $channel, string $payload) use ($server, $workerId, $stats, $global, $realtime, $auth) { + $redis->subscribe(['realtime'], function (Redis $redis, string $channel, string $payload) use ($server, $workerId, $stats, $realtime, $auth, $container) { $event = json_decode($payload, true); if ($event['permissionsChanged'] && isset($event['userId'])) { @@ -353,18 +347,17 @@ $server->onWorkerStart(function (int $workerId) use ($server, $global, $stats, $ if ($realtime->hasSubscriber($projectId, 'user:' . $userId)) { $connection = array_key_first(reset($realtime->subscriptions[$projectId]['user:' . $userId])); - $consoleDatabase = getConsoleDB($auth); + $consoleDatabase = $container->get('dbForConsole'); $auth = new Authorization(); $project = $auth->skip(fn () => $consoleDatabase->getDocument('projects', $projectId)); - $database = getProjectDB($project, $auth); + $database = $container->get('getProjectDB')($project); $user = $database->getDocument('users', $userId); $roles = Auth::getRoles($user, $auth); $realtime->subscribe($projectId, $connection, $roles, $realtime->connections[$connection]['channels']); - - $global->get('pools')->reclaim(); + //TODO NOW $global->get('pools')->reclaim(); } } @@ -404,25 +397,29 @@ $server->onWorkerStart(function (int $workerId) use ($server, $global, $stats, $ Console::error('Failed to restart pub/sub...'); }); -$server->onOpen(function (int $connection, SwooleRequest $request) use ($server, $global, $stats, &$realtime, $logError) { +$server->onOpen(function (int $connection, SwooleRequest $request) use ($server, $container, $stats, &$realtime, $logError) { $auth = new Authorization(); - $http = new Http(new FPMServer(), 'UTC'); - $request = new Request($request); - $response = new Response(new SwooleResponse()); + $request = new Request(new UtopiaRequest($request)); + $response = new Response(new UtopiaResponse(new SwooleResponse())); + + $requestInjection = new Dependency(); + $responseInjection = new Dependency(); + + $requestInjection->setName('request')->setCallback(fn () => $request); + $responseInjection->setName('response')->setCallback(fn () => $response); + + $container->set($requestInjection); + $container->set($responseInjection); Console::info("Connection open (user: {$connection})"); - Http::setResource('pools', fn () => $global->get('pools')); - Http::setResource('request', fn () => $request); - Http::setResource('response', fn () => $response); - try { /** @var Document $project */ - $project = $http->getResource('project'); + $project = $container->get('project'); /* - * Project Check + * Project Check */ if (empty($project->getId())) { throw new Exception(Exception::REALTIME_POLICY_VIOLATION, 'Missing or unknown project ID'); @@ -437,9 +434,11 @@ $server->onOpen(function (int $connection, SwooleRequest $request) use ($server, throw new AppwriteException(AppwriteException::GENERAL_API_DISABLED); } - $dbForProject = getProjectDB($project, $auth); - $console = $http->getResource('console'); /** @var Document $console */ - $user = $http->getResource('user'); /** @var Document $user */ + $dbForProject = $container->get('getProjectDB')($project); + $console = $container->get('console'); + /** @var Document $console */ + $user = $container->get('user'); + /** @var Document $user */ /* * Abuse Check @@ -519,22 +518,20 @@ $server->onOpen(function (int $connection, SwooleRequest $request) use ($server, Console::error('[Error] Message: ' . $response['data']['message']); } } finally { - $global->get('pools')->reclaim(); + // TODO NOW $global->get('pools')->reclaim(); } }); -$server->onMessage(function (int $connection, string $message) use ($server, $global, $realtime, $containerId) { - $auth = new Authorization(); - +$server->onMessage(function (int $connection, string $message) use ($server, $container, $realtime, $containerId) { try { $response = new Response(new SwooleResponse()); $projectId = $realtime->connections[$connection]['projectId']; - $database = getConsoleDB($auth); + $database = $container->get('dbForConsole'); if ($projectId !== 'console') { $auth = new Authorization(); $project = $auth->skip(fn () => $database->getDocument('projects', $projectId)); - $database = getProjectDB($project, $auth); + $database = $container->get('getProjectDB')($project); } else { $project = null; } @@ -619,7 +616,7 @@ $server->onMessage(function (int $connection, string $message) use ($server, $gl $server->close($connection, $th->getCode()); } } finally { - $global->get('pools')->reclaim(); + // TODO NOW $global->get('pools')->reclaim(); } });